胖胖的枫叶
主页
博客
知识图谱
产品设计
数据分析
企业架构
项目管理
效率工具
全栈开发
后端
前端
测试
运维
数据
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 浮生若梦
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
主页
博客
知识图谱
产品设计
数据分析
企业架构
项目管理
效率工具
全栈开发
后端
前端
测试
运维
数据
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 浮生若梦
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
  • 博客

    • 博客迁移说明
    • 2024年
    • 2023年
    • 2022年
    • 2021年
    • 2020年
    • 2019年
    • 2018年

之前有介绍过SpringBoot-Delayed-Queue基于jdk实现DelayedQueue。在实际开发中使用redis zset来实现的也比较常见。

  • Redis 延迟队列实现的思路,利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务。或者每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。

Redis-Sorted-Sets

每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

  • zrangebyscore 返回排序集合中的所有元素,key其分数在min and之间max(包括分数等于minor的元素max)。元素被认为是从低到高排序的。
  • zrem 从排序集中删除的成员数,不包括不存在的成员。
  • zadd 将具有指定分数的所有指定成员添加到存储在的排序集中key。可以指定多个分数/成员对。如果指定的成员已经是排序集的成员,则更新分数并将元素重新插入到正确的位置以确保正确的排序。

演示代码

/**
 * @author z201.coding@gmail.com
 **/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {

    private final static String ORDER_DELAY_KEY = "order:delay";

    public final static int ORDER_DELAY_TIME = 1;

    @Autowired
    @Qualifier("defExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 初始化时加载数据库中需处理超时的订单
     */
    @PostConstruct
    public void init() {
        /*启动一个线程,去取延迟订单*/
        threadPoolTaskExecutor.execute(() -> {
            log.info(" 待处理订单数量 count {} ", size());
            for (; ; ) {
                try {
                    Long startTime = DateTool.conversion(DateTool.localDateTime().plusDays(-1));
                    // 测试下直接获取所有数据。
                    Set<Integer> data = redisTemplate.opsForZSet().rangeByScore(ORDER_DELAY_KEY, 0, DateTool.currentTimeMillis());
                    data.stream().forEach(i -> {
                        //处理超时订单
                        log.info("处理订单 {}", i);
                        removeToOrderDelayQueueById(Long.valueOf(i));
                    });
                } catch (Exception e) {
                    log.error("执行订单的_延迟队列_异常:" + e);
                } finally {
                    try {
                        // 1秒执行一次,实际情况可以快点。
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        log.error("执行订单的_延迟队列_异常:" + e);
                    }
                }
            }
        });
    }

    @PreDestroy
    public void destroy() {
        threadPoolTaskExecutor.shutdown();
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
        log.info("添加订单超时列队 {} ", itemDelayed.getDataId());
        return redisTemplate.opsForZSet().add(ORDER_DELAY_KEY, itemDelayed.getDataId(), itemDelayed.getExpire());
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToDelayQueue(OrderBo order) {
        ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
        return addToOrderDelayQueue(orderDelayed);
    }

    /**
     * 移除列队
     *
     * @param order
     */
    @Override
    public void removeToOrderDelayQueue(OrderBo order) {
        if (null == order || null == order.getId()) {
            return;
        }
        removeToOrderDelayQueueById(order.getId());
    }

    public void removeToOrderDelayQueueById(Long id) {
        if (id == null) {
            return;
        }
        log.info("移除订单超时列队 {}", id);
        redisTemplate.opsForZSet().remove(ORDER_DELAY_KEY, id);
    }

    @Override
    public List<OrderBo> all() {
        List<OrderBo> list = new ArrayList<>();
        Map<String, Long> orderZSetMap = zScan(ORDER_DELAY_KEY, "*", 100);
        orderZSetMap.forEach((key, value) -> {
            list.add(OrderBo.builder()
                    .id(Long.valueOf(key))
                    .orderDeadlineTime(value)
                    .createTime(DateTool.conversion(DateTool.conversion(value).plusMinutes(-ORDER_DELAY_TIME)))
                    .build());
        });
        return list;
    }

    @Override
    public Long size() {
        return redisTemplate.opsForZSet().size(ORDER_DELAY_KEY);
    }

    /**
     * {@link = http://redisdoc.com/database/scan.html#zscan}
     * <p>
     * 可用版本: >= 2.8.0
     * 时间复杂度:增量式迭代命令每次执行的复杂度为 O(1) , 对数据集进行一次完整迭代的复杂度为 O(N) , 其中 N 为数据集中的元素数量。
     *
     * @param key
     * @param pattern
     * @param count
     * @return
     */
    private Map<String, Long> zScan(String key, String pattern, Integer count) {
        Map<String, Long> map = new HashMap<>();
        RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer();
        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        ScanOptions scanOptions = ScanOptions.scanOptions()
                .match(pattern)
                .count(count)
                .build();
        Cursor<RedisZSetCommands.Tuple> cursor = connection.zScan(stringRedisSerializer.serialize(key), scanOptions);
        while (cursor.hasNext()) {
            RedisZSetCommands.Tuple data = cursor.next();
            if (null != data) {
                map.put(stringRedisSerializer.deserialize(data.getValue()), data.getScore().longValue());
            }
        }
        return map;
    }
}


  • Console
curl http://127.0.0.1:9033/add/3
{"code":"200","data":3}%        
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 787 
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 815 
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 689 
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 787
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 787
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 815
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 815
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 689
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 689

END

Last Updated:
Contributors: 庆峰