0%

SpringBoot-Redis-Delayed-Queue

之前有介绍过SpringBoot-Delayed-Queue基于jdk实现DelayedQueue。在实际开发中使用redis zset来实现的也比较常见。

  • Redis 延迟队列实现的思路,利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务。或者每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。

Redis-Sorted-Sets

每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

  • zrangebyscore 返回排序集合中的所有元素,key其分数在min and之间max(包括分数等于minor的元素max)。元素被认为是从低到高排序的。
  • zrem 从排序集中删除的成员数,不包括不存在的成员。
  • zadd 将具有指定分数的所有指定成员添加到存储在的排序集中key。可以指定多个分数/成员对。如果指定的成员已经是排序集的成员,则更新分数并将元素重新插入到正确的位置以确保正确的排序。

演示代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* @author z201.coding@gmail.com
**/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {

private final static String ORDER_DELAY_KEY = "order:delay";

public final static int ORDER_DELAY_TIME = 1;

@Autowired
@Qualifier("defExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;

@Autowired
private RedisTemplate redisTemplate;

/**
* 初始化时加载数据库中需处理超时的订单
*/
@PostConstruct
public void init() {
/*启动一个线程,去取延迟订单*/
threadPoolTaskExecutor.execute(() -> {
log.info(" 待处理订单数量 count {} ", size());
for (; ; ) {
try {
Long startTime = DateTool.conversion(DateTool.localDateTime().plusDays(-1));
// 测试下直接获取所有数据。
Set<Integer> data = redisTemplate.opsForZSet().rangeByScore(ORDER_DELAY_KEY, 0, DateTool.currentTimeMillis());
data.stream().forEach(i -> {
//处理超时订单
log.info("处理订单 {}", i);
removeToOrderDelayQueueById(Long.valueOf(i));
});
} catch (Exception e) {
log.error("执行订单的_延迟队列_异常:" + e);
} finally {
try {
// 1秒执行一次,实际情况可以快点。
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("执行订单的_延迟队列_异常:" + e);
}
}
}
});
}

@PreDestroy
public void destroy() {
threadPoolTaskExecutor.shutdown();
}

/**
* 加入延迟消息队列
**/
@Override
public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
log.info("添加订单超时列队 {} ", itemDelayed.getDataId());
return redisTemplate.opsForZSet().add(ORDER_DELAY_KEY, itemDelayed.getDataId(), itemDelayed.getExpire());
}

/**
* 加入延迟消息队列
**/
@Override
public boolean addToDelayQueue(OrderBo order) {
ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
return addToOrderDelayQueue(orderDelayed);
}

/**
* 移除列队
*
* @param order
*/
@Override
public void removeToOrderDelayQueue(OrderBo order) {
if (null == order || null == order.getId()) {
return;
}
removeToOrderDelayQueueById(order.getId());
}

public void removeToOrderDelayQueueById(Long id) {
if (id == null) {
return;
}
log.info("移除订单超时列队 {}", id);
redisTemplate.opsForZSet().remove(ORDER_DELAY_KEY, id);
}

@Override
public List<OrderBo> all() {
List<OrderBo> list = new ArrayList<>();
Map<String, Long> orderZSetMap = zScan(ORDER_DELAY_KEY, "*", 100);
orderZSetMap.forEach((key, value) -> {
list.add(OrderBo.builder()
.id(Long.valueOf(key))
.orderDeadlineTime(value)
.createTime(DateTool.conversion(DateTool.conversion(value).plusMinutes(-ORDER_DELAY_TIME)))
.build());
});
return list;
}

@Override
public Long size() {
return redisTemplate.opsForZSet().size(ORDER_DELAY_KEY);
}

/**
* {@link = http://redisdoc.com/database/scan.html#zscan}
* <p>
* 可用版本: >= 2.8.0
* 时间复杂度:增量式迭代命令每次执行的复杂度为 O(1) , 对数据集进行一次完整迭代的复杂度为 O(N) , 其中 N 为数据集中的元素数量。
*
* @param key
* @param pattern
* @param count
* @return
*/
private Map<String, Long> zScan(String key, String pattern, Integer count) {
Map<String, Long> map = new HashMap<>();
RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer();
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
ScanOptions scanOptions = ScanOptions.scanOptions()
.match(pattern)
.count(count)
.build();
Cursor<RedisZSetCommands.Tuple> cursor = connection.zScan(stringRedisSerializer.serialize(key), scanOptions);
while (cursor.hasNext()) {
RedisZSetCommands.Tuple data = cursor.next();
if (null != data) {
map.put(stringRedisSerializer.deserialize(data.getValue()), data.getScore().longValue());
}
}
return map;
}
}