3RocketMQ 分布式事务实践 · SpringCloud微服务实战 · 看云

导航

本节代码地址


RocketMQ 的事务原理之前在RocketMQ 介绍篇已经介绍了,现在把图再拿过来,跟着图片的步骤,我们来实际操作一遍
eff5927c7ca11e0c31a93ca8375bf7ea_MD5.png

1. 基于RocketMQ 实例演示

本模块中我们将新建两个模块,我们将模拟电商购物环节中的下单->发货的过程。两个模块的名称分别为fw-cloud-transaction-rocketmq-orderfw-cloud-transaction-rocketmq-send,order 作为Producer,send 作为Consumer

1.1 maven 依赖

两个模块所使用的依赖也是相同的,这里可以看到,我们依赖了上面封装的fw-cloud-transaction-base-dao,不需要再自己写基本的mapper等,因为是基于RocketMQ调用的,因此引入rocketmq-spring-boot-starter

<dependencies>
    <!--基础包-->
    <dependency>
        <groupId>com.yisu.cloud</groupId>
        <artifactId>fw-cloud-transaction-base-dao</artifactId>
        <version>${version}</version>
    </dependency>
    <!--rocketmq-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>
    <!-- 数据连接池-->
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <version>${HikariCP.version}</version>
    </dependency>
</dependencies>

并在MySql中新建fw_transaction库,并执行以下脚本,供fw-cloud-transaction-rocktmq-orderfw-cloud-transaction-rocktmq-send使用

