SpringBoot与RabbitMQ深度实践:四大核心场景高效解耦方案
在现代分布式系统中,**消息队列(MQ)**是系统解耦、削峰填谷和异步处理的关键组件。
本文将基于 Spring Boot + RabbitMQ,实现四大核心场景,并结合生产级优化实践,帮助你快速掌握从入门到实战的完整方案。
一、方案概述
本实践涵盖 RabbitMQ 的四大核心使用模式:
- 简单队列模式(点对点通信)
- 工作队列模式(竞争消费者)
- 发布/订阅模式(广播消息)
- 路由模式(选择性消费)
并结合以下生产环境优化点:
- 消息持久化
- 手动确认(ACK)机制
- 预取(Prefetch)限制
- 死信队列(DLX)配置
- 发布确认与返回机制
- 幂等性与重试机制
二、项目配置
1. Maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
</dependencies>
2. application.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated # 启用发布确认
publisher-returns: true # 启用回退
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次只取一条消息,公平分发
3. 通用配置类(推荐)
@Configuration
public class RabbitConfig {
// 使用 JSON 转换器,方便消息序列化
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter converter) {
RabbitTemplate rt = new RabbitTemplate(connectionFactory);
rt.setMessageConverter(converter);
rt.setMandatory(true);
return rt;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory, MessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(1);
return factory;
}
}
三、核心场景实现
1. 简单队列模式(点对点)
配置
@Bean
public Queue simpleQueue() {
return new Queue("simple.queue", true);
}
生产者
@Service
public class SimpleQueueProducer {
private final RabbitTemplate rabbitTemplate;
public SimpleQueueProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
public void sendMessage(String msg) {
rabbitTemplate.convertAndSend("simple.queue", msg);
System.out.println("发送消息: " + msg);
}
}
消费者
@Component
@RabbitListener(queues = "simple.queue")
public class SimpleQueueConsumer {
@RabbitHandler
public void receiveMessage(Message message, Channel channel) throws IOException {
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收消息: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
2. 工作队列模式(竞争消费者)
@Bean
public Queue workQueue() {
return new Queue("work.queue", true);
}
生产者
@Service
public class WorkQueueProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String msg) {
rabbitTemplate.convertAndSend("work.queue", msg);
}
}
消费者(两个竞争)
@Component
@RabbitListener(queues = "work.queue", concurrency = "2-5")
public class WorkQueueConsumer {
@RabbitHandler
public void process(String msg) throws InterruptedException {
System.out.println("处理任务: " + msg);
Thread.sleep(200);
}
}
3. 发布/订阅模式(Fanout Exchange)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
@Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1", true); }
@Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2", true); }
@Bean public Binding binding1(FanoutExchange e, Queue q1) { return BindingBuilder.bind(q1).to(e); }
@Bean public Binding binding2(FanoutExchange e, Queue q2) { return BindingBuilder.bind(q2).to(e); }
生产者
@Service
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String msg) {
rabbitTemplate.convertAndSend("fanout.exchange", "", msg);
}
}
4. 路由模式(Direct Exchange)
@Bean
public DirectExchange directExchange() { return new DirectExchange("direct.exchange"); }
@Bean public Queue qError() { return new Queue("direct.queue1", true); }
@Bean public Queue qInfoWarn() { return new Queue("direct.queue2", true); }
@Bean public Binding bError(DirectExchange e, Queue qError) {
return BindingBuilder.bind(qError).to(e).with("error");
}
@Bean public Binding bInfo(DirectExchange e, Queue qInfoWarn) {
return BindingBuilder.bind(qInfoWarn).to(e).with("info");
}
@Bean public Binding bWarn(DirectExchange e, Queue qInfoWarn) {
return BindingBuilder.bind(qInfoWarn).to(e).with("warning");
}
生产者
@Service
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String type, String msg) {
rabbitTemplate.convertAndSend("direct.exchange", type, msg);
}
}
四、控制器与测试
@RestController
@RequestMapping("/mq")
public class MQController {
@Autowired private SimpleQueueProducer simpleQueueProducer;
@Autowired private WorkQueueProducer workQueueProducer;
@Autowired private FanoutProducer fanoutProducer;
@Autowired private DirectProducer directProducer;
@PostMapping("/simple")
public String simple(@RequestParam String message) {
simpleQueueProducer.sendMessage(message);
return "Simple 队列消息已发送";
}
@PostMapping("/work")
public String work(@RequestParam String message, @RequestParam int count) {
for (int i = 0; i < count; i++) {
workQueueProducer.sendMessage(message + "-" + i);
}
return "Work 队列消息已发送";
}
@PostMapping("/fanout")
public String fanout(@RequestParam String message) {
fanoutProducer.sendMessage(message);
return "Fanout 广播消息已发送";
}
@PostMapping("/direct")
public String direct(@RequestParam String type, @RequestParam String message) {
directProducer.send(type, message);
return "Direct 路由消息已发送";
}
}
测试示例
# 简单队列
curl -X POST "http://localhost:8080/mq/simple?message=HelloSimple"
# 工作队列(批量10条)
curl -X POST "http://localhost:8080/mq/work?message=Task&count=10"
# 广播消息
curl -X POST "http://localhost:8080/mq/fanout?message=Broadcast"
# 路由消息
curl -X POST "http://localhost:8080/mq/direct?type=error&message=ErrorOccurred"
五、高级特性与优化
- 消息持久化
队列、交换机、消息均持久化,防止 Broker 重启丢失。 - 手动 ACK + 死信队列
消费失败时 basicNack,并路由到 DLX(Dead Letter Exchange)。 - 预取(Prefetch)限制
保证公平分发,避免慢消费者阻塞队列。 - 发布确认 + Returns 回调
确认消息是否成功到达 Broker / 队列,防止消息丢失。 - 幂等性与重试
业务逻辑需设计幂等,避免重复消费带来副作用。 - 监控与报警
开启 RabbitMQ 管理插件,监控队列积压、消费者状态,结合 Prometheus/Grafana 报警。
六、架构图建议
- 简单队列:Producer → Queue → Consumer
- 工作队列:Queue → 多个消费者(竞争)
- Fanout:消息广播到多个队列
- Direct:根据 routingKey 精准路由
七、总结
本文完整实现了 SpringBoot + RabbitMQ 的 四大核心场景:
- 简单队列:点对点通信
- 工作队列:多个消费者竞争任务
- 发布/订阅:广播消息
- 路由模式:精准消息投递
并结合 持久化、ACK机制、预取、公平分发、死信队列、幂等性、监控 等生产级优化,构建出一个可扩展、高可靠的消息系统。
这种架构能有效解耦系统,提高可扩展性与健壮性,适用于电商订单、日志处理、消息通知等典型业务场景。