第6步: 集成分布式事务(Seata) - YuHaiQing233/explore-cloud-alibaba GitHub Wiki
# 下载地址 https://github.com/seata/seata/releases
全局事务会话信息由3块内容构成:
全局事务表:global_table
分支事务表:branch_table
全局锁表:lock_table
# file.conf
store.mode: 设置数据源
store.db:配置数据库相关的配置
# registry.conf
1. 设置注册中心 registry,以便于其他服务可以使用Seata
2. 设置配置中心 config,Seata 相关配置保存在配置中心,启动时从配置中心加载
# 后台启动命令
nohup sh seata-server.sh -h 127.0.0.1 &
# -h: 注册到注册中心的ip
# -p: Server rpc 监听端口
# -m: 全局事务会话信息存储模式,file、db、redis,优先读取启动参数 (Seata-Server 1.3及以上版本支持redis)
# -n: Server node,多个Server时,需区分各自节点,用于生成不同区间的transactionId,以免冲突
# -e: 多环境配置参考 http://seata.io/en-us/docs/ops/multi-configuration-isolation.html
<!-- 分布式事务依赖 Seata -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
seata:
enabled: true
tx-service-group: cloud_order_tx_group
registry:
type: nacos
nacos:
server-addr: 192.168.174.11:8848
namespace: 2b21d738-f843-46c5-9063-7c6413fb003a
group: SEATA_GROUP
application: seata-server
config:
type: nacos
nacos:
namespace: 2b21d738-f843-46c5-9063-7c6413fb003a
group: SEATA_GROUP
server-addr: 192.168.174.11:8848
package com.explore.order;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
/**
* 订单服务 启动类
*
* @author: YuHaiQing
* @time: 2022/2/1 18:56
*/
@EnableFeignClients // 此注解为启用Feign
@EnableDiscoveryClient
@SpringBootApplication
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class,args);
}
}
# 第一种:不处理分布式事务
调用链中的其中一个服务宕机或不可用,都会导致此次调用失败,但对分布式进行事务处理,则会导致部分业务执行成功,部分业务执行失败,最终导致数据不一致问题
# 第二种:处理分布式事务,在调用的发起方添加注解 @GlobalTransactional
跨服务调用链中任何一个服务宕机或者服务熔断、降级,都能保存数据一致性
/**
* 下单业务
*
* @author: HaiQing.Yu
* @time: 2022/2/1 22:25
*/
/**
* 订单 控制器类
*
* @author: YuHaiQing
* @time: 2022/2/1 18:57
*/
@Slf4j
@RestController
@RequestMapping("/api/order")
public class OrderController {
@Value("${server.port}")
private Integer port;
@Autowired
private IOrderService orderService;
@Resource
private IUserFeign userFeign;
@Resource
private IProductFeign productFeign;
@PostMapping("/placeOrder")
@GlobalTransactional(rollbackFor = Exception.class)
public ResultResponse<Boolean> placeOrder(@RequestBody Order order){
try {
// 1. 创建订单
log.info("【下单】 - 【创建订单】 - 【开始】");
order.setOrderNo("ECA" + System.currentTimeMillis());
order.setCreateTime(LocalDateTime.now());
Boolean createFlag = orderService.createOrder(order);
log.info("【下单】 - 【创建订单】 - 【结束】, 结果:{}",createFlag);
if(!createFlag){
this.rollback();
return ResultResponse.error(444,"订单创建失败");
}
// 2. 用户扣款
log.info("【下单】 - 【用户扣款】 - 【开始】");
UserReductionRequest request = new UserReductionRequest();
request.setUserId(order.getUserId());
request.setMoney(order.getPrice().multiply(new BigDecimal(order.getQuantity())));
Boolean reduction = userFeign.reduction(request);
log.info("【下单】 - 【用户扣款】 - 【结束】, 结果:{}",reduction);
if(!reduction){
this.rollback();
return ResultResponse.error(444,"用户扣款失败");
}
// 3. 产品减库存
log.info("【下单】 - 【产品减库存】 - 【开始】");
ProductReductionRequest productRequest = new ProductReductionRequest();
productRequest.setProductId(order.getProductId());
productRequest.setQuantity(order.getQuantity());
final Boolean pFlag = productFeign.reduction(productRequest);
log.info("【下单】 - 【产品减库存】 - 【结束】, 结果:{}",pFlag);
if(!pFlag){
this.rollback();
return ResultResponse.error(444,"产品减库存失败");
}
// 4. 更新订单状态
log.info("【下单】 - 【更新订单状态】 - 【开始】");
final Boolean aBoolean = orderService.updateOrderStatus(order.getId(), Order.OrderStatus.complete.getStatus());
log.info("【下单】 - 【更新订单状态】 - 【结束】, 结果:{}",aBoolean);
if(!aBoolean){
this.rollback();
return ResultResponse.error(444,"更新订单状态失败");
}
} catch (Exception exception) {
this.rollback();
return ResultResponse.error(444,"下单业务异常");
}
return ResultResponse.success(Boolean.TRUE);
}
/**
* 手动指定全局事务回滚业务(非运行时异常以外的异常,调用此方法进行处理)
*
* @author: HaiQing.Yu
* @time: 2022/2/4 18:57
*/
private void rollback(){
try {
log.info("【下单】 - 【业务异常】 - 【回滚全局事务】 - 【开始】");
final String xid = RootContext.getXID();
final GlobalStatus rollback = TransactionManagerHolder.get().rollback(xid);
log.info("【下单】 - 【业务异常】 - 【回滚全局事务】 - 【结束】, 结果:{}",rollback.toString());
} catch (TransactionException e) {
log.error("【下单业务】 - 【回滚全局事务】 - 【异常】,异常:{}",e.getMessage());
}
}
}
什么是Seata?
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
AT事务模式:
前提:
(1) 基于支持本地ACID(A原子性、C一致性、I隔离性、D持久性)事务的关系型数据库
(2) Java应用,通过JDBC访问数据
整体机制(两阶段提交):
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
(1) 提交异步化,非常快速的完成
(2) 回滚通过一阶段的回滚日志进行反向补偿
写隔离:
(1) 一阶段本地事务提交前,需要确保先拿到 全局锁。
(2) 拿不到 全局锁,不能提交本地事务。
(3) 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并且回滚本地事务,释放本地锁。
读隔离:
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT模式) 的默认全局隔离级别是 读未提交(Read Uncommitted)。
TCC模式:
描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具有自己的:
(1) 一阶段prepare行为
(2) 二阶段 commit 或者 rollback 行为
根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.
AT 模式基于 支持本地 ACID 事务 的 关系型数据库:
(1) 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
(2) 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
(3) 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
相应的,TCC 模式,不依赖于底层数据资源的事务支持:
(1) 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
(2) 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
(3) 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
Saga模式:
Saga模式是Seata提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。
适用场景:
(1) 业务流程长、业务流程多
(2) 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
优势:
(1) 一阶段提交本地事务,无锁,高性能
(2) 事件驱动架构,参与者可异步执行,高吞吐
(3) 补偿服务易于实现
缺点:
(1) 不保证隔离性
TC - 事务协调者
(1) 维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM - 事务管理器
(1) 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM - 资源管理器
(1) 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。