之前有介绍过SpringBoot-Delayed-Queue基于jdk实现DelayedQueue。在实际开发中使用redis zset来实现的也比较常见。
- Redis 延迟队列实现的思路,利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务。或者每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。
Redis-Sorted-Sets
每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。
- zrangebyscore 返回排序集合中的所有元素,
key
其分数在min
and之间max
(包括分数等于min
or的元素max
)。元素被认为是从低到高排序的。 - zrem 从排序集中删除的成员数,不包括不存在的成员。
- zadd 将具有指定分数的所有指定成员添加到存储在的排序集中
key
。可以指定多个分数/成员对。如果指定的成员已经是排序集的成员,则更新分数并将元素重新插入到正确的位置以确保正确的排序。
演示代码
/**
* @author z201.coding@gmail.com
**/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {
private final static String ORDER_DELAY_KEY = "order:delay";
public final static int ORDER_DELAY_TIME = 1;
@Autowired
@Qualifier("defExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private RedisTemplate redisTemplate;
/**
* 初始化时加载数据库中需处理超时的订单
*/
@PostConstruct
public void init() {
/*启动一个线程,去取延迟订单*/
threadPoolTaskExecutor.execute(() -> {
log.info(" 待处理订单数量 count {} ", size());
for (; ; ) {
try {
Long startTime = DateTool.conversion(DateTool.localDateTime().plusDays(-1));
// 测试下直接获取所有数据。
Set<Integer> data = redisTemplate.opsForZSet().rangeByScore(ORDER_DELAY_KEY, 0, DateTool.currentTimeMillis());
data.stream().forEach(i -> {
//处理超时订单
log.info("处理订单 {}", i);
removeToOrderDelayQueueById(Long.valueOf(i));
});
} catch (Exception e) {
log.error("执行订单的_延迟队列_异常:" + e);
} finally {
try {
// 1秒执行一次,实际情况可以快点。
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("执行订单的_延迟队列_异常:" + e);
}
}
}
});
}
@PreDestroy
public void destroy() {
threadPoolTaskExecutor.shutdown();
}
/**
* 加入延迟消息队列
**/
@Override
public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
log.info("添加订单超时列队 {} ", itemDelayed.getDataId());
return redisTemplate.opsForZSet().add(ORDER_DELAY_KEY, itemDelayed.getDataId(), itemDelayed.getExpire());
}
/**
* 加入延迟消息队列
**/
@Override
public boolean addToDelayQueue(OrderBo order) {
ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
return addToOrderDelayQueue(orderDelayed);
}
/**
* 移除列队
*
* @param order
*/
@Override
public void removeToOrderDelayQueue(OrderBo order) {
if (null == order || null == order.getId()) {
return;
}
removeToOrderDelayQueueById(order.getId());
}
public void removeToOrderDelayQueueById(Long id) {
if (id == null) {
return;
}
log.info("移除订单超时列队 {}", id);
redisTemplate.opsForZSet().remove(ORDER_DELAY_KEY, id);
}
@Override
public List<OrderBo> all() {
List<OrderBo> list = new ArrayList<>();
Map<String, Long> orderZSetMap = zScan(ORDER_DELAY_KEY, "*", 100);
orderZSetMap.forEach((key, value) -> {
list.add(OrderBo.builder()
.id(Long.valueOf(key))
.orderDeadlineTime(value)
.createTime(DateTool.conversion(DateTool.conversion(value).plusMinutes(-ORDER_DELAY_TIME)))
.build());
});
return list;
}
@Override
public Long size() {
return redisTemplate.opsForZSet().size(ORDER_DELAY_KEY);
}
/**
* {@link = http://redisdoc.com/database/scan.html#zscan}
* <p>
* 可用版本: >= 2.8.0
* 时间复杂度:增量式迭代命令每次执行的复杂度为 O(1) , 对数据集进行一次完整迭代的复杂度为 O(N) , 其中 N 为数据集中的元素数量。
*
* @param key
* @param pattern
* @param count
* @return
*/
private Map<String, Long> zScan(String key, String pattern, Integer count) {
Map<String, Long> map = new HashMap<>();
RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer();
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
ScanOptions scanOptions = ScanOptions.scanOptions()
.match(pattern)
.count(count)
.build();
Cursor<RedisZSetCommands.Tuple> cursor = connection.zScan(stringRedisSerializer.serialize(key), scanOptions);
while (cursor.hasNext()) {
RedisZSetCommands.Tuple data = cursor.next();
if (null != data) {
map.put(stringRedisSerializer.deserialize(data.getValue()), data.getScore().longValue());
}
}
return map;
}
}
- Console
curl http://127.0.0.1:9033/add/3
{"code":"200","data":3}%
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 787
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 815
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 689
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 787
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 787
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 815
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 815
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 689
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 689