RabbitMQ的死信队列和延时队列
RabbitMQ的死信队列和延时队列
1. 死信队列&死信交换器:
DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。
说实在的,死信队列和普通队列没啥区别,都需要自己创建Queue、Exchange,然后通过RoutingKey绑定到Exchange上。只不过死信队列的RoutingKey和Exchange要作为参数,绑定到正常的队列上。
2. 死信消息:
- (1)消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
- (2)消息过期了
- (3)队列达到最大的长度
3. 过期消息:
在 rabbitmq 中存在2种方法可设置消息的过期时间:
- 第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间;
- 第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。
/**
* 模拟下单超过x分钟没有支付则取消订单
* 这里2分钟
* <p>
* 创建订单后发送一条消息:
* <pre>
* amqpTemplate.convertAndSend("order-event-exchange", "order.create.order", msg);
* </pre>
*
* @author ruan
*/
@Component
public class DeadQueueTest {
/**
* 死信队列
*
* @return
*/
@Bean
public Queue orderDelayQueue() {
return QueueBuilder
//持久化;名字
.durable("order.delay.queue")
//ttl 2分钟;
.ttl(1000 * 60 * 2)
//2分钟后(死信)会到那个交换机;
.deadLetterExchange("order-event-exchange")
//死信后到交换机的key
.deadLetterRoutingKey("order.release.order")
.build();
}
/**
* 普通队列
* (取消订单)用于处理死信
*
* @return
*/
@Bean
public Queue orderReleaseQueue() {
return QueueBuilder.durable("order.release.order.queue").build();
}
/**
* TopicExchange
*
* @return
*/
@Bean
public Exchange orderEventExchange() {
return ExchangeBuilder.topicExchange("order-event-exchange").build();
}
/**
* 把死信队列和交换机绑定切来,他们的之间的key:order.create.order
*
* @return
*/
@Bean
public Binding orderCreateBinding(@Qualifier("orderDelayQueue") Queue orderDelayQueue, Exchange orderEventExchange) {
return BindingBuilder.bind(orderDelayQueue)
//绑定交换机
.to(orderEventExchange)
//路由key
.with("order.create.order")
.noargs();
}
/**
* 把处理私信的队列和交换机绑定
*
* @return
*/
@Bean
public Binding orderReleaseBinding(@Qualifier("orderReleaseQueue") Queue orderReleaseQueue, Exchange orderEventExchange) {
return BindingBuilder
.bind(orderReleaseQueue)
.to(orderEventExchange)
//和交换机路由key ;注意这里的key 和死信队列里面deadLetterRoutingKey设置的一样
.with("order.release.order")
.noargs();
}
@RabbitListener(queues = "order.release.order.queue")
public void releaseOrder(Message<Object> message, Channel channel) {
// 1. 收到消息以后进行业务端消费处理
System.err.println("-----------------------");
System.err.println("releaseOrder:" + message.getPayload());
System.err.println("接收时间:" + LocalDateTime.now());
}
//发送消息
amqpTemplate.convertAndSend("order-event-exchange", "order.create.order", MQMsg.build("这是一条延时消息!"));
上面例子中是直接为队列设置的ttl ,也可以针对消息设置
amqpTemplate.convertAndSend("order-event-exchange", "order.create.order", MQMsg.build("这是一条延时消息!"), message -> {
message.getMessageProperties().setExpiration("130000");
return message;
});
资料:
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。