ActiveMQ详细使用(含高级篇)

ActiveMQ详细使用(含高级篇)

ActiveMQ

文章目录

ActiveMQ1、概述2、使用安装启动&停止

3、Java编码实现ActiveMQ通讯环境搭建点对点的消息传递域——队列(Queue)消息生产者消息消费者方式一:阻塞式消费者(receive)方式二:异步监听式消费者(监听器onMessage())

发布订阅消息传递域——主题(topic)发布主题生产者订阅主题消费者方式一:阻塞式消费者(receive)方式二:异步监听式消费者(监听器onMessage())

总结

4、JMS规范和落地产品简介JMS的组成结构和特点JMS ProviderJMS ProducerJMS ConsumerJSM Message(*)消息头消息属性消息体

JMS的可靠性持久性(Persistent)持久性——队列(Queue)持久性——主题(Topic)

事务(Transaction)非事务事务

签收(Acknowledge)非事务情况下使用签收事务情况下使用签收

5、Spring整合ActiveMQ环境搭建队列(Queue)主题(Topic)消费者使用监听接收消息

6、SpringBoot整合ActiveMQ环境配置队列(Queue)添加配置文件配置JMS生产者消费者

主题发布订阅(Topic)添加配置文件配置JMS生产者消费者

7、ActiveMQ的传输协议协议配置NIO配置 NIO增强

8、ActiveMQ的消息存储和持久化KahaDB消息存储(默认)JDBC存储消息步骤总结

JDBC Message store with ActiveMQ Journal

9、ActiveMQ的多节点集群zookeeper+replicated-leveldb-store的主从集群部署(待更新)

1、概述

官网:http://activemq.apache.org/定义:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题topic中,在合适的时候,消息服务器回将消息转发给接受者。在这个过程中,发送和接收是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然的关系特点:解耦、削峰、异步

2、使用

安装

到官网下载后解压即可使用修改activemq的conf目录中jetty.xml文件(为了让外部设备访问)

启动&停止

普通启动:进入到activemq的bin目录中

./activemq start

3、Java编码实现ActiveMQ通讯

目的地Destination:队列(Queue)和主题(Topic)

环境搭建

第一步:创建Maven工程

第二步:引入配置(pom.xml)

org.apache.activemq

activemq-all

5.15.11

org.apache.xbean

xbean-spring

4.15

点对点的消息传递域——队列(Queue)

特点

1、每个消息只能有一个消费者,类似于1对1的关系。2、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看3、消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息

消息生产者

public class JmsProduce {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616"; //ActiveMQ地址

public static final String QUEUE_NAME = "queue001"; //队列名称

public static void main(String[] args) throws JMSException {

//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

//2、通过连接工厂,获得连接connection并启动访问

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//3、创建会话Session(两个参数:transacted=是否开启事务,acknowledgeMode=签收模式)

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4、创建目的地(具体是队列(queue)还是主题(top))

Queue queue = session.createQueue(QUEUE_NAME);

//5、创建消息的生产者

MessageProducer messageProducer = session.createProducer(queue);

//6、创建消息并赋值

TextMessage textMessage = session.createTextMessage(); //String类型的

textMessage.setText("fzk");

//7、发送消息到队列或主题

messageProducer.send(textMessage);

messageProducer.send(mapMessage);

//8、关闭资源

messageProducer.close();

session.close();

connection.close();

System.out.println("消息发送完毕");

}

}

控制台

控制台说明

Name=队列名称Number Of Pending Messages=等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数Number Of Consumers=消费者数量,消费者端的消费者数量。Messages Enqueued=进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。Messages Dequeued=出队消息数,可以理解为是消费者消费掉的数量。

消息消费者

方式一:阻塞式消费者(receive)

订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

