0%

RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息队列中间件,由Erlang语言编写而成,负责接收和发送消息。

使用原因

在基于Java平台之间传递消息,使用JMS就足够了,但如果是不同的厂商或平台,AMQP就派上了用场。
主要有三大特性:

  • 解耦
  • 异步
  • 削峰

安装Erlang

在windows上安装测试RabbitMQ,首先安装Erlang,并配置环境变量。

安装RabbitMQ

官网下载响应版本的压缩包,解压。

在sbin下执行命令,开启RabbitMQ服务:

1
./rabbitmq-server

开启RabbitMQ管理工具:

1
./rabbitmq-plugins enable rabbitmq_management

关闭RabbitMQ服务:

1
./rabbitmqctl stop

端口占用

  • 默认端口5672用于客户端调用。
  • 默认http://localhost:15672用于后台管理系统。

准备依赖jar包

Spring Boot项目,直接增加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

基础概念

发送消息的是生产者,接收消息的是消费者,并且生产者、消费者、中间件没有必要在同一台主机上。

ConnectionFactory(连接工厂)

ConnectionFactory用于创建Connection。

1
2
3
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1" , 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection(连接)

Connection接口是一条TCP连接,表示应用程序和消息服务器之间的连接。

1
Connection connection = connectionFactory.createConnection();

Channel(信道)

tcp连接的建立需要经过三次握手,连接的释放需要四次挥手,开销昂贵,影响系统性能。所以需要信道,channel是在Connection中的一个虚拟连接,可以多次使用,但每个线程需要一个单独的channel,负责发送AMQP指令。
创建不开启事务的Channel:

1
channel = connection.createChannel(false);

Queue(队列)

队列中存储还未被消费的信息,是一个巨大的消息缓存区,多个生产者可以发送消息到一个队列,多个消费者也可以从一个队列接收消息。
创建一个指定名称的队列,自动绑定到一个空的Exchange,绑定的routing key和队列名相同:

1
channel.queueDeclare("hello",false,false,false,null);

Exchange(交换机)

有四种Direct exchangeFanout exchangeTopic exchangeHeaders exchange

  • Direct exchange:默认exchange,使用时不需要指定routing key的名字,创建的Queue都有一个默认的routing key。循环发放给不同的消费者。
  • Fanout exchange:忽略routing key的存在,将message广播到所有的Queue中。
  • Topic exchange:根据匹配routing key到不同格式,将message发送到一个或者多个Queue中。
  • Headers exchange:根据message的头信息,分发过滤Message。

默认exchange发送消息:

1
channel.basicPublish("", "hello",null,messageA.getBytes());

fanout exchange发送消息,a、b队列同时收到消息

1
2
3
4
5
6
7
String messageA = "ni hao";
channel.exchangeDeclare("ex","fanout");
channel.queueDeclare("a",false,false,false,null);
channel.queueDeclare("b",false,false,false,null);
channel.queueBind("a","ex","");
channel.queueBind("b","ex","");
channel.basicPublish("ex", "",null,messageA.getBytes());

topic exchange发送消息,满足条件的a和b队列同时收到消息。

1
2
3
4
5
6
7
8
9
String messageA = "ni hao";
channel.exchangeDeclare("ex","topic");
channel.queueDeclare("a",false,false,false,null);
channel.queueDeclare("b",false,false,false,null);
channel.queueDeclare("c",false,false,false,null);
channel.queueBind("a","ex","a.yuan.*");
channel.queueBind("b","ex","*.yuan.b");
channel.queueBind("c","ex","uana.b*");
channel.basicPublish("ex", "a.yuan.b",null,messageA.getBytes());

Binding(绑定)

  • 队列使用routing key绑定到exchange
  • 消息根据routing key被发送到exchange,然后exchange分发消息到队列。

spring boot

Spring提供了方便的连接管理(CachingConnectionFactory),消息发布(RabbitTemplate),消息消费(RabbitListener)等工具类。

配置文件

application.yml

1
2
3
4
5
6
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
user: guest
password: guest

配置一个队列

配置自定义队列

1
2
3
4
5
6
7
8
@Configuration
public class Config {

@Bean
public Queue myQueue() {
return new Queue("myQueue", false);
}
}

发送测试

使用rabbitTemplate直接发送

1
2
3
4
5
6
7
8
9
10
11
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testRabbit(){
rabbitTemplate.convertAndSend("myQueue","hhhhhhhhhhello");
}
}

信息消费

通过@RabbitListener注解,实现信息的消费。

1
2
3
4
5
6
7
8
@Component
public class MessageHandler {

@RabbitListener(queues = "myQueue")
public void handleMessage(String in){
System.out.println("Message read from myQueue : " + in);
}
}

spring boot使用topic exchange

Spring AMQP提供了Declarables,可以使队列、交换机、绑定三者放在一起。

配置Declarables

配置两个队列在一个exchange上,两个队列分别有自己的绑定格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public Declarables topicBindings() {
Queue topicQueue1 = new Queue("a", false);
Queue topicQueue2 = new Queue("b", false);

TopicExchange topicExchange = new TopicExchange("ex");

return new Declarables(
topicQueue1,
topicQueue2,
topicExchange,
BindingBuilder
.bind(topicQueue1)
.to(topicExchange).with("*.important.*"),
BindingBuilder
.bind(topicQueue2)
.to(topicExchange).with("#.error"));
}

发送测试

代码如下:

1
2
3
4
5
@Test
public void sendRabbit(){
rabbitTemplate.convertAndSend("topicEx","successs","ok");
rabbitTemplate.convertAndSend("topicEx","y.error","occur wrong");
}

结果b队列中添加了一条”occur wrong”的字符串数据。