醋醋百科网

Good Luck To You!

SpringBoot集成RabbitMQ实战指南_springboot集成flowable工作流

SpringBoot集成RabbitMQ实战指南(增强版)

RabbitMQ 是一个轻量级、高可用的消息中间件,在分布式系统中扮演着“异步解耦、流量削峰、可靠通信”的重要角色。本文将带你从 环境准备 → 基础配置 → 常见模式 → 高级特性 → 可靠性保障 → 实战案例 → 集群与监控,完整掌握 SpringBoot 集成 RabbitMQ 的最佳实践。


一、环境准备

  1. 安装RabbitMQ
  • 本地安装:RabbitMQ官网
  • Docker安装:
docker run -d --hostname my-rabbit --name some-rabbit \
  -p 5672:5672 -p 15672:15672 \
  rabbitmq:3-management
  • 访问管理界面:http://localhost:15672 (默认账号 guest/guest)
  1. 创建SpringBoot项目
  • 使用 Spring Initializr 创建项目
  • 添加依赖:
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 配置连接信息
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

二、基础收发示例

  1. 配置类
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "demo.queue";
    public static final String EXCHANGE_NAME = "demo.exchange";
    public static final String ROUTING_KEY = "demo.routingkey";

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}
  1. 消息生产者
@Component
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_NAME, 
            RabbitMQConfig.ROUTING_KEY, 
            message
        );
        System.out.println("发送消息: " + message);
    }
}
  1. 消息消费者
@Component
public class MessageConsumer {
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("接收消息: " + message);
    }
}
  1. 测试接口
@RestController
@RequestMapping("/rabbit")
public class RabbitMQController {
    @Autowired
    private MessageProducer producer;

    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
        producer.sendMessage(message);
        return "消息发送成功: " + message;
    }
}

三、常见消息模式与应用场景

模式

特点

应用场景

Work Queue

多个消费者竞争消息

异步任务处理(如图片转码)

Fanout(发布订阅)

消息广播给所有队列

日志、系统通知

Direct(路由)

按路由键精准投递

多级日志处理(error/info)

Topic(主题)

支持通配符匹配

电商系统中订单/支付/物流消息分类

1. 工作队列模式

@Configuration
public class WorkQueueConfig {

    public static final String WORK_QUEUE = "work.queue";

    @Bean
    Queue workQueue() {
        return new Queue(WORK_QUEUE, true);
    }
}

// 生产者
public void sendWorkMessage(String message) {
    rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message);
}

// 消费者1
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker1(String message) {
    System.out.println("Worker1 接收消息: " + message);
}

// 消费者2
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker2(String message) {
    System.out.println("Worker2 接收消息: " + message);
}

2. 发布/订阅模式

@Configuration
public class FanoutConfig {

    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    Queue fanoutQueue1() {
        return new Queue(FANOUT_QUEUE1, false);
    }

    @Bean
    Queue fanoutQueue2() {
        return new Queue(FANOUT_QUEUE2, false);
    }

    @Bean
    Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    Binding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3. 路由模式

@Configuration
public class DirectConfig {

    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";
    public static final String ROUTING_KEY1 = "routing.key1";
    public static final String ROUTING_KEY2 = "routing.key2";

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    Queue directQueue1() {
        return new Queue(DIRECT_QUEUE1, false);
    }

    @Bean
    Queue directQueue2() {
        return new Queue(DIRECT_QUEUE2, false);
    }

    @Bean
    Binding binding1(Queue directQueue1, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTING_KEY1);
    }

    @Bean
    Binding binding2(Queue directQueue2, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTING_KEY2);
    }
}

4. 主题模式

 @Configuration
public class TopicConfig {

    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String ROUTING_KEY1 = "topic.key1.*";
    public static final String ROUTING_KEY2 = "topic.key2.#";

    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1, false);
    }

    @Bean
    Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2, false);
    }

    @Bean
    Binding binding1(Queue topicQueue1, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTING_KEY1);
    }

    @Bean
    Binding binding2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTING_KEY2);
    }
}

四、高级特性

  1. 消息确认机制(Confirm & Return)
  • 确保消息可靠投递到交换机/队列
  • 配置 RabbitTemplate.setConfirmCallback 和 setReturnsCallback
  1. 消费者手动ACK
@RabbitListener(queues = "demo.queue")
public void receive(String msg, Channel channel, 
        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    try {
        // 业务逻辑
        channel.basicAck(tag, false);
    } catch (Exception e) {
        channel.basicNack(tag, false, true);
    }
}
  1. 消息序列化(JSON)
@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

五、可靠性保障

  1. 持久化
new Queue("durable.queue", true);
new DirectExchange("durable.exchange", true, false);
  1. 消息TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60秒
return new Queue("ttl.queue", true, false, false, args);
  1. 死信队列(DLQ)
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlq.queue");
  1. 延迟队列(插件实现)
  • 安装 rabbitmq_delayed_message_exchange 插件
  • 声明延迟交换机:
@Bean
CustomExchange delayedExchange() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
  • 发送延迟消息:
rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {
    msg.getMessageProperties().setDelay(5000); // 5秒延迟
    return msg;
});

5.优先级队列

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
return new Queue("priority.queue", true, false, false, args);

六、实战案例

1. 订单超时取消(延迟队列)

  • 用户下单 → 发送延迟消息(30分钟)
  • 若未支付,延迟消息触发 → 取消订单 & 回滚库存

2. 秒杀系统削峰填谷(工作队列)

  • 请求先写入 MQ → 异步处理扣减库存
  • 防止数据库被瞬时高并发打挂

3. 日志采集(Fanout)

  • 应用产生日志 → Fanout广播给 Elasticsearch、Kafka、文件存储等队列

七、集群与高可用配置

  1. 镜像队列(高可用)
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
  1. 负载均衡
  • 使用 Nginx/LB 做 MQ 节点的负载均衡
  • SpringBoot 配置多个地址:
spring:
  rabbitmq:
    addresses: host1:5672,host2:5672,host3:5672

八、监控与排错

  1. RabbitMQ管理界面
    http://localhost:15672 → 查看队列、交换机、消费者状态
  2. Spring Boot Actuator
management:
  endpoints:
    web:
      exposure:
        include: health,metrics,rabbit
  1. Prometheus + Grafana
  • 使用 rabbitmq_exporter
  • 实时监控消息堆积、消费速率、连接数

九、常见问题

  1. 消息重复消费:幂等性(使用 Redis 存储已处理消息ID)
  2. 消息堆积:扩展消费者数量 / 惰性队列 (x-queue-mode=lazy)
  3. 消息顺序:单队列单消费者 / 业务层控制
  4. 连接断开:设置 requested-heartbeat 和 connection-timeout

十、总结

  • 选模式:根据业务场景选择 Work/Fanout/Direct/Topic/延迟/优先级等模式
  • 保可靠:持久化、ACK机制、死信/延迟队列
  • 抗高并发:异步削峰、集群部署、连接池优化
  • 可监控:管理界面 + Actuator + Prometheus

通过以上配置和实践,你可以在 Spring Boot 中灵活、稳定地集成 RabbitMQ,满足绝大多数业务需求。


控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言