首页   

日常工作,MQ的8种常用使用场景

芋道源码  · Java  · 5 天前

正文

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号、ERPCRMAI 大模型等等功能:

  • Boot 多模块架构:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 微服务架构:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 17/21 + SpringBoot 3.3、JDK 8/11 + Spring Boot 2.7 双版本 

来源:捡田螺的小男孩


前言

我们日常开发中,经常跟MQ(消息队列)打交道。本文梳理了MQ的8种使用场景。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

1. 异步处理

面试官在问我们MQ作用时,很多伙伴马上想到异步处理、解耦、流量削锋 等等。

MQ 最常见的应用场景之一就是异步处理

比如,在用户注册场景中,当用户信息保存成功后,系统需要发送一个短信、或者邮箱消息,通知用户注册成功。如果这个短信或者邮箱消息发送比较耗时 ,则可能拖垮注册接口 。又或者如果调用第三方短信、邮件发送接口失败,也会影响注册接口。一般我们不希望一个通知类的功能,去影响注册主流程,这时候,则可以使用MQ来实现异步处理

简要代码如下 :先保存用户信息,然后发送注册成功的MQ消息

  // 用户注册方法
  public void registerUser(String username, String email, String phoneNumber) {
      // 保存用户信息(简化版)
      userService.add(buildUser(username,email,phoneNumber))
      // 发送消息
      String registrationMessage = "User " + username + " has registered successfully.";
      // 发送消息到队列
      rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
  }

消费者从队列中读取消息并发送短信或邮件

@Service
public class NotificationService {

    // 监听消息队列中的消息并发送短信/邮件
    @RabbitListener(queues = "registrationQueue")
    public void handleRegistrationNotification(String message) {
        // 这里可以进行短信或邮件的发送操作
        System.out.println("Sending registration notification: " + message);

        // 假设这里是发送短信的操作
        sendSms(message);

        // 也可以做其他通知(比如发邮件等)
        sendEmail(message);
    }
  }

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

2. 解耦

在微服务架构中,各个服务通常需要进行相互通信 。使用 MQ 可以帮助解耦服务 ,避免直接调用导致的强耦合。

一个电商平台的库存服务和支付服务。支付服务在处理支付后,需要向库存服务发送扣库存的请求,但不直接调用 API,而是通过 MQ 发送消息,让库存服务异步处理。

支付服务在支付成功后将消息发送到 RocketMQ:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class PaymentService {

    private DefaultMQProducer producer;

    public PaymentService() throws Exception {
        producer = new DefaultMQProducer("PaymentProducerGroup");
        producer.setNamesrvAddr("localhost:9876");  // RocketMQ NameServer 地址
        producer.start();
    }

    public void processPayment(String orderId, int quantity) throws Exception {
        // 1. 模拟调用支付接口(例如:支付宝、微信支付等)
        boolean paymentSuccessful = callPayment(orderId, quantity);

        if (paymentSuccessful) {
            // 2. 支付成功后,创建支付消息并发送到 RocketMQ
            String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
            Message message = new Message("paymentTopic""paymentTag", messageBody.getBytes());
            producer.send(message);    
        }
    }
}

库存服务从 RocketMQ 中消费支付消息,并处理扣库存的逻辑:

public class InventoryService {

    private DefaultMQPushConsumer consumer;

    public InventoryService() throws Exception {
        consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("paymentTopic""paymentTag");

        // 消息监听器
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt msg : msgs) {
                String messageBody = new String(msg.getBody());
                // 执行扣库存操作
                reduceStock(messageBody);
            }
            return null// 返回消费成功
        });

        consumer.start();
        System.out.println("InventoryService started...");
    }
}

3. 流量削锋

在高并发的情况下,有些请求可能会产生瞬时流量峰值,直接处理可能会导致服务过载。比如:

  • 春运快到了,12306的抢票就是这种案例。
  • 又或者双12这种大促,订单压力会比较大。
  • 秒杀的时候,也需要避免流量暴涨,打垮应用系统的风险

这些场景,我们都可以使用MQ来进行流量的削峰填谷,确保系统平稳运行。

假设秒杀系统每秒最多可以处理2k个请求,每秒却有5k的请求过来,可以引入消息队列,秒杀系统每秒从消息队列拉2k请求处理得了。

4. 延时任务

在电商平台的订单处理中,如果用户下单后一定时间内未支付,需要自动取消订单。通过MQ的延时队列功能 ,可以设置消息延迟消费的时间,当消息到达延迟时间后,由消费者处理取消订单的逻辑。

当用户下单时,生成一个订单并发送一条延迟消息到RocketMQ。延迟时间可以根据订单的超时时间设置:

@Service
public class OrderService {
 
 @Autowired
 private RocketMQTemplate rocketMQTemplate;
 
 public void createOrder(Order order) {
  // 保存订单逻辑(省略)
 
  // 计算延迟时间(单位:毫秒)
  long delay = order.getTimeout();
 
  // 发送延迟消息
  rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
    MessageBuilder.withPayload(order).build(),
    10000// 消息发送超时时间(单位:毫秒)
    (int) (delay / 1000// RocketMQ的延迟级别是以秒为单位的,因此需要转换为秒
  );
 }
}

注意:RocketMQ的延迟级别是固定的,如1s、5s、10s等。如果订单的延迟时间不是RocketMQ支持的延迟级别的整数倍,那么消息将不会精确地在预期的延迟时间后被消费。为了解决这个问题,你可以选择最接近的延迟级别,或者根据业务需求进行适当的调整。

