采用延时队列实现延时任务

首先看看什么是延时队列:

延时队列

  • 通过源码我们得知,延时队列实现了BlockingQueue,说明它也是个阻塞队列,它的泛型是Delayed。

  • 所以我们首先要实现这个延时对象,对应我们业务也就是延时订单(当然也可以抽象成功任何延时业务):

实现Delayed

package vc.coding.juc.queue;

import lombok.Data;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
* 延时对象
*
* @author HeTongHao
* @since 2020/3/6 22:06
*/
@Data
public class DelayedOrder implements Delayed {
/**
* 记录id
*/
private Long id;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 到期时间
*/
private LocalDateTime expireTime;

public DelayedOrder(Long id, long delay, TimeUnit timeUnit) {
this.id = id;
this.createTime = LocalDateTime.now();
this.expireTime = this.createTime.plusNanos(timeUnit.toNanos(delay));
}

public DelayedOrder(Long id, LocalDateTime expireTime) {
this.id = id;
this.createTime = LocalDateTime.now();
this.expireTime = expireTime;
}

/**
* 将LocalDateTime转为时间戳
*
* @param localDateTime
* @return
*/
public long parseToTimestamp(LocalDateTime localDateTime) {
return localDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

@Override
public long getDelay(TimeUnit unit) {
return parseToTimestamp(this.expireTime) - parseToTimestamp(LocalDateTime.now());
}

@Override
public int compareTo(Delayed delayed) {
return this.expireTime.compareTo(((DelayedOrder) delayed).getExpireTime());
}
}

Delayed实现细节

业务记录、创建时间、到期时间

  • id用来记录我们业务id,创建时间、到期时间需要维护好,用于下面两个方法的实现

关键我们需实现getDelay、compareTo两个方法

  • getDelay:获取剩余的延迟时间,用于判断这个对象是否可以被取出,返回的值小于0代表剩余延时实现没有了可以取出,反之不能取出,实现:到期时间-当前时间=剩余的延迟时间。

  • compareTo:用于比较两个对象在延迟队列里的排序,我们直接比较两个对象的超时时间即可,比较越小的越排在队列前面,下面代码可以验证:

到期时间最近的,排在队列前面

维护延时队列、创建订单消费者take()与提供者put()

  • 接下来,这里关键的是创建一个消费者线程用take()来持续消费队列里的延迟订单对象,提供者则是用户源源不断产生的订单put(),直接上代码和一张图来了解延时队列与take()的工作方式:
package vc.coding.juc.queue;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.*;

/**
* 延时处理
*
* @author HeTongHao
* @since 2020/3/6 22:58
*/
public class Main {
private static ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0
, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)
, Executors.privilegedThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("mm:ss");

public static void main(String[] args) {
DelayQueue<DelayedOrder> delayedOrders = new DelayQueue<>();
//启动消费者
startConsumption(delayedOrders);
//启动提供者
startSupplier(delayedOrders);
}

/**
* 启动消费者
*
* @param delayedOrders
*/
private static void startConsumption(DelayQueue<DelayedOrder> delayedOrders) {
executorService.execute(() -> {
System.out.println("消费者已启动");
while (true) {
try {
System.out.println("消费者尝试获取订单(如果队列没有可取的订单则阻塞,直到出现可取订单)");
DelayedOrder delayedOrder = delayedOrders.take();
System.out.println("消费者,"
+ dateTimeFormatter.format(LocalDateTime.now()) + "消费订单id:" + delayedOrder.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}


/**
* 启动提供者
*
* @param delayedOrders
*/
private static void startSupplier(DelayQueue<DelayedOrder> delayedOrders) {
executorService.execute(() -> {
System.out.println("提供者已启动");
long id = 0;
for (int i = 1; i <= 5; i++) {
//每个订单延迟25秒消费
DelayedOrder delayedOrder = new DelayedOrder(++id, 30, TimeUnit.SECONDS);
delayedOrders.put(delayedOrder);
System.out.println("提供者,初始化订单:" + dateTimeFormatter.format(delayedOrder.getCreateTime())
+ "产生一个订单id:" + delayedOrder.getId()
+ ",应在" + dateTimeFormatter.format(delayedOrder.getExpireTime()) + "消费");
}
//模拟提供者,源源不断创建订单
while (true) {
//每个订单延迟5秒消费
DelayedOrder delayedOrder = new DelayedOrder(++id, 5, TimeUnit.SECONDS);
System.out.println("提供者," + dateTimeFormatter.format(delayedOrder.getCreateTime())
+ "产生一个订单id:" + delayedOrder.getId()
+ ",应在" + dateTimeFormatter.format(delayedOrder.getExpireTime()) + "消费");
delayedOrders.put(delayedOrder);
try {
//毎20秒创建一个订单
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

消费者与提供者的工作方式

资源消耗测试

  • 我根据实际业务情况来测试,main方法加入性能打印、修改了提供者的代码:首先瞬间创建十万条订单,再持续100毫秒一条订单(一秒10单)都是延迟30分钟消费,看下内存消耗与cpu占用怎么样:
@SneakyThrows
public static void main(String[] args) {
long startMemory = Runtime.getRuntime().freeMemory();
System.out.println("开始时消耗内存" + consumeMB(startMemory));
DelayQueue<DelayedOrder> delayedOrders = new DelayQueue<>();
//启动消费者
startConsumption(delayedOrders);
//启动提供者
startSupplier(delayedOrders);
while (true) {
TimeUnit.SECONDS.sleep(2);
System.out.println("实时时消耗内存" + consumeMB(startMemory));
System.out.println("-实时cpu占用"+ CPUMonitorCalc.getInstance().getProcessCpu());
}
}

private static String consumeMB(long startMemory) {
return (startMemory - Runtime.getRuntime().freeMemory()) / 1024 / 1024 + "MB";
}


/**
* 启动提供者
*
* @param delayedOrders
*/
private static void startSupplier(DelayQueue<DelayedOrder> delayedOrders) {
executorService.execute(() -> {
System.out.println("提供者已启动");
long id = 0;
for (int i = 1; i <= 10_0000; i++) {
//每个订单延迟30分钟消费
DelayedOrder delayedOrder = new DelayedOrder(++id, 30, TimeUnit.MINUTES);
delayedOrders.put(delayedOrder);
}
//模拟提供者,源源不断创建订单
while (true) {
//每个订单延迟30分钟消费
DelayedOrder delayedOrder = new DelayedOrder(++id, 30, TimeUnit.MINUTES);
delayedOrders.put(delayedOrder);
try {
//毎100毫秒创建一个订单,一秒10单
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}

资源消耗截图

  • 看起来这么大数据量下内存占用与cpu占用也非常的低!并且在实际业务中我们只需跑1个额外的线程(消费者)即可,消耗的资源算是非常的少了。

  • 因为是自己的电脑所以不敢测试千万甚至亿级订单数量,各位有兴趣可以尝试~

以上就是我最近研究的延时队列,希望能为大家现有的延时策略提供改进的思想~

文章作者: 何同昊
文章链接: http://hetonghao.cn/2020/03/采用延时队列实现延时任务/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 何同昊 Blog
支付宝超级火箭🚀
微信超级火箭🚀