0%

SpringBoot-Delayed-Queue

突然发现公司订单小哥的消息列队是用redis做的,而且用的是list。这里自己也实现下简单的阻塞列队。用于处理延时消息的问题。

  • 延时列队就是一种带有延迟功能的消息列队。通常具备消息存储、过期消息的实时获取、高可用。消费熔断。
  • 业务场景
    • 订单未支付超时。
    • 订单发货提醒。
    • 短信提醒。
    • 自动收货。
    • 自动评论。
    • 自动取消订单,不发货的情况下。
  • 常用的解决方案
    • 定时轮询任务,比如jdk中TimerThread轮询数据库表、缓存中的数据。频繁的轮询容易出现过度资源消耗。对数据和缓存也有一定的影响。但是可以作为辅助手段,通常用于补偿或者初始化数据。
    • ScheduledExecutorService 周期性线程池
    • 时间轮(kafka、Netty的HashedWheelTimer)
    • 使用mysql通常是定时扫描表,找出符合条件的数据进行处理。消费成功则更新数据。
    • 使用redis可以使用zset,通过分值进行排序。定时轮询的方式去获取符合条件的记录。消费数据后删除消息。失败则重新进入队列。
    • Java中java.util.concurrent.DelayQueue
      • Jdk实现,列队处于jvm中,不支持分布式和消息持久化。
    • Rocketmp延时列队
      • 消息持久、重试、分布式等等特性。
      • 不支持任意时间精度的,支持level级别的延时消息。

DelayQueue

  • Leader-follower 模式

DelayQueue

1
2
3
4
5
6
7
8
9
10
11
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
// 并发控制,可重入锁。
private final transient ReentrantLock lock = new ReentrantLock();
// 根据Delay时间排序,优先级列队
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 线程指定等待队列头部的元素,标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于阻塞和通知的Condition对象,表示现在是否有可取的元素
private final Condition available = lock.newCondition();
...

PriorityQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class PriorityQueue<E> extends AbstractQueue<E>
implements java.io.Serializable {

private static final long serialVersionUID = -7720805057305804111L;
// 默认容器大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 用数组存储元素
transient Object[] queue; // non-private to simplify nested class access

// 记录元素个数
private int size = 0;

// 比较器
private final Comparator<? super E> comparator;

// 安全修改计数,Fail-Fast 机制。
transient int modCount = 0; // non-private to simplify nested class access
...

offer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 添加元素到列队中
public boolean add(E e) {
return offer(e);
}

// 添加元素到列队中
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 判断优先级入列
q.offer(e);
// 如果添加的元素是列头,栈顶元素。
if (q.peek() == e) {
// leader存储了被阻塞等待列头的线程,
leader = null;
// 唤醒在等待条件的线程。
available.signal();
}
// 添加元素返回成功,可以OOM
return true;
} finally {
// 释放锁
lock.unlock();
}
}

