RocketMQ是阿里巴巴开源的一款分布式消息中间件,支持高可用、高并发、高吞吐量的特性,广泛应用于各个领域的消息通信。在分布式系统中,由于数据的分散性和复杂性,通常需要实现分布式事务来确保系统的一致性和可靠性。本文将介绍如何使用RocketMQ实现分布式事务,并结合代码demo进行系统且完善的讲解。
一、RocketMQ分布式事务原理
RocketMQ支持使用半消息(Half Message)机制实现分布式事务,其主要流程如下:
- 事务消息的生产者向RocketMQ发送半消息,半消息中包含了待处理的业务数据,但该消息并不会立即被消费者接收。
- 事务消息的生产者执行本地事务,如果本地事务执行成功,则向RocketMQ发送确认消息(Commit Message);如果本地事务执行失败,则向RocketMQ发送回滚消息(Rollback Message)。
- 如果RocketMQ接收到了确认消息,则该半消息被视为已经提交,并对外提供消费;如果RocketMQ接收到了回滚消息,则该半消息被视为已经回滚,不再对外提供消费。
通过半消息机制,RocketMQ实现了分布式事务的基本逻辑,但在实际使用中还需要考虑以下问题:
- 事务消息的生产者执行本地事务成功后,可能由于网络或机器故障等原因无法向RocketMQ发送确认消息,导致事务消息的状态不一致。因此,RocketMQ支持定时任务和回查机制来解决该问题。
- RocketMQ的消费者可能会在消费时发生异常,导致事务消息的状态不一致。因此,RocketMQ需要支持幂等性消费,确保消费者在重复消费时不会影响事务消息的状态。
二、使用RocketMQ实现分布式事务的示例
系统设计
我们将设计一个简单的订单系统,包含以下三个服务:
- 订单服务:用于创建订单,订单包含商品信息和用户信息,创建订单时会发送一个消息到MQ,通知其他服务扣减商品库存和用户余额。
- 商品服务:用于扣减商品库存,接收到订单服务发送的消息后,扣减商品库存。
- 用户服务:用于扣减用户余额,接收到订单服务发送的消息后,扣减用户余额。
订单服务和商品服务需要在同一个事务中进行,只有当两个服务都执行成功后,才会提交事务,否则回滚事务。用户服务和订单服务没有直接关联,不需要在同一个事务中进行。
系统架构
我们将使用Spring Boot搭建整个系统,其中订单服务和商品服务将在同一个Spring Boot应用中实现,用户服务将在另一个Spring Boot应用中实现。订单服务和商品服务需要在同一个事务中进行,所以我们将使用RocketMQ实现分布式事务。
系统实现
首先,我们需要创建一个RocketMQ生产者和消费者,分别用于发送消息和接收消息。在订单服务和商品服务中,我们需要创建一个RocketMQ事务监听器,用于处理本地事务和发送半消息。
1. RocketMQ生产者
在订单服务中创建一个RocketMQ生产者,用于发送创建订单的消息。
javaCopy code
@Component
public class OrderProducer {
private static final String TOPIC = "create_order";
@Autowired
private DefaultMQProducer producer;
public void sendCreateOrderMessage(Order order) throws Exception {
Message message = new Message(TOPIC, "create_order", JSON.toJSONString(order).getBytes());
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.printf("发送消息: %s%n", result);
}
}
2. RocketMQ消费者
在商品服务中创建一个RocketMQ消费者,用于接收创建订单的消息。
javaCopy code
@Component
public class OrderConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
try {
String body = new String(message.getBody());
Order order = JSON.parseObject(body, Order.class);
System.out.printf("接收到消息: %s%n", order);
// TODO 处理本地事务
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. RocketMQ事务监听器
在订单服务中创建一个RocketMQ事务监听器,用于处理本地事务和发送半消息。
首先,我们需要在订单服务的pom.xml文件中添加RocketMQ的依赖:
xmlCopy code
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
接下来,我们需要创建一个RocketMQ事务监听器。在订单服务中创建一个名为OrderTransactionListener的类,实现RocketMQ的TransactionListener接口:
javaCopy code
public class OrderTransactionListener implements TransactionListener {
private final OrderService orderService;
public OrderTransactionListener(OrderService orderService) {
this.orderService = orderService;
}
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
orderService.createOrder(msg.getBody());
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
Order order = JSON.parseObject(msg.getBody(), Order.class);
if (order.getStatus() == OrderStatus.PAY_SUCCESS) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
}
在这个监听器中,我们实现了executeLocalTransaction()和checkLocalTransaction()方法。executeLocalTransaction()方法用于执行本地事务,即创建订单;checkLocalTransaction()方法用于检查本地事务状态,如果订单状态是支付成功,就返回COMMIT_MESSAGE,否则返回UNKNOW,表示需要再次确认本地事务状态。
接下来,我们需要在订单服务中实现创建订单的方法。在OrderService中添加createOrder()方法:
javaCopy code
@Service
public class OrderService {
private final RocketMQTemplate rocketMQTemplate;
public OrderService(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
@Transactional
public void createOrder(String message) {
Order order = JSON.parseObject(message, Order.class);
// 创建订单
// 更新库存
// 扣款
order.setStatus(OrderStatus.PAY_SUCCESS);
rocketMQTemplate.sendMessageInTransaction("create_order_group", "create_order_topic", MessageBuilder.withPayload(JSON.toJSONString(order)).build(), order);
}
}
在这个方法中,我们首先从消息中解析出Order对象,然后创建订单、更新库存和扣款。最后,我们使用RocketMQ的sendMessageInTransaction()方法发送半消息,并将Order对象作为参数传递给该方法。
最后,我们需要在应用程序中配置RocketMQ事务监听器。在application.yml文件中添加以下配置:
yamlCopy code
rocketmq:
name-server: localhost:9876
producer:
group: create_order_group
consumer:
group: create_order_group
transaction-listener:
enable: true
bean-name: orderTransactionListener
在这个配置中,我们指定了RocketMQ的Name Server地址、生产者组、消费者组和事务监听器,分别解释一下:
- rocketmq.name-server:指定了RocketMQ的Name Server地址,用于连接到RocketMQ集群。
- rocketmq.producer.group:指定了生产者组,同一个生产者组中的多个生产者可以共同向同一个主题发送消息,这样可以增强消息发送的可靠性和吞吐量。
- rocketmq.consumer.group:指定了消费者组,同一个消费者组中的多个消费者可以共同消费同一个主题的消息,这样可以增强消息的并发处理能力和可扩展性。
- rocketmq.producer.transactionListenerName:指定了事务监听器的名称,在上一步中创建的TransactionListenerImpl实现了TransactionListener接口,这里我们将其命名为transactionListener,用于在事务发生时执行本地事务和发送半消息。
配置完毕之后,我们就可以在代码中使用TransactionMQProducer进行消息发送,同时将事务监听器和本地事务处理器传递给TransactionMQProducer,具体代码实现如下:
javaCopy code
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.producer.transactionListenerName}")
private String transactionListenerName;
@Autowired
private OrderTransactionListener orderTransactionListener;
@Autowired
private OrderService orderService;
@Bean
public TransactionMQProducer transactionMQProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr(nameServer);
producer.setTransactionListener(orderTransactionListener);
producer.start();
return producer;
}
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new OrderMessageListener(orderService));
consumer.start();
return consumer;
}
}
在上面的代码中,我们首先将TransactionMQProducer和DefaultMQPushConsumer作为@Bean注入到Spring容器中。TransactionMQProducer用于发送半消息,DefaultMQPushConsumer用于消费消息。
在TransactionMQProducer的配置中,我们设置了生产者组、Name Server地址和事务监听器。其中,事务监听器使用了在之前创建的OrderTransactionListener实现类。在这个实现类中,我们通过OrderService处理本地事务。而在DefaultMQPushConsumer的配置中,我们设置了消费者组、Name Server地址和订阅的主题,同时也指定了消息监听器OrderMessageListener,用于消费消息。
接下来,我们实现OrderTransactionListener和OrderMessageListener的代码。
首先实现OrderTransactionListener,代码如下:
javaCopy code
@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
String body = new String((byte[]) msg.getPayload(), Charset.forName("UTF-8"));
JSONObject jsonObject = JSON.parseObject(body);
Long orderId = jsonObject.getLong("orderId");
Integer amount = jsonObject.getInteger("amount");
orderService.createOrder(orderId, amount);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String body = new String((byte[]) msg.getPayload(), Charset.forName("UTF-8"));
JSONObject jsonObject = JSON.parseObject(body);
Long orderId = jsonObject.getLong("orderId");
Order order = orderService.getOrder(orderId);
if (order == null) {
return RocketMQLocalTransactionState.ROLLBACK;
}
return RocketMQLocalTransactionState.COMMIT;
}
}
这里我们实现了
RocketMQLocalTransactionListener接口,并注入了OrderService,用于创建订单和查询订单。
executeLocalTransaction方法用于执行本地事务,根据消息中的订单ID和金额创建订单,并返回COMMIT状态,如果创建订单失败则返回ROLLBACK状态。
checkLocalTransaction方法用于回查本地事务,根据消息中的订单ID查询订单,如果订单不存在则返回ROLLBACK状态,否则返回COMMIT状态。
接下来实现OrderMessageListener,代码如下:
javaCopy code
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group")
public class OrderMessageListener implements RocketMQListener<Message> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(Message message) {
try {
String body = new String((byte[]) message.getPayload(), Charset.forName("UTF-8"));
JSONObject jsonObject = JSON.parseObject(body);
Long orderId = jsonObject.getLong("orderId");
Integer amount = jsonObject.getInteger("amount");
orderService.confirmOrder(orderId, amount);
} catch (Exception e) {
e.printStackTrace();
// 如果出现异常,则向RocketMQ发送消息,触发回查事务
throw new RocketMQLocalTransactionException("Transaction check failed");
}
}
}
这里我们使用@RocketMQMessageListener注解声明消费者,指定消费的主题和消费者组,并实现RocketMQListener接口,用于消费RocketMQ的消息。
onMessage方法用于消费消息,并调用OrderService的confirmOrder方法确认订单,如果出现异常,则向RocketMQ发送消息,触发回查事务。
这样我们就实现了RocketMQ分布式事务的整个流程。
总结
使用RocketMQ实现分布式事务可以保证系统的数据一致性,适用于需要异步处理的场景。在RocketMQ中,分布式事务需要通过发送半消息、本地事务执行和二次确认来实现。实现步骤如下:
- 首先需要在订单服务中创建一个RocketMQ事务监听器OrderTransactionListener,用于处理本地事务和发送半消息。
- 在RocketMQ的配置文件中指定Name Server地址、生产者组、消费者组和事务监听器。
- 实现OrderTransactionListener和OrderMessageListener的代码,分别处理本地事务和消息消费。
- 在发送消息时,通过TransactionMQProducer发送半消息,并将本地事务与消息关联起来。
- 在本地事务执行完毕后,根据事务执行结果来确定是否提交或回滚本地事务,并发送二次确认消息。
- 在RocketMQ的消费者端,通过设置MessageListenerConcurrently来处理消息的消费,并设置消费模式为集群消费。在消费消息时,如果消费失败,则进行消息重试,最终若还是无法消费成功,则将消息发往一个特殊的Topic中,供后续手动处理。
RocketMQ分布式事务的实现需要进行一定的代码编写和配置,但相比于其他分布式事务的实现,RocketMQ具有较高的性能和灵活性,适用于高并发、异步处理的场景。