胖胖的枫叶
主页
博客
产品设计
企业架构
全栈开发
效率工具
数据分析
项目管理
方法论
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
主页
博客
产品设计
企业架构
全栈开发
效率工具
数据分析
项目管理
方法论
面试
  • openJdk-docs
  • spring-projects-docs
  • mysql-docs
  • redis-commands
  • redis-projects
  • apache-rocketmq
  • docker-docs
  • mybatis-docs
  • netty-docs
  • journaldev
  • geeksforgeeks
  • 后端进阶
  • 并发编程网
  • 英语肌肉记忆锻炼软件
  • 墨菲安全
  • Redisson-docs
  • jmh-Visual
  • 美团技术
  • MavenSearch
  • 标签索引
  • 2024年

    • 配置Mac环境
    • 业务知识会计管理
    • 业务知识会计基础
    • 业务知识什么是财务
  • 2023年

    • 项目 Boi
  • 2022年

    • 企业架构故障管理
    • 企业架构开发债务
  • 2021年

    • Python3.8 Matplotlib员工数据分析
    • Python3.8 Matplotlib IP折线图
    • Python3.8 词云 IP地址
    • Redis RediSearch
    • Rust第一个CLI程序
    • Rust所有权
    • Rust函数与控制流
    • Rust变量与数据类型
    • Rust入门
    • 企业架构分布式系统
    • 编程式权限设计
    • Java JVM优化
    • SpringBoot MyBatis 批量
    • SpringBoot 测试Mock
    • SpringBoot Redis布隆过滤器
    • CentOS7 Jenkins 部署
    • SpringBoot WebClient
    • Docker Drone 部署
    • SpringBoot MyBatis
    • SpringBoot Redisson
    • SpringBoot MyBatis 雪花算法
    • Java Netty
    • Redis 扫描
    • CentOS7 Jenkins本地部署分级
    • Mac 安装 Neo4j Jupyter
    • Mac OpenJDK11 JavaFX 环境
    • Mac 安装 Jenv
    • SpringBoot Redis 延时队列
    • SpringBoot MDC日志
    • SpringBoot 定时任务
    • CentOS7 Nginx GoAccess
    • SpringBoot MyBatis 分析
    • SpringBoot Lucene
    • 企业架构分布式锁
    • 学习技巧减少学习排斥心理
    • SpringBoot 动态数据源
    • Docker Compose SpringBoot MySQL Redis
    • SpringBoot 阻塞队列
    • Docker Compose Redis 哨兵
    • Docker Compose Redis 主从
    • 网络通信
  • 2020年

    • SpringBoot 延时队列
    • MySQL基础(四)
    • Java 雪花算法
    • Redis Geo
    • 网络通信 Tcpdump
    • Spring SPI
    • Java Zookeeper
    • SpringBoot JMH
    • 网络通信 Wireshark
    • Docker Compose Redis MySQL
    • CentOS7 Docker 部署
    • Netty 源码环境搭建
    • MySQL基础(三)
    • CentOS7 Selenium运行环境
    • CentOS7 Nginx HTTPS
    • Java JMH
    • SpringBoot 修改Tomcat版本
    • Java Eureka 钉钉通知
    • SpringBoot 错误钉钉通知
    • Java JVM
    • Git 合并提交
    • CentOS7 OpenResty 部署
  • 2019年

    • Redis CLI
    • CentOS7 Nginx 日志
    • 编程式代码风格
    • IDEA 插件
    • Skywalking 源码环境搭建
    • SpringBoot Redis 超时错误
    • 编程式 gRPC
    • Java Arthas
    • Docker Compose Redis 缓存击穿
    • Docker ElasticSearch5.6.8 部署
    • Docker Mysql5.7 部署
    • Spring Redis 字符串
    • Docker Zookeeper 部署
    • Docker Redis 部署
    • SpringBoot Dubbo
    • CentOS7 CMake 部署
    • 应用程序性能指标
    • Java Code 递归
    • CentOS7 ELK 部署
    • CentOS7 Sonarqube 部署
    • Java Selenium
    • Java JJWT JUnit4
    • Spring 源码环境搭建
    • Java JUnit4
    • Java Web JSON Token
    • 编程式 FastDFS
    • Java XPath
    • Redis基础(二)
    • Redis基础(一)
    • Java MyBatis JUnit4
    • Java MyBatis H2 JUnit4
    • MyBatis 源码环境搭建
    • Git 配置
    • Java 核心
    • Java Dubbo
    • Java JavaCollecionsFramework
    • Java Maven
    • Java MyBatis
    • Java Spring
    • Java SpringMVC
    • MySQL
    • Redis
  • 2018年

    • Java HashMap
    • Java HashSet
    • Java Code 交换值
    • Spring Upgrade SpringBoot
    • Mac 编程环境
    • Java Log4j
    • 网络通信 Modbus
    • MySQL基础(二)
    • MySQL基础(一)
    • Java Stack
    • Java Vector
    • CentOS7 RabbitMQ 部署
    • CentOS7 Redis 部署
    • CentOS7 MongoDB 部署
    • CentOS7 基础命令
    • Java Eureka Zookeeper
    • CentOS7 MySQL 部署
    • Git 分支
    • CentOS7 Java环境配置
    • Java LinkedList
    • Java ArrayList
    • Spring Annotation Aop