创建一个用来消费延迟消息的消费者,处理取消订单的逻辑。例如:

@Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener<Order{
 
 @Override
 public void onMessage(Order order) {
  // 取消订单逻辑
  // 检查订单状态,如果订单仍处于未支付状态则进行取消
  System.out.println("Cancelling order: " + order.getOrderId());
  // (省略实际的取消订单逻辑)
 }
}

5. 日志收集

消息队列常用于日志系统中,将应用生成的日志异步地发送到日志处理系统,进行统一存储和分析。

假设你有一个微服务架构,每个微服务都会生成日志。你可以将这些日志通过消息队列(如Kafka)发送到一个集中式的日志收集系统(如 ELK(Elasticsearch, Logstash, Kibana) 或 Fluentd),从而实现日志的统一管理。

生产者(发送日志到 Kafka)

// 配置和发送日志到 Kafka 主题 "app-logs"
KafkaProducer producer = new KafkaProducer<>(config);
String logMessage = "{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>("app-logs""log-key", logMessage));

消费者(收集日志信息)

@Service
public class LogConsumer {
    // 使用 @KafkaListener 注解来消费 Kafka 中的日志
    @KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
    public void consumeLog(String logMessage) {
        // 打印或处理收到的日志
        System.out.println("Received log: " + logMessage);
    }
}

6. 分布式事务

业界经常使用MQ来实现分布式事务。

我举个下订单的场景,使用MQ实现分布式事务的例子吧。

我们先来看,一条普通的MQ消息,从产生到被消费,大概流程如下:

  • 生产者产生消息,发送带MQ服务器
  • MQ收到消息后,将消息持久化到存储系统。
  • MQ服务器返回ACk到生产者。
  • MQ服务器把消息push给消费者
  • 消费者消费完消息,响应ACK
  • MQ服务器收到ACK,认为消息消费成功,即在存储中删除消息。

回到下订单 这个例子,订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致。

这时候就可以使用MQ实现分布式事务消息。大家看下这个流程:

  1. 生产者产生消息,发送一条半事务消息到MQ服务器
  2. MQ收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态。
  3. MQ服务器返回ACK确认到生产者,此时MQ不会触发消息推送事件
  4. 生产者执行本地事务
  5. 如果本地事务执行成功,即commit执行结果到MQ服务器;如果执行失败,发送rollback。
  6. 如果是正常的commit,MQ服务器更新消息状态为可发送;如果是rollback,即删除消息。
  7. 如果消息状态更新为可发送,则MQ服务器会push消息给消费者。消费者消费完就回ACK。
  8. 如果MQ服务器长时间没有收到生产者的commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态。

7. 远程调用

我以前公司(微众)基于MQ(RocketMQ),自研了远程调用框架

RocketMQ 作为远程调用框架,主要就是金融场景的适配性。

  • 消息查询功能 :RocketMQ提供了消息查询功能,方便微众银行在需要时进行消息对账或问题排查。
  • 金融级稳定性 :RocketMQ在金融领域的应用非常广泛,得到了众多金融机构的认可。其稳定性和可靠性能够满足微众银行对金融级消息服务的需求。

还有可以基于RocketMQ的定制开发:多中心多活、灰度发布、流量权重与消息去重、背压模式 (能够根据后续服务的治理能力决定拉取的消息数量,确保系统的稳定运行。)

8. 广播通知:事件驱动的消息通知

消息队列(MQ) 可以非常适合用于 广播通知。在广播通知场景中,消息队列可以将消息推送到多个订阅者,让不同的服务或者应用接收到通知。

  • 系统通知 :向所有用户广播应用更新、系统维护、公告通知等。
  • 事件驱动的消息通知 :如库存更新、用户状态变化、订单支付成功等事件通知,多个系统可以订阅这个事件。

针对事件驱动的消息通知,我们以 订单支付成功 事件为例,假设多个系统(如库存管理系统、用户积分系统、财务系统等)都需要监听这个事件来进行相应处理。

当订单支付成功 事件发生时,系统会通过消息队列广播一个事件通知(比如消息内容是订单ID、支付金额等),其他系统可以根据这个事件来执行相应的操作,如:

  • 库存系统:根据订单信息减少库存。
  • 用户积分系统:增加用户积分。
  • 财务系统:记录支付流水。

发送订单支付成功事件:

// 创建订单支付成功事件消息
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic""order_payment_success", orderEventData.getBytes());

// 发送消息
producer.send(msg);

事件消费者(接收并处理订单支付成功事件):

  • 库存系统:
    // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Inventory system received: " + eventData);
                
                // 处理库存减少逻辑
                // 解析消息(假设是 JSON 格式)
                // updateInventory(eventData);  // 假设调用库存更新方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
  • 积分系统:
        // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Points system received: " + eventData);

                // 处理用户积分增加逻辑
                // updateUserPoints(eventData);  // 假设调用积分更新方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
  • 财务系统:
  // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (Message msg : msgs) {
                String eventData = new String(msg.getBody());
                System.out.println("Finance system received: " + eventData);

                // 处理财务记录逻辑
                // recordPaymentTransaction(eventData);  // 假设调用财务记录方法
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });


欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

© 2024 精读
删除内容请联系邮箱 2879853325@qq.com