胖胖的枫叶
主页
博客
产品设计
企业架构
全栈开发
效率工具
数据分析
项目管理
方法论
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
主页
博客
产品设计
企业架构
全栈开发
效率工具
数据分析
项目管理
方法论
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
  • 标签索引
  • 2024年

    • 配置Mac环境
    • 业务知识会计管理
    • 业务知识会计基础
    • 业务知识什么是财务
  • 2023年

    • 项目 Boi
  • 2022年

    • 企业架构故障管理
    • 企业架构开发债务
  • 2021年

    • Python3.8 Matplotlib员工数据分析
    • Python3.8 Matplotlib IP折线图
    • Python3.8 词云 IP地址
    • Redis RediSearch
    • Rust第一个CLI程序
    • Rust所有权
    • Rust函数与控制流
    • Rust变量与数据类型
    • Rust入门
    • 企业架构分布式系统
    • 编程式权限设计
    • Java JVM优化
    • SpringBoot MyBatis 批量
    • SpringBoot 测试Mock
    • SpringBoot Redis布隆过滤器
    • CentOS7 Jenkins 部署
    • SpringBoot WebClient
    • Docker Drone 部署
    • SpringBoot MyBatis
    • SpringBoot Redisson
    • SpringBoot MyBatis 雪花算法
    • Java Netty
    • Redis 扫描
    • CentOS7 Jenkins本地部署分级
    • Mac 安装 Neo4j Jupyter
    • Mac OpenJDK11 JavaFX 环境
    • Mac 安装 Jenv
    • SpringBoot Redis 延时队列
    • SpringBoot MDC日志
    • SpringBoot 定时任务
    • CentOS7 Nginx GoAccess
    • SpringBoot MyBatis 分析
    • SpringBoot Lucene
    • 企业架构分布式锁
    • 学习技巧减少学习排斥心理
    • SpringBoot 动态数据源
    • Docker Compose SpringBoot MySQL Redis
    • SpringBoot 阻塞队列
    • Docker Compose Redis 哨兵
    • Docker Compose Redis 主从
    • 网络通信
  • 2020年

    • SpringBoot 延时队列
    • MySQL基础(四)
    • Java 雪花算法
    • Redis Geo
    • 网络通信 Tcpdump
    • Spring SPI
    • Java Zookeeper
    • SpringBoot JMH
    • 网络通信 Wireshark
    • Docker Compose Redis MySQL
    • CentOS7 Docker 部署
    • Netty 源码环境搭建
    • MySQL基础(三)
    • CentOS7 Selenium运行环境
    • CentOS7 Nginx HTTPS
    • Java JMH
    • SpringBoot 修改Tomcat版本
    • Java Eureka 钉钉通知
    • SpringBoot 错误钉钉通知
    • Java JVM
    • Git 合并提交
    • CentOS7 OpenResty 部署
  • 2019年

    • Redis CLI
    • CentOS7 Nginx 日志
    • 编程式代码风格
    • IDEA 插件
    • Skywalking 源码环境搭建
    • SpringBoot Redis 超时错误
    • 编程式 gRPC
    • Java Arthas
    • Docker Compose Redis 缓存击穿
    • Docker ElasticSearch5.6.8 部署
    • Docker Mysql5.7 部署
    • Spring Redis 字符串
    • Docker Zookeeper 部署
    • Docker Redis 部署
    • SpringBoot Dubbo
    • CentOS7 CMake 部署
    • 应用程序性能指标
    • Java Code 递归
    • CentOS7 ELK 部署
    • CentOS7 Sonarqube 部署
    • Java Selenium
    • Java JJWT JUnit4
    • Spring 源码环境搭建
    • Java JUnit4
    • Java Web JSON Token
    • 编程式 FastDFS
    • Java XPath
    • Redis基础(二)
    • Redis基础(一)
    • Java MyBatis JUnit4
    • Java MyBatis H2 JUnit4
    • MyBatis 源码环境搭建
    • Git 配置
    • Java 核心
    • Java Dubbo
    • Java JavaCollecionsFramework
    • Java Maven
    • Java MyBatis
    • Java Spring
    • Java SpringMVC
    • MySQL
    • Redis
  • 2018年

    • Java HashMap
    • Java HashSet
    • Java Code 交换值
    • Spring Upgrade SpringBoot
    • Mac 编程环境
    • Java Log4j
    • 网络通信 Modbus
    • MySQL基础(二)
    • MySQL基础(一)
    • Java Stack
    • Java Vector
    • CentOS7 RabbitMQ 部署
    • CentOS7 Redis 部署
    • CentOS7 MongoDB 部署
    • CentOS7 基础命令
    • Java Eureka Zookeeper
    • CentOS7 MySQL 部署
    • Git 分支
    • CentOS7 Java环境配置
    • Java LinkedList
    • Java ArrayList
    • Spring Annotation Aop

SpringBoot Redis 延时队列

之前有介绍过SpringBoot-Delayed-Queue基于jdk实现DelayedQueue。在实际开发中使用redis zset来实现的也比较常见。

  • Redis 延迟队列实现的思路,利用 zrangebyscore 查询符合条件的所有待处理任务,循环执行队列任务。或者每次查询最早的一条消息,判断这条信息的执行时间是否小于等于此刻的时间,如果是则执行此任务,否则继续循环检测。

