延时任务的实现方法

一、Java延迟队列实现

该方案是利用 JDK 自带的 DelayQueue 来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入 DelayQueue 中的对象,是必须实现 Delayed 接口的。

DelayedQueue 实现工作流程如下图所示

DelayedQueue 实现工作流程

其中Poll():获取并移除队列的超时元素,没有则返回空

take():获取并移除队列的超时元素,如果没有则 wait 当前线程,直到有元素满足超时条件,返回结果。

1、举例订单取消实现:

使用延时队列DelayQueue实现订单超时关闭

2、总结

优点

  • 基于jvm内存,效率高,任务触发时间延迟低

缺点

  • 存在jvm内存中,服务器重启后,数据全部丢失
  • 依赖代码硬编码,集群扩展麻烦
  • 依赖jvm内存,如果订单量过大,无界队列内容扩充,容易出现OOM
  • 需要代码实现,多线程处理业务,复杂度较高
  • 多线程处理时,数据频繁触发等待和唤醒,多了无谓的竞争

3, 这里说一种spring中常用的方式:

使用注解@EnableScheduling开启;然后在需要的地方注入bean TaskScheduler 即可

 taskScheduler.schedule(() -> {
            System.out.println("延迟60秒执行");
        }, Instant.now().plusSeconds(60));

这里是spring 封装了jdk 中的 ScheduledThreadPoolExecutor

优缺点同上,存在jvm内存中,服务器重启后,数据全部丢失。

二、时间轮算法

时间轮算法具体参考资料。有很多工具都有实现:netty、hutool 等等。这里以netty来演示

/**
 * @author ruan
 */
public class HashedWheelTimerTest {
    static class MyTimerTask implements TimerTask {
        boolean flag;

        public MyTimerTask(boolean flag) {
            this.flag = flag;
        }

        public void run(Timeout timeout) {
            System.out.println("要去数据库删除订单了。。。。");
            this.flag = false;
        }
    }

    public static void main(String[] argv) {
        MyTimerTask timerTask = new MyTimerTask(true);
        Timer timer = new HashedWheelTimer();
        //5秒后执行
        timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);

        int i = 1;
        while (timerTask.flag) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(i + "秒过去了");
            i++;
        }
    }

}

(# 资料)

三、redis实现

思路一、利用zet

利用 redis 的 zset,zset 是一个有序集合,每一个元素(member)都关联了一个 score,通过 score 排序来取集合中的值

添加元素:

ZADD key score member [[score member] [score member] …]

按顺序查询元素:

ZRANGE key start stop [WITHSCORES]

查询元素 score:

ZSCORE key member

移除元素:

ZREM key member [member …]

测试如下:

# 添加单个元素
redis> ZADD page_rank 10 google.com
(integer) 1

# 添加多个元素
redis> ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2
redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"

# 查询元素的score值
redis> ZSCORE page_rank bing.com
"8"

# 移除单个元素
redis> ZREM page_rank google.com
(integer) 1
redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"

那么如何实现呢?我们将订单超时时间戳与订单号分别设置为 score 和 member,系统扫描第一个元素判断是否超时,具体如下图所示:

zset-延迟任务

代码实现:

import java.util.Calendar;
import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

public class AppTest {
    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
    public static Jedis getJedis() {
       return jedisPool.getResource();
    }

    //生产者,生成5个订单放进去
    public void productionDelayMessage(){
        for(int i=0;i<5;i++){
            //延迟3秒
            Calendar cal1 = Calendar.getInstance();
            cal1.add(Calendar.SECOND, 3);
            int second3later = (int) (cal1.getTimeInMillis() / 1000);
            AppTest.getJedis().zadd("OrderId",second3later,"OID0000001"+i);
            System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);
        }
    }

    //消费者,取订单
    public void consumerDelayMessage(){
        Jedis jedis = AppTest.getJedis();
        while(true){
            Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);
            if(items == null || items.isEmpty()){
                System.out.println("当前没有等待的任务");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                continue;
            }

            int  score = (int) ((Tuple)items.toArray()[0]).getScore();
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if(nowSecond >= score){
                String orderId = ((Tuple)items.toArray()[0]).getElement();
                Long num = jedis.zrem("OrderId", orderId);
                if( num != null && num>0){
                    System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
            }
        }
        }
    }

    public static void main(String[] args) {
        AppTest appTest =new AppTest();
        appTest.productionDelayMessage();
        appTest.consumerDelayMessage();
    }
}

思路二、监听key过期

  • 利用redis的notify-keyspace-events,该选项默认为空,改为Ex开启过期事件,配置消息监听。每下一单在redis中放置一个key(如订单id),并设置过期时间

具体实现看文章在程序中监听Redis Key 过期

总结:

优点

  • 消息都存储在Redis中,不占用应用内存
  • 外部redis存储,应用down机不会丢失数据
  • 做集群扩展相当方便
  • 依赖redis超时,时间准确度高

缺点

  • 订单量大时,每一单都要存储redis内存,需要大量redis服务器资源

四、消息队列

1,rabbitmq

看本站文章

2、RocketMQ

RocketMQ不支持自定义延时时间设置

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.10.11:9876");
producer.start();

Message msg1 = new Message(JmsConfig.TOPIC, "订单001".getBytes());
msg1.setDelayTimeLevel(2);//延迟5秒

Message msg2 = new Message(JmsConfig.TOPIC,"订单001".getBytes());
msg2.setDelayTimeLevel(4);//延迟30秒

SendResult sendResult1 = producer.send(msg1);
SendResult sendResult2 = producer.send(msg2
producer.shutdown();

RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的18种延迟时间间隔的延迟消息

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

3、总结

优点

  • 可以随时在队列移除,实现实时取消订单,及时恢复订单占用的资源(如商品)
  • 消息存储在mq中,不占用应用服务器资源
  • 异步化处理,一旦处理能力不足,consumer集群可以很方便的扩容

缺点

  • 可能会导致消息大量堆积
  • mq服务器一旦故障重启后,持久化的队列过期时间会被重新计算,造成精度不足
  • 死信消息可能会导致监控系统频繁预

参考资料: