ActiveMQ的什么?
ActiveMQ是基于Java的消息中间件(MOM,Message Oriented Middleware)实现,依据JMS规范,提供了一种消息传递服务,可以使分布式应用程序在异构环境中进行简单、快捷、有效的数据通信,是开源社区中最知名的MOM之一。
ActiveMQ的核心思想就是消息,消息具有很强的灵活性和可扩展性。ActiveMQ处理消息的模型基于JMS规范,实现了两种消息传递模型:点对点模型和发布/订阅模型。点对点模型基于队列,消息生产者向队列中发送消息,消息消费者从队列中接收消息。发布/订阅模型基于主题,消息生产者向主题发送消息,主题将消息传递给订阅者。
ActiveMQ的消息传递是异步的,当消息到达ActiveMQ Broker后,会被保存在队列或主题中等待消费。Broker会将消息推送给消息消费者,消费者可以选择立即消费,也可以等待消费。ActiveMQ支持多种传输通信协议,包括TCP、SSL、HTTP等,可以保证消息的安全传输。
一个ActiveMQ架构通常由Broker、连接器、目的地和客户端4个主要组成部分组成。Broker是消息服务器,负责接收、存储和转发消息。客户端可以连接到Broker,生产或消费消息。连接器是处理外部连接和转发消息的中介组件。目的地是消息的发送和接收地址,可以是队列或主题。
ActiveMQ还支持多种高级特性,如消息持久化、事务处理、消息分组、顺序消息、流量控制、消息查询、动态网络优化、集群等,这些特性使得ActiveMQ成为一种高效的、可靠的、灵活的、易于集成的消息传递系统。由于开源和活跃的社区支持,ActiveMQ已经广泛应用于很多企业级应用程序,如电子商务平台、金融交易系统、商品交易系统、电信和网络系统等。
ActiveMQ通常拿来做什么和解决什么问题?
ActiveMQ主要用于解决分布式应用程序之间的消息传递和数据通信问题。它为企业应用提供了一种异步消息传递机制,允许系统组件之间用消息进行通信,并且不需要直接关注目标系统的状态和可用性。 ActiveMQ通常拿来做下面几项应用和解决以下问题:
-
应用解耦:ActiveMQ支持基于队列和主题的消息传递模型,可以将消息发送到目标系统中的特定队列或主题,从而确保消息能被异步地传送给目标系统。应用系统可以继续运行,而不必直接处理目标系统的状态和可用性。这种解耦可以提高应用系统之间的松耦合,简化应用的设计和维护。
-
高可用性和容错性:ActiveMQ支持集群和主从部署模式,可以确保系统的高可用性和容错性。当一个节点宕机或无法处理请求时,ActiveMQ将自动切换到另一个活动节点,从而确保系统可用性。
-
异步通信:ActiveMQ的消息传递机制是异步的,可以在消息到达之前进行其他操作。这种异步通信模型可以提高系统的响应时间和并发性,并且能够处理大量的请求。
-
数据持久化:ActiveMQ支持持久化消息,可以将消息保存在磁盘上,确保在发生故障时不会丢失数据。这可以增加数据安全性,避免数据损失。
-
分布式事务管理:ActiveMQ支持JTA分布式事务管理,可以处理消息的事务性,保证消息的一致性和可靠性。
ActiveMQ主要用于解决分布式应用程序之间的异步消息传递和数据通信问题,提供源系统和目标系统之间的消息传递机制,支持高可用性、容错性、异步通信、数据持久化和分布式事务管理等功能。它可以应用于各种企业应用程序,如金融交易系统、电子商务平台、商品交易系统、电信和网络系统等。
ActiveMQ的消息模型
ActiveMQ支持两种消息传递模型,分别是点对点模型和发布/订阅模型。
- 点对点模型(Point-to-Point Model)
点对点模型基于队列(Queue),消息生产者(生产消息的客户端)将消息发送到队列中,消息消费者(消费消息的客户端)从队列中接收消息,每条消息只能被一个消费者消费。当有多个消费者时,ActiveMQ通过负载均衡机制来分配消费者。点对点模型被广泛应用于请求/响应系统、任务分配和任务处理系统等场景。点对点模型可以通过以下方式创建:
- 在ActiveMQ Server端创建Queue,在客户端通过JNDI查找
- 在客户端通过JMS API创建Queue,不需要在ActiveMQ Server端创建
- 发布/订阅模型(Publish/Subscribe Model)
发布/订阅模型基于主题(Topic),消息生产者将消息发布到主题中,订阅者(消费消息的客户端)从主题中订阅消息。当消息发布到主题中时,所有订阅者都可以接收到相同的消息副本。发布/订阅模型被广泛应用于新闻订阅、股票行情、即时聊天等场景。发布/订阅模型可以通过以下方式创建:
- 在ActiveMQ Server端创建Topic,在客户端通过JNDI查找
- 在客户端通过JMS API创建Topic,不需要在ActiveMQ Server端创建
ActiveMQ支持点对点模型和发布/订阅模型。点对点模型适用于请求/响应系统、任务分配和任务处理系统等场景,而发布/订阅模型适用于新闻订阅、股票行情、即时聊天等场景。无论是点对点模型还是发布/订阅模型,在消息处理过程中,ActiveMQ都提供了可靠性、可伸缩性、高吞吐量和灵活性等特性,是一种高效的、可靠的、易于集成的消息传递系统。
安装教程
在Windows上安装ActiveMQ:
- 下载ActiveMQ二进制安装包,地址:http://activemq.apache.org/components/classic/download/
- 解压缩ActiveMQ安装包到指定目录,例如:D:\activemq
- 打开命令提示符,切换到D:\activemq\bin目录,运行以下命令启动ActiveMQ: activemq.bat start
- 使用浏览器访问http://localhost:8161/admin/ ,查看ActiveMQ管理控制台,可以进行队列(Queue)和主题(Topic)的管理、消息发送和接收等操作
在Linux上安装ActiveMQ:
- 下载ActiveMQ二进制安装包,地址:http://activemq.apache.org/components/classic/download/
- 将ActiveMQ安装包上传到指定目录,例如:/opt
- 在命令行终端中进入/opt目录,运行以下命令解压缩ActiveMQ安装包: tar -zxf apache-activemq-x.x.x-bin.tar.gz
- 进入/opt/apache-activemq-x.x.x/bin目录,运行以下命令启动ActiveMQ: ./activemq start
- 使用浏览器访问http://localhost:8161/admin/ ,查看ActiveMQ管理控制台,可以进行队列(Queue)和主题(Topic)的管理、消息发送和接收等操作
注意:在Linux上启动ActiveMQ时,可能需要以root用户身份运行,或者在/opt/apache-activemq-x.x.x/bin/activemq脚本中加上sudo命令。
在Windows上安装ActiveMQ需要下载ActiveMQ二进制安装包,解压缩到指定目录,然后在命令提示符中运行activemq.bat脚本启动ActiveMQ;在Linux上安装ActiveMQ同样需要下载ActiveMQ二进制安装包,解压缩到指定目录,然后在命令行终端中进入bin目录,运行activemq脚本启动ActiveMQ。
SpringBoot整合
Spring Boot整合ActiveMQ主要需要通过Spring Boot提供的activeMQ包以及相关的注解、配置文件等来完成,下面是具体的步骤和代码实现:
- 引入相关依赖包
在Spring Boot项目的pom.xml文件中,引入以下依赖包:
<!--spring boot整合ActiveMQ依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
- 添加ActiveMQ配置信息
在Spring Boot项目的application.yml或application.properties文件中添加以下配置信息:
# ActiveMQ配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.max-connections=10
其中,broker-url是ActiveMQ服务端的地址和端口号,user和password是访问ActiveMQ的用户名和密码,max-connections是连接池的最大连接数。
- 编写消息生产者和消费者
添加消息生产者和消费者类,使用Spring Boot提供的注解完成ActiveMQ的配置和操作。
生产者类:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
@Component
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 发送消息
public void send(String message) {
jmsMessagingTemplate.convertAndSend(queue, message);
}
}
消费者类:
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 接收消息
@JmsListener(destination = "${spring.activemq.queue-name}")
public void receive(String message) {
System.out.println("接收到消息:" + message);
}
}
其中,Producer类使用@Autowired注解注入Spring Boot自动配置的JMSMessagingTemplate和Queue;Consumer类使用@JmsListener注解指定接收消息的目的地。
- 测试消息发送和接收
创建一个Controller接口,注入消息生产者类,在接口方法中调用生产者的send方法发送消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private Producer producer;
@GetMapping("/send/{message}")
public String send(@PathVariable String message) {
producer.send(message);
return "消息发送成功:" + message;
}
}
启动Spring Boot项目,在浏览器访问http://localhost:8080/send/{message} 接口,发送消息。在控制台中可以看到消费者接收到消息,并输出消息内容。
完整代码实现:
- pom.xml文件中添加依赖:
<dependencies>
<!--Spring Boot依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--spring boot整合ActiveMQ依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
- application.yml文件中添加ActiveMQ配置信息:
# ActiveMQ配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.max-connections=10
- Producer类和Consumer类:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
@Component
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 发送消息
public void send(String message) {
jmsMessagingTemplate.convertAndSend(queue, message);
}
}
--------------------------------------------------------
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
// 接收消息
@JmsListener(destination = "${spring.activemq.queue-name}")
public void receive(String message) {
System.out.println("接收到消息:" + message);
}
}
- Controller类:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private Producer producer;
@GetMapping("/send/{message}")
public String send(@PathVariable String message) {
producer.send(message);
return "消息发送成功:" + message;
}
}
注意:在消费者类中,@JmsListener注解的destination属性值可以通过配置文件中的${spring.activemq.queue-name}动态获取,也可以直接指定为字符串类型的目的地名称。如果使用动态获取的方式,需要在配置文件中添加以下配置:
spring.activemq.queue-name=test.queue
其中,test.queue为自定义的队列名称。在消息发送时,要保证目的地的名称一致,否则消息将不能正确发送和接收。
文章评论