SpringBoot 延时队列

突然发现公司订单小哥的消息列队是用redis做的,而且用的是list。这里自己也实现下简单的阻塞列队。用于处理延时消息的问题。

  • 延时列队就是一种带有延迟功能的消息列队。通常具备消息存储、过期消息的实时获取、高可用。消费熔断。
  • 业务场景
    • 订单未支付超时。
    • 订单发货提醒。
    • 短信提醒。
    • 自动收货。
    • 自动评论。
    • 自动取消订单,不发货的情况下。
  • 常用的解决方案
    • 定时轮询任务,比如jdk中TimerThread轮询数据库表、缓存中的数据。频繁的轮询容易出现过度资源消耗。对数据和缓存也有一定的影响。但是可以作为辅助手段,通常用于补偿或者初始化数据。
    • ScheduledExecutorService 周期性线程池
    • 时间轮(kafka、Netty的HashedWheelTimer)
    • 使用mysql通常是定时扫描表,找出符合条件的数据进行处理。消费成功则更新数据。
    • 使用redis可以使用zset,通过分值进行排序。定时轮询的方式去获取符合条件的记录。消费数据后删除消息。失败则重新进入队列。
    • Java中java.util.concurrent.DelayQueue
      • Jdk实现,列队处于jvm中,不支持分布式和消息持久化。
    • Rocketmp延时列队
      • 消息持久、重试、分布式等等特性。
      • 不支持任意时间精度的,支持level级别的延时消息。

DelayQueue

  • Leader-follower 模式

DelayQueue

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
		// 并发控制,可重入锁。
    private final transient ReentrantLock lock = new ReentrantLock();
    // 根据Delay时间排序,优先级列队
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    // 线程指定等待队列头部的元素,标记当前是否有线程在排队(仅用于取元素时)
    private Thread leader = null;
    // 条件,用于阻塞和通知的Condition对象,表示现在是否有可取的元素
    private final Condition available = lock.newCondition();
...

PriorityQueue

public class PriorityQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable {

    private static final long serialVersionUID = -7720805057305804111L;
    // 默认容器大小
    private static final int DEFAULT_INITIAL_CAPACITY = 11;

    // 用数组存储元素
    transient Object[] queue; // non-private to simplify nested class access

    // 记录元素个数
    private int size = 0;

    // 比较器
    private final Comparator<? super E> comparator;
 