public class JmsConsumer {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616"; //ActiveMQ地址

public static final String QUEUE_NAME = "queue001"; //队列名称

public static void main(String[] args) throws JMSException {

//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

//2、通过连接工厂,获得连接connection并启动访问

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//3、创建会话Session(两个参数:transacted=是否开启事务,acknowledgeMode=签收模式)

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4、创建目的地(具体是队列(queue)还是主题(top))

Queue queue = session.createQueue(QUEUE_NAME);

//5、创建消息的生产者

MessageConsumer messageConsumer = session.createConsumer(queue);

//阻塞式

while(true){

//6、接收队列或主题的消息

TextMessage textMessage = (TextMessage) messageConsumer.receive();

if(textMessage != null){

//7、获取消息中的数据

String text = textMessage.getText();

System.out.println(text);

}else{

break;

}

}

//8、关闭资源

messageConsumer.close();

session.close();

connection.close();

}

}

方式二:异步监听式消费者(监听器onMessage())

订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

public class JmsConsumer {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String QUEUE_NAME = "queue001";

public static void main(String[] args) throws JMSException, IOException {

System.out.println("消费者1");

//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

//2、通过连接工厂,获得连接connection并启动访问

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//3、创建会话Session(两个参数:transacted=是否开启事务,acknowledgeMode=签收模式)

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4、创建目的地(具体是队列(queue)还是主题(top))

Queue queue = session.createQueue(QUEUE_NAME);

//5、创建消息的生产者

MessageConsumer messageConsumer = session.createConsumer(queue);

//6、使用异步监听式来获取消息

messageConsumer.setMessageListener(new MessageListener() {

public void onMessage(Message message) {

if(message != null && message instanceof TextMessage){

//7、获取消息并通过消息获得数据

TextMessage textMessage = (TextMessage) message;

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

});

System.in.read(); //输入键盘任意键结束

//8、关闭资源

messageConsumer.close();

session.close();

connection.close();

}

}

发布订阅消息传递域——主题(topic)

特点

1、生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系2、生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息3、生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者 先启动订阅者再启动生产者,不然发送的消息是废消息

发布主题生产者

public class JmsProduceTopic {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String TOPIC_NAME = "topic001";

public static void main(String[] args) throws JMSException {

//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

//2、通过连接工厂,获得连接connection并启动访问

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//3、创建会话Session

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4、创建目的地(具体是队列(queue)还是主题(top))

Topic topic = session.createTopic(TOPIC_NAME);

//5、创建消息的生产者

MessageProducer messageProducer = session.createProducer(topic);

//6、创建消息并赋值

TextMessage textMessage = session.createTextMessage();

textMessage.setText("fzk");

//7、发送消息到队列或主题

messageProducer.send(textMessage);

//8、关闭资源

messageProducer.close();

session.close();

connection.close();

System.out.println("消息发送完毕");

}

}

订阅主题消费者

方式一:阻塞式消费者(receive)

public class JmsConsumerTopic {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String TOPIC_NAME = "topic001";

public static void main(String[] args) throws JMSException {

//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

//2、通过连接工厂,获得连接connection并启动访问

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//3、创建会话Session

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4、创建目的地(具体是队列(queue)还是主题(top))

Queue queue = session.createQueue(QUEUE_NAME);

//5、创建消息的生产者

MessageConsumer messageConsumer = session.createConsumer(queue);

//阻塞式

while(true){

//6、接收队列或主题的消息

TextMessage textMessage = (TextMessage) messageConsumer.receive();

if(textMessage != null){

//7、获取消息中的数据

String text = textMessage.getText();

System.out.println(text);

}else{

break;

}

}

//8、关闭资源

messageConsumer.close();

session.close();

connection.close();

}

}

方式二:异步监听式消费者(监听器onMessage())

public class JmsConsumerTopic {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String TOPIC_NAME = "topic001";

public static void main(String[] args) throws JMSException, IOException {

System.out.println("消费者1");

//1、创建连接工厂,按照给定的URL地址,采用默认的用户名和密码

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

//2、通过连接工厂,获得连接connection并启动访问

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//3、创建会话Session

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//4、创建目的地(具体是队列(queue)还是主题(top))

Topic topic = session.createTopic(TOPIC_NAME);

//5、创建消息的生产者

MessageConsumer messageConsumer = session.createConsumer(topic);

//6、使用异步监听式来获取消息

messageConsumer.setMessageListener(new MessageListener() {

public void onMessage(Message message) {

if(message != null && message instanceof TextMessage){

//7、获取消息并通过消息获得数据

TextMessage textMessage = (TextMessage) message;

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

});

System.in.read(); //输入键盘任意键结束

//8、关闭资源

messageConsumer.close();

session.close();

connection.close();

}

}

总结

比较项目Topic队列模式Queue队列模式工作模式"订阅-发布"模式,如果当前没有订阅者,消息将会被丢弃,如果有多个订阅者,那么这些订阅者都会收到消息"负载均衡"模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息有无状态无状态Queue数据默认会在mq服务器上已文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面,也可以配置成DB存储传递完整性如果没有订阅者,消息会被丢弃消息不会被丢弃处理效率由于消息要按照订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同消息协议自身的性能差异由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不会有明显降低。当然不同消息协议的具体性能也是有差异的

4、JMS规范和落地产品

简介

Java Message Service(Java消息服务是JavaEE中的一个技术)

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果

JMS的组成结构和特点

JMS Provider

实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器

JMS Producer

消息生产者,创建和发送JMS消息的客户端应用

JMS Consumer

消息消费者,接收和处理JMS消息的客户端应用

JSM Message(*)

消息头

JMSDestination:消息发送的目的地,主要是指队列Queue和主题TopicJMSDeliveryMode:持久模式和非持久模式

一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失 JMSExpiration:过期时间(默认是永不过期)

消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除 JMSPriority:消息优先级

从0-9十个级别,0-4是普通消息5-9是加急消息JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级 JMSMessageID:唯一标识每个消息的标识由MQ产生

消息属性

发送和接收的消息类型必须一致对应

消息格式

TxtMessage : 普通字符串消息,包含一个String

TextMessage textMessage = session.createTextMessage(); //生产者

TextMessage textMessage = (TextMessage) message; //消费者

MapMessage :一个Map类型的消息,key为Strng类型,而值为Java基本类型

MapMessage mapMessage = session.createMapMessage(); //生产者

MapMessage mapMessage = (MapMessage) message; //消费者

BytesMessage : 二进制数组消息,包含一个byte[]

BytesMessage bytesMessage = session.createBytesMessage(); //生产者

BytesMessage bytesMessage = (BytesMessage) message; //消费者

StreamMessage : Java数据流消息,用标准流操作来顺序填充和读取

StreamMessage streamMessage = session.createStreamMessage(); //生产者

StreamMessage streamMessage = (StreamMessage) message; //消费者

ObjectMessage :对象消息,包含一个可序列化的Java对象

ObjectMessage objectMessage = session.createObjectMessage(); //生产者

ObjectMessage objectMessage = (ObjectMessage) message; //消费者

消息体

如果需要除消息字段以外的值,那么可以使用消息属性

识别/去重/重点标注等操作非常有用的方法

发送和接收的消息体类型必须一致对应

//生产者中消息体的使用

TextMessage textMessage = session.createTextMessage(); //String类型的

textMessage.setText("fzk");

textMessage.setStringProperty("name", "property"); //消息体

//消费者中消息体的获取(发送和接收的消息体必须一一对应)

TextMessage textMessage = (TextMessage) message;

try {

System.out.println(textMessage.getText());

System.out.println(textMessage.getStringProperty("name")); //消息体

} catch (JMSException e) {

e.printStackTrace();

}

JMS的可靠性

持久性(Persistent)

非持久性:当服务器宕机,消息不存在

设置:MessageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) 持久性:当服务器宕机,消息依然存在

设置:MessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)

持久性——队列(Queue)

队列(Queue)默认是持久性

持久性——主题(Topic)

发布主题生产者代码

public class JmsProduceTopicPersistent {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String TOPIC_NAME = "topic_persistent";

public static void main(String[] args) throws JMSException {

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

Connection connection = activeMQConnectionFactory.createConnection();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic(TOPIC_NAME);

MessageProducer messageProducer = session.createProducer(topic);

//将发布主题设为持久性

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

//设置为持久性后启动连接

connection.start();

for (int i = 0; i < 3; i++) {

TextMessage textMessage = session.createTextMessage();

textMessage.setText("生产:" + (i + 1));

messageProducer.send(textMessage);

}

messageProducer.close();

session.close();

connection.close();

System.out.println("发送成功");

}

}

定阅主题消费者代码

public class JmsConsumerTopicPersistent {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String topic_persistent = "topic_persistent";

public static void main(String[] args) throws JMSException {

System.out.println("消费者:1");

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

Connection connection = activeMQConnectionFactory.createConnection();

//设置客户端id

connection.setClientID("fzk");

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic(topic_persistent);

//订阅主题(两个参数:topic=主题,name=订阅名称)

TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "fzk123");

//启动连接

connection.start();

//订阅消息

Message message = topicSubscriber.receive();

while (message != null){

TextMessage textMessage = (TextMessage) message;

System.out.println(textMessage.getText());

message = topicSubscriber.receive();

}

topicSubscriber.close();

session.close();

connection.close();

}

}

定阅主题消费者在线

定阅主题消费者不在线

事务(Transaction)

非事务

只要执行send方法,就进入到队列中

例如:messageProducer.send(textMessage);

事务

先执行send方法再执行commit方法,消息才被真正提交到队列中

例如:

messageProducer.send(textMessage);session.commit();

生产者

public class JmsProduceQueueTransaction {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String QUEUE_NAME = "queue_transacton";

public static void main(String[] args) throws JMSException {

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//将第一个参数设为true --> 开启事务

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);

MessageProducer messageProducer = session.createProducer(queue);

//发送数据

try {

for (int i = 0; i < 3; i++) {

TextMessage textMessage = session.createTextMessage("生产:" + (i + 1));

messageProducer.send(textMessage);

}

//数据发送成功,提交

session.commit();

System.out.println("消息发送完毕");

} catch (Exception e){

//出现异常,回滚

System.out.println("消息发送异常,回滚");

session.rollback();

} finally {

//关闭资源

if(messageProducer != null){

messageProducer.close();

}

if(session != null){

session.close();

}

if(connection != null){

connection.close();

}

}

}

}

