0%

基于JMS的activeMQ

Java消息服务(Java Message Service)是面向消息中间件的API,用于在两个应用程序,或是分布式系统中发送消息,进行异步通信。

JMS提供者

连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

常见的JMS提供者有:

  • Kafka
  • Apache ActiveMQ
  • JBoss HornetQ
  • WebLogic Server JMS

步骤

以activemq为例,JMS的基本步骤如下:

准备依赖jar包

Spring Boot项目,直接增加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

启动activemq

使用activemq.bat启动。

  • 默认端口8161用于自带后台管理系统。
  • 默认端口61616用于java的tcp调用。
  • 默认用户名admin
  • 默认密码admin

ConnectionFactory(连接工厂)

ConnectionFactory接口用于创建Connection。

1
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616");

Connection(连接)

Connection接口表示应用程序和消息服务器之间的连接。

1
2
3
4
5
Connection connection = null;
//获取连接
connection = connectionFactory.createConnection();
//打开连接
connection.start();

Session(会话)

表示一个单线程的上下文,用于发送和接收数据。也允许用户创建消费生产者来发送消息,创建消息消费者接收消息。

1
2
3
Session session = null;
//创建会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination(目标)

消息目标是消息发送和接受的地点,可以是队列,也可以是主题。

1
2
//获取目标,参数是队列名,需提前在activeMQ中创建。
Destination destination = session.createQueue("FirstQueue");

MessageProducer(消息生产者)

由会话创建的对象,为某个目标创建,或创建通用的生产者,在发送时指定目标。

1
2
MessageProducer producer = null;
producer = session.createProducer(destination);

Message(消息)

消费者和生产者之间传送的对象

1
2
3
4
//生成消息
TextMessage textMessage = session.createTextMessage("Helloooooooo");
//发送消息
producer.send(textMessage);

关闭资源

在资源使用完毕后,需要关闭资源。

1
2
3
4
5
6
7
8
9
10
11
finally {
if (connection != null) {
connection.close();
}
if (session != null) {
session.close();
}
if (producer != null) {
producer.close();
}
}

Spring boot集成

JMS按功能大致分为两部分,消息的生产和消费。

配置文件application.yml如下

1
2
3
4
5
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin

JmsTemplate

JmsTemplate类是Spring JMS的核心类。用于消息生产和同步消息的接收。在发送和同步接收消息的时候,处理了资源的创建和释放,简化了JMS的使用。

测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringJMSTest {
@Autowired
private JmsTemplate jmsTemplate;

@Test
public void sendMessage(){
jmsTemplate.send("FirstQueue", session -> session.createTextMessage("Helloooooooo"));
}
}

MessageConverter

MessageConverter接口在Java对象JMS message之间定义了一个简单的协议。默认的实现类SimpleMessageConverter支持以下类型之间的转化:String和TextMessage、byte[]和BytesMesssage、java.util.Map和MapMessage。通过使用MessageConverter,让应用可以更关注于被发送和接收的业务逻辑对象。

测试代码如下:

dto对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Goods implements Serializable {
private static final long serialVersionUID = 5101052610913704018L;
private String name;
private int size;

public Goods(String name, int size) {
this.name = name;
this.size = size;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getSize() {
return size;
}

public void setSize(int size) {
this.size = size;
}

@Override
public String toString() {
return "Goods{" +
"name='" + name + '\'' +
", size=" + size +
'}';
}
}

实现MessageConverter接口

1
2
3
4
5
6
7
8
9
10
11
12
public class GoodsMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, Session session) throws JMSException, MessageConversionException {
return session.createObjectMessage((Serializable) o);
}

@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
ObjectMessage objectMessage = (ObjectMessage)message;
return objectMessage.getObject();
}
}

dto作为信息发送接收

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void sendAndConvertGoods(){
jmsTemplate.setMessageConverter(new GoodsMessageConverter());
Goods tv = new Goods("tv", 75);
jmsTemplate.convertAndSend("FirstQueue", tv);
}

@Test
public void receiveAndConvertGoods(){
jmsTemplate.setMessageConverter(new GoodsMessageConverter());
Goods goods = (Goods) jmsTemplate.receiveAndConvert("FirstQueue");
System.out.println(goods);
}

activeMQ为安全考虑,需要配置信任的dto路径

1
2
3
4
spring:
activemq:
packages:
trust-all: true