3RocketMQ 分布式事务实践 · SpringCloud微服务实战 · 看云
导航
本节代码地址
GitHub:https://github.com/xuyisu/fw-spring-cloud/tree/master/fw-cloud-transaction/fw-cloud-transaction-rocketmq/fw-cloud-transaction-rocketmq-order
GitHub:https://github.com/xuyisu/fw-spring-cloud/tree/master/fw-cloud-transaction/fw-cloud-transaction-rocketmq/fw-cloud-transaction-rocketmq-send
RocketMQ 的事务原理之前在RocketMQ 介绍篇已经介绍了,现在把图再拿过来,跟着图片的步骤,我们来实际操作一遍
1. 基于RocketMQ 实例演示
本模块中我们将新建两个模块,我们将模拟电商购物环节中的下单->发货的过程。两个模块的名称分别为fw-cloud-transaction-rocketmq-order和fw-cloud-transaction-rocketmq-send,order 作为Producer,send 作为Consumer
1.1 maven 依赖
两个模块所使用的依赖也是相同的,这里可以看到,我们依赖了上面封装的fw-cloud-transaction-base-dao,不需要再自己写基本的mapper等,因为是基于RocketMQ调用的,因此引入rocketmq-spring-boot-starter包
<dependencies>
<!--基础包-->
<dependency>
<groupId>com.yisu.cloud</groupId>
<artifactId>fw-cloud-transaction-base-dao</artifactId>
<version>${version}</version>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- 数据连接池-->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${HikariCP.version}</version>
</dependency>
</dependencies>
并在MySql中新建fw_transaction库,并执行以下脚本,供fw-cloud-transaction-rocktmq-order和fw-cloud-transaction-rocktmq-send使用
CREATE TABLE `fw_trade_log` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订单状态 1.待支付 2.待发货 3.待收货 4.订单完成 5.订单关闭',
`status_dsc` varchar(100) CHARACTER SET utf8 DEFAULT '' COMMENT '状态描述',
`product_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品id',
`product_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称',
`user_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '用户id',
`order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '订单id',
`order_amount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '订单总额',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
CREATE TABLE `fw_transaction_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
`transaction_Id` varchar(50) NOT NULL COMMENT '事务id',
`remark` varchar(50) NOT NULL COMMENT '备注',
PRIMARY KEY (`id`,`transaction_Id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COMMENT='事务日志表';
1.2 fw-cloud-transaction-rocketmq-order模块
1.2.1 订单接口创建
再订单模块中创建一个订单接口,在里面分别设置下单和支付的接口,因为是半方法发送,支付成功后的业务查分出来,二次确认支付的时候方便调用,因此拆开
public interface OrderService{
void saveAndPayOrder(String productName);
void payOrder(FwTradeLog fwTradeLog,String transactionId);
}
1.2.2 订单接口实现
第1步和第2步:向RocketMQ发送半消息,我们通过RocketMQTemplate的sendMessageInTransaction方法发送了一条半消息,但是并没有实际操作数据库,数据库操作过程抽出来放到了payOrder方法中,然后,半消息发送成功之后,会告知Producer 成功收到了半消息,待会我们在半消息的监听RocketMQLocalTransactionListener讲到,由半消息的监听来确认支付方法是否成功
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private FwTradeLogService fwTradeLogService;
@Autowired
private FwTransactionLogService fwTransactionLogService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void saveAndPayOrder(String productName) {
FwTradeLog fwTradeLog =new FwTradeLog(StatusEnum.TWO);
fwTradeLog.setProductId(System.currentTimeMillis());
fwTradeLog.setProductName(productName);
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"fw-pay-order-group",
"pay-success",
MessageBuilder.withPayload(fwTradeLog)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build(),
fwTradeLog
);
}
@Override
@Transactional
public void payOrder(FwTradeLog fwTradeLog,String transactionId) {
fwTradeLogService.save(fwTradeLog);
log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.TWO.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
FwTransactionLog transactionLog = new FwTransactionLog();
transactionLog.setTransactionId(transactionId);
String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
transactionLog.setRemark(remark);
fwTransactionLogService.save(transactionLog);
log.info("事务ID=>{} 本地事务执行成功", transactionId);
}
}
1.2.3 RocketMQ 本地事务配置
这里我们实现RocketMQLocalTransactionListener并在类上使用@RocketMQTransactionListener注解标注,注解属性txProducerGroup值和前面发送的事务消息分组组名必须一致。
实现了RocketMQLocalTransactionListener接口的executeLocalTransaction和checkLocalTransaction方法。executeLocalTransaction用于指定执行本地事务逻辑,checkLocalTransaction方法用于RocketMQ回查本地事务状态
当监听器对应的半消息发送成功后,就会执行executeLocalTransaction方法;如果本地事务已经提交了,但是RocketMQ并未收到COMMIT确认时。可能网络突然中断或者程序故障,这种情况下,当网络和应用恢复正常后,RocketMQ会执行本地事务回查操作,因为RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认。
为了方便,我们创建一张事务日志表,在本地事务执行成功后,往本地事务日志表中也插入一条事务日志。事务回查时,只要通过transicationId能够找到对应的日志,则说明本地事务执行成功,否则认为执行失败。
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "fw-pay-order-group")
public class LocalTransactionRocketMQListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private FwTransactionLogService fwTransactionLogService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
MessageHeaders headers = message.getHeaders();
String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
try {
FwTradeLog tradeLog = (FwTradeLog) o;
orderService.payOrder(tradeLog,transicationId);
log.info("本地事务=>{} 执行成功,往RocketMQ发送COMMIT",transicationId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e){
log.info("本地事务=>{} 回滚,往RocketMQ发送ROLLBACK",transicationId ,e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
MessageHeaders headers = message.getHeaders();
String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("RocketMQ事务状态回查=>{}",transicationId);
FwTransactionLog transactionLog = fwTransactionLogService.getOne(
new LambdaQueryWrapper<FwTransactionLog>().eq(FwTransactionLog::getTransactionId, transicationId)
);
return transactionLog != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
}
}
1.2.4 Controller 控制层
配置订单服务对外的接口,方便演示
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("saveOrder")
public FwResult saveOrder(String productName){
orderService.saveAndPayOrder(productName);
return FwResult.ok();
}
}
1.2.5 启动类配置
这里就时是普通的应用启动类
@SpringBootApplication
public class FwTransactionRocketmqOrderApplication {
public static void main(String[] args) {
SpringApplication.run(FwTransactionRocketmqOrderApplication.class, args);
}
}
1.2.6 应用配置
配置中需要配置数据库,以及RocketMq的连接信息
server:
port: 9002
spring:
#数据库配置 start
datasource:
type: com.zaxxer.hikari.HikariDataSource
url: jdbc:mysql://${dbIp}:3306/fw_transaction?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
hikari:
connection-timeout: 20000
auto-commit: true
max-lifetime: 1200000
minimum-idle: 5
maximum-pool-size: 12
idle-timeout: 300000
rocketmq:
name-server: localhost:9876
producer:
group: order-group
1.3 fw-cloud-transaction-rocketmq-send模块
1.3.1 发货接口创建
配置发货接口,并且创建发货的方法
public interface SendService {
void sendOrder(FwTradeLog fwTradeLog);
}
1.3.2 发货接口实现
这里在实现方法上加上@Transactional已实现本地事务的控制,并且设置订单的状态为待收货
@Service
@Slf4j
public class SendServiceImpl implements SendService {
@Autowired
private FwTradeLogService fwTradeLogService;
@Override
public void sendOrder(FwTradeLog fwTradeLog) {
fwTradeLog.setStatus(StatusEnum.THREE.getValue());
fwTradeLog.setStatusDsc(StatusEnum.THREE.getDesc());
fwTradeLogService.save(fwTradeLog);
log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.THREE.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
}
}
1.3.3 Send 消费者监听
这里需要注意的是在类上添加@RocketMQMessageListener注解,注意consumerGroup 和topic 不要搞错了
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "fw-pay-order-group", topic = "pay-success")
public class PayRocketMQListener implements RocketMQListener<FwTradeLog> {
@Autowired
private PayService payService;
@Override
public void onMessage(FwTradeLog fwTradeLog) {
log.info("监听到用户已经下单成功订单id=>{},名称=>{}的商品", fwTradeLog.getOrderId(), fwTradeLog.getProductName());
payService.sendOrder(fwTradeLog);
}
}
1.3.4 发货控制层实现
提供给订单模块调用的接口,注意方法名和请求方式要保持一致
@RestController
public class SendController {
@Autowired
private SendService sendService;
@PostMapping("send")
public void sendOrder(@RequestBody FwTradeLog tradeLog) {
sendService.sendOrder(tradeLog);
}
}
1.3.5 启动类
这里没什么特别的注解
@SpringBootApplication
public class FwTransactionRocketmqPayApplication {
public static void main(String[] args) {
SpringApplication.run(FwTransactionRocketmqPayApplication.class, args);
}
}
1.3.6 应用配置
配置中需要配置数据库,以及设置了RocketMQ的连接信息
server:
port: 9003
spring:
#数据库配置 start
datasource:
type: com.zaxxer.hikari.HikariDataSource
url: jdbc:mysql://${dbIp}:3306/fw_transaction?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
hikari:
connection-timeout: 20000
auto-commit: true
max-lifetime: 1200000
minimum-idle: 5
maximum-pool-size: 12
idle-timeout: 300000
rocketmq:
name-server: localhost:9876
1.4 启动服务
分别启动RocketMQ、fw-transaction-rocketmq-send、fw-transaction-rocketmq-order
Postman输入localhost:9002/saveOrder
可以看到订单服务的日志如下:
2020-04-11 19:19:35.944 INFO 12564 --- [nio-9002-exec-3] c.y.t.r.o.service.impl.OrderServiceImpl : [订单状态771390]=>待发货,当前商品id=>1586603975086,商品名称=>Mac pro 2019款
2020-04-11 19:19:35.977 INFO 12564 --- [nio-9002-exec-3] c.y.t.r.o.service.impl.OrderServiceImpl : 事务ID=>f1f08d8c-da88-4641-975f-fe6848dd4fb1 本地事务执行成功
2020-04-11 19:19:35.977 INFO 12564 --- [nio-9002-exec-3] t.r.o.l.LocalTransactionRocketMQListener : 本地事务=>f1f08d8c-da88-4641-975f-fe6848dd4fb1 执行成功,往RocketMQ发送COMMIT
发货服务的日志如下:
2020-04-11 19:19:36.882 INFO 22984 --- [MessageThread_1] c.y.t.r.s.listener.PayRocketMQListener : 监听到用户已经下单成功订单id=>771390,名称=>Mac pro 2019款的商品
2020-04-11 19:19:36.937 INFO 22984 --- [MessageThread_1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2020-04-11 19:19:37.400 INFO 22984 --- [MessageThread_1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2020-04-11 19:19:37.522 INFO 22984 --- [MessageThread_1] c.y.t.r.s.service.impl.PayServiceImpl : [订单状态771390]=>待收货,当前商品id=>1586603975086,商品名称=>Mac pro 2019款
数据库正常进入两条数据
MQ控制台 信息
现在让订单服务的方法抛出异常
直接在方法中加一个抛异常的语句int i=1/0;,如下所示
@Override
@Transactional
public void payOrder(FwTradeLog fwTradeLog,String transactionId) {
fwTradeLogService.save(fwTradeLog);
log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.TWO.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
FwTransactionLog transactionLog = new FwTransactionLog();
transactionLog.setTransactionId(transactionId);
String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
transactionLog.setRemark(remark);
fwTransactionLogService.save(transactionLog);
log.info("事务ID=>{} 本地事务执行成功", transactionId);
int i=10/0;
}
重启订单服务,删除之前fw_trade_log表的数据
Postman输入localhost:9002/saveOrder
订单服务日志如下,可以看到RocketMQ 本地事务回滚了
2020-04-11 19:40:05.111 INFO 28016 --- [nio-9002-exec-1] c.y.t.r.o.service.impl.OrderServiceImpl : [订单状态246287]=>待发货,当前商品id=>1586605204008,商品名称=>Mac pro 2020款
2020-04-11 19:40:05.144 INFO 28016 --- [nio-9002-exec-1] c.y.t.r.o.service.impl.OrderServiceImpl : 事务ID=>7d7a3274-6333-49ac-bd11-7aed62c5a544 本地事务执行成功
2020-04-11 19:40:05.177 INFO 28016 --- [nio-9002-exec-1] t.r.o.l.LocalTransactionRocketMQListener : 本地事务=>7d7a3274-6333-49ac-bd11-7aed62c5a544 回滚,往RocketMQ发送ROLLBACK
java.lang.ArithmeticException: / by zero
发送服务并没有被调用到,因此没有日志
因此数据库里并没有数据







