RocketMQ是什么?
RocketMQ是一种高性能、高可用、可扩展的分布式消息中间件。它是阿里巴巴集团自主研发的分布式消息中间件,旨在为大规模分布式系统提供高效可靠的消息投递服务。RocketMQ最初是基于Apache Kafka开发的,后来在Kafka的基础上进行了很多改进和优化,如支持事务消息、消费者拉取消息、消息轨迹追踪等功能。
RocketMQ采用了分布式架构,可以水平扩展,支持多种发布订阅模式和点对点模式,并拥有消息事务、消息幂等、消息定时等多种高级特性。此外,RocketMQ还支持多种高效的消息传输协议(TCP、HTTP、HTTPS、JMS)、各种操作系统(Linux、Windows、MacOSX)以及多种不同的编程语言(Java、C++、Python、Ruby、PHP等),便于开发者对其进行定制和扩展。
RocketMQ的核心架构包括Producer、Consumer、Broker、Name Server和Cluster等。Producer和Consumer是RocketMQ中最基本的组件,分别用于生产消息和消费消息。Broker是消息队列服务器,负责保存所有生产的消息,同时协调消费者对消息的访问。Name Server用于管理Broker集群中的元数据(如Broker的地址、主题、队列等信息),提供基于主题和队列的路由服务。Cluster则是一组具有相同主题的Broker的集群,一个大规模消息系统可以包含多个Cluster,每个Cluster包含多个Broker和多个Topic。
RocketMQ的主要特点包括:
-
高吞吐量和低延迟:RocketMQ支持异步消息和顺序发送,从而实现高效的消息通信,可以满足高并发、低延迟的需求。
-
数据安全和可靠性:RocketMQ提供多种存储模式(内存和磁盘)和复制模式(同步和异步)以实现数据的持久化存储和高可用,可以保证消息的可靠性和数据安全性。
-
灵活性和可扩展性:RocketMQ可灵活地支持多种发布订阅模式和点对点模式,并支持多种编程语言和操作系统,易于定制和扩展。
-
高可用性:RocketMQ支持自动容错和自动故障处理机制,能够保证消息系统的高可用性和可靠性。
-
实时可观测性:RocketMQ提供实时监测和诊断工具,能够帮助用户实时发现和解决问题。
RocketMQ是一种高性能、可靠、灵活、可扩展的分布式消息中间件,可以满足大规模分布式系统的高效可靠消息投递需求。
解决什么问题?
RocketMQ通常用于分布式系统中的消息传递,可以在各种应用程序和服务之间传递数据、事件和通知。具体来说,RocketMQ可以解决以下问题:
-
异步通信:RocketMQ支持异步消息传递,并且可以支持消息持久化,从而可以在不同的节点之间实现异步通信,提高系统响应速度。
-
高并发流量处理:RocketMQ可以支持高吞吐量的消息处理,适用于处理高并发的数据流量,如日志处理、实时数据分析等。
-
分布式系统的协调:RocketMQ在分布式系统中被用来实现协调和集成各种服务和应用程序,从而提高整个系统的可管理性,实现各种分布式应用模式和场景。
-
事务消息:RocketMQ支持事务消息,保证了消息的可靠传递和原子性处理,可用于实现分布式事务。
-
实时消息处理:RocketMQ可以处理实时的数据流,支持实时消息分析和实时监控,以及各种实时应用场景,如信息推送、实时广播等。
RocketMQ可以在各种应用程序和服务之间传递数据、事件和通知,从而帮助开发人员构建更可靠、可扩展、高性能的分布式系统。同时,RocketMQ可以解决一系列消息传递的问题,如异步通信、高并发流量处理、事务消息等,具备广泛的应用场景。
RocketMQ的消息模式
RocketMQ主要提供了两种模式的消息传递方式:
-
发布/订阅模式(Publish/Subscribe,简称Pub/Sub):发布者将消息发送到一个主题(Topic),多个订阅者可以订阅该主题,从而接收消息。发布者和订阅者之间是完全解耦的,可以随时增加或删除订阅者。Pub/Sub模式适用于消息广播、事件通知等场景。
-
点对点模式(Point-to-Point,简称P2P):生产者将消息发送到一个队列(Queue),消费者从该队列中取出消息进行消费。一个消息只会被一个消费者处理,处理后就会从队列中删除。P2P模式适用于任务分发、数据传输等场景。
除了这两种基本模式,RocketMQ还支持两种高级模式:
-
发布/订阅模式+标签模式(Publish/Subscribe + Topic Tag):在Pub/Sub模式下,对主题进行细分和分类,使用标签(Tag)来区分不同的消息类型。不同的订阅者可以订阅不同的标签,从而只接收自己关心的消息。Tag模式适用于针对同一主题的不同消息类型进行处理的场景。
-
分布式事务模式(Distributed Transaction):在事务消息的模式下,RocketMQ支持分布式事务,保证消息的原子性处理。生产者将消息发送到一个半消息(Half Message)队列中,等待确认。同时在本地事务中进行操作。如果本地事务成功,则确认半消息并提交,如果失败则回滚半消息。消费者在处理到被确认的半消息后进行业务处理,如果失败则重新消费,如果成功则提交事务。分布式事务模式适用于需要保证多个主机之间事务一致性的分布式系统。
RocketMQ提供了多种不同的消息传递模式,可以根据具体的应用场景和需求进行选择和配置。其中,Pub/Sub模式和P2P模式是最基本的两种模式,其它高级模式则可满足更为复杂的需求,如分布式事务、精细化数据过滤等。
安装教程
好的,下面是详细步骤和命令,供参考:
Windows 安装 RocketMQ 的步骤:
-
下载 RocketMQ 的压缩包,将其解压到本地某个目录下,如 D:\programs\rocketmq-all-4.7.0-bin-release。
-
设置环境变量:在系统环境变量中新建 ROCKETMQ_HOME,其值为 RocketMQ 的安装目录,即 D:\programs\rocketmq-all-4.7.0-bin-release。
-
打开 PowerShell 或命令提示符(CMD),进入 RocketMQ 安装目录 D:\programs\rocketmq-all-4.7.0-bin-release\bin 目录下。
-
启动 Namesrv:执行以下命令
cd D:\programs\rocketmq-all-4.7.0-bin-release\bin
mqnamesrv.cmd
mqnamesrv.cmd
文件是 Namesrv 的启动脚本,执行后会在后台启动 Namesrv。如果一切正常,会显示类似下面的信息:
The Name Server boot success...
- 启动 Broker:执行以下命令:
cd D:\programs\rocketmq-all-4.7.0-bin-release\bin
mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
mqbroker.cmd
文件是 Broker 的启动脚本,-n 参数后面是设置 Namesrv 的地址,autoCreateTopicEnable=true 表示自动创建 Topic。执行后会在后台启动 Broker。如果一切正常,会显示类似下面的信息:
The broker[%s,%s] boot success...
-
打开 JConsole 监控控制台:在命令提示符中执行 jconsole 命令,打开 JConsole 监控控制台,选择一个进程进行监控。常见的监控对象有:
- org.apache.rocketmq.namesrv.NamesrvStartup:Namesrv 进程
- org.apache.rocketmq.broker.BrokerStartup:Broker 进程
- org.apache.rocketmq.remoting.netty.NettyRemotingClient:客户端
Linux 安装 RocketMQ 的步骤:
-
下载 RocketMQ 的压缩包,将其解压到本地某个目录下,如 /usr/local/rocketmq。
-
设置环境变量:在 /etc/profile 文件中添加如下环境变量:
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$ROCKETMQ_HOME/bin
然后运行 source /etc/profile 命令,使环境变量生效。
-
进入 RocketMQ 安装目录 /usr/local/rocketmq/bin 目录下。
-
启动 Namesrv:执行以下命令:
nohup sh mqnamesrv &
mqnamesrv
文件是 Namesrv 的启动脚本,& 表示在后台运行。如果一切正常,会生成一个 nohup.out 文件,可以用 tail -f nohup.out 命令查看启动信息。
- 启动 Broker:执行以下命令:
nohup sh mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true &
mqbroker
文件是 Broker 的启动脚本,-n 参数后面是设置 Namesrv 的地址,autoCreateTopicEnable=true 表示自动创建 Topic。& 表示在后台运行。如果一切正常,会生成一个 nohup.out 文件,可以用 tail -f nohup.out 命令查看启动信息。
-
打开 JConsole 监控控制台:在终端中执行 jconsole 命令,打开 JConsole 监控控制台,选择一个进程进行监控。常见的监控对象有:
- org.apache.rocketmq.namesrv.NamesrvStartup:Namesrv 进程
- org.apache.rocketmq.broker.BrokerStartup:Broker 进程
- org.apache.rocketmq.remoting.netty.NettyRemotingClient:客户端
以上是相应操作系统下安装 RocketMQ 的详细命令和步骤。
SpringBoot整合
下面是 SpringBoot 整合 RocketMQ 的详细步骤和完整的代码,供参考:
步骤:
- 添加依赖:
在 pom.xml 文件中添加 RocketMQ 客户端依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
其中,${rocketmq.version} 可以设置为 4.7.0 等版本号。
- 配置 RocketMQ 属性:
在 application.yml 文件中添加 RocketMQ 的配置属性:
rocketmq:
name-server: 127.0.0.1:9876 # Namesrv 的地址
producer:
group: test-group # 生产者的组名
consumer:
group: test-group # 消费者的组名
- 编写生产者:
@Service
public class RocketMQProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
Message msg = MessageBuilder.withPayload(message).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
System.out.printf("MQ send message: %s%n", sendResult);
}
}
其中,RocketMQTemplate 是 RocketMQ 的消息操作模板。
- 编写消费者:
@Service
public class RocketMQConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("MQ receive message: %s%n", message);
}
@Override
public void rocketMQExceptionOccurred(RocketMQListenerException e) {
System.err.printf("MQ exception: %s%n", e);
}
}
其中,RocketMQListener 是 RocketMQ 的消息监听器。
- 发送消息:
在控制器中使用生产者发送消息:
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private RocketMQProducerService rocketMQProducerService;
@PostMapping("/{topic}/{message}")
public String sendMessage(@PathVariable String topic, @PathVariable String message) {
rocketMQProducerService.sendMessage(topic, message);
return "success";
}
}
其中,@PathVariable 表示从 URL 中获取参数。
完整代码:
RocketMQConfiguration.java:
@Configuration
public class RocketMQConfiguration {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Bean
public DefaultMQProducer defaultProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer); // 设置 Namesrv 地址
producer.start();
return producer;
}
@Bean
public RocketMQTemplate rocketMQTemplate() throws Exception {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
rocketMQTemplate.setProducer(defaultProducer());
rocketMQTemplate.setDefaultTopicQueueNums(1); // 设置默认 Topic 的队列数量为 1
return rocketMQTemplate;
}
@Bean
public DefaultMQPushConsumer defaultConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer); // 设置 Namesrv 地址
consumer.subscribe("test-topic", "*"); // 订阅 Topic
consumer.registerMessageListener(new RocketMQConsumerService()); // 注册消息监听器
consumer.start();
return consumer;
}
}
RocketMQProducerService.java:
@Service
public class RocketMQProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
Message msg = MessageBuilder.withPayload(message).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, msg);
System.out.printf("MQ send message: %s%n", sendResult);
}
}
RocketMQConsumerService.java:
@Service
public class RocketMQConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("MQ receive message: %s%n", message);
}
@Override
public void rocketMQExceptionOccurred(RocketMQListenerException e) {
System.err.printf("MQ exception: %s%n", e);
}
}
RocketMQController.java:
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private RocketMQProducerService rocketMQProducerService;
@PostMapping("/{topic}/{message}")
public String sendMessage(@PathVariable String topic, @PathVariable String message) {
rocketMQProducerService.sendMessage(topic, message);
return "success";
}
}
以上是 SpringBoot 整合 RocketMQ 的详细步骤和完整的代码。
文章评论