1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
@Slf4j @Lazy(false) @Component public class DelayOrderImpl implements DelayOrderI<OrderBo> {
private final static String ORDER_DELAY_KEY = "order:delay";
public final static int ORDER_DELAY_TIME = 1;
@Autowired @Qualifier("defExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired private RedisTemplate redisTemplate;
@PostConstruct public void init() { threadPoolTaskExecutor.execute(() -> { log.info(" 待处理订单数量 count {} ", size()); for (; ; ) { try { Long startTime = DateTool.conversion(DateTool.localDateTime().plusDays(-1)); Set<Integer> data = redisTemplate.opsForZSet().rangeByScore(ORDER_DELAY_KEY, 0, DateTool.currentTimeMillis()); data.stream().forEach(i -> { log.info("处理订单 {}", i); removeToOrderDelayQueueById(Long.valueOf(i)); }); } catch (Exception e) { log.error("执行订单的_延迟队列_异常:" + e); } finally { try { Thread.sleep(1000); } catch (InterruptedException e) { log.error("执行订单的_延迟队列_异常:" + e); } } } }); }
@PreDestroy public void destroy() { threadPoolTaskExecutor.shutdown(); }
@Override public boolean addToOrderDelayQueue(ItemDelayedI<OrderBo> itemDelayed) { log.info("添加订单超时列队 {} ", itemDelayed.getDataId()); return redisTemplate.opsForZSet().add(ORDER_DELAY_KEY, itemDelayed.getDataId(), itemDelayed.getExpire()); }
@Override public boolean addToDelayQueue(OrderBo order) { ItemDelayedI<OrderBo> orderDelayed = new ItemDelayedI<>(order.getId(), order.getCreateTime(), order.getOrderDeadlineTime()); return addToOrderDelayQueue(orderDelayed); }
@Override public void removeToOrderDelayQueue(OrderBo order) { if (null == order || null == order.getId()) { return; } removeToOrderDelayQueueById(order.getId()); }
public void removeToOrderDelayQueueById(Long id) { if (id == null) { return; } log.info("移除订单超时列队 {}", id); redisTemplate.opsForZSet().remove(ORDER_DELAY_KEY, id); }
@Override public List<OrderBo> all() { List<OrderBo> list = new ArrayList<>(); Map<String, Long> orderZSetMap = zScan(ORDER_DELAY_KEY, "*", 100); orderZSetMap.forEach((key, value) -> { list.add(OrderBo.builder() .id(Long.valueOf(key)) .orderDeadlineTime(value) .createTime(DateTool.conversion(DateTool.conversion(value).plusMinutes(-ORDER_DELAY_TIME))) .build()); }); return list; }
@Override public Long size() { return redisTemplate.opsForZSet().size(ORDER_DELAY_KEY); }
private Map<String, Long> zScan(String key, String pattern, Integer count) { Map<String, Long> map = new HashMap<>(); RedisSerializer<String> stringRedisSerializer = redisTemplate.getStringSerializer(); RedisConnection connection = redisTemplate.getConnectionFactory().getConnection(); ScanOptions scanOptions = ScanOptions.scanOptions() .match(pattern) .count(count) .build(); Cursor<RedisZSetCommands.Tuple> cursor = connection.zScan(stringRedisSerializer.serialize(key), scanOptions); while (cursor.hasNext()) { RedisZSetCommands.Tuple data = cursor.next(); if (null != data) { map.put(stringRedisSerializer.deserialize(data.getValue()), data.getScore().longValue()); } } return map; } }
|