消费者

public class JmsConsumerQueueTransaction {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String QUEUE_NAME = "queue_transacton";

public static void main(String[] args) throws JMSException {

System.out.println("消费者:fzk");

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//将第一个参数设为true --> 开启事务

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);

MessageConsumer messageConsumer = session.createConsumer(queue);

Message message = messageConsumer.receive();

while(message != null){

try {

TextMessage textMessage = (TextMessage) message;

System.out.println(textMessage.getText());

//消费成功,提交

session.commit();

message = messageConsumer.receive(4000L);

} catch (Exception e){

System.out.println("出现异常,消费回滚");

//消费异常,回滚

session.rollback();

}

}

messageConsumer.close();

session.close();

connection.close();

}

}

签收(Acknowledge)

自动签收(默认):Session.AUTO_ACKNOWLEDGE

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 手动签收:Session.CLIENT_ACKNOWLEDGE

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);客户端调用acknowledge方法手动签收(没有手动签收会重复消费)

例如:textMessage.acknowledge();

非事务情况下使用签收

生产者

public class JmsProduceQueueAcknowledge {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String QUEUE_NAME = "queue_acknowledge";

public static void main(String[] args) throws JMSException {

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//生产者:不开启事务,自动签收

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);

MessageProducer messageProducer = session.createProducer(queue);

for (int i = 0; i < 3; i++) {

TextMessage textMessage = session.createTextMessage("生产:" + (i + 1));

messageProducer.send(textMessage);

}

messageProducer.close();

session.close();

connection.close();

System.out.println("消息发送完毕");

}

}