take()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
// 可中断锁,防止应为await()方法中断异常导致异常退出。
lock.lockInterruptibly();
try {
// 死循环
for (;;) {
// 获取列队头元素
E first = q.peek();
if (first == null)
// 列队头null,继续等待。
available.await();
else {
// 获取头元素剩余时间。
long delay = first.getDelay(NANOSECONDS);
// 如果时间已经生效直接返回头元素。
if (delay <= 0)
return q.poll();
// 时间还没生效,释放当前引用。
first = null; // don't retain ref while waiting
if (leader != null)
// leader不是null,应该有其他线程在出列队。继续等待。
available.await();
else {
// 标识当前线程处于等待列头
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 继续等待列头元素。
available.awaitNanos(delay);
} finally {
// 释放当前线程
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 当前线程出队成功,通知其他线程继续执行。
available.signal();
// 释放锁
lock.unlock();
}
}

演示代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* @author z201.coding@gmail.com
**/
@Setter
@Getter
public class ItemDelayedI<T> implements Delayed {

/**
* 数据id
*/
private Long dataId;
/**
* 开始时间
*/
private long startTime;
/**
* 到期时间
*/
private long expire;
/**
* 泛型data
*/
private T data;

private ItemDelayedI() {

}

public ItemDelayedI(Long dataId, long startTime, long expire) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = expire;
}

@Override
public int compareTo(Delayed o) {
// 入队时需要判断任务放到队列的哪个位置,过期时间短的放在前面
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public long getDelay(TimeUnit unit) {
// 和当前时间比较,判断是否过期。
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

// 重写 equals 和 hashcode方法用id作为唯一标志,添加列队的时候做防重复判断。
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ItemDelayedI<?> that = (ItemDelayedI<?>) o;

return dataId.equals(that.dataId);
}

@Override
public int hashCode() {
return dataId.hashCode();
}
}

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
* @author z201.coding@gmail.com
**/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {

@Autowired
@Qualifier("defExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

private final static DelayQueue<ItemDelayedI<OrderBo>> DELAY_QUEUE = new DelayQueue<>();

/**
* 初始化时加载数据库中需处理超时的订单
*/
@PostConstruct
public void init() {
/*启动一个线程,去取延迟订单*/
threadPoolTaskExecutor.execute(() -> {
ItemDelayedI<OrderBo> orderDelayed;
// 循环处理
for (; ; ) {
try {
orderDelayed = DELAY_QUEUE.take();
//处理超时订单
log.info("处理超时订单 {} {}", orderDelayed.getDataId(), DateTool.conversionNowFormat());
} catch (Exception e) {
log.error("执行自营超时订单的_延迟队列_异常:" + e);
}
}
});
}

/**
* 加入延迟消息队列
**/
@Override
public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
return DELAY_QUEUE.add(itemDelayed);
}

/**
* 加入延迟消息队列
**/
@Override
public boolean addToDelayQueue(OrderBo order) {
ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
if (DELAY_QUEUE.contains(orderDelayed)) {
return true;
}
log.info("添加订单超时列队 {} {}", JsonTool.toString(order), DateTool.conversionNowFormat());
return DELAY_QUEUE.add(orderDelayed);
}

/**
* 移除列队
*
* @param order
*/
@Override
public void removeToOrderDelayQueue(OrderBo order) {
if (order == null) {
return;
}
for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayedI<OrderBo> queue = iterator.next();
if (queue.getDataId().equals(order.getId())) {
log.info("移除订单超时列队 {} {}", order, DateTool.conversionNowFormat());
DELAY_QUEUE.remove(queue);
}
}
}

public void removeToOrderDelayQueueById(Long id) {
if (id == null) {
return;
}
log.info("移除订单超时列队 {} {}", id, DateTool.conversionNowFormat());
for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayedI<OrderBo> queue = iterator.next();
if (queue.getDataId().equals(id)) {
DELAY_QUEUE.remove(queue);
}
}
}

@Override
public List<OrderBo> all() {
List<OrderBo> list = new ArrayList<>();
OrderBo orderBoTemp = null;
for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayedI<OrderBo> queue = iterator.next();
orderBoTemp = OrderBo.builder()
.id(queue.getDataId())
.orderDeadlineTime(queue.getExpire())
.createTime(queue.getStartTime())
.build();
list.add(orderBoTemp);
}
return list;
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @author z201.coding@gmail.com
**/
@RestController
public class AppController {

@Autowired
private DelayOrderImpl delayOrder;

@RequestMapping(value = "add/{id}")
public Object add(@PathVariable(required = false) Long id) {
if (null == id) {
id = RandomUtil.randomLong(10,1000);
}
OrderBo orderBo = OrderBo.builder()
.id(id)
.createTime(DateTool.currentTimeMillis())
.orderDeadlineTime(DateTool.currentTimeMillis() + 1 * 60 * 1000).build();
delayOrder.addToDelayQueue(orderBo);
Map<String, Object> data = new HashMap<>();
data.put("code", "200");
data.put("data", JsonTool.toString(orderBo));
return data;
}

@RequestMapping(value = "list")
public Object list() {
List<OrderBo> list = delayOrder.all();
Map<String, Object> data = new HashMap<>();
data.put("code", "200");
data.put("data",list);
return data;
}


@RequestMapping(value = "del/{id}")
public Object del(@PathVariable(required = false) Long id) {
delayOrder.removeToOrderDelayQueueById(id);
List<OrderBo> list = delayOrder.all();
Map<String, Object> data = new HashMap<>();
data.put("code", "200");
data.put("data", list);
return data;
}

}

Console

1
2
3
4
5
6
7
8
9
10
11
12
➜  blog curl http://localhost:9016/delayed/add/1
➜ blog curl http://localhost:9016/delayed/add/1
➜ blog curl http://localhost:9016/delayed/add/2
➜ blog curl http://localhost:9016/delayed/add/2
➜ blog curl http://localhost:9016/delayed/list
# 分别添加两次相同的id,不会重复添加进入。
{"code":"200","data":[{"id":1,"createTime":1643699260192,"orderDeadlineTime":1643699320192},{"id":2,"createTime":1643699263491,"orderDeadlineTime":1643699323491}]}%

[XNIO-1 I/O-4] 添加订单超时列队 {"id":1,"createTime":1643699260192,"orderDeadlineTime":1643699320192} 2022-02-01 15:07:40
[XNIO-1 I/O-4] 添加订单超时列队 {"id":2,"createTime":1643699263491,"orderDeadlineTime":1643699323491} 2022-02-01 15:07:43
[def-Executor-1] 处理超时订单 1 2022-02-01 15:08:40
[def-Executor-1] 处理超时订单 2 2022-02-01 15:08:43

END