    // 安全修改计数,Fail-Fast 机制。
    transient int modCount = 0; // non-private to simplify nested class access
...

offer方法

// 添加元素到列队中
public boolean add(E e) {
    return offer(e);
}

// 添加元素到列队中
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 判断优先级入列
        q.offer(e); 
        // 如果添加的元素是列头,栈顶元素。
        if (q.peek() == e) {
            // leader存储了被阻塞等待列头的线程,
            leader = null;
            // 唤醒在等待条件的线程。
            available.signal();
        }
        // 添加元素返回成功,可以OOM
        return true;
    } finally {
        // 释放锁
        lock.unlock();
    }
}

take()方法

public E take() throws InterruptedException {
        // 获取锁
        final ReentrantLock lock = this.lock;
        // 可中断锁,防止应为await()方法中断异常导致异常退出。
        lock.lockInterruptibly();
        try {
            // 死循环
            for (;;) {
                // 获取列队头元素
                E first = q.peek();
                if (first == null)
                    // 列队头null,继续等待。
                    available.await();
                else {
                    // 获取头元素剩余时间。
                    long delay = first.getDelay(NANOSECONDS);
                    // 如果时间已经生效直接返回头元素。
                    if (delay <= 0)
                        return q.poll();
                    // 时间还没生效,释放当前引用。
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        // leader不是null,应该有其他线程在出列队。继续等待。
                        available.await();
                    else {
                        // 标识当前线程处于等待列头
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 继续等待列头元素。
                            available.awaitNanos(delay);
                        } finally { 
                            // 释放当前线程
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                // 当前线程出队成功,通知其他线程继续执行。
                available.signal();
            // 释放锁
            lock.unlock();
        }
    }

演示代码

/**
 * @author z201.coding@gmail.com
 **/
@Setter
@Getter
public class ItemDelayedI<T> implements Delayed {

    /**
     * 数据id
     */
    private Long dataId;
    /**
     * 开始时间
     */
    private long startTime;
    /**
     * 到期时间
     */
    private long expire;
    /**
     * 泛型data
     */
    private T data;

    private ItemDelayedI() {

    }

    public ItemDelayedI(Long dataId, long startTime, long expire) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = expire;
    }

    @Override
    public int compareTo(Delayed o) {
        // 入队时需要判断任务放到队列的哪个位置,过期时间短的放在前面
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // 和当前时间比较,判断是否过期。
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    // 重写 equals 和 hashcode方法用id作为唯一标志,添加列队的时候做防重复判断。
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        ItemDelayedI<?> that = (ItemDelayedI<?>) o;

        return dataId.equals(that.dataId);
    }

    @Override
    public int hashCode() {
        return dataId.hashCode();
    }
}

工具类

/**
 * @author z201.coding@gmail.com
 **/
@Slf4j
@Lazy(false)
@Component
public class DelayOrderImpl implements DelayOrderI<OrderBo> {

    @Autowired
    @Qualifier("defExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    private final static DelayQueue<ItemDelayedI<OrderBo>> DELAY_QUEUE = new DelayQueue<>();

    /**
     * 初始化时加载数据库中需处理超时的订单
     */
    @PostConstruct
    public void init() {
        /*启动一个线程,去取延迟订单*/
        threadPoolTaskExecutor.execute(() -> {
            ItemDelayedI<OrderBo> orderDelayed;
            // 循环处理
            for (; ; ) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    //处理超时订单
                    log.info("处理超时订单 {} {}", orderDelayed.getDataId(), DateTool.conversionNowFormat());
                } catch (Exception e) {
                    log.error("执行自营超时订单的_延迟队列_异常:" + e);
                }
            }
        });
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) {
        return DELAY_QUEUE.add(itemDelayed);
    }

    /**
     * 加入延迟消息队列
     **/
    @Override
    public boolean addToDelayQueue(OrderBo order) {
        ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime());
        if (DELAY_QUEUE.contains(orderDelayed)) {
            return true;
        }
        log.info("添加订单超时列队 {} {}", JsonTool.toString(order), DateTool.conversionNowFormat());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 移除列队
     *
     * @param order
     */
    @Override
    public void removeToOrderDelayQueue(OrderBo order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayedI<OrderBo> queue = iterator.next();
            if (queue.getDataId().equals(order.getId())) {
                log.info("移除订单超时列队 {} {}", order, DateTool.conversionNowFormat());
                DELAY_QUEUE.remove(queue);
            }
        }
    }

    public void removeToOrderDelayQueueById(Long id) {
        if (id == null) {
            return;
        }
        log.info("移除订单超时列队 {} {}", id, DateTool.conversionNowFormat());
        for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayedI<OrderBo> queue = iterator.next();
            if (queue.getDataId().equals(id)) {
                DELAY_QUEUE.remove(queue);
            }
        }
    }

    @Override
    public List<OrderBo> all() {
        List<OrderBo> list = new ArrayList<>();
        OrderBo orderBoTemp = null;
        for (Iterator<ItemDelayedI<OrderBo>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayedI<OrderBo> queue = iterator.next();
            orderBoTemp = OrderBo.builder()
                    .id(queue.getDataId())
                    .orderDeadlineTime(queue.getExpire())
                    .createTime(queue.getStartTime())
                    .build();
            list.add(orderBoTemp);
        }
        return list;
    }
}