消费者

public class JmsConsumerQueueAcknowledge {

public static final String ACTIVEMQ_URL = "tcp://192.168.37.130:61616";

public static final String QUEUE_NAME = "queue_acknowledge";

public static void main(String[] args) throws JMSException {

System.out.println("消费者:fzk");

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

Connection connection = activeMQConnectionFactory.createConnection();

connection.start();

//消费者:不开启事务,手动签收

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Queue queue = session.createQueue(QUEUE_NAME);

MessageConsumer messageConsumer = session.createConsumer(queue);

Message message = messageConsumer.receive();

while(message != null){

TextMessage textMessage = (TextMessage) message;

System.out.println(textMessage.getText());

//执行完 手动签收

textMessage.acknowledge();

message = messageConsumer.receive(1000L);

}

messageConsumer.close();

session.close();

connection.close();

}

}

事务情况下使用签收

事务提交后签收将自动为签收(事务提交权限大于签收权限)在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送非事务性会话中,消息何时被确认取决于创建会话是的应答模式(acknowledgement mode)

5、Spring整合ActiveMQ

环境搭建

第一步:创建Maven工程,引入配置(pom.xml)

org.springframework

spring-context

5.2.0.RELEASE

org.springframework

spring-test

5.2.0.RELEASE

test

org.springframework

