SpringBoot集成RabbitMQ实战指南(增强版)
RabbitMQ 是一个轻量级、高可用的消息中间件,在分布式系统中扮演着“异步解耦、流量削峰、可靠通信”的重要角色。本文将带你从 环境准备 → 基础配置 → 常见模式 → 高级特性 → 可靠性保障 → 实战案例 → 集群与监控,完整掌握 SpringBoot 集成 RabbitMQ 的最佳实践。
一、环境准备
- 安装RabbitMQ
- 本地安装:RabbitMQ官网
- Docker安装:
docker run -d --hostname my-rabbit --name some-rabbit \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
- 访问管理界面:http://localhost:15672 (默认账号 guest/guest)
- 创建SpringBoot项目
- 使用 Spring Initializr 创建项目
- 添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置连接信息
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
二、基础收发示例
- 配置类
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "demo.queue";
public static final String EXCHANGE_NAME = "demo.exchange";
public static final String ROUTING_KEY = "demo.routingkey";
@Bean
Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean
DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
- 消息生产者
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
RabbitMQConfig.ROUTING_KEY,
message
);
System.out.println("发送消息: " + message);
}
}
- 消息消费者
@Component
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("接收消息: " + message);
}
}
- 测试接口
@RestController
@RequestMapping("/rabbit")
public class RabbitMQController {
@Autowired
private MessageProducer producer;
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
producer.sendMessage(message);
return "消息发送成功: " + message;
}
}
三、常见消息模式与应用场景
模式 | 特点 | 应用场景 |
Work Queue | 多个消费者竞争消息 | 异步任务处理(如图片转码) |
Fanout(发布订阅) | 消息广播给所有队列 | 日志、系统通知 |
Direct(路由) | 按路由键精准投递 | 多级日志处理(error/info) |
Topic(主题) | 支持通配符匹配 | 电商系统中订单/支付/物流消息分类 |
1. 工作队列模式
@Configuration
public class WorkQueueConfig {
public static final String WORK_QUEUE = "work.queue";
@Bean
Queue workQueue() {
return new Queue(WORK_QUEUE, true);
}
}
// 生产者
public void sendWorkMessage(String message) {
rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, message);
}
// 消费者1
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker1(String message) {
System.out.println("Worker1 接收消息: " + message);
}
// 消费者2
@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)
public void worker2(String message) {
System.out.println("Worker2 接收消息: " + message);
}
2. 发布/订阅模式
@Configuration
public class FanoutConfig {
public static final String FANOUT_EXCHANGE = "fanout.exchange";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1, false);
}
@Bean
Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2, false);
}
@Bean
Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
Binding binding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
3. 路由模式
@Configuration
public class DirectConfig {
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
public static final String ROUTING_KEY1 = "routing.key1";
public static final String ROUTING_KEY2 = "routing.key2";
@Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
Queue directQueue1() {
return new Queue(DIRECT_QUEUE1, false);
}
@Bean
Queue directQueue2() {
return new Queue(DIRECT_QUEUE2, false);
}
@Bean
Binding binding1(Queue directQueue1, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue1).to(directExchange).with(ROUTING_KEY1);
}
@Bean
Binding binding2(Queue directQueue2, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue2).to(directExchange).with(ROUTING_KEY2);
}
}
4. 主题模式
@Configuration
public class TopicConfig {
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String ROUTING_KEY1 = "topic.key1.*";
public static final String ROUTING_KEY2 = "topic.key2.#";
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, false);
}
@Bean
Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, false);
}
@Bean
Binding binding1(Queue topicQueue1, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with(ROUTING_KEY1);
}
@Bean
Binding binding2(Queue topicQueue2, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with(ROUTING_KEY2);
}
}
四、高级特性
- 消息确认机制(Confirm & Return)
- 确保消息可靠投递到交换机/队列
- 配置 RabbitTemplate.setConfirmCallback 和 setReturnsCallback
- 消费者手动ACK
@RabbitListener(queues = "demo.queue")
public void receive(String msg, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 业务逻辑
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
- 消息序列化(JSON)
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
五、可靠性保障
- 持久化
new Queue("durable.queue", true);
new DirectExchange("durable.exchange", true, false);
- 消息TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60秒
return new Queue("ttl.queue", true, false, false, args);
- 死信队列(DLQ)
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlq.queue");
- 延迟队列(插件实现)
- 安装 rabbitmq_delayed_message_exchange 插件
- 声明延迟交换机:
@Bean
CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
- 发送延迟消息:
rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {
msg.getMessageProperties().setDelay(5000); // 5秒延迟
return msg;
});
5.优先级队列
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
return new Queue("priority.queue", true, false, false, args);
六、实战案例
1. 订单超时取消(延迟队列)
- 用户下单 → 发送延迟消息(30分钟)
- 若未支付,延迟消息触发 → 取消订单 & 回滚库存
2. 秒杀系统削峰填谷(工作队列)
- 请求先写入 MQ → 异步处理扣减库存
- 防止数据库被瞬时高并发打挂
3. 日志采集(Fanout)
- 应用产生日志 → Fanout广播给 Elasticsearch、Kafka、文件存储等队列
七、集群与高可用配置
- 镜像队列(高可用)
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
- 负载均衡
- 使用 Nginx/LB 做 MQ 节点的负载均衡
- SpringBoot 配置多个地址:
spring:
rabbitmq:
addresses: host1:5672,host2:5672,host3:5672
八、监控与排错
- RabbitMQ管理界面
http://localhost:15672 → 查看队列、交换机、消费者状态 - Spring Boot Actuator
management:
endpoints:
web:
exposure:
include: health,metrics,rabbit
- Prometheus + Grafana
- 使用 rabbitmq_exporter
- 实时监控消息堆积、消费速率、连接数
九、常见问题
- 消息重复消费:幂等性(使用 Redis 存储已处理消息ID)
- 消息堆积:扩展消费者数量 / 惰性队列 (x-queue-mode=lazy)
- 消息顺序:单队列单消费者 / 业务层控制
- 连接断开:设置 requested-heartbeat 和 connection-timeout
十、总结
- 选模式:根据业务场景选择 Work/Fanout/Direct/Topic/延迟/优先级等模式
- 保可靠:持久化、ACK机制、死信/延迟队列
- 抗高并发:异步削峰、集群部署、连接池优化
- 可监控:管理界面 + Actuator + Prometheus
通过以上配置和实践,你可以在 Spring Boot 中灵活、稳定地集成 RabbitMQ,满足绝大多数业务需求。