RabbitMQ-笔记
四大核心概念
- 生产者:产生数据发送消息的程序是生产者
- 交换机:交换机是rabbitmq非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或是把消息丢弃,这个要交换机类型决定
- 队列:队列是rabbitmq内部使用的一种数据机构,尽管消息流经rabbitmq和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
- 消费者:消费与接收具有相同的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又可以是消费者
六大模式
- 简单模式
- 工作模式
- 发布订阅模式
- 路由模式
- 主题模式
- rpc模式
名词解释
- broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
- virtual host:出于多租户和安全因素设计的,把amqp的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
- connection:publisher/consumer和broker之间的tcp连接
- channel:如果每一次访问RabbitMq都建立一个connection,在消息量大的时候开销是巨大的,效率也较低。channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通信,amqp method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。channel作为轻量级的connection极大减少了操作系统建立tcp connection的开销
- exchange:message到达broker的第一站,根据分发规则匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point)、topic(publish-subscribe)和fanout(multicast)
- queue:消息最终被送到这里等待consumer取走
- binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,bingding信息被保存到exchange中查询表中,用于message的分发依据
权限
创建用户
rabbitmqctl add_user 账号 密码
设置用户角色
rabbitmqctl set_user_tags 账号 administrator
设置用户权限
set_permissions [-p<vhostpath>]<user><conf><write><read>
rabbitmqctl set_permissions -p "/" 账号 ".*" ".*" ".*"
查看当前用户
rabbitmqctl list_users
简单队列
生产者
/**
* 生产者
* 发消息
*/
public class Producer {
//队列
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//工厂ip,连接mq队列
factory.setHost("ip地址");
//用户名
factory.setUsername("账号");
//密码
factory.setPassword("密码");
//创建连接
Connection connection = factory.newConnection();
//获取信道
Channel channel = connection.createChannel();
/**
* 创建队列
* 参数
* 队列名称
* 队列里的消息是否持久化(默认存储在内存中,持久化指磁盘)
* 该队列是否只供一个消费者进行消费,是否进行消息共享,true允许多个消费者消费
* 是否自动删除,最后一个消费者断开链接后,该队列是否自动删除,true自动删除
* 其它参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 发送消息
* 参数
* 发送到哪个交换机
* 路由的key值
* 其它参数
* 消息
*/
channel.basicPublish("",QUEUE_NAME,null,"hello world".getBytes());
}
}
消费者
/**
* 消费者
* 接收消息
*/
public class Consumer {
//队列
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip地址");
factory.setUsername("账号");
factory.setPassword("密码");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//接收消息
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
//取消消息
CancelCallback cancelCallback = consumerTag->{
System.out.println("消费消息——被中断");
};
/**
* 消费消息
* 参数
* 消费哪个队列
* 消费成功后是否自动应答 true代表自动应答
* 消费者未成功消费回调
* 消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作队列
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
一个消息只能被处理一次,多个工作进程(消费者)之间是竞争的关系
/**
* 生产者
* 发送大量消息
*/
public class Task {
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
//队列的声明
channel.queueDeclare(RabbitMQUtil.QUEUE_NAME,false,false,false,null);
//发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
channel.basicPublish("",RabbitMQUtil.QUEUE_NAME,null,next.getBytes());
System.out.println("发送消息完成:"+next);
}
}
}
/**
* 工作线程
* 相当于消费者
*/
public class Worker {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//消息接收
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("工作线程---"+new String(message.getBody()));
};
//消息被取消
CancelCallback cancelCallback = (consumerTag)->{
System.out.println("工作线程---消息取消:"+consumerTag);
};
//消息接收
channel.basicConsume(RabbitMQUtil.QUEUE_NAME,false,deliverCallback,cancelCallback);
}
}
可以使用idea的Allow parallel run 实现多个工作线程
会发现消息以轮询的方式发给各个工作线程
消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅完成了部分突然它挂掉了,会发生什么情况。rabbitmq一旦向消费者传递了一条消息,便立即将消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了
自动应答
消息发送后立刻被认为已经成功传输,这种模式需要在高吞吐量和数据传输安全性上权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息的数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速度能处理这些消息的情况下使用
手动应答的方法
- basicAck:肯定应答,成功处理消息,可以将其丢弃
- basicNack:否定应答,不处理消息,可以将其丢弃
- basicReject:否定应答,比basicNack少一个批量应答参数,不处理消息,可以将其丢弃
批量应答
- true:代表批量应答channel上未应答的消息。比如channel上有传送tag的消息5、6、7,当前tag是7那么此时5、6的这些还未应答的消息都会被确认收到消息应答
- false:同上面相比,只会应答8的消息,5、6依然不会被确认收到消息应答
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已经关闭,连接已关闭或tcp连接丢失),导致消息未发送ack确认,rabbitmq将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者死亡,也可以保证不会丢失消息
示例
/**
* 消费者
*/
public class Worker {
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道
Channel channel = RabbitMQUtil.getChannel();
System.out.println("消费者等待处理");
//接收消息,采用手动应答
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("工作线程1接收:" + new String(message.getBody()));
/**
* 手动应答
* 参数
* 消息标记
* 是否批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
channel.basicConsume(RabbitMQUtil.QUEUE_NAME,false,deliverCallback,consumerTag->{});
}
}
持久化
如何保证rabbitmq服务停掉以后消息生产者发送过来的消息不丢失。默认情况下rabbitmq退出或者由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:需要将队列和消息都标记为持久化
队列持久化
持久化后可以在控制页面看到队列信息的Features字段为D
,这个时候即使重启rabbit队列依然存在
代码
//声明队列时,第二个参数设置为true
channel.queueDeclare(RabbitMQUtil.QUEUE_NAME,true,false,false,null);
如果之前已经创建的非持久化队列后再修改,会抛出异常
需要把原本队列先删除,或创建一个新的
这样重启服务后队列依然存在,但是队列里的消息就不存在了
消息持久化
生产者在发送消息时,通知队列消息需要持久化
//发送消息时,第三个参数设置为MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("",RabbitMQUtil.QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("utf-8"));
将消息标记为持久化并不能完全保证不会丢失消息
不公平分发
rabbitmq默认采用轮询分发,但是在某种场景下这种策略并不是很好,比如说有两个消费者,其中一个处理速度非常快,而另一个非常慢,这个时候我们如果还是采用轮询分发,会出现快的长时间空闲,慢的一直在干活,这种情况就不太好
接收消息时要求消费放对信道进行设置
//设置为1为不公平分发,0和不设置为轮询分发
channel.basicQos(1);
设置完成后控制页面的channels里的Prefetch count为1
发布确认
持久化时,就是设置队列和消息持久化也不能保证数据安全,因为可能在数据持久化到磁盘上前发生问题,这时就需要使用发布确认,在生产者向mq发布消息之后,需要mq持久化到磁盘之后,再通知生产者完成发布
开启
在生产者中开启,默认是不开启的
//开启发布确认
channel.confirmSelect();
单个发布确认
这是一种同步确认发布的方式
缺点:发布速度特别的慢
Channel channel = RabbitMQUtil.getChannel();
//声明队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发布
for (int i=0; i<count; i++){
String mag = i+"";
channel.basicPublish("",queueName,null,mag.getBytes());
//发布确认,等候确认
boolean b = channel.waitForConfirms();
if (b){
System.out.println("消息发送成功");
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("耗时"+(end-begin));//18894
批量发布确认
缺点:当发生故障时,不知道是哪个消息出现的问题
Channel channel = RabbitMQUtil.getChannel();
//声明队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量确认大小
int batchSize = 100;
//批量发送消息,批量确认
for (int i = 1; i <= count; i++) {
String mag = i+"";
channel.basicPublish("",queueName,null,mag.getBytes());
if (i%batchSize==0){
System.out.println("确认发布");
//确认发布
channel.waitForConfirms();
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("耗时"+(end-begin)); //425
异步确认发布
效率高,性能好,并且利用回调函数来达到消息的可靠性传递
Channel channel = RabbitMQUtil.getChannel();
//声明队列
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,false,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//准备消息监听器
//成功回调,参数 1消息的标记、2是否批量
ConfirmCallback ackCallback = (deliveryTag, multiple)->{
System.out.println("确认消息:"+deliveryTag);
};
//失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple)->{
System.out.println("未确认消息:"+deliveryTag);
};
//添加监听器(异步通知)
channel.addConfirmListener(ackCallback,nackCallback);
//批量发送
for (int i = 0; i < count; i++) {
String mag = i + "";
channel.basicPublish("",queueName,null,mag.getBytes());
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("耗时"+(end-begin)); //116
交换机exchange
rabbitmq消息传递的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者都不知道这些消息传递到了哪些队列中
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是许多队列,或者丢弃他们。这就由交换机的类型来决定
类型
- 直接(direct):路由类型
- 主题(topic)
- 标题(headers)
- 扇出(fanout):发布订阅类型
- 无名:默认交换机,用户
""
进行标识
临时队列
就是没有持久化的队列
每当我们连接到rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除
创建临时队列
String queueName = channel.queueDeclare().getQueue();
创建好的临时队列控制页面里可以看到,队列的Features为AD Exd
绑定bindings
binding其实是exchange和queue直接的桥梁,它告诉我们exchange和那个队列进行了绑定
发布订阅模式fanout
它是将收到的所有消息广播到它知道的所有队列中
/**
* 发布信息
*/
public class EmitLog {
//交换机的名称
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
//发送消息,设置交换机 不需要设置路由key
channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes());
System.out.println("发送消息:"+next);
}
}
}
/**
* 接收消息
*/
public class ReceiveLogs {
//交换机名称
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机参数 名称、类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明队列(临时),队列名称是随机的,断开连接的时候,队列自动删除
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列 参数 队列名、交换机名、路由key
channel.queueBind(queueName,EXCHANGE_NAME,"");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收消息:"+new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,(consumerTag)->{});
}
}
路由模式direct
和发布订阅模式的唯一区别在于绑定具有路由key,这样就可以将消息发送到指定的一个或多个队列中
/**
* 提供者
*/
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String next = scanner.next();
channel.basicPublish(EXCHANGE_NAME,next,null,next.getBytes());
System.out.println("生产者发送消息:"+next);
}
}
}
/**
* 消费者
*/
public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static final String QUEUE_NAME = "console";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
//多重绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
//接收
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("console接收消息:"+new String(message.getBody()));
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,(consumerTag)->{});
}
}
主题模式topics
topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号隔开
这些单词可以是任意单词,比如说:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。当然这个单词最多不能超过255个字节
替换符:
- *:可以代替一个单词
- #:可以代替0个或多个单词
当一个队列路由key是
#
,那么这个队列将接收所有数据,就有点像fanout了如果队列路由key中没有
#
和*
,那么该队列绑定类型就是direct了
/**
* 消费者1
*/
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static final String QUEUE_NAME = "q1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
//接受
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("q1接收消息:"+new String(message.getBody())+" 路由key:"+message.getEnvelope().getRoutingKey());
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});
}
}
死信队列
死信,顾名思义就是无法被消费的消息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就变成了死信,有死信自然就有了死信队列
应用场景:为了保证订单业务的消息数据不丢失,需要使用到rabbitmq的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,比如,用户下单成功并点击支付后,在指定时间未支付自动失败
死信的来源
- 消息ttl过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
- 消息被拒绝(basicReject或者basicNack)并且requeue=false(不放回队列中)
普通队列消费者
/**
* 消费者1
*/
public class Consumer01 {
//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机名称
public static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机死信和普通
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
Map<String,Object> arguments = new HashMap<>();
//过期时间(单位毫秒)
//arguments.put("x-message-ttl",10000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信路由key
arguments.put("x-dead-letter-routing-key","lisi");
//设置正常队列的长度限制
//arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
//System.out.println("接收消息:"+new String(message.getBody()));
String mag = new String(message.getBody());
if ("info5".equals(mag)){
System.out.println("拒收消息");
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("接收消息:"+mag);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag->{});
}
}
死信队列消费者
/**
* 消费者2
*/
public class Consumer02 {
//死信队列名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("接收消息:"+new String(message.getBody()));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag->{});
}
}
消息生产者
/**
* 生产者
*/
public class Producer {
//普通交换机名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//发送死信消息,设置ttl
//设置参数
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
//.expiration("10000")
.build();
for (int i = 1; i < 11; i++) {
String mag = "info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,mag.getBytes());
}
}
}
延迟队列
延迟队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的元素是希望在指定时间到了之后或之前取出和处理,简单来说,延迟队列就是用来存放需要在指定时间被处理的元素队列
延迟队列本质是死信队列的消息过期
使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
- 用户注册成功后,如果三体内没有登录则进行短信提醒
- 用户发起退款,如果三天内没有得到处理则通知运营人员
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会员
整合springboot实现延迟队列
配置文件
spring:
rabbitmq:
host: ip地址
port: 端口号
username: 账号
password: 密码
配置类
/**
* ttl队列 配置文件
*/
@Configuration
public class TtlQueueConfig {
//普通交换机名称
public static final String X_EXCHANGE = "X";
//死信交换机名称
public static final String Y_DEAD_EXCHANGE = "Y";
//普通队列名称
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String QUEUE_C = "QC";
//死信队列名称
public static final String DEAD_QUEUE_D = "QD";
/**
* x交换机
* @return
*/
@Bean
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
/**
* y交换机(死信)
* @return
*/
@Bean
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_EXCHANGE);
}
/**
* 普通队列a
* @return
*/
@Bean
public Queue queueA(){
//设置参数
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
//设置路由key
arguments.put("x-dead-letter-routing-key","YD");
//设置过期时间
arguments.put("x-message-ttl",10000);
return QueueBuilder
.durable(QUEUE_A)
.withArguments(arguments)
.build();
}
/**
* 普通队列b
* @return
*/
@Bean
public Queue queueB(){
//设置参数
Map<String, Object> arguments = new HashMap<>();
//设置死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
//设置路由key
arguments.put("x-dead-letter-routing-key","YD");
//设置过期时间
arguments.put("x-message-ttl",40000);
return QueueBuilder
.durable(QUEUE_B)
.withArguments(arguments)
.build();
}
/**
* 死信队列d
* @return
*/
@Bean
public Queue queueD(){
return QueueBuilder
.durable(DEAD_QUEUE_D)
.build();
}
/**
* 队列a绑定交换机x
* @return
*/
@Bean
public Binding queueABindingX(DirectExchange xExchange,Queue queueA){
return BindingBuilder
.bind(queueA)
.to(xExchange)
.with("XA");
}
/**
* 队列b绑定交换机x
* @return
*/
@Bean
public Binding queueBBindingX(DirectExchange xExchange,Queue queueB){
return BindingBuilder
.bind(queueB)
.to(xExchange)
.with("XB");
}
/**
* 队列d绑定交换机y
* @return
*/
@Bean
public Binding queueDBindingY(DirectExchange yExchange,Queue queueD){
return BindingBuilder
.bind(queueD)
.to(yExchange)
.with("YD");
}
/**
* 普通队列c 不设置过期时间,由生产者设置
* @return
*/
@Bean
public Queue queueC(){
Map<String, Object> arguments = new HashMap<>();
//当前队列绑定的死信交换机
arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
//和死信队列的路由key
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder
.durable(QUEUE_C)
.withArguments(arguments)
.build();
}
/**
* 绑定队列c和交换机x
* @param queueC
* @param xExchange
* @return
*/
@Bean
public Binding queueCBindingX(Queue queueC,DirectExchange xExchange){
return BindingBuilder
.bind(queueC)
.to(xExchange)
.with("XC");
}
}
生产者
/**
* 发送延迟消息
*/
@Slf4j
@RestController
@RequestMapping("ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 固定时间
* @param msg
*/
@GetMapping("send/{msg}")
public void sendMsg(@PathVariable String msg){
log.info("当前时间:{},发送一条信息:{}",new Date().toString(),msg);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+msg);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+msg);
}
/**
* 设置时间
* @param msg
* @param ttl
*/
@GetMapping("send/{msg}/{ttl}")
public void sendMsgTtl(@PathVariable String msg,@PathVariable String ttl){
log.info("当前时间:{},发送一条时长{}毫秒,的消息给队列:{}",new Date().toString(),msg,ttl);
//缺点:如果发送多条消息,哪怕后续消息ttl更短,也会在前一条数据接收到后再接收(会排队)
//因为rabbitmq只会检测第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息延迟很长,第二个很短,第二行不会优先执行
//解决这个问题需要安装插件 rabbitmq_delayed_message_exchange,这样会在交换机进行延迟
rabbitTemplate.convertAndSend("X","XC",msg,message->{
message.getMessageProperties().setExpiration(ttl);
return message;
});
}
}
消费者
/**
* ttl消费者
*/
@Component
@Slf4j
public class DeadLetterQueueConsumer {
/**
* 接收消息
* @param message
* @param channel
*/
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
}
}
基于插件方式实现延迟队列
安装插件
将插件rabbitmq_delayed_message_exchange-3.8.0.ez放入/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录下,并执行命令rabbitmq-plugins enable rabbitmq_delayed_message_exchange,最后重启服务systemctl restart rabbitmq-server
重启完服务后,会发现交换机类型多了x-delayed-message
原理:将消息缓存在交换机中
配置类
//队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//路由key
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
/**
* 声明队列
* @return
*/
@Bean
Queue delayedQueue(){
return QueueBuilder
.durable(DELAYED_QUEUE_NAME)
.build();
}
/**
* 声明交换机(自定义)
*/
@Bean
CustomExchange delayedExchange(){
Map<String, Object> arguments = new HashMap<>();
//延迟类型
arguments.put("x-delayed-type","direct");
/**
* 参数
* 1. 交换机名称
* 2. 交换机类型
* 3. 是否持久化
* 4. 是否自动删除
* 5. 参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}
/**
* 绑定延迟队列和延迟交换机
* @param delayedQueue
* @param delayedExchange
* @return
*/
@Bean
Binding queueDelayedBindingDelayedExchange(Queue delayedQueue,CustomExchange delayedExchange){
return BindingBuilder
.bind(delayedQueue)
.to(delayedExchange)
.with(DELAYED_ROUTING_KEY)
.noargs();
}
生产者
/**
* 发送延迟消息
* @param msg
* @param delayedTime
*/
@GetMapping("sendDelayMsg/{msg}/{delayedTime}")
public void sendMsgDelayTtl(@PathVariable String msg,@PathVariable Integer delayedTime){
log.info("当前时间:{},发送一条时长{}毫秒,的消息给延迟队列:{}",new Date().toString(),msg,delayedTime);
//发送消息
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,msg, message->{
//延迟时长 单位ms
message.getMessageProperties().setDelay(delayedTime);
return message;
});
}
消费者
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiverDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到的消息是:{}",new Date().toString(),msg);
}
延迟队列在需要延时的场景下非常有用,使用rabbitmq来实现延时队列可以很好的利用rabbitmq的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过rabbitmq集群的特性,可以很好的解决单点故障的问题,不会因为单个节点挂掉导致延迟队列消息丢失
发布确认高级
在生产环境中由于一些不明原因,导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行rabbitmq的消息可靠投递呢?特别是在这样比较极端的情况,rabbitmq集群不可用的时候,无法投递的消息该如何处理呢?
配置文件
spring:
rabbitmq:
#none:禁用发布确认模式,是默认值
#correlated:异步发布确认,发布消息成功到交换机后会触发回调方法
#simple:同步发布确认
publisher-confirm-type: correlated
自定义回调
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机确认回调方法
* 发消息成功或失败都会回调
* @param correlationData 保存回调消息的id及相关信息
* @param ack 交换机是否受到消息
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("发送消息成功:id为:{}",correlationData.getId());
}else {
log.info("发送消息失败:id为:{},失败原因:{}",correlationData.getId(),cause);
}
}
}
生产者
@RestController
@RequestMapping("producer")
@Slf4j
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("{info}")
public void sendInfo(@PathVariable String info){
log.info("发送消息:{}",info);
//回调接口中使用的CorrelationData对象需要在生产者创建
CorrelationData correlationData = new CorrelationData("1");
//发送消息,参数多了CorrelationData
rabbitTemplate.convertAndSend(RabbitConfigConfirm.CONFIRM_EXCHANGE_NAME,RabbitConfigConfirm.CONFIRM_ROUTING_KEY,info,correlationData);
}
}
注意:这种方式只能判断交换机是否接收到消息,如果队列或路由发生问题依然会回调成功
回退消息
Mandatory参数,在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢失这个事件的。通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者
配置文件
spring:
rabbitmq:
#none:禁用发布确认模式,是默认值
#correlated:异步发布确认,发布消息成功到交换机后会触发回调方法
#simple:同步发布确认
publisher-confirm-type: correlated
# 开启回退消息
publisher-returns: true
自定义回调
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确认回调方法
* 发消息成功或失败都会回调
* @param correlationData 保存回调消息的id及相关信息
* @param ack 交换机是否受到消息
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
log.info("发送消息成功:id为:{}",correlationData.getId());
}else {
log.info("发送消息失败:id为:{},失败原因:{}",correlationData.getId(),cause);
}
}
/**
* 回退方法
* 可以在当前消息传递过程中不可达目的地的时将消息返给生产者
* @param returned
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("消息{},被交换机{},退回,原因:{},路由key:{}",
new String(returned.getMessage().getBody()),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey());
}
}
备份交换机
在原本交换机上添加配置
@Bean
Exchange confirmExchange(){
return ExchangeBuilder
.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//指向备份交换机,参数为备份交换机名称
.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME)
.build();
}
当发生交换机无法将消息路由到队列时,会把消息备份到备份交换机,可以通过消费备份交换机中的消息进行异常处理
如果mandatory参数和备份交换机一起使用,备份交换机的优先级更高
注意:消息未到达交换机发生异常,无法进行处理
幂等性
用户对于同一操作发起的一次或多次请求的结果是一致的,不会因为多次点击而产生副作用
简单说就是消息被重复消费了
解决思路:一般使用全局id,uuid或时间戳,在消费前利用该属性判断是否已经被消费
唯一id+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是系统生成的,基本都是由我们的业务规则拼接来的,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能
redis原子性
利用redis执行setnx命令,天然具有幂等性。从而实现不重复消费
优先级队列
队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
配置文件
@Bean
Queue queueA(){
return QueueBuilder
.durable("qa")
//优先级取值范围0-10(最大为0-255)
.withArgument("x-max-priority",20)
.build();
}
消息生产者
for (int i = 0; i <20 ; i++) {
int priority = i;
rabbitTemplate.convertAndSend("ea","aa",info+i,message -> {
//数字越大优先级越大,越先被执行
message.getMessageProperties().setPriority(priority);
return message;
});
}
惰性队列
rabbitmq从3.6.0版本开始引入惰性队列概念,惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因而致长时间内不能消费消息造成堆积时,惰性队列就很有必要了
通过设置x-queue-mode
为lazy
,切换队列模式为惰性队列
惰性队列内存开销更小,但性能更差
镜像队列
在集群状态下,队列的消息并不会保存在其它节点中,当mq宕机重启后,就有可能会丢失消息,此时需要将消息保存到一个或多个其它节点中
设置:
- 在控制页面的admin中,找到Policies
- Add / update a policy
- name策略名称,随意
- pattern:正则表达式,表示什么队列会进行镜像
- apply to:范围
- definition:参数
- add/update policy添加策略
添加后mq集群会自动选择节点备份消息
即使集群中只剩一台机器也可以正常工作