延时任务的实现方法
延时任务的实现方法
一、Java延迟队列实现
该方案是利用 JDK 自带的 DelayQueue 来实现,这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入 DelayQueue 中的对象,是必须实现 Delayed 接口的。
DelayedQueue 实现工作流程如下图所示
其中Poll()
:获取并移除队列的超时元素,没有则返回空
take()
:获取并移除队列的超时元素,如果没有则 wait 当前线程,直到有元素满足超时条件,返回结果。
1、举例订单取消实现:
2、总结
优点
- 基于jvm内存,效率高,任务触发时间延迟低
缺点
- 存在jvm内存中,服务器重启后,数据全部丢失
- 依赖代码硬编码,集群扩展麻烦
- 依赖jvm内存,如果订单量过大,无界队列内容扩充,容易出现OOM
- 需要代码实现,多线程处理业务,复杂度较高
- 多线程处理时,数据频繁触发等待和唤醒,多了无谓的竞争
3, 这里说一种spring中常用的方式:
使用注解@EnableScheduling
开启;然后在需要的地方注入bean TaskScheduler
即可
taskScheduler.schedule(() -> {
System.out.println("延迟60秒执行");
}, Instant.now().plusSeconds(60));
这里是spring 封装了jdk 中的 ScheduledThreadPoolExecutor
优缺点同上,存在jvm内存中,服务器重启后,数据全部丢失。
二、时间轮算法
时间轮算法具体参考资料。有很多工具都有实现:netty、hutool 等等。这里以netty来演示
/**
* @author ruan
*/
public class HashedWheelTimerTest {
static class MyTimerTask implements TimerTask {
boolean flag;
public MyTimerTask(boolean flag) {
this.flag = flag;
}
public void run(Timeout timeout) {
System.out.println("要去数据库删除订单了。。。。");
this.flag = false;
}
}
public static void main(String[] argv) {
MyTimerTask timerTask = new MyTimerTask(true);
Timer timer = new HashedWheelTimer();
//5秒后执行
timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);
int i = 1;
while (timerTask.flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i + "秒过去了");
i++;
}
}
}
(# 资料)
三、redis实现
思路一、利用zet
利用 redis 的 zset,zset 是一个有序集合,每一个元素(member)都关联了一个 score,通过 score 排序来取集合中的值
添加元素:
ZADD key score member [[score member] [score member] …]
按顺序查询元素:
ZRANGE key start stop [WITHSCORES]
查询元素 score:
ZSCORE key member
移除元素:
ZREM key member [member …]
测试如下:
# 添加单个元素
redis> ZADD page_rank 10 google.com
(integer) 1
# 添加多个元素
redis> ZADD page_rank 9 baidu.com 8 bing.com
(integer) 2
redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
5) "google.com"
6) "10"
# 查询元素的score值
redis> ZSCORE page_rank bing.com
"8"
# 移除单个元素
redis> ZREM page_rank google.com
(integer) 1
redis> ZRANGE page_rank 0 -1 WITHSCORES
1) "bing.com"
2) "8"
3) "baidu.com"
4) "9"
那么如何实现呢?我们将订单超时时间戳与订单号分别设置为 score 和 member,系统扫描第一个元素判断是否超时,具体如下图所示:
代码实现:
import java.util.Calendar;
import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
public class AppTest {
private static final String ADDR = "127.0.0.1";
private static final int PORT = 6379;
private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
public static Jedis getJedis() {
return jedisPool.getResource();
}
//生产者,生成5个订单放进去
public void productionDelayMessage(){
for(int i=0;i<5;i++){
//延迟3秒
Calendar cal1 = Calendar.getInstance();
cal1.add(Calendar.SECOND, 3);
int second3later = (int) (cal1.getTimeInMillis() / 1000);
AppTest.getJedis().zadd("OrderId",second3later,"OID0000001"+i);
System.out.println(System.currentTimeMillis()+"ms:redis生成了一个订单任务:订单ID为"+"OID0000001"+i);
}
}
//消费者,取订单
public void consumerDelayMessage(){
Jedis jedis = AppTest.getJedis();
while(true){
Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);
if(items == null || items.isEmpty()){
System.out.println("当前没有等待的任务");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
continue;
}
int score = (int) ((Tuple)items.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if(nowSecond >= score){
String orderId = ((Tuple)items.toArray()[0]).getElement();
Long num = jedis.zrem("OrderId", orderId);
if( num != null && num>0){
System.out.println(System.currentTimeMillis()+"ms:redis消费了一个任务:消费的订单OrderId为"+orderId);
}
}
}
}
public static void main(String[] args) {
AppTest appTest =new AppTest();
appTest.productionDelayMessage();
appTest.consumerDelayMessage();
}
}
思路二、监听key过期
- 利用redis的notify-keyspace-events,该选项默认为空,改为Ex开启过期事件,配置消息监听。每下一单在redis中放置一个key(如订单id),并设置过期时间
具体实现看文章: 在程序中监听Redis Key 过期
总结:
优点
- 消息都存储在Redis中,不占用应用内存
- 外部redis存储,应用down机不会丢失数据
- 做集群扩展相当方便
- 依赖redis超时,时间准确度高
缺点
- 订单量大时,每一单都要存储redis内存,需要大量redis服务器资源
四、消息队列
1,rabbitmq
2、RocketMQ
RocketMQ不支持自定义延时时间设置
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.10.11:9876");
producer.start();
Message msg1 = new Message(JmsConfig.TOPIC, "订单001".getBytes());
msg1.setDelayTimeLevel(2);//延迟5秒
Message msg2 = new Message(JmsConfig.TOPIC,"订单001".getBytes());
msg2.setDelayTimeLevel(4);//延迟30秒
SendResult sendResult1 = producer.send(msg1);
SendResult sendResult2 = producer.send(msg2
producer.shutdown();
RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的18种延迟时间间隔的延迟消息
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
3、总结
优点
- 可以随时在队列移除,实现实时取消订单,及时恢复订单占用的资源(如商品)
- 消息存储在mq中,不占用应用服务器资源
- 异步化处理,一旦处理能力不足,consumer集群可以很方便的扩容
缺点
- 可能会导致消息大量堆积
- mq服务器一旦故障重启后,持久化的队列过期时间会被重新计算,造成精度不足
- 死信消息可能会导致监控系统频繁预
参考资料:
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。