突然发现公司订单小哥的消息列队是用redis做的,而且用的是list。这里自己也实现下简单的阻塞列队。用于处理延时消息的问题。
延时列队就是一种带有延迟功能的消息列队。通常具备消息存储、过期消息的实时获取、高可用。消费熔断。
业务场景
订单未支付超时。
订单发货提醒。
短信提醒。
自动收货。
自动评论。
自动取消订单,不发货的情况下。
常用的解决方案
定时轮询任务,比如jdk中TimerThread轮询数据库表、缓存中的数据。频繁的轮询容易出现过度资源消耗。对数据和缓存也有一定的影响。但是可以作为辅助手段,通常用于补偿或者初始化数据。
ScheduledExecutorService 周期性线程池
时间轮(kafka、Netty的HashedWheelTimer)
使用mysql通常是定时扫描表,找出符合条件的数据进行处理。消费成功则更新数据。
使用redis可以使用zset,通过分值进行排序。定时轮询的方式去获取符合条件的记录。消费数据后删除消息。失败则重新进入队列。
Java中java.util.concurrent.DelayQueue
Jdk实现,列队处于jvm中,不支持分布式和消息持久化。
Rocketmp延时列队
消息持久、重试、分布式等等特性。
不支持任意时间精度的,支持level级别的延时消息。
DelayQueue
DelayQueue 1 2 3 4 5 6 7 8 9 10 11 public class DelayQueue <E extends Delayed > extends AbstractQueue <E> implements BlockingQueue <E> { private final transient ReentrantLock lock = new ReentrantLock (); private final PriorityQueue<E> q = new PriorityQueue <E>(); private Thread leader = null ; private final Condition available = lock.newCondition(); ...
PriorityQueue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class PriorityQueue <E> extends AbstractQueue <E> implements java .io.Serializable { private static final long serialVersionUID = -7720805057305804111L ; private static final int DEFAULT_INITIAL_CAPACITY = 11 ; transient Object[] queue; private int size = 0 ; private final Comparator<? super E> comparator; transient int modCount = 0 ; ...
offer方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public boolean add (E e) { return offer(e); } public boolean offer (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { q.offer(e); if (q.peek() == e) { leader = null ; available.signal(); } return true ; } finally { lock.unlock(); } }
take()方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null ) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0 ) return q.poll(); first = null ; if (leader != null ) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null ; } } } } } finally { if (leader == null && q.peek() != null ) available.signal(); lock.unlock(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @Setter @Getter public class ItemDelayedI <T> implements Delayed { private Long dataId; private long startTime; private long expire; private T data; private ItemDelayedI () { } public ItemDelayedI (Long dataId, long startTime, long expire) { super (); this .dataId = dataId; this .startTime = startTime; this .expire = expire; } @Override public int compareTo (Delayed o) { return (int ) (this .getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public long getDelay (TimeUnit unit) { return unit.convert(this .expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public boolean equals (Object o) { if (this == o) return true ; if (o == null || getClass() != o.getClass()) return false ; ItemDelayedI<?> that = (ItemDelayedI<?>) o; return dataId.equals(that.dataId); } @Override public int hashCode () { return dataId.hashCode(); } }
工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 @Slf4j @Lazy(false) @Component public class DelayOrderImpl implements DelayOrderI <OrderBo> { @Autowired @Qualifier("defExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; private final static DelayQueue<ItemDelayedI<OrderBo>> DELAY_QUEUE = new DelayQueue <>(); @PostConstruct public void init () { threadPoolTaskExecutor.execute(() -> { ItemDelayedI<OrderBo> orderDelayed; for (; ; ) { try { orderDelayed = DELAY_QUEUE.take(); log.info("处理超时订单 {} {}" , orderDelayed.getDataId(), DateTool.conversionNowFormat()); } catch (Exception e) { log.error("执行自营超时订单的_延迟队列_异常:" + e); } } }); } @Override public boolean addToOrderDelayQueue (ItemDelayedI<OrderBo> itemDelayed) { return DELAY_QUEUE.add(itemDelayed); } @Override public boolean addToDelayQueue (OrderBo order) { ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI <>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime()); if (DELAY_QUEUE.contains(orderDelayed)) { return true ; } log.info("添加订单超时列队 {} {}" , JsonTool.toString(order), DateTool.conversionNowFormat()); return DELAY_QUEUE.add(orderDelayed); } @Override public void removeToOrderDelayQueue (OrderBo order) { if (order == null ) { return ; } for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayedI<OrderBo> queue = iterator.next(); if (queue.getDataId().equals(order.getId())) { log.info("移除订单超时列队 {} {}" , order, DateTool.conversionNowFormat()); DELAY_QUEUE.remove(queue); } } } public void removeToOrderDelayQueueById (Long id) { if (id == null ) { return ; } log.info("移除订单超时列队 {} {}" , id, DateTool.conversionNowFormat()); for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayedI<OrderBo> queue = iterator.next(); if (queue.getDataId().equals(id)) { DELAY_QUEUE.remove(queue); } } } @Override public List<OrderBo> all () { List<OrderBo> list = new ArrayList <>(); OrderBo orderBoTemp = null ; for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayedI<OrderBo> queue = iterator.next(); orderBoTemp = OrderBo.builder() .id(queue.getDataId()) .orderDeadlineTime(queue.getExpire()) .createTime(queue.getStartTime()) .build(); list.add(orderBoTemp); } return list; } }
测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @RestController public class AppController { @Autowired private DelayOrderImpl delayOrder; @RequestMapping(value = "add/{id}") public Object add (@PathVariable(required = false) Long id) { if (null == id) { id = RandomUtil.randomLong(10 ,1000 ); } OrderBo orderBo = OrderBo.builder() .id(id) .createTime(DateTool.currentTimeMillis()) .orderDeadlineTime(DateTool.currentTimeMillis() + 1 * 60 * 1000 ).build(); delayOrder.addToDelayQueue(orderBo); Map<String, Object> data = new HashMap <>(); data.put("code" , "200" ); data.put("data" , JsonTool.toString(orderBo)); return data; } @RequestMapping(value = "list") public Object list () { List<OrderBo> list = delayOrder.all(); Map<String, Object> data = new HashMap <>(); data.put("code" , "200" ); data.put("data" ,list); return data; } @RequestMapping(value = "del/{id}") public Object del (@PathVariable(required = false) Long id) { delayOrder.removeToOrderDelayQueueById(id); List<OrderBo> list = delayOrder.all(); Map<String, Object> data = new HashMap <>(); data.put("code" , "200" ); data.put("data" , list); return data; } }
Console 1 2 3 4 5 6 7 8 9 10 11 12 ➜ blog curl http://localhost:9016/delayed/add/1 ➜ blog curl http://localhost:9016/delayed/add/1 ➜ blog curl http://localhost:9016/delayed/add/2 ➜ blog curl http://localhost:9016/delayed/add/2 ➜ blog curl http://localhost:9016/delayed/list {"code" :"200" ,"data" :[{"id" :1,"createTime" :1643699260192,"orderDeadlineTime" :1643699320192},{"id" :2,"createTime" :1643699263491,"orderDeadlineTime" :1643699323491}]}% [XNIO-1 I/O-4] 添加订单超时列队 {"id" :1,"createTime" :1643699260192,"orderDeadlineTime" :1643699320192} 2022-02-01 15:07:40 [XNIO-1 I/O-4] 添加订单超时列队 {"id" :2,"createTime" :1643699263491,"orderDeadlineTime" :1643699323491} 2022-02-01 15:07:43 [def-Executor-1] 处理超时订单 1 2022-02-01 15:08:40 [def-Executor-1] 处理超时订单 2 2022-02-01 15:08:43
END