spring-jdbc

5.2.0.RELEASE

org.springframework

spring-tx

5.2.0.RELEASE

org.hamcrest

hamcrest-core

1.3

junit

junit

4.12

cglib

cglib

2.1_3

org.aspectj

aspectjweaver

1.9.5

org.apache.activemq

activemq-all

5.15.11

org.apache.xbean

xbean-spring

4.15

com.fasterxml.jackson.core

jackson-databind

2.10.4

org.springframework

spring-jms

5.2.0.RELEASE

org.apache.activemq

activemq-pool

5.15.11

第二步:Spring配置文件

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans

https://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context

https://www.springframework.org/schema/context/spring-context.xsd">

队列(Queue)

配置文件(pom.xml)

中的 ref 设置为队列(Queue)

生产者

@Service

public class Produce {

@Autowired

private JmsTemplate jmsTemplate;

public static void main(String[] args) {

ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

Produce produce = (Produce) applicationContext.getBean("produce");

//发送消息

produce.jmsTemplate.send(new MessageCreator() {

public Message createMessage(Session session) throws JMSException {

TextMessage textMessage = session.createTextMessage("生产者-》消费者");

return textMessage;

}

});

System.out.println("发送成功");

}

}

消费者

@Service

public class Consumer {

@Autowired

private JmsTemplate jmsTemplate;

public static void main(String[] args) {

ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");

Consumer consumer = (Consumer) applicationContext.getBean("consumer");

String strMessage = (String) consumer.jmsTemplate.receiveAndConvert();

while (strMessage != null && !"".equals(strMessage)){

System.out.println(strMessage);

strMessage = (String) consumer.jmsTemplate.receiveAndConvert();

}

//接收消息

System.out.println("消费者接收到的消息:" + strMessage);

}

}

主题(Topic)

配置文件(pom.xml)

中的 ref 设置为主题(Topic)

生产者/消费者的程序与队列(Queue)相同

消费者使用监听接收消息

第一步:添加配置文件

第二步:编写一个类来实现消息监听

@Component

