RabbitMQ实现延迟队列

Rabbit安装方式,以及对应延迟队列插件安装方式:见:Centos7 安装 RabbitMQ 教程

一、方式一

1.1 配置常量

public class DelayConstant {

/**
* 延迟队列 TTL 名称
* 延迟队列名称
*/
public static final String DELAY_QUEUE_NAME = "delay.queue";

/**
* 延时队列交换机
* 延时消息就是发送到该交换机的
*/
public static final String DELAY_EXCHANGE_NAME = "delay.exchange";

/**
* 延迟路由键
* routing key 名称 路由键
* 具体延时消息发送在该 routingKey 的
*/
public static final String DELAY_ROUTING_KEY = "delay.routing_key";



/**
* 死信队列名称
*/
public static final String DEAD_QUEUE_NAME = "dead.queue";

/**
* 死信队列交换机
* 死信队列交换机 DLX,dead letter发送到的 exchange
*/
public static final String DEAD_EXCHANGE_NAME = "dead.exchange";

/**
* 死信路由键
*/
public static final String DEAD_ROUTING_KEY = "dead.exchange";
}
  • 实体类
@Data
@ApiModel("订单信息")
public class Order implements Serializable {

@ApiModelProperty("订单id")
private Long orderId;

@ApiModelProperty("订单金额")
private Double orderMoney;

@ApiModelProperty("订单信息")
private String orderMsg;

@ApiModelProperty("订单状态")
private Integer orderStatus;

@ApiModelProperty("延迟执行时间/s")
private Integer delayTime;

}

*

1.2 队列配置

@Configuration
public class DelayRabbitConfig {


/**
* 1.死信队列
*/
@Bean
public Queue orderQueue() {
return new Queue(DelayConstant.DEAD_QUEUE_NAME, true);
}

/**
* 2.死信交换机
* 通过死信交换机把死信消息发送到指定的队列中去
* 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
*/
@Bean
public TopicExchange orderTopicExchange() {
return new TopicExchange(DelayConstant.DEAD_EXCHANGE_NAME);
}

/**
* 3.死信队列(绑定交换机)
*/
@Bean
public Binding orderBinding() {
/**
* 将死信队列绑定到死信交换机上,通过死信路由键
* TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
*/
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange())
.with(DelayConstant.DEAD_ROUTING_KEY);
}


/**
* 4.延时队列配置
* <p>
* 1、第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
* params.put("x-message-ttl", 5 * 1000);
* 2、第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
*/
@Bean
public Queue delayOrderQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,即死信消息转发到那个队列
params.put("x-dead-letter-exchange", DelayConstant.DEAD_EXCHANGE_NAME);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", DelayConstant.DEAD_OUTING_KEY);
return new Queue(DelayConstant.DELAY_QUEUE, true, false, false, params);
}

/**
* 5. 延迟交换机
* 延时队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
*/
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange(DelayConstant.DELAY_EXCHANGE_NAME);
}

/**
* 6.将延迟队列绑定到延迟交换机上,通过延迟路由键
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange())
.with(DelayConstant.DELAY_ROUTING_KEY);
}
}

1.3 生产者,消费者

@RestController
@RequestMapping("/delayQueue")
@Slf4j
@Api("无跳过延迟队列")
public class ProductAndConsumer {

@Resource
private RabbitTemplate rabbitTemplate;

/**
* @param order 订单实体信息
*/
@Anonymous
@GetMapping("/product")
public void product(Order order) {
/* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
* 如果发送时候指定的交换器不存在 ack 就是 false 代表消息不可达
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
if (!ack) {
// TODO 消息不可达的后续操作
System.out.println("进行对应的消息补偿机制");
}
});
/* 消息失败的回调
* 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE
*/
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
returnedMessage.getExchange(), returnedMessage.getRoutingKey());
});
// 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理
CorrelationData correlationData = new CorrelationData(order.getOrderId().toString());
rabbitTemplate.convertAndSend(DelayConstant.DELAY_EXCHANGE_NAME,
DelayConstant.DELAY_ROUTING_KEY, order, message -> {
/**
* 如果配置了 params.put("x-message-ttl", 60 * 1000 * 30);
* 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
* 这里为了演示设置为 30 s
*/
message.getMessageProperties().setExpiration(order.getDelayTime() * 1000 + "");
return message;
}, correlationData);
}