测试代码

/**
 * @author z201.coding@gmail.com
 **/
@RestController
public class AppController {

    @Autowired
    private DelayOrderImpl delayOrder;

    @RequestMapping(value = "add/{id}")
    public Object add(@PathVariable(required = false) Long id) {
        if (null == id) {
            id = RandomUtil.randomLong(10,1000);
        }
        OrderBo orderBo = OrderBo.builder()
                .id(id)
                .createTime(DateTool.currentTimeMillis())
                .orderDeadlineTime(DateTool.currentTimeMillis() + 1 * 60 * 1000).build();
        delayOrder.addToDelayQueue(orderBo);
        Map<String, Object> data = new HashMap<>();
        data.put("code", "200");
        data.put("data", JsonTool.toString(orderBo));
        return data;
    }

    @RequestMapping(value = "list")
    public Object list() {
        List<OrderBo> list = delayOrder.all();
        Map<String, Object> data = new HashMap<>();
        data.put("code", "200");
        data.put("data",list);
        return data;
    }


    @RequestMapping(value = "del/{id}")
    public Object del(@PathVariable(required = false) Long id) {
        delayOrder.removeToOrderDelayQueueById(id);
        List<OrderBo> list = delayOrder.all();
        Map<String, Object> data = new HashMap<>();
        data.put("code", "200");
        data.put("data", list);
        return data;
    }

}

Console

➜  blog curl http://localhost:9016/delayed/add/1
➜  blog curl http://localhost:9016/delayed/add/1
➜  blog curl http://localhost:9016/delayed/add/2
➜  blog curl http://localhost:9016/delayed/add/2
➜  blog curl http://localhost:9016/delayed/list 
# 分别添加两次相同的id,不会重复添加进入。
{"code":"200","data":[{"id":1,"createTime":1643699260192,"orderDeadlineTime":1643699320192},{"id":2,"createTime":1643699263491,"orderDeadlineTime":1643699323491}]}% 

[XNIO-1 I/O-4]  添加订单超时列队 {"id":1,"createTime":1643699260192,"orderDeadlineTime":1643699320192} 2022-02-01 15:07:40
[XNIO-1 I/O-4]  添加订单超时列队 {"id":2,"createTime":1643699263491,"orderDeadlineTime":1643699323491} 2022-02-01 15:07:43
[def-Executor-1]  处理超时订单 1 2022-02-01 15:08:40
[def-Executor-1]  处理超时订单 2 2022-02-01 15:08:43

END

最近更新: 2025/12/27 18:51
Contributors: 庆峰
Prev
网络通信
Next
MySQL基础(四)