我们日常开发中,经常跟MQ(消息队列)打交道。本文梳理了MQ的8种使用场景。
![]()
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
- 视频教程:https://doc.iocoder.cn/video/
面试官在问我们MQ作用时,很多伙伴马上想到异步处理、解耦、流量削锋 等等。
MQ 最常见的应用场景之一就是异步处理 。
比如,在用户注册场景中,当用户信息保存成功后,系统需要发送一个短信、或者邮箱消息,通知用户注册成功。如果这个短信或者邮箱消息发送比较耗时 ,则可能拖垮注册接口 。又或者如果调用第三方短信、邮件发送接口失败,也会影响注册接口。一般我们不希望一个通知类的功能,去影响注册主流程,这时候,则可以使用MQ来实现异步处理 。
简要代码如下 :先保存用户信息,然后发送注册成功的MQ消息
// 用户注册方法
public void registerUser(String username, String email, String phoneNumber) {
// 保存用户信息(简化版)
userService.add(buildUser(username,email,phoneNumber))
// 发送消息
String registrationMessage = "User " + username + " has registered successfully.";
// 发送消息到队列
rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
}
消费者从队列中读取消息并发送短信或邮件 :
@Service
public class NotificationService {
// 监听消息队列中的消息并发送短信/邮件
@RabbitListener(queues = "registrationQueue")
public void handleRegistrationNotification(String message) {
// 这里可以进行短信或邮件的发送操作
System.out.println("Sending registration notification: " + message);
// 假设这里是发送短信的操作
sendSms(message);
// 也可以做其他通知(比如发邮件等)
sendEmail(message);
}
}
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
- 项目地址:https://github.com/YunaiV/yudao-cloud
- 视频教程:https://doc.iocoder.cn/video/
在微服务架构中,各个服务通常需要进行相互通信 。使用 MQ 可以帮助解耦服务 ,避免直接调用导致的强耦合。
![]()
一个电商平台的库存服务和支付服务。支付服务在处理支付后,需要向库存服务发送扣库存的请求,但不直接调用 API,而是通过 MQ 发送消息,让库存服务异步处理。
支付服务在支付成功后将消息发送到 RocketMQ:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class PaymentService {
private DefaultMQProducer producer;
public PaymentService() throws Exception {
producer = new DefaultMQProducer("PaymentProducerGroup");
producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址
producer.start();
}
public void processPayment(String orderId, int quantity) throws Exception {
// 1. 模拟调用支付接口(例如:支付宝、微信支付等)
boolean paymentSuccessful = callPayment(orderId, quantity);
if (paymentSuccessful) {
// 2. 支付成功后,创建支付消息并发送到 RocketMQ
String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
Message message = new Message("paymentTopic", "paymentTag", messageBody.getBytes());
producer.send(message);
}
}
}
库存服务从 RocketMQ 中消费支付消息,并处理扣库存的逻辑:
public class InventoryService {
private DefaultMQPushConsumer consumer;
public InventoryService() throws Exception {
consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("paymentTopic", "paymentTag");
// 消息监听器
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String messageBody = new String(msg.getBody());
// 执行扣库存操作
reduceStock(messageBody);
}
return null; // 返回消费成功
});
consumer.start();
System.out.println("InventoryService started...");
}
}
在高并发的情况下,有些请求可能会产生瞬时流量峰值,直接处理可能会导致服务过载。比如:
- 秒杀的时候,也需要避免流量暴涨,打垮应用系统的风险
这些场景,我们都可以使用MQ来进行流量的削峰填谷,确保系统平稳运行。
![]()
假设秒杀系统每秒最多可以处理2k个请求,每秒却有5k的请求过来,可以引入消息队列,秒杀系统每秒从消息队列拉2k请求处理得了。
在电商平台的订单处理中,如果用户下单后一定时间内未支付,需要自动取消订单。通过MQ的延时队列功能 ,可以设置消息延迟消费的时间,当消息到达延迟时间后,由消费者处理取消订单的逻辑。
当用户下单时,生成一个订单并发送一条延迟消息到RocketMQ。延迟时间可以根据订单的超时时间设置:
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) {
// 保存订单逻辑(省略)
// 计算延迟时间(单位:毫秒)
long delay = order.getTimeout();
// 发送延迟消息
rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
MessageBuilder.withPayload(order).build(),
10000, // 消息发送超时时间(单位:毫秒)
(int) (delay / 1000) // RocketMQ的延迟级别是以秒为单位的,因此需要转换为秒
);
}
}
注意:RocketMQ的延迟级别是固定的,如1s、5s、10s等。如果订单的延迟时间不是RocketMQ支持的延迟级别的整数倍,那么消息将不会精确地在预期的延迟时间后被消费。为了解决这个问题,你可以选择最接近的延迟级别,或者根据业务需求进行适当的调整。
创建一个用来消费延迟消息的消费者,处理取消订单的逻辑。例如:
@Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 取消订单逻辑
// 检查订单状态,如果订单仍处于未支付状态则进行取消
System.out.println("Cancelling order: " + order.getOrderId());
// (省略实际的取消订单逻辑)
}
}
消息队列常用于日志系统中,将应用生成的日志异步地发送到日志处理系统,进行统一存储和分析。
假设你有一个微服务架构,每个微服务都会生成日志。你可以将这些日志通过消息队列(如Kafka)发送到一个集中式的日志收集系统(如 ELK(Elasticsearch, Logstash, Kibana) 或 Fluentd),从而实现日志的统一管理。
生产者(发送日志到 Kafka)
// 配置和发送日志到 Kafka 主题 "app-logs"
KafkaProducer producer = new KafkaProducer<>(config);
String logMessage = "{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>("app-logs", "log-key", logMessage));
消费者(收集日志信息)
@Service
public class LogConsumer {
// 使用 @KafkaListener 注解来消费 Kafka 中的日志
@KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
public void consumeLog(String logMessage) {
// 打印或处理收到的日志
System.out.println("Received log: " + logMessage);
}
}
业界经常使用MQ
来实现分布式事务。
我举个下订单的场景,使用MQ实现分布式事务的例子吧。
我们先来看,一条普通的MQ消息,从产生到被消费,大概流程如下:
![]()
- MQ服务器收到ACK,认为消息消费成功,即在存储中删除消息。
回到下订单 这个例子,订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致。
这时候就可以使用MQ实现分布式事务消息。大家看下这个流程:
![]()
- MQ收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态。
- MQ服务器返回ACK确认到生产者,此时MQ不会触发消息推送事件
- 如果本地事务执行成功,即commit执行结果到MQ服务器;如果执行失败,发送rollback。
- 如果是正常的commit,MQ服务器更新消息状态为可发送;如果是rollback,即删除消息。
- 如果消息状态更新为可发送,则MQ服务器会push消息给消费者。消费者消费完就回ACK。
- 如果MQ服务器长时间没有收到生产者的commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态。
我以前公司(微众)基于MQ(RocketMQ),自研了远程调用框架 。
RocketMQ 作为远程调用框架,主要就是金融场景的适配性。
- 消息查询功能 :RocketMQ提供了消息查询功能,方便微众银行在需要时进行消息对账或问题排查。
- 金融级稳定性 :RocketMQ在金融领域的应用非常广泛,得到了众多金融机构的认可。其稳定性和可靠性能够满足微众银行对金融级消息服务的需求。
还有可以基于RocketMQ的定制开发:多中心多活、灰度发布、流量权重与消息去重、背压模式 (能够根据后续服务的治理能力决定拉取的消息数量,确保系统的稳定运行。)
消息队列(MQ) 可以非常适合用于 广播通知。在广播通知场景中,消息队列可以将消息推送到多个订阅者,让不同的服务或者应用接收到通知。
- 系统通知 :向所有用户广播应用更新、系统维护、公告通知等。
- 事件驱动的消息通知 :如库存更新、用户状态变化、订单支付成功等事件通知,多个系统可以订阅这个事件。
针对事件驱动的消息通知,我们以 订单支付成功 事件为例,假设多个系统(如库存管理系统、用户积分系统、财务系统等)都需要监听这个事件来进行相应处理。
![]()
当订单支付成功 事件发生时,系统会通过消息队列广播一个事件通知(比如消息内容是订单ID、支付金额等),其他系统可以根据这个事件来执行相应的操作,如:
发送订单支付成功事件:
// 创建订单支付成功事件消息
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic", "order_payment_success", orderEventData.getBytes());
// 发送消息
producer.send(msg);
事件消费者(接收并处理订单支付成功事件):
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Inventory system received: " + eventData);
// 处理库存减少逻辑
// 解析消息(假设是 JSON 格式)
// updateInventory(eventData); // 假设调用库存更新方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Points system received: " + eventData);
// 处理用户积分增加逻辑
// updateUserPoints(eventData); // 假设调用积分更新方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Finance system received: " + eventData);
// 处理财务记录逻辑
// recordPaymentTransaction(eventData); // 假设调用财务记录方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });