突然发现公司订单小哥的消息列队是用redis做的,而且用的是list。这里自己也实现下简单的阻塞列队。用于处理延时消息的问题。
- 延时列队就是一种带有延迟功能的消息列队。通常具备消息存储、过期消息的实时获取、高可用。消费熔断。
- 业务场景
- 订单未支付超时。
- 订单发货提醒。
- 短信提醒。
- 自动收货。
- 自动评论。
- 自动取消订单,不发货的情况下。
- 常用的解决方案
- 定时轮询任务,比如jdk中TimerThread轮询数据库表、缓存中的数据。频繁的轮询容易出现过度资源消耗。对数据和缓存也有一定的影响。但是可以作为辅助手段,通常用于补偿或者初始化数据。
- ScheduledExecutorService 周期性线程池
- 时间轮(kafka、Netty的HashedWheelTimer)
- 使用mysql通常是定时扫描表,找出符合条件的数据进行处理。消费成功则更新数据。
- 使用redis可以使用zset,通过分值进行排序。定时轮询的方式去获取符合条件的记录。消费数据后删除消息。失败则重新进入队列。
- Java中java.util.concurrent.DelayQueue
- Jdk实现,列队处于jvm中,不支持分布式和消息持久化。
- Rocketmp延时列队
- 消息持久、重试、分布式等等特性。
- 不支持任意时间精度的,支持level级别的延时消息。
DelayQueue
- Leader-follower 模式