public class MyMessageListener implements MessageListener {

public void onMessage(Message message) {

TextMessage textMessage = (TextMessage) message;

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

6、SpringBoot整合ActiveMQ

环境配置

第一步:创建SpringBoot工程,引入依赖

org.springframework.boot

spring-boot-starter-activemq

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-devtools

runtime

true

org.springframework.boot

spring-boot-configuration-processor

true

org.projectlombok

lombok

true

org.springframework.boot

spring-boot-starter-test

test

第二步:编写配置文件(application.yml)

server:

port: 8999

# ActiveMQ 配置

spring:

activemq:

broker-url: tcp://192.168.37.130:61616 # 地址

user: admin # 用户名

password: admin # 密码

jms:

pub-sub-domain: false # 指定连接队列还是主题 false:队列 true:主题 (默认是false)

# 自定义队列名称

myQueue: SpringBoot_Queue

# 自定义主题名称

myTopic: SpringBoot_Topic

队列(Queue)

添加配置文件

# ActiveMQ 配置

spring:

jms:

pub-sub-domain: false # 指定连接队列还是主题 false:队列 true:主题 (默认是false)

# 自定义队列名称

myQueue: SpringBoot_Queue

配置JMS

在类上添加**@EnableJms**注解

@Configuration

@EnableJms //开启 JMS

public class ActiveMQConfig {

//获取配置文件中的 myQueue 字段

@Value("${myQueue}")

private String myQueue;

//配置队列

@Bean

public ActiveMQQueue queue(){

return new ActiveMQQueue(myQueue);

}

}

生产者

@Service

public class ProduceQueue {

@Autowired

private ActiveMQQueue activeMQQueue;

@Autowired

private JmsMessagingTemplate jmsMessagingTemplate;

public void produce(){

//发送消息

jmsMessagingTemplate.convertAndSend(activeMQQueue, UUID.randomUUID().toString());

}

}

消费者

使用SpringBoot的消息监听注解**@JmsListener**

监听过后会随着springboot一起启动,有消息就执行加了该注解的方法

@Service

public class ConsumerQueue {

//监听接收消息

@JmsListener(destination = "${myQueue}") //jms监听注解

public void receive(TextMessage textMessage){

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

主题发布订阅(Topic)

添加配置文件

# ActiveMQ 配置

spring:

jms:

pub-sub-domain: false # 指定连接队列还是主题 false:队列 true:主题 (默认是false)

# 自定义主题名称

myTopic: SpringBoot_Topic

配置JMS

在类上添加**@EnableJms**注解

@Configuration

@EnableJms //开启 JMS

public class ActiveMQConfig {

//获取配置文件中的 myTopic 字段

@Value("${myTopic}")

private String myTopic;

//配置主题

@Bean

public ActiveMQTopic topic(){

return new ActiveMQTopic(myTopic);

}

}

生产者

@Service

public class ProduceQueue {

@Autowired

private ActiveMQTopic activeMQTopic;

@Autowired

private JmsMessagingTemplate jmsMessagingTemplate;

public void produceTopicScheduled(){

jmsMessagingTemplate.convertAndSend(activeMQTopic, UUID.randomUUID().toString());

}

}

消费者

@Service

public class ConsumerQueue {

//监听接收消息

@JmsListener(destination = "${myTopic}") //jms监听注解

public void receive(TextMessage textMessage){

try {

System.out.println(textMessage.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

7、ActiveMQ的传输协议

http://activemq.apache.org/configuring-version-5-transports.html

ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。

配置TransportConnector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内

协议

TCP、NIO、AMQP、Stomp、SSL、MQTT、WS协议

TCP(默认)

1.这是默认的Broker配置,TCP的Client监听端口616162.在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。3.TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。4.TCP传输的的优点:

TCP协议传输可靠性高,稳定性强高效率:字节流方式传递,效率很高有效性、可用性:应用广泛,支持任何平台 5.关于Transport协议的配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html NIO

1.NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。2.适合使用NIO协议的场景:

可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。 3.NIO连接的URI形式:nio://hostname:port?key=value&key=value4.关于Transport协议的配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html AMQP

Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。 STOP

Streaming Text Orientation Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。 MQTT

(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

配置NIO

在ActiveMQ安装目录的conf/activemq.xml中的标签中添加

配置 NIO增强

让端口既支持NIO网络模型,又让他支持多个协议

方法:使用auto关键字,使用"+"符号来为端口设置多种特性

8、ActiveMQ的消息存储和持久化

官网:http://activemq.apache.org/persistence为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。ActiveMQ消息持久化机制有:

AMQ 基于日志文件KahaDB 基于日志文件,从ActiveMQ5.4开始默认使用JDBC 基于第三方数据库Replicated LevelDB Store 从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。

KahaDB消息存储(默认)

官网:http://activemq.aache.org/kahadb

基于日志文件,从ActiveMQ5.4开始默认的持久化插件

KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。

消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。

数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

JDBC存储消息

官网:http://activemq.apache.org/persistence点对点

当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。而且点对点类型中消息一旦被Consumer消费,就从数据中删除 发布/订阅

设置了持久订阅数据库里面会保存订阅者的信息

步骤

第一步:添加mysql数据库的驱动包到lib文件夹 (mysql-connector-java-8.0.17.jar)

第二步:jdbcPersistenceAdapter配置

第三步:数据库连接池配置

第四步:SQL建库和创表

数据库:建一个名为activemq的数据库

如果新建数据库ok,上述配置ok,启动项目ok,3张表会自动生成

如果表没生成,可能需要自己创建

ACTIVEMQ_MSGS

create table ACTIVEMQ_MSGS

(

ID bigint not null

primary key,

CONTAINER varchar(250) not null,

MSGID_PROD varchar(250) null,

MSGID_SEQ bigint null,

EXPIRATION bigint null,

MSG blob null,

PRIORITY bigint null,

XID varchar(250) null

);

ACTIVEMQ_ACKS

create table ACTIVEMQ_ACKS

(

CONTAINER varchar(250) not null comment '消息的Destination',

SUB_DEST varchar(250) null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',

CLIENT_ID varchar(250) not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',

SUB_NAME varchar(250) not null comment '订阅者名称',

SELECTOR varchar(250) null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',

LAST_ACKED_ID bigint null comment '记录消费过消息的ID',

PRIORITY bigint default 5 not null comment '优先级,默认5',

XID varchar(250) null,

primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)

);

ACTIVEMQ_LOCK

create table ACTIVEMQ_LOCK

(

ID bigint not null

primary key,

TIME bigint null,

BROKER_NAME varchar(250) null

);

第五步:代码运行验证

一定要开启持久化:messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);第六步 第六步:数据库情况

总结

队列(Queue)

在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者消费了,就会删除消费过的消息 主题(Topic)

一般是先启动消费订阅者然后再生产的情况下会将持久订阅者永久保存到qctivemq_acks,而消息则永久保存在activemq_msgs,在acks表中的订阅者有一个last_ack_id对应了activemq_msgs中的id字段,这样就知道订阅者最后收到的消息是哪一条。

JDBC Message store with ActiveMQ Journal

ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库JDBC是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用

配置

配置和JDBC相同

替换成下面的

journalLogFiles="5"

journalLogFileSize="32768"

useJournal="true"

useQuickJournal="true"

dataSource="#mysql-ds"

dataDirectory="../activemq-data" />

9、ActiveMQ的多节点集群

官网:http://activemq.apache.org/replicated-leveldb-store引入消息中间件后如何保证其高可用基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障

zookeeper+replicated-leveldb-store的主从集群部署(待更新)

待更新

相关新闻

如何将 iPhone 照片传到电脑?5 个方法传到 Mac 或 Windows 电脑上!
也许是工作型文件命名的终极参考规范
365黑道老大免费观看第一季在线

也许是工作型文件命名的终极参考规范

🕒 07-13 👽 7521
问道手游仙阳剑坐骑好用吗 仙阳剑属性技能详解
狗狗睡眠时间是多少?如何判断你的狗狗是否睡得够?
SPU和SKU的定义区别(SKU和SPU在电商中的含义)
office365怎么登陆

SPU和SKU的定义区别(SKU和SPU在电商中的含义)

🕒 07-01 👽 1088
百度雲可以保存多久?文件保質期和影響因素詳解