Redis-Sorted-Sets

每个元素都会关联一个 double 类型的分数。redis 正是通过分数来为集合中的成员进行从小到大的排序。

  • zrangebyscore 返回排序集合中的所有元素,key其分数在min and之间max(包括分数等于minor的元素max)。元素被认为是从低到高排序的。
  • zrem 从排序集中删除的成员数,不包括不存在的成员。
  • zadd 将具有指定分数的所有指定成员添加到存储在的排序集中key。可以指定多个分数/成员对。如果指定的成员已经是排序集的成员,则更新分数并将元素重新插入到正确的位置以确保正确的排序。

演示代码

/**
 * @author z201.coding@gmail.com
 **/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {

    private final static String ORDER_DELAY_KEY = "order:delay";

    public final static int ORDER_DELAY_TIME = 1;

    @Autowired
    @Qualifier("defExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 初始化时加载数据库中需处理超时的订单
     */
    @PostConstruct
    public void init() {
        /*启动一个线程,去取延迟订单*/
        threadPoolTaskExecutor.execute(() -> {
            log.info(" 待处理订单数量 count {} ", size());
            for (; ; ) {
                try {
                    Long startTime = DateTool.conversion(DateTool.localDateTime().plusDays(-1));
                    // 测试下直接获取所有数据。
                    Set<Integer> data = redisTemplate.opsForZSet().rangeByScore(ORDER_DELAY_KEY, 0, DateTool.currentTimeMillis());
                    data.stream().forEach(i -> {
                        //处理超时订单
                        log.info("处理订单 {}", i);
                        removeToOrderDelayQueueById(Long.valueOf(i));
                    });
                } catch (Exception e) {
                    log.error("执行订单的_延迟队列_异常:" + e);
                } finally {
                    try {
                        // 1秒执行一次,实际情况可以快点。
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        log.error("执行订单的_延迟队列_异常:" + e);
                    }
                }
            }
        });
    }

    @PreDestroy
    public void destroy() {
        threadPoolTaskExecutor.shutdown();
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
        log.info("添加订单超时列队 {} ", itemDelayed.getDataId());
        return redisTemplate.opsForZSet().add(ORDER_DELAY_KEY, itemDelayed.getDataId(), itemDelayed.getExpire());
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToDelayQueue(OrderBo order) {
        ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
        return addToOrderDelayQueue(orderDelayed);
    }

    /**
     * 移除列队
     *
     * @param order
     */
    @Override
    public void removeToOrderDelayQueue(OrderBo order) {
        if (null == order || null == order.getId()) {
            return;
        }
        removeToOrderDelayQueueById(order.getId());
    }

    public void removeToOrderDelayQueueById(Long id) {
        if (id == null) {
            return;
        }
        log.info("移除订单超时列队 {}", id);
        redisTemplate.opsForZSet().remove(ORDER_DELAY_KEY, id);
    }

    @Override
    public List<OrderBo> all() {
        List<OrderBo> list = new ArrayList<>();
        Map<String, Long> orderZSetMap = zScan(ORDER_DELAY_KEY, "*", 100);
        orderZSetMap.forEach((key, value) -> {
            list.add(OrderBo.builder()
                    .id(Long.valueOf(key))
                    .orderDeadlineTime(value)
                    .createTime(DateTool.conversion(DateTool.conversion(value).plusMinutes(-ORDER_DELAY_TIME)))
                    .build());
        });
        return list;
    }

    @Override
    public Long size() {
        return redisTemplate.opsForZSet().size(ORDER_DELAY_KEY);
    }

    /**
     * {@link = http://redisdoc.com/database/scan.html#zscan}
     * <p>
     * 可用版本: >= 2.8.0
     * 时间复杂度:增量式迭代命令每次执行的复杂度为 O(1) , 对数据集进行一次完整迭代的复杂度为 O(N) , 其中 N 为数据集中的元素数量。
     *
     * @param key
     * @param pattern
     * @param count
     * @return
     */
    private Map<String, Long> zScan(String key, String pattern, Integer count) {
        Map<String, Long> map = new HashMap<>();
        RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer();
        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        ScanOptions scanOptions = ScanOptions.scanOptions()
                .match(pattern)
                .count(count)
                .build();
        Cursor<RedisZSetCommands.Tuple> cursor = connection.zScan(stringRedisSerializer.serialize(key), scanOptions);
        while (cursor.hasNext()) {
            RedisZSetCommands.Tuple data = cursor.next();
            if (null != data) {
                map.put(stringRedisSerializer.deserialize(data.getValue()), data.getScore().longValue());
            }
        }
        return map;
    }
}


  • Console
curl http://127.0.0.1:9033/add/3
{"code":"200","data":3}%        
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 787 
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 815 
[XNIO-1 task-1] [DelayOrderImpl.java : 83] 添加订单超时列队 689 
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 787
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 787
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 815
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 815
[def-Executor-1] [DelayOrderImpl.java : 57] 处理订单 689
[def-Executor-1] [DelayOrderImpl.java : 113] 移除订单超时列队 689

END

最近更新: 2025/12/27 18:51
Contributors: 庆峰
Prev
Mac 安装 Jenv
Next
SpringBoot MDC日志