RabbitMQ笔记
使用RabbitMQ实现订单超时管理
方案分析
- JDK延迟队列
- 定时任务
- 被动取消
- Redis Sorted Set
- Redis事件通知
- 时间轮算法
- RabbitMQ
JDK延迟队列
该方案是利用JDK自带的JUC包中的DelayQueue队列。
pubilc class DelayQueue extends AbstractQueue implements BlockingQueue
这是一个无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素,放入DelayQueue中的对象,必须实现Delayed接口

- offer(): 添加元素
- poll(): 获取并移出队列的超时元素,没有则返回空
- take(): 获取并移出队列的超时元素,没有则wait当前线程,直到有元素满足超时条件时返回结果
定时任务
这种方式是最简单的,启动一个计划任务,每隔一定时间(假设1分钟)去扫描数据库一次,通过订单时间来判断是否超时,然后进行UPDATE 或DELETE操作。如Quartz
被动取消
利用懒加载的思想,当用户或商户查询订单时,再判断该订单是否超时,超时则进行业务处理。
这种方式依赖于用户的查询操作触发,如果用户不进行查询订单的操作,该订单就永远不会被取消。所以,实际应用中,也是被动取消+定时任务的组合方式来实现。这种情况下定时任务的时间可以设置的稍微“长”一点。
Redis Sorted Set
Redis有序集合(Sorted Set)每个元素都会关联一个double类型的分数score。Redis可以通过分数来为集合中的成员进行从小到大的排序。
添加元素: ZADD key score member [[score member] [score member]...] 按顺序查询元素: ZRANGE key start end [WITHSCORES] 查询元素score: ZSCORE key member 移出元素: ZREM key member [member ...]
该方案可以将订单超时时间戳与订单编号分别设置为score和member。系统扫描第一个元素判断是否超时,超时则进行业务处理。
然而,这一版存在一个致命的硬伤,在高并发条件下,多个消费者会取到同一个订单编号,又需要编写Lua脚本保证原子性或使用分布式锁,用了分布式锁性能又下降了。
RabbitMQ
RabbitMQ is the most widely deployed open source message broker
延迟队列
队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
延迟队列,最重要的特性就体现在它的延时属性上,跟普通队列不一样的是,普通队列中的元素总是等着希望被早点取出消费,而延迟队列中的元素则是希望在指定时间被取出消费,所以延迟队列中的元素是都是带时间属性的。
简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
本文使用RabbitMQ也是通过延迟队列的机制来实现订单超时的处理。然而 RabbitMg自身并没有延迟队列这个功能,实现该功能一般有以下两种方式:
- 利用TTL(Time To Live) 和 DLX(Dead Letter Exchanges)实现延迟队列
- 利用延迟队列插件rabbitmq_delayed_message_exchange实现
RabbitMQ延迟队列实现
实现方案一

- RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
// 声明 4个路由key 4个队列 2个交换机 属性
// 延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列
public static final String DELAY_Queue_A_NAME = "delay.queue.a";
public static final String DELAY_Queue_B_NAME = "delay.queue.b";
// 延迟队列路由key
public static final String DELAY_Queue_A_ROUTING_KEY = "delay.queue.a.routingkey";
public static final String DELAY_Queue_B_ROUTING_KEY = "delay.queue.b.routingkey";
// 死信交换机
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
// 死信队列
public static final String DEAD_LETTER_Queue_A_NAME = "dead.letter.queue.a";
public static final String DEAD_LETTER_Queue_B_NAME = "dead.letter.queue.b";
// 死信队列路由key
public static final String DEAD_LETTER_Queue_A_ROUTING_KEY = "dead.letter.delay_10s.routingkey";
public static final String DEAD_LETTER_Queue_B_ROUTING_KEY = "dead.letter.delay_60s.routingkey";
// 声明延迟交换机
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 声明延迟队列A 延迟10s 并且绑定到对应的死信交换机中
@Bean
public Queue delayQueueA() {
Map args = new HashMap();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key", DEAD_LETTER_Queue_A_ROUTING_KEY);
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(DELAY_Queue_A_NAME).withArguments(args).build();
}
// 声明延迟队列A 延迟60s 并且绑定到对应的死信交换机中
@Bean
public Queue delayQueueB() {
Map args = new HashMap();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key", DEAD_LETTER_Queue_B_ROUTING_KEY);
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(DELAY_Queue_B_NAME).withArguments(args).build();
}
// 声明延迟队列A的绑定关系
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_A_ROUTING_KEY);
}
// 声明延迟队列A的绑定关系
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_B_ROUTING_KEY);
}
// 声明死信队列A,用于接收延迟10s处理的消息
@Bean
public Queue deadLetterQueueA() {
return new Queue(DEAD_LETTER_Queue_A_NAME);
}
// 声明死信队列B,用于接收延迟60s处理的消息
@Bean
public Queue deadLetterQueueB() {
return new Queue(DEAD_LETTER_Queue_B_NAME);
}
// 声明死信队列A的绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_A_ROUTING_KEY);
}
// 声明死信队列B的绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_B_ROUTING_KEY);
}
}
- 枚举类
@Getter
@AllArgsConstructor
public enum DelayTypeEnum {
DELAY_10s(1),
DELAY_60s(2);
private Integer type;
public static DelayTypeEnum getDelayTypeEnum(Integer type) {
if (Objects.equals(type, DELAY_10s.type)) {
return DELAY_10s;
}
if (Objects.equals(type, DELAY_60s.type)) {
return DELAY_60s;
}
return null;
}
}
- 生产者
@Component
public class DelayMsgProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String msg, DelayTypeEnum type) {
switch (type) {
case DELAY_10s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_Queue_A_ROUTING_KEY, msg);
break;
case DELAY_60s:
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_Queue_B_ROUTING_KEY, msg);
break;
default:
}
}
}
- 消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {
// 监听死信队列A
@RabbitListener(queues = DEAD_LETTER_Queue_A_NAME)
public void receiveA(Message message) {
// 获取消息
String msg = new String(message.getBody());
// 记录日志
log.info("当前时间: {}, 死信队列A收到消息: {}", LocalDateTime.now(), msg);
}
// 监听死信队列B
@RabbitListener(queues = DEAD_LETTER_Queue_B_NAME)
public void receiveB(Message message) {
// 获取消息
String msg = new String(message.getBody());
// 记录日志
log.info("当前时间: {}, 死信队列B收到消息: {}", LocalDateTime.now(), msg);
}
}
- 测试


实现方案二
方案一的问题可以通过将TTL设置在消息属性里来解决,然后添加一个延迟队列,用于接收设置为任意延迟时长的消息,再添加一个相应的死信队列和routingkey 即可,如下图:

- RabbitMQ配置文件
@Configuration
public class RabbitMQConfig {
// 声明 2个交换机 2个队列 2个路由key 属性
// 延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列
public static final String DELAY_Queue_NAME = "delay.queue";
// 延迟队列路由key
public static final String DELAY_Queue_ROUTING_KEY = "delay.queue.routingkey";
// 死信交换机
public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
// 死信队列
public static final String DEAD_LETTER_Queue_NAME = "dead.letter.queue";
// 死信队列路由key
public static final String DEAD_LETTER_Queue_ROUTING_KEY = "dead.letter.routingkey";
// 声明延迟交换机
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 声明死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
}
// 声明延迟队列 并且绑定到对应的死信交换机中
@Bean
public Queue delayQueue() {
Map args = new HashMap();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
args.put("x-dead-letter-routing-key", DEAD_LETTER_Queue_ROUTING_KEY);
return QueueBuilder.durable(DELAY_Queue_NAME).withArguments(args).build();
}
// 声明延迟队列的绑定关系
@Bean
public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY);
}
// 声明死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_Queue_NAME);
}
// 声明死信队列的绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueue") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_Queue_ROUTING_KEY);
}
}
- 生产者
@Component
public class DelayMsgProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String message, String delayTime) {
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_Queue_ROUTING_KEY, message, msg -> {
// 设置消息到期时间
msg.getMessageProperties().setExpiration(delayTime);
return msg;
});
}
}
- 消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {
// 监听死信队列
@RabbitListener(queues = DEAD_LETTER_Queue_NAME)
public void receiveA(Message message) {
// 获取消息
String msg = new String(message.getBody());
// 记录日志
log.info("当前时间: {}, 死信队列收到消息: {}", LocalDateTime.now(), msg);
}
}
- controller
@RestController
@Slf4j
@RequestMapping("/rabbitmq")
public class RabbitMQController {
@Resource
private DelayMsgProducer producer;
@ApiOperation(value = "发送消息")
@GetMapping("/send/{msg}/{delayTime}")
public void send(@PathVariable String msg,
@PathVariable String delayTime) {
log.info("当前时间: {}, 消息: {}, 延迟时间: {}", LocalDateTime.now(), msg, delayTime);
producer.send(msg, delayTime);
}
}
- 测试

