package vc.coding.juc.queue;
import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.*;
public class Main { private static ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(1) , Executors.privilegedThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("mm:ss");
public static void main(String[] args) { DelayQueue<DelayedOrder> delayedOrders = new DelayQueue<>(); startConsumption(delayedOrders); startSupplier(delayedOrders); }
private static void startConsumption(DelayQueue<DelayedOrder> delayedOrders) { executorService.execute(() -> { System.out.println("消费者已启动"); while (true) { try { System.out.println("消费者尝试获取订单(如果队列没有可取的订单则阻塞,直到出现可取订单)"); DelayedOrder delayedOrder = delayedOrders.take(); System.out.println("消费者," + dateTimeFormatter.format(LocalDateTime.now()) + "消费订单id:" + delayedOrder.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
private static void startSupplier(DelayQueue<DelayedOrder> delayedOrders) { executorService.execute(() -> { System.out.println("提供者已启动"); long id = 0; for (int i = 1; i <= 5; i++) { DelayedOrder delayedOrder = new DelayedOrder(++id, 30, TimeUnit.SECONDS); delayedOrders.put(delayedOrder); System.out.println("提供者,初始化订单:" + dateTimeFormatter.format(delayedOrder.getCreateTime()) + "产生一个订单id:" + delayedOrder.getId() + ",应在" + dateTimeFormatter.format(delayedOrder.getExpireTime()) + "消费"); } while (true) { DelayedOrder delayedOrder = new DelayedOrder(++id, 5, TimeUnit.SECONDS); System.out.println("提供者," + dateTimeFormatter.format(delayedOrder.getCreateTime()) + "产生一个订单id:" + delayedOrder.getId() + ",应在" + dateTimeFormatter.format(delayedOrder.getExpireTime()) + "消费"); delayedOrders.put(delayedOrder); try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
|