CREATE TABLE `fw_trade_log` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订单状态 1.待支付 2.待发货 3.待收货 4.订单完成 5.订单关闭',
  `status_dsc` varchar(100) CHARACTER SET utf8 DEFAULT '' COMMENT '状态描述',
  `product_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品id',
  `product_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称',
  `user_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '用户id',
  `order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '订单id',
  `order_amount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '订单总额',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
CREATE TABLE `fw_transaction_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `transaction_Id` varchar(50) NOT NULL COMMENT '事务id',
  `remark` varchar(50) NOT NULL COMMENT '备注',
  PRIMARY KEY (`id`,`transaction_Id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4 COMMENT='事务日志表';

1.2 fw-cloud-transaction-rocketmq-order模块

1.2.1 订单接口创建

再订单模块中创建一个订单接口,在里面分别设置下单和支付的接口,因为是半方法发送,支付成功后的业务查分出来,二次确认支付的时候方便调用,因此拆开


public interface OrderService{

    
    void saveAndPayOrder(String productName);

    
    void payOrder(FwTradeLog fwTradeLog,String transactionId);
}

1.2.2 订单接口实现

第1步和第2步:向RocketMQ发送半消息,我们通过RocketMQTemplate的sendMessageInTransaction方法发送了一条半消息,但是并没有实际操作数据库,数据库操作过程抽出来放到了payOrder方法中,然后,半消息发送成功之后,会告知Producer 成功收到了半消息,待会我们在半消息的监听RocketMQLocalTransactionListener讲到,由半消息的监听来确认支付方法是否成功


@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

    @Autowired
    private FwTradeLogService fwTradeLogService;
    @Autowired
    private FwTransactionLogService fwTransactionLogService;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void saveAndPayOrder(String productName) {

        FwTradeLog fwTradeLog =new FwTradeLog(StatusEnum.TWO);
        fwTradeLog.setProductId(System.currentTimeMillis());
        fwTradeLog.setProductName(productName);
        String transactionId = UUID.randomUUID().toString();
        
        this.rocketMQTemplate.sendMessageInTransaction(
                "fw-pay-order-group", 
                "pay-success", 
                MessageBuilder.withPayload(fwTradeLog)
                        .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                        .build(), 
                fwTradeLog 
        );

    }

    @Override
    @Transactional
    public void payOrder(FwTradeLog fwTradeLog,String transactionId) {
        fwTradeLogService.save(fwTradeLog);
        log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.TWO.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());

        
        FwTransactionLog transactionLog = new FwTransactionLog();
        transactionLog.setTransactionId(transactionId);
        String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
        transactionLog.setRemark(remark);
        fwTransactionLogService.save(transactionLog);
        log.info("事务ID=>{} 本地事务执行成功", transactionId);

    }
}

1.2.3 RocketMQ 本地事务配置

这里我们实现RocketMQLocalTransactionListener并在类上使用@RocketMQTransactionListener注解标注,注解属性txProducerGroup值和前面发送的事务消息分组组名必须一致。
实现了RocketMQLocalTransactionListener接口的executeLocalTransactioncheckLocalTransaction方法。executeLocalTransaction用于指定执行本地事务逻辑,checkLocalTransaction方法用于RocketMQ回查本地事务状态
当监听器对应的半消息发送成功后,就会执行executeLocalTransaction方法;如果本地事务已经提交了,但是RocketMQ并未收到COMMIT确认时。可能网络突然中断或者程序故障,这种情况下,当网络和应用恢复正常后,RocketMQ会执行本地事务回查操作,因为RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认。

为了方便,我们创建一张事务日志表,在本地事务执行成功后,往本地事务日志表中也插入一条事务日志。事务回查时,只要通过transicationId能够找到对应的日志,则说明本地事务执行成功,否则认为执行失败。


@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "fw-pay-order-group")
public class LocalTransactionRocketMQListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderService orderService;
    @Autowired
    private FwTransactionLogService fwTransactionLogService;

    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        MessageHeaders headers = message.getHeaders();
        String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        try {
            FwTradeLog tradeLog = (FwTradeLog) o;
            orderService.payOrder(tradeLog,transicationId); 
            log.info("本地事务=>{} 执行成功,往RocketMQ发送COMMIT",transicationId);
            return RocketMQLocalTransactionState.COMMIT; 
        } catch (Exception e){
            log.info("本地事务=>{} 回滚,往RocketMQ发送ROLLBACK",transicationId ,e);
            return RocketMQLocalTransactionState.ROLLBACK; 
        }
    }

    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        MessageHeaders headers = message.getHeaders();
        String transicationId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        log.info("RocketMQ事务状态回查=>{}",transicationId);
        
        FwTransactionLog transactionLog = fwTransactionLogService.getOne(
                new LambdaQueryWrapper<FwTransactionLog>().eq(FwTransactionLog::getTransactionId, transicationId)
        );
        
        return transactionLog != null ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
    }
}

1.2.4 Controller 控制层

配置订单服务对外的接口,方便演示


@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    
    @GetMapping("saveOrder")
    public FwResult saveOrder(String productName){
         orderService.saveAndPayOrder(productName);
         return FwResult.ok();
    }


}

1.2.5 启动类配置

这里就时是普通的应用启动类

@SpringBootApplication
public class FwTransactionRocketmqOrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(FwTransactionRocketmqOrderApplication.class, args);
    }
}

1.2.6 应用配置

配置中需要配置数据库,以及RocketMq的连接信息

server:
  port: 9002
spring:
  #数据库配置  start
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    url: jdbc:mysql://${dbIp}:3306/fw_transaction?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
    hikari:
      connection-timeout: 20000
      auto-commit: true
      max-lifetime: 1200000
      minimum-idle: 5
      maximum-pool-size: 12
      idle-timeout: 300000
rocketmq:
  name-server: localhost:9876
  producer:
    group: order-group

1.3 fw-cloud-transaction-rocketmq-send模块

1.3.1 发货接口创建

配置发货接口,并且创建发货的方法


public interface SendService {

    
    void sendOrder(FwTradeLog fwTradeLog);

}

1.3.2 发货接口实现

这里在实现方法上加上@Transactional已实现本地事务的控制,并且设置订单的状态为待收货


@Service
@Slf4j
public class SendServiceImpl implements SendService {

    @Autowired
    private FwTradeLogService fwTradeLogService;

    @Override
    public void sendOrder(FwTradeLog fwTradeLog) {
        fwTradeLog.setStatus(StatusEnum.THREE.getValue());
        fwTradeLog.setStatusDsc(StatusEnum.THREE.getDesc());
        fwTradeLogService.save(fwTradeLog);
        log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.THREE.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
    }
}

1.3.3 Send 消费者监听

这里需要注意的是在类上添加@RocketMQMessageListener注解,注意consumerGroup 和topic 不要搞错了


@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "fw-pay-order-group", topic = "pay-success")
public class PayRocketMQListener implements RocketMQListener<FwTradeLog> {

    @Autowired
    private PayService payService;


    @Override
    public void onMessage(FwTradeLog fwTradeLog) {
        log.info("监听到用户已经下单成功订单id=>{},名称=>{}的商品", fwTradeLog.getOrderId(), fwTradeLog.getProductName());
        payService.sendOrder(fwTradeLog);
    }

}

1.3.4 发货控制层实现

提供给订单模块调用的接口,注意方法名和请求方式要保持一致


@RestController
public class SendController {

    @Autowired
    private SendService sendService;

    
    @PostMapping("send")
    public void sendOrder(@RequestBody FwTradeLog tradeLog) {
         sendService.sendOrder(tradeLog);
    }
}

1.3.5 启动类

这里没什么特别的注解


@SpringBootApplication
public class FwTransactionRocketmqPayApplication {
    public static void main(String[] args) {
        SpringApplication.run(FwTransactionRocketmqPayApplication.class, args);
    }


}

1.3.6 应用配置

配置中需要配置数据库,以及设置了RocketMQ的连接信息

server:
  port: 9003
spring:
  #数据库配置  start
  datasource:
    type: com.zaxxer.hikari.HikariDataSource
    url: jdbc:mysql://${dbIp}:3306/fw_transaction?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
    hikari:
      connection-timeout: 20000
      auto-commit: true
      max-lifetime: 1200000
      minimum-idle: 5
      maximum-pool-size: 12
      idle-timeout: 300000
rocketmq:
  name-server: localhost:9876

1.4 启动服务

分别启动RocketMQ、fw-transaction-rocketmq-send、fw-transaction-rocketmq-order
ca6fa205dc1aed93e437eaf9ae24ac06_MD5.png

可以看到订单服务的日志如下:

2020-04-11 19:19:35.944  INFO 12564 --- [nio-9002-exec-3] c.y.t.r.o.service.impl.OrderServiceImpl  : [订单状态771390]=>待发货,当前商品id=>1586603975086,商品名称=>Mac pro 2019款
2020-04-11 19:19:35.977  INFO 12564 --- [nio-9002-exec-3] c.y.t.r.o.service.impl.OrderServiceImpl  : 事务ID=>f1f08d8c-da88-4641-975f-fe6848dd4fb1 本地事务执行成功
2020-04-11 19:19:35.977  INFO 12564 --- [nio-9002-exec-3] t.r.o.l.LocalTransactionRocketMQListener : 本地事务=>f1f08d8c-da88-4641-975f-fe6848dd4fb1 执行成功,往RocketMQ发送COMMIT


发货服务的日志如下:

2020-04-11 19:19:36.882  INFO 22984 --- [MessageThread_1] c.y.t.r.s.listener.PayRocketMQListener   : 监听到用户已经下单成功订单id=>771390,名称=>Mac pro 2019款的商品
2020-04-11 19:19:36.937  INFO 22984 --- [MessageThread_1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2020-04-11 19:19:37.400  INFO 22984 --- [MessageThread_1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2020-04-11 19:19:37.522  INFO 22984 --- [MessageThread_1] c.y.t.r.s.service.impl.PayServiceImpl    : [订单状态771390]=>待收货,当前商品id=>1586603975086,商品名称=>Mac pro 2019款


数据库正常进入两条数据
0a20aaf25fd79947fa7871d9afadfe81_MD5.png

MQ控制台 信息
038825ddcc287b653af0cd171ec6a4dd_MD5.png

22132464decff338d311cc05d6cb28c2_MD5.png

现在让订单服务的方法抛出异常
直接在方法中加一个抛异常的语句int i=1/0;,如下所示

@Override
@Transactional
public void payOrder(FwTradeLog fwTradeLog,String transactionId) {
    fwTradeLogService.save(fwTradeLog);
    log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.TWO.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());

    
    FwTransactionLog transactionLog = new FwTransactionLog();
    transactionLog.setTransactionId(transactionId);
    String remark = String.format("事务ID为%s的本地事务执行成功", transactionId);
    transactionLog.setRemark(remark);
    fwTransactionLogService.save(transactionLog);
    log.info("事务ID=>{} 本地事务执行成功", transactionId);
    int i=10/0;
}

重启订单服务,删除之前fw_trade_log表的数据
Postman输入localhost:9002/saveOrder
f15a6f74d633c63562e5c42743b75573_MD5.png

订单服务日志如下,可以看到RocketMQ 本地事务回滚了

2020-04-11 19:40:05.111  INFO 28016 --- [nio-9002-exec-1] c.y.t.r.o.service.impl.OrderServiceImpl  : [订单状态246287]=>待发货,当前商品id=>1586605204008,商品名称=>Mac pro 2020款
2020-04-11 19:40:05.144  INFO 28016 --- [nio-9002-exec-1] c.y.t.r.o.service.impl.OrderServiceImpl  : 事务ID=>7d7a3274-6333-49ac-bd11-7aed62c5a544 本地事务执行成功
2020-04-11 19:40:05.177  INFO 28016 --- [nio-9002-exec-1] t.r.o.l.LocalTransactionRocketMQListener : 本地事务=>7d7a3274-6333-49ac-bd11-7aed62c5a544 回滚,往RocketMQ发送ROLLBACK

java.lang.ArithmeticException: / by zero

发送服务并没有被调用到,因此没有日志

因此数据库里并没有数据
317c6b67db6b4e8a1d1dc325d084941a_MD5.png