3SEATA-AT模式 · SpringCloud微服务实战 · 看云
导航
本节代码地址
GitHub:https://github.com/xuyisu/fw-spring-cloud/tree/master/fw-cloud-transaction/fw-cloud-transaction-seata/fw-cloud-transaction-seata-at/fw-cloud-transaction-seata-at-order
GitHub:https://github.com/xuyisu/fw-spring-cloud/tree/master/fw-cloud-transaction/fw-cloud-transaction-seata/fw-cloud-transaction-seata-at/fw-cloud-transaction-seata-at-send
1. 前提
- 基于支持本地 ACID 事务的关系型数据库。
- Java 应用,通过 JDBC 访问数据库。
2. 整体机制
两阶段提交协议的演变:
-
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
-
二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
3. 写隔离
- 一阶段本地事务提交前,需要确保先拿到全局锁。
- 拿不到全局锁,不能提交本地事务。
- 拿全局锁的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。
以一个示例来说明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的全局锁,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的全局锁,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待全局锁。
tx1 二阶段全局提交,释放全局锁。tx2 拿到全局锁提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的全局锁等锁超时,放弃全局锁并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程全局锁在 tx1 结束前一直是被 tx1 持有的,所以不会发生脏写的问题。
3. 读隔离
在数据库本地事务隔离级别读已提交(Read Committed)或以上的基础上,Seata(AT 模式)的默认全局隔离级别是读未提交(Read Uncommitted)。
如果应用在特定场景下,必需要求全局的读已提交,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
SELECT FOR UPDATE 语句的执行会申请全局锁,如果全局锁被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到全局锁拿到,即读取的相关数据是已提交的,才返回。
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句。
4. 工作机制
以一个示例来说明整个 AT 分支的工作过程。
业务表:product
| Field | Type | Key |
|---|---|---|
| id | bigint(20) | PRI |
| name | varchar(100) | |
| since | varchar(100) |
AT 分支事务的业务逻辑:
update product set name = 'GTS' where name = 'TXC';
4.1 一阶段
过程:
- 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。
- 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id, name, since from product where name = 'TXC';
得到前镜像:
| id | name | since |
|---|---|---|
| 1 | TXC | 2014 |
- 执行业务 SQL:更新这条记录的 name 为 'GTS'。
- 查询后镜像:根据前镜像的结果,通过主键定位数据。
select id, name, since from product where id = 1`;
得到后镜像:
| id | name | since |
|---|---|---|
| 1 | GTS | 2014 |
- 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到
UNDO_LOG表中。
{
"branchId": 641789253,
"undoItems": [{
"afterImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "GTS"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"beforeImage": {
"rows": [{
"fields": [{
"name": "id",
"type": 4,
"value": 1
}, {
"name": "name",
"type": 12,
"value": "TXC"
}, {
"name": "since",
"type": 12,
"value": "2014"
}]
}],
"tableName": "product"
},
"sqlType": "UPDATE"
}],
"xid": "xid:xxx"
}
- 提交前,向 TC 注册分支:申请
product表中,主键值等于 1 的记录的全局锁。 - 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
- 将本地事务提交的结果上报给 TC。
4.2 二阶段-回滚
- 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
- 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
- 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
- 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update product set name = 'TXC' where id = 1;
- 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
4.3 二阶段-提交
- 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
- 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
4.4 回滚日志表
UNDO_LOG Table:不同数据库在类型上会略有差别。
以 MySQL 为例:
| Field | Type |
|---|---|
| branch_id | bigint PK |
| xid | varchar(100) |
| context | varchar(128) |
| rollback_info | longblob |
| log_status | tinyint |
| log_created | datetime |
| log_modified | datetime |
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
5. AT 实例演示
本模块中我们将新建两个模块,我们将模拟电商购物环节中的下单->发货的过程,两个服务通过Feign进行远程调用。两个模块的名称分别为fw-cloud-transaction-seata-at-order和fw-cloud-transaction-seata-at-send
5.1 maven 依赖
两个模块所使用的依赖也是相同的,这里可以看到,我们依赖了上面封装的fw-cloud-transaction-base-dao,不需要再自己写基本的mapper等,因为是基于Nacos服务发现和Feign 远程调用的,因此引入了响应的包。其中最重要的就是引入spring-cloud-starter-alibaba-seata包,否者不能实现和Seata Server的通信及事务控制。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- base-dao -->
<dependency>
<groupId>com.yisu.cloud</groupId>
<artifactId>fw-cloud-transaction-base-dao</artifactId>
<version>${version}</version>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<!--服务发现-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--feign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
</dependencies>
并在MySql中新建fw_transaction_seata库,并执行以下脚本,供fw-cloud-transaction-seata-at-order和fw-cloud-transaction-seata-at-send使用,其中undo_log的作用前面有介绍。
CREATE TABLE `fw_trade_log` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
`status` tinyint(3) unsigned NOT NULL DEFAULT '0' COMMENT '订单状态 1.待支付 2.待发货 3.待收货 4.订单完成 5.订单关闭',
`status_dsc` varchar(100) CHARACTER SET utf8 DEFAULT '' COMMENT '状态描述',
`product_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '商品id',
`product_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称',
`user_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '用户id',
`order_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '订单id',
`order_amount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '订单总额',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) NOT NULL COMMENT 'global transaction id',
`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime NOT NULL COMMENT 'create datetime',
`log_modified` datetime NOT NULL COMMENT 'modify datetime',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8 COMMENT='AT transaction mode undo table';
5.2 fw-cloud-transaction-seata-at-order模块
5.2.1 订单接口创建
再订单模块中创建一个订单接口,并在接口中设置支付成功的方法
public interface OrderService{
void saveAndPayOrder(String productName);
}
5.2.2 订单接口实现
订单在支付成功以后会远程调用发货服务进行发货,需要在Service 上加上@GlobalTransactional,开启AT模式的分布式事务控制,并且设置订单的状态为待支付
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private FwTradeLogService fwTradeLogService;
@Autowired
private RemoteSendServiceFeign remoteSendServiceFeign;
@GlobalTransactional
@Override
public void saveAndPayOrder(String productName) {
FwTradeLog fwTradeLog =new FwTradeLog(StatusEnum.TWO);
fwTradeLog.setProductId(System.currentTimeMillis());
fwTradeLog.setProductName(productName);
fwTradeLogService.save(fwTradeLog);
log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.TWO.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
remoteSendServiceFeign.sendOrder(fwTradeLog);
}
}
5.2.3 Feign 接口配置
这里我们配置远程调用的服务名称是fw-transaction-seata-at-send,并且设置了fallback,并且指定了远程调用的方法是sendOrder。
@FeignClient(value = "fw-transaction-seata-at-send", fallbackFactory = RemoteSendServiceFallback.class)
public interface RemoteSendServiceFeign {
@PostMapping("send")
void sendOrder(@RequestBody FwTradeLog tradeLog);
}
5.2.4 Fallback 接口配置
在fallback输出异常日志
@Component
@Slf4j
public class RemoteSendServiceFallback implements FallbackFactory<RemoteSendServiceFeign> {
@Override
public RemoteSendServiceFeign create(Throwable throwable) {
return tradeLog -> log.error("远程调用失败",throwable);
}
}
5.2.5 Controller 控制层
配置订单服务对外的接口,方便演示
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("saveOrder")
public FwResult saveOrder(String productName){
orderService.saveAndPayOrder(productName);
return FwResult.ok();
}
}
5.2.6 启动类配置
这里需要加上@EnableFeignClients开启Feign的远程调用,加上@EnableDiscoveryClient开启服务的注册和发现
@SpringBootApplication
@EnableFeignClients
@EnableDiscoveryClient
public class FwTransactionSeataAtOrderApplication {
public static void main(String[] args) {
SpringApplication.run(FwTransactionSeataAtOrderApplication .class, args);
}
}
5.2.7 应用配置
配置中需要配置数据库,以及Nacos的连接信息和开启Feign的远程调用,其中dbIp 作为变量传入,其中tx-service-group: fwcloud_tx_group需要和file.conf里面配置的一样。
server:
port: 9002
spring:
application:
name: fw-transaction-seata-at-order
#数据库配置 start
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
url: jdbc:mysql://${dbIp}:3306/fw_transaction_seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
cloud:
nacos:
discovery:
server-addr: localhost:8848
alibaba:
seata:
tx-service-group: fwcloud_tx_group
feign:
hystrix:
enabled: true
hystrix:
shareSecurityContext: true
另外需要将Seata 里面的file.conf和registry.conf 复制到如下图所示的地方
5.3 fw-cloud-transaction-seata-at-send模块
5.3.1 发货接口创建
配置发货接口,并且创建发货的方法
public interface SendService {
void sendOrder(FwTradeLog fwTradeLog);
}
5.3.2 发货接口实现
这里在实现方法上加上@GlobalTransactional,并且设置订单的状态为待收货
@Service
@Slf4j
public class SendServiceImpl implements SendService {
@Autowired
private FwTradeLogService fwTradeLogService;
@GlobalTransactional
@Override
public void sendOrder(FwTradeLog fwTradeLog) {
fwTradeLog.setStatus(StatusEnum.THREE.getValue());
fwTradeLog.setStatusDsc(StatusEnum.THREE.getDesc());
fwTradeLogService.save(fwTradeLog);
log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.THREE.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
}
}
5.3.3 发货控制层实现
提供给订单模块调用的接口,注意方法名和请求方式要保持一致
@RestController
public class SendController {
@Autowired
private SendService sendService;
@PostMapping("send")
public void sendOrder(@RequestBody FwTradeLog tradeLog) {
sendService.sendOrder(tradeLog);
}
}
5.3.4 启动类
需要加上@EnableDiscoveryClient注解开启服务的注册与发现
@SpringBootApplication
@EnableDiscoveryClient
public class FwTransactionSeataAtSendApplication {
public static void main(String[] args) {
SpringApplication.run(FwTransactionSeataAtSendApplication .class, args);
}
}
5.3.5 应用配置
配置中需要配置数据库,以及设置了Nacos的连接信息和开启Feign的远程调用,其中tx-service-group: fwcloud_tx_group需要和file.conf里面配置的一样。
server:
port: 9003
spring:
application:
name: fw-transaction-seata-at-send
#数据库配置 start
datasource:
druid:
url: jdbc:mysql://${dbIp}:3306/fw_transaction_seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=false
username: root
password: 123456
driver-class-name: com.mysql.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
cloud:
nacos:
discovery:
server-addr: localhost:8848
alibaba:
seata:
tx-service-group: fwcloud_tx_group
另外需要将Seata 里面的file.conf和registry.conf 复制到如下图所示的地方
5.4 启动服务
分别启动Nacos、Seata Server、fw-transaction-seata-at-send、fw-transaction-seata-at-order
Nacos可以看到两个服务和Seata Server 注册上来了
Postman输入localhost:9002/saveOrder
可以看到订单服务的日志如下
2020-04-10 23:14:33.951 INFO 32052 --- [nio-9002-exec-8] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [10.88.12.39:8091:2040256728]
2020-04-10 23:14:34.372 INFO 32052 --- [nio-9002-exec-8] c.y.t.s.a.o.s.impl.OrderServiceImpl : [订单状态978896]=>待发货,当前商品id=>1586531673951,商品名称=>Mac pro 2019款
2020-04-10 23:14:35.256 INFO 32052 --- [nio-9002-exec-8] i.seata.tm.api.DefaultGlobalTransaction : [10.88.12.39:8091:2040256728] commit status: Committed
2020-04-10 23:14:36.028 INFO 32052 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=10.88.12.39:8091:2040256728,branchId=2040256730,branchType=AT,resourceId=jdbc:mysql://192.168.0.102:3306/fw_transaction_seata,applicationData=null
2020-04-10 23:14:36.029 INFO 32052 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch committing: 10.88.12.39:8091:2040256728 2040256730 jdbc:mysql://192.168.0.102:3306/fw_transaction_seata null
2020-04-10 23:14:36.029 INFO 32052 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
发货服务的日志如下
2020-04-10 23:14:34.834 INFO 31332 --- [nio-9003-exec-4] c.y.t.s.t.s.s.impl.SendServiceImpl : [订单状态978896]=>待收货,当前商品id=>1586531673951,商品名称=>Mac pro 2019款
2020-04-10 23:14:36.195 INFO 31332 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=10.88.12.39:8091:2040256728,branchId=2040256733,branchType=AT,resourceId=jdbc:mysql://192.168.0.102:3306/fw_transaction_seata,applicationData=null
2020-04-10 23:14:36.195 INFO 31332 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch committing: 10.88.12.39:8091:2040256728 2040256733 jdbc:mysql://192.168.0.102:3306/fw_transaction_seata null
2020-04-10 23:14:36.195 INFO 31332 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch commit result: PhaseTwo_Committed
数据库记录两条记录
现在让订单服务的方法抛出异常
直接在方法中加一个抛异常的语句int i=1/0;,如下所示
@GlobalTransactional
@Override
public void saveAndPayOrder(String productName) {
FwTradeLog fwTradeLog =new FwTradeLog(StatusEnum.TWO);
fwTradeLog.setProductId(System.currentTimeMillis());
fwTradeLog.setProductName(productName);
fwTradeLogService.save(fwTradeLog);
log.info("[订单状态{}]=>{},当前商品id=>{},商品名称=>{}",fwTradeLog.getOrderId(), StatusEnum.TWO.getDesc(),fwTradeLog.getProductId(),fwTradeLog.getProductName());
remoteSendServiceFeign.sendOrder(fwTradeLog);
int i=1/0;
}
重启订单服务,删除之前fw_trade_log表的数据
Postman输入localhost:9002/saveOrder
可以看到订单服务的日志如下,可以看到seata PhaseTwo_Rollbacked 的日志信息
2020-04-10 23:25:18.663 INFO 14868 --- [nio-9002-exec-3] i.seata.tm.api.DefaultGlobalTransaction : Begin new global transaction [10.88.12.39:8091:2040260158]
2020-04-10 23:25:19.160 INFO 14868 --- [nio-9002-exec-3] c.y.t.s.a.o.s.impl.OrderServiceImpl : [订单状态239568]=>待发货,当前商品id=>1586532318664,商品名称=>Mac pro 2020款
2020-04-10 23:25:20.088 INFO 14868 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=10.88.12.39:8091:2040260158,branchId=2040260161,branchType=AT,resourceId=jdbc:mysql://192.168.0.102:3306/fw_transaction_seata,applicationData=null
2020-04-10 23:25:20.088 INFO 14868 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 10.88.12.39:8091:2040260158 2040260161 jdbc:mysql://192.168.0.102:3306/fw_transaction_seata
2020-04-10 23:25:20.163 INFO 14868 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 10.88.12.39:8091:2040260158 branch 2040260161, undo_log deleted with GlobalFinished
2020-04-10 23:25:20.179 INFO 14868 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
2020-04-10 23:25:20.472 INFO 14868 --- [nio-9002-exec-3] i.seata.tm.api.DefaultGlobalTransaction : [10.88.12.39:8091:2040260158] rollback status: Rollbacked
发货服务的日志如下,可以看到发送被调用到了
2020-04-10 23:25:19.606 INFO 2860 --- [nio-9003-exec-2] c.y.t.s.t.s.s.impl.SendServiceImpl : [订单状态239568]=>待收货,当前商品id=>1586532318664,商品名称=>Mac pro 2020款
2020-04-10 23:25:19.813 INFO 2860 --- [tch_RMROLE_1_16] i.s.core.rpc.netty.RmMessageListener : onMessage:xid=10.88.12.39:8091:2040260158,branchId=2040260164,branchType=AT,resourceId=jdbc:mysql://192.168.0.102:3306/fw_transaction_seata,applicationData=null
2020-04-10 23:25:19.813 INFO 2860 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 10.88.12.39:8091:2040260158 2040260164 jdbc:mysql://192.168.0.102:3306/fw_transaction_seata
2020-04-10 23:25:19.884 INFO 2860 --- [tch_RMROLE_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 10.88.12.39:8091:2040260158 branch 2040260164, undo_log deleted with GlobalFinished
2020-04-10 23:25:19.901 INFO 2860 --- [tch_RMROLE_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
数据库记录发现没有数据