问题:10秒的消息没有被提前释放,发送两条消息,如果后面的消息过期时间比前面的短,则要等着前面消息的过期时间到了再一起被释放
实现方案三(插件)
linux环境docker安装rabbitmq插件
- 下载插件
- docker run -d –name rabbitmq -p 5672:5672 -p 15672:15672 –hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.8.5
- docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez b40e96969299:/plugins/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
- docker exec -it rabbitmq /bin/bash
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
方案二的问题可以通过安装RabbitMQ的社区插件rabbitmq_delayed_message_exchange来解决。
安装插件后会生成新的Exchange类型x-delayed-message,该类型消息支持延迟投递机制,接收到消息后并未立即将消息投递至目标队列,而是存储在mnesia (一个分布式数据库)中,随后监测消息延迟时间,如达到可投递时间时将其通过x-delayed-type类型标记的交换机投递至目标队列。


- 配置类
@Configuration
public class RabbitMQConfig {
// 声明 1个交换机 1个队列 1个路由key 属性
// 延迟交换机
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
// 延迟队列
public static final String DELAY_Queue_NAME = "delay.queue";
// 延迟队列路由key
public static final String DELAY_Queue_ROUTING_KEY = "delay.queue.routingkey";
// 声明延迟交换机
@Bean
public CustomExchange delayExchange() {
Map args = new HashMap();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 声明延迟队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_Queue_NAME).build();
}
// 声明延迟队列的绑定关系
@Bean
public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY).noargs();
}
}
- 生产者
@Component
public class DelayMsgProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void send(String message, Integer delayTime) {
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_Queue_ROUTING_KEY, message, msg -> {
// 设置消息到期时间
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
}
- 消费者
@Component
@Slf4j
public class DeadLetterQueueConsumer {
// 监听死信队列
@RabbitListener(queues = RabbitMQConfig.DELAY_Queue_NAME)
public void receiveA(Message message) {
// 获取消息
String msg = new String(message.getBody());
// 记录日志
log.info("当前时间: {}, 延迟队列收到消息: {}", LocalDateTime.now(), msg);
}
}
- 测试
–

没有因为前面一条消息延迟时间长而影响下一条消息
订单超时实战
- yml
spring: rabbitmq: host: 192.168.183.139 port: 5672 username: admin password: admin virtual-host: my_vhost datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl map-underscore-to-camel-case: true mapper-locations: classpath:mapper/*.xml type-aliases-package: com.zdz.entity global-config: db-config: id-type: auto# 自定义配置order: delay: time: 60000 # 1分钟 单位毫秒
- enum
@Getter@AllArgsConstructorpublic enum OrderStatus { //0待确定 1已确定 2已收获 3已取消 4已完成 5已作废 no_confirm(0, "待确定"), has_confirm(1, "已确定"), has_receive(2, "已收获"), cancel(3, "已取消 "), complete(4, "已完成"), discard(5, "已作废"); private Integer status; private String message;}@Getter@AllArgsConstructorpublic enum PayStatus { //0 等待支付 1 已支付 2 部分支付 no_pay(0, "等待支付"), has_pay(1, "已支付"), part_pay(2, "部分支付"); private Integer status; //状态 private String message; //描述}- 配置类
@Configurationpublic class RabbitMQConfig { // 声明 1个交换机 1个队列 1个路由key 属性 // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_Queue_NAME = "delay.queue"; // 延迟队列路由key public static final String DELAY_Queue_ROUTING_KEY = "delay.queue.routingkey"; // 声明延迟交换机 @Bean public CustomExchange delayExchange() { Map args = new HashMap(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args); } // 声明延迟队列 @Bean public Queue delayQueue() { return QueueBuilder.durable(DELAY_Queue_NAME).build(); } // 声明延迟队列的绑定关系 @Bean public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_Queue_ROUTING_KEY).noargs(); }}- service
@Servicepublic class OrderServiceImpl extends ServiceImpl implements OrderService{ @Resource private OrderMapper orderMapper; @Resource private DelayMsgProducer producer; @Value("${order.delay.time}") private Integer orderDelayTime; /** * 新增订单 * @param order * @return */ @Transactional @Override public Map saveOrder(Order order) { // 订单编号 order.setOrderSn(IdUtil.getSnowflake(1, 1).nextIdStr()); // 订单状态 0 待支付 order.setOrderStatus(OrderStatus.no_confirm.getStatus()); // 支付状态 0 等待支付 order.setPayStatus(PayStatus.no_pay.getStatus()); // 下单时间 order.setOrderTime(new Date()); // 新增订单 int insert = orderMapper.insert(order); Map map = new HashMap(); if (insert > 0) { map.put("code", 200); map.put("msg", "订单已提交"); // 发送消息到队列, 设置消息延迟时间 producer.send(order.getOrderSn(), orderDelayTime); }else { map.put("code", 400); map.put("msg", "订单提交失败"); } return map; } /** * 根据用户id查询订单列表 * @param userId * @return */ public List getAllByUserId(Integer userId) { QueryWrapper queryWrapper = new QueryWrapper(); queryWrapper.eq("user_id", userId); return orderMapper.selectList(queryWrapper); }}- controller
@PostMapping("/save") @ApiOperation(value = "提交订单") public Map saveOrder(@RequestBody Order order) { log.info("订单信息: {}", order); return orderService.saveOrder(order); }- producer
@Component@Slf4jpublic class DelayMsgProducer { @Resource private RabbitTemplate rabbitTemplate; public void send(String message, Integer delayTime) { log.info("发送订单编号到队列,当前时间: {}, 订单编号: {}", LocalDateTime.now(), message); rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_Queue_ROUTING_KEY, message, msg -> { // 设置消息到期时间 msg.getMessageProperties().setDelay(delayTime); return msg; }); }}- consumer
@Component@Slf4jpublic class DeadLetterQueueConsumer { @Resource private OrderMapper orderMapper; @Resource private OrderActionMapper orderActionMapper; // 监听延迟队列 @RabbitListener(queues = RabbitMQConfig.DELAY_Queue_NAME) public void receiveA(Message message) { // 获取消息 String orderSn = new String(message.getBody()); // 记录日志 log.info("当前时间: {}, 订单编号: {}", LocalDateTime.now(), orderSn); // 根据订单编号查询订单 QueryWrapper wrapper = new QueryWrapper(); wrapper.eq("order_sn", orderSn); Order order = orderMapper.selectOne(wrapper); log.info("订单信息: {}", order); // 如果订单不为空并且支付状态是未支付并且订单状态是待确认 if (order != null && OrderStatus.no_confirm.getStatus().equals(order.getOrderStatus()) && PayStatus.no_pay.getStatus().equals(order.getPayStatus())) { // 设置订单状态为3 已取消 order.setOrderStatus(OrderStatus.cancel.getStatus()); // 根据订单编写修改订单 int result = orderMapper.updateById(order); if (result > 0) { OrderAction orderAction = new OrderAction(); orderAction.setOrderSn(orderSn); orderAction.setOrderStatus(OrderStatus.cancel.getStatus()); orderAction.setPayStatus(PayStatus.no_pay.getStatus()); orderAction.setActionNote("支付超时,订单已取消"); orderAction.setActionTime(new Date()); orderAction.setStatusDesc("支付超时,订单已取消"); // 新增订单操作 orderActionMapper.insert(orderAction); } } }}- 测试

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/d4b45209fd.html