@RabbitListener(queues = {DelayConstant.DEAD_QUEUE_NAME})
public void consumer(Order order, Message message, Channel channel) {
System.out.println("###########################################");
System.out.println("【orderDelayQueue 监听的消息】 - 【消费时间】 - ["
+ new Date() + "]- 【订单内容】 - [" + order.toString() + "]");

if (order.getOrderStatus() == 0) {
order.setOrderStatus(2);
System.out.println("【该订单未支付,取消订单】" + order.toString());
} else if (order.getOrderStatus() == 1) {
System.out.println("【该订单已完成支付】");
} else if (order.getOrderStatus() == 2) {
System.out.println("【该订单已取消】");
}
System.out.println("###########################################");
}

}

1.4 缺陷

  • 对于经典订单系统,用户在下单后30分钟未操作,系统将删除用户下的订单;
  • 此时使用传统的定时任务扫描数据库并更改也可以完成,但并不能达到实时更新,且当数据量非常大的时候,是否损耗性能
  • 使用rabbitmq的延迟队列,用户在下单后将订单数据放入延迟队列,并设置30分钟的延迟,到达时间后该操作放入死信队列
  • 以上使用rabbitmq存在一个非常大的问题,加入第一个入队列的订单的延时为100s,第二个加入队列的订单的延时为10s,明显第二个加入队列的更先进入死信队列被消费

  • 但rabbitmq的延迟队列必须按加入顺序出队列。显然是不合理的

二、方式二(推荐)

2.1 配置常量与队列配置

@Configuration
public class DelayMessageConfig {

// 延迟队列名称
public static final String DELAY_EXCHANGE_NAME = "plugin.delay.exchange";
// 延迟队列交换机
public static final String DELAY_QUEUE_NAME = "plugin.delay.queue";
// 延迟队列路由键
public static final String DELAY_ROUTING_KRY = "plugin.delay.routing_key";

/**
* 声明一个延迟队列
* @return
*/
@Bean
Queue delayQueue(){
return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
}

/**
* 声明一个交换机
* @return
*/
@Bean
CustomExchange delayExchange(){

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args);

}
/**
* 绑定
* @param delayQueue
* @param delayExchange
* @return
*/
@Bean
Binding queueBinding(Queue delayQueue, CustomExchange delayExchange){

return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KRY).noargs();

}
}

2.2 生成者与消费者

@RestController
@RequestMapping("/delayQueue2")
@Slf4j
@Api("可跳过延迟队列")
public class ProAndCum {

@Resource
private RabbitTemplate rabbitTemplate;


/**
* @param order 消息
*/
@GetMapping("/product")
public void product(Order order) {
/* 确认的回调 确认消息是否到达 Broker 服务器 其实就是是否到达交换器
* 如果发送时候指定的交换器不存在 ack 就是 false 代表消息不可达
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
if (!ack) {
// TODO 消息不可达,进行相应的c
System.out.println("进行对应的消息补偿机制");
}
});
/* 消息失败的回调
* 例如消息已经到达交换器上,但路由键匹配任何绑定到该交换器的队列,会触发这个回调,此时 replyText: NO_ROUTE
*/
rabbitTemplate.setReturnsCallback(returnedMessage -> {
log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(),
returnedMessage.getExchange(), returnedMessage.getRoutingKey());
});
// 在实际中ID 应该是全局唯一 能够唯一标识消息 消息不可达的时候触发ConfirmCallback回调方法时可以获取该值,进行对应的错误处理
CorrelationData correlationData = new CorrelationData(order.getOrderId().toString());
rabbitTemplate.convertAndSend(DelayMessageConfig.DELAY_EXCHANGE_NAME, DelayMessageConfig.ROUTING_KRY
, order, message -> {
// 设置延迟时间
message.getMessageProperties().setDelay(order.getDelayTime() * 1000);
return message;
}, correlationData);
}

@RabbitListener(queues = {DelayMessageConfig.DELAY_QUEUE_NAME})
public void consumer(Order order, Message message, Channel channel) {
System.out.println("###########################################");
System.out.println("【orderDelayQueue 监听的消息】 - 【消费时间】 - ["
+ new Date() + "]- 【订单内容】 - [" + order.toString() + "]");

if (order.getOrderStatus() == 0) {
order.setOrderStatus(2);
System.out.println("【该订单未支付,取消订单】" + order.toString());
} else if (order.getOrderStatus() == 1) {
System.out.println("【该订单已完成支付】");
} else if (order.getOrderStatus() == 2) {
System.out.println("【该订单已取消】");
}
System.out.println("###########################################");
}

}

2.3 注意事项

  • 使用第二种方式需要rabbitmq安装一个延迟队列插件
  • 具体看延迟队列安装:rabbitmq_delayed_message_exchange插件