RabbitMQ的死信队列和延时队列

1. 死信队列&死信交换器:

DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。

说实在的,死信队列和普通队列没啥区别,都需要自己创建Queue、Exchange,然后通过RoutingKey绑定到Exchange上。只不过死信队列的RoutingKey和Exchange要作为参数,绑定到正常的队列上。

2. 死信消息:

  • (1)消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
  • (2)消息过期了
  • (3)队列达到最大的长度

3. 过期消息:

在 rabbitmq 中存在2种方法可设置消息的过期时间:

  • 第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间;
  • 第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。
/**
 * 模拟下单超过x分钟没有支付则取消订单
 * 这里2分钟
 * <p>
 * 创建订单后发送一条消息:
 * <pre>
 * amqpTemplate.convertAndSend("order-event-exchange", "order.create.order", msg);
 * </pre>
 *
 * @author ruan
 */
@Component
public class DeadQueueTest {

    /**
     * 死信队列
     *
     * @return
     */
    @Bean
    public Queue orderDelayQueue() {
        return QueueBuilder
                //持久化;名字
                .durable("order.delay.queue")
                //ttl 2分钟;
                .ttl(1000 * 60 * 2)
                //2分钟后(死信)会到那个交换机;
                .deadLetterExchange("order-event-exchange")
                //死信后到交换机的key
                .deadLetterRoutingKey("order.release.order")
                .build();
    }

    /**
     * 普通队列
     * (取消订单)用于处理死信
     *
     * @return
     */
    @Bean
    public Queue orderReleaseQueue() {
        return QueueBuilder.durable("order.release.order.queue").build();
    }

    /**
     * TopicExchange
     *
     * @return
     */
    @Bean
    public Exchange orderEventExchange() {
        return ExchangeBuilder.topicExchange("order-event-exchange").build();
    }

    /**
     * 把死信队列和交换机绑定切来,他们的之间的key:order.create.order
     *
     * @return
     */
    @Bean
    public Binding orderCreateBinding(@Qualifier("orderDelayQueue") Queue orderDelayQueue, Exchange orderEventExchange) {
        return BindingBuilder.bind(orderDelayQueue)
                //绑定交换机
                .to(orderEventExchange)
                //路由key
                .with("order.create.order")
                .noargs();
    }

    /**
     * 把处理私信的队列和交换机绑定
     *
     * @return
     */
    @Bean
    public Binding orderReleaseBinding(@Qualifier("orderReleaseQueue") Queue orderReleaseQueue, Exchange orderEventExchange) {
        return BindingBuilder
                .bind(orderReleaseQueue)
                .to(orderEventExchange)
                //和交换机路由key ;注意这里的key 和死信队列里面deadLetterRoutingKey设置的一样
                .with("order.release.order")
                .noargs();
    }

    @RabbitListener(queues = "order.release.order.queue")
    public void releaseOrder(Message<Object> message, Channel channel) {
        //    1. 收到消息以后进行业务端消费处理
        System.err.println("-----------------------");
        System.err.println("releaseOrder:" + message.getPayload());
        System.err.println("接收时间:" + LocalDateTime.now());
    }
//发送消息
amqpTemplate.convertAndSend("order-event-exchange", "order.create.order", MQMsg.build("这是一条延时消息!"));

上面例子中是直接为队列设置的ttl ,也可以针对消息设置

amqpTemplate.convertAndSend("order-event-exchange", "order.create.order", MQMsg.build("这是一条延时消息!"), message -> {
            message.getMessageProperties().setExpiration("130000");
            return message;
        });

资料:

文章目录