DelayQueue
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 并发控制,可重入锁。
private final transient ReentrantLock lock = new ReentrantLock();
// 根据Delay时间排序,优先级列队
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 线程指定等待队列头部的元素,标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于阻塞和通知的Condition对象,表示现在是否有可取的元素
private final Condition available = lock.newCondition();
...
PriorityQueue
public class PriorityQueue<E> extends AbstractQueue<E>
implements java.io.Serializable {
private static final long serialVersionUID = -7720805057305804111L;
// 默认容器大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 用数组存储元素
transient Object[] queue; // non-private to simplify nested class access
// 记录元素个数
private int size = 0;
// 比较器
private final Comparator<? super E> comparator;
// 安全修改计数,Fail-Fast 机制。
transient int modCount = 0; // non-private to simplify nested class access
...
offer方法
// 添加元素到列队中
public boolean add(E e) {
return offer(e);
}
// 添加元素到列队中
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 判断优先级入列
q.offer(e);
// 如果添加的元素是列头,栈顶元素。
if (q.peek() == e) {
// leader存储了被阻塞等待列头的线程,
leader = null;
// 唤醒在等待条件的线程。
available.signal();
}
// 添加元素返回成功,可以OOM
return true;
} finally {
// 释放锁
lock.unlock();
}
}
take()方法
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 可中断锁,防止应为await()方法中断异常导致异常退出。
lock.lockInterruptibly();
try {
// 死循环
for (;;) {
// 获取列队头元素
E first = q.peek();
if (first == null)
// 列队头null,继续等待。
available.await();
else {
// 获取头元素剩余时间。
long delay = first.getDelay(NANOSECONDS);
// 如果时间已经生效直接返回头元素。
if (delay <= 0)
return q.poll();
// 时间还没生效,释放当前引用。
first = null; // don't retain ref while waiting
if (leader != null)
// leader不是null,应该有其他线程在出列队。继续等待。
available.await();
else {
// 标识当前线程处于等待列头
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 继续等待列头元素。
available.awaitNanos(delay);
} finally {
// 释放当前线程
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 当前线程出队成功,通知其他线程继续执行。
available.signal();
// 释放锁
lock.unlock();
}
}
演示代码
/**
* @author z201.coding@gmail.com
**/
@Setter
@Getter
public class ItemDelayedI<T> implements Delayed {
/**
* 数据id
*/
private Long dataId;
/**
* 开始时间
*/
private long startTime;
/**
* 到期时间
*/
private long expire;
/**
* 泛型data
*/
private T data;
private ItemDelayedI() {
}
public ItemDelayedI(Long dataId, long startTime, long expire) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = expire;
}
@Override
public int compareTo(Delayed o) {
// 入队时需要判断任务放到队列的哪个位置,过期时间短的放在前面
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
// 和当前时间比较,判断是否过期。
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
// 重写 equals 和 hashcode方法用id作为唯一标志,添加列队的时候做防重复判断。
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ItemDelayedI<?> that = (ItemDelayedI<?>) o;
return dataId.equals(that.dataId);
}
@Override
public int hashCode() {
return dataId.hashCode();
}
}
工具类
/**
* @author z201.coding@gmail.com
**/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {
@Autowired
@Qualifier("defExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
private final static DelayQueue<ItemDelayedI<OrderBo>> DELAY_QUEUE = new DelayQueue<>();
/**
* 初始化时加载数据库中需处理超时的订单
*/
@PostConstruct
public void init() {
/*启动一个线程,去取延迟订单*/
threadPoolTaskExecutor.execute(() -> {
ItemDelayedI<OrderBo> orderDelayed;
// 循环处理
for (; ; ) {
try {
orderDelayed = DELAY_QUEUE.take();
//处理超时订单
log.info("处理超时订单 {} {}", orderDelayed.getDataId(), DateTool.conversionNowFormat());
} catch (Exception e) {
log.error("执行自营超时订单的_延迟队列_异常:" + e);
}
}
});
}
/**
* 加入延迟消息队列
**/
@Override
public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
return DELAY_QUEUE.add(itemDelayed);
}
/**
* 加入延迟消息队列
**/
@Override
public boolean addToDelayQueue(OrderBo order) {
ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
if (DELAY_QUEUE.contains(orderDelayed)) {
return true;
}
log.info("添加订单超时列队 {} {}", JsonTool.toString(order), DateTool.conversionNowFormat());
return DELAY_QUEUE.add(orderDelayed);
}
/**
* 移除列队
*
* @param order
*/
@Override
public void removeToOrderDelayQueue(OrderBo order) {
if (order == null) {
return;
}
for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayedI<OrderBo> queue = iterator.next();
if (queue.getDataId().equals(order.getId())) {
log.info("移除订单超时列队 {} {}", order, DateTool.conversionNowFormat());
DELAY_QUEUE.remove(queue);
}
}
}
public void removeToOrderDelayQueueById(Long id) {
if (id == null) {
return;
}
log.info("移除订单超时列队 {} {}", id, DateTool.conversionNowFormat());
for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayedI<OrderBo> queue = iterator.next();
if (queue.getDataId().equals(id)) {
DELAY_QUEUE.remove(queue);
}
}
}
@Override
public List<OrderBo> all() {
List<OrderBo> list = new ArrayList<>();
OrderBo orderBoTemp = null;
for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayedI<OrderBo> queue = iterator.next();
orderBoTemp = OrderBo.builder()
.id(queue.getDataId())
.orderDeadlineTime(queue.getExpire())
.createTime(queue.getStartTime())
.build();
list.add(orderBoTemp);
}
return list;
}
}
测试代码
/**
* @author z201.coding@gmail.com
**/
@RestController
public class AppController {
@Autowired
private DelayOrderImpl delayOrder;
@RequestMapping(value = "add/{id}")
public Object add(@PathVariable(required = false) Long id) {
if (null == id) {
id = RandomUtil.randomLong(10,1000);
}
OrderBo orderBo = OrderBo.builder()
.id(id)
.createTime(DateTool.currentTimeMillis())
.orderDeadlineTime(DateTool.currentTimeMillis() + 1 * 60 * 1000).build();
delayOrder.addToDelayQueue(orderBo);
Map<String, Object> data = new HashMap<>();
data.put("code", "200");
data.put("data", JsonTool.toString(orderBo));
return data;
}
@RequestMapping(value = "list")
public Object list() {
List<OrderBo> list = delayOrder.all();
Map<String, Object> data = new HashMap<>();
data.put("code", "200");
data.put("data",list);
return data;
}
@RequestMapping(value = "del/{id}")
public Object del(@PathVariable(required = false) Long id) {
delayOrder.removeToOrderDelayQueueById(id);
List<OrderBo> list = delayOrder.all();
Map<String, Object> data = new HashMap<>();
data.put("code", "200");
data.put("data", list);
return data;
}
}
Console
➜ blog curl http://localhost:9016/delayed/add/1
➜ blog curl http://localhost:9016/delayed/add/1
➜ blog curl http://localhost:9016/delayed/add/2
➜ blog curl http://localhost:9016/delayed/add/2
➜ blog curl http://localhost:9016/delayed/list
# 分别添加两次相同的id,不会重复添加进入。
{"code":"200","data":[{"id":1,"createTime":1643699260192,"orderDeadlineTime":1643699320192},{"id":2,"createTime":1643699263491,"orderDeadlineTime":1643699323491}]}%
[XNIO-1 I/O-4] 添加订单超时列队 {"id":1,"createTime":1643699260192,"orderDeadlineTime":1643699320192} 2022-02-01 15:07:40
[XNIO-1 I/O-4] 添加订单超时列队 {"id":2,"createTime":1643699263491,"orderDeadlineTime":1643699323491} 2022-02-01 15:07:43
[def-Executor-1] 处理超时订单 1 2022-02-01 15:08:40
[def-Executor-1] 处理超时订单 2 2022-02-01 15:08:43