醋醋百科网

Good Luck To You!

SpringBoot与RabbitMQ深度实践_springboot+vue+redis+rabbitmq项目实战

SpringBoot与RabbitMQ深度实践:四大核心场景高效解耦方案

在现代分布式系统中,**消息队列(MQ)**是系统解耦、削峰填谷和异步处理的关键组件。
本文将基于 Spring Boot + RabbitMQ,实现四大核心场景,并结合生产级优化实践,帮助你快速掌握从入门到实战的完整方案。


一、方案概述

本实践涵盖 RabbitMQ 的四大核心使用模式:

  1. 简单队列模式(点对点通信)
  2. 工作队列模式(竞争消费者)
  3. 发布/订阅模式(广播消息)
  4. 路由模式(选择性消费)

并结合以下生产环境优化点:

  • 消息持久化
  • 手动确认(ACK)机制
  • 预取(Prefetch)限制
  • 死信队列(DLX)配置
  • 发布确认与返回机制
  • 幂等性与重试机制

二、项目配置

1. Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
    </dependency>
</dependencies>

2. application.yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated   # 启用发布确认
    publisher-returns: true              # 启用回退
  listener:
    simple:
      acknowledge-mode: manual           # 手动确认
      prefetch: 1                        # 每次只取一条消息,公平分发

3. 通用配置类(推荐)

@Configuration
public class RabbitConfig {

    // 使用 JSON 转换器,方便消息序列化
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter converter) {
        RabbitTemplate rt = new RabbitTemplate(connectionFactory);
        rt.setMessageConverter(converter);
        rt.setMandatory(true);
        return rt;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory, MessageConverter converter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(converter);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(1);
        return factory;
    }
}

三、核心场景实现

1. 简单队列模式(点对点)

配置

@Bean
public Queue simpleQueue() {
    return new Queue("simple.queue", true);
}

生产者

@Service
public class SimpleQueueProducer {
    private final RabbitTemplate rabbitTemplate;
    public SimpleQueueProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }

    public void sendMessage(String msg) {
        rabbitTemplate.convertAndSend("simple.queue", msg);
        System.out.println("发送消息: " + msg);
    }
}

消费者

@Component
@RabbitListener(queues = "simple.queue")
public class SimpleQueueConsumer {
    @RabbitHandler
    public void receiveMessage(Message message, Channel channel) throws IOException {
        try {
            String body = new String(message.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收消息: " + body);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

2. 工作队列模式(竞争消费者)

@Bean
public Queue workQueue() {
    return new Queue("work.queue", true);
}

生产者

@Service
public class WorkQueueProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String msg) {
        rabbitTemplate.convertAndSend("work.queue", msg);
    }
}

消费者(两个竞争)

@Component
@RabbitListener(queues = "work.queue", concurrency = "2-5")
public class WorkQueueConsumer {
    @RabbitHandler
    public void process(String msg) throws InterruptedException {
        System.out.println("处理任务: " + msg);
        Thread.sleep(200);
    }
}

3. 发布/订阅模式(Fanout Exchange)

@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange");
}
@Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1", true); }
@Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2", true); }
@Bean public Binding binding1(FanoutExchange e, Queue q1) { return BindingBuilder.bind(q1).to(e); }
@Bean public Binding binding2(FanoutExchange e, Queue q2) { return BindingBuilder.bind(q2).to(e); }

生产者

@Service
public class FanoutProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMessage(String msg) {
        rabbitTemplate.convertAndSend("fanout.exchange", "", msg);
    }
}

4. 路由模式(Direct Exchange)

@Bean
public DirectExchange directExchange() { return new DirectExchange("direct.exchange"); }

@Bean public Queue qError() { return new Queue("direct.queue1", true); }
@Bean public Queue qInfoWarn() { return new Queue("direct.queue2", true); }

@Bean public Binding bError(DirectExchange e, Queue qError) {
    return BindingBuilder.bind(qError).to(e).with("error");
}
@Bean public Binding bInfo(DirectExchange e, Queue qInfoWarn) {
    return BindingBuilder.bind(qInfoWarn).to(e).with("info");
}
@Bean public Binding bWarn(DirectExchange e, Queue qInfoWarn) {
    return BindingBuilder.bind(qInfoWarn).to(e).with("warning");
}

生产者

@Service
public class DirectProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String type, String msg) {
        rabbitTemplate.convertAndSend("direct.exchange", type, msg);
    }
}

四、控制器与测试

@RestController
@RequestMapping("/mq")
public class MQController {

    @Autowired private SimpleQueueProducer simpleQueueProducer;
    @Autowired private WorkQueueProducer workQueueProducer;
    @Autowired private FanoutProducer fanoutProducer;
    @Autowired private DirectProducer directProducer;

    @PostMapping("/simple")
    public String simple(@RequestParam String message) {
        simpleQueueProducer.sendMessage(message);
        return "Simple 队列消息已发送";
    }

    @PostMapping("/work")
    public String work(@RequestParam String message, @RequestParam int count) {
        for (int i = 0; i < count; i++) {
            workQueueProducer.sendMessage(message + "-" + i);
        }
        return "Work 队列消息已发送";
    }

    @PostMapping("/fanout")
    public String fanout(@RequestParam String message) {
        fanoutProducer.sendMessage(message);
        return "Fanout 广播消息已发送";
    }

    @PostMapping("/direct")
    public String direct(@RequestParam String type, @RequestParam String message) {
        directProducer.send(type, message);
        return "Direct 路由消息已发送";
    }
}

测试示例

# 简单队列
curl -X POST "http://localhost:8080/mq/simple?message=HelloSimple"

# 工作队列(批量10条)
curl -X POST "http://localhost:8080/mq/work?message=Task&count=10"

# 广播消息
curl -X POST "http://localhost:8080/mq/fanout?message=Broadcast"

# 路由消息
curl -X POST "http://localhost:8080/mq/direct?type=error&message=ErrorOccurred"

五、高级特性与优化

  1. 消息持久化
    队列、交换机、消息均持久化,防止 Broker 重启丢失。
  2. 手动 ACK + 死信队列
    消费失败时 basicNack,并路由到 DLX(Dead Letter Exchange)。
  3. 预取(Prefetch)限制
    保证公平分发,避免慢消费者阻塞队列。
  4. 发布确认 + Returns 回调
    确认消息是否成功到达 Broker / 队列,防止消息丢失。
  5. 幂等性与重试
    业务逻辑需设计幂等,避免重复消费带来副作用。
  6. 监控与报警
    开启 RabbitMQ 管理插件,监控队列积压、消费者状态,结合 Prometheus/Grafana 报警。

六、架构图建议



  • 简单队列:Producer → Queue → Consumer
  • 工作队列:Queue → 多个消费者(竞争)
  • Fanout:消息广播到多个队列
  • Direct:根据 routingKey 精准路由

七、总结

本文完整实现了 SpringBoot + RabbitMQ 的 四大核心场景

  • 简单队列:点对点通信
  • 工作队列:多个消费者竞争任务
  • 发布/订阅:广播消息
  • 路由模式:精准消息投递

并结合 持久化、ACK机制、预取、公平分发、死信队列、幂等性、监控 等生产级优化,构建出一个可扩展、高可靠的消息系统。

这种架构能有效解耦系统,提高可扩展性与健壮性,适用于电商订单、日志处理、消息通知等典型业务场景。


控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言