事务发起方 - 969251639/study GitHub Wiki
如果ServiceA -> ServiceB,那么请看下面的一个官方例子
@Service
public class ServiceA {
@Autowired
private ValueDao valueDao; //本地db操作
@Autowired
private ServiceB serviceB;//远程B模块业务
@LcnTransaction //分布式事务注解
@Transactional //本地事务注解
public String execute(String value) throws BusinessException {
// step1. call remote service B
String result = serviceB.rpc(value); // (1)
// step2. local store operate. DTX commit if save success, rollback if not.
valueDao.save(value); // (2)
valueDao.saveBackup(value); // (3)
return result + " > " + "ok-A";
}
}
@Service
public class ServiceB {
@Autowired
private ValueDao valueDao; //本地db操作
@LcnTransaction //分布式事务注解
@Transactional //本地事务注解
public String rpc(String value) throws BusinessException {
valueDao.save(value); // (4)
valueDao.saveBackup(value); // (5)
return "ok-B";
}
}
# 默认之配置为TM的本机默认端口
tx-lcn.client.manager-address=127.0.0.1:8070
所以使用LCN最重要的就是拦截LcnTransaction 注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface LcnTransaction {
/**
* 分布式事务传播行为
*
* @return 传播行为
* @see DTXPropagation
*/
DTXPropagation propagation() default DTXPropagation.REQUIRED;
}
这个注解只有一个参数propagation,表示事务的传播机制,而LCN框架有两种传播机制
public enum DTXPropagation {
/**
* 当前没有分布式事务,就创建。当前有分布式事务,就加入
*/
REQUIRED,
/**
* 当前没有分布式事务,非分布式事务运行。当前有分布式事务,就加入
*/
SUPPORTS;
public static DTXPropagation parser(String code){
switch (code){
case "REQUIRED":{
return REQUIRED;
}
case "SUPPORTS":{
return SUPPORTS;
}
default:{
return REQUIRED;
}
}
}
}
接下来看它的拦截点
/**
* LCN 事务拦截器
* create by lorne on 2018/1/5
*/
@Aspect
@Component
@Slf4j
public class TransactionAspect implements Ordered {
...
/**
* DTC Aspect (Type of LCN)
*/
@Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")
public void lcnTransactionPointcut() {
}
...
@Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
"&& !tccTransactionPointcut() && !txTransactionPointcut()")
public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
DTXInfo dtxInfo = DTXInfo.getFromCache(point);
LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
dtxInfo.setTransactionType(Transactions.LCN);
dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
}
...
}
第一行是获取事务的数据信息DTXInfo
public class DTXInfo {
...
private String transactionType;
private DTXPropagation transactionPropagation;
private TransactionInfo transactionInfo;
/**
* 用户实例对象的业务方法(包含注解信息)
*/
private Method businessMethod;
private String unitId;
private DTXInfo(Method method, Object[] args, Class<?> targetClass) {
this.transactionInfo = new TransactionInfo();
this.transactionInfo.setTargetClazz(targetClass);
this.transactionInfo.setArgumentValues(args);
this.transactionInfo.setMethod(method.getName());
this.transactionInfo.setMethodStr(method.toString());
this.transactionInfo.setParameterTypes(method.getParameterTypes());
this.businessMethod = method;
this.unitId = Transactions.unitId(method.toString());
}
...
}
它包括以下几个方面的内容
transactionType:事务类型(lcn,txc,tcc)
transactionPropagation:事务传播机制(默认为REQUIRED)
transactionInfo:包含事务的执行方法信息
businessMethod:分布式事务执行方法
unitId:单元ID,缓存DTXInfo用
其中transactionInfo包含了执行方法的所有可用内容,比如方法所在的class,参数,方法名等
public class TransactionInfo implements Serializable{
/**
* 事务执行器
*/
private Class targetClazz;
/**
* 方法
*/
private String method;
/**
* 参数值
*/
private Object[] argumentValues;
/**
* 参数类型
*/
private Class[] parameterTypes;
/**
* 方法字符串
*/
private String methodStr;
...
}
回到lcnTransactionPointcut方法上,首先会先从缓存中获取DTXInfo,获取不到就用方法名生成一个ID作为缓存键
public static DTXInfo getFromCache(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
String signature = proceedingJoinPoint.getSignature().toString();
String unitId = Transactions.unitId(signature);
DTXInfo dtxInfo = dtxInfoCache.get(unitId);
if (Objects.isNull(dtxInfo)) {
MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
Method method = methodSignature.getMethod();
Class<?> targetClass = proceedingJoinPoint.getTarget().getClass();
Method thisMethod = targetClass.getMethod(method.getName(), method.getParameterTypes());
dtxInfo = new DTXInfo(thisMethod, proceedingJoinPoint.getArgs(), targetClass);
dtxInfoCache.put(unitId, dtxInfo);
}
dtxInfo.reanalyseMethodArgs(proceedingJoinPoint.getArgs());
return dtxInfo;
}
public static String unitId(String methodSignature) {
return DigestUtils.md5DigestAsHex((APPLICATION_ID_WHEN_RUNNING + methodSignature).getBytes());
}
接下来是获取注解上的信息,继续完善DTXInfo
最后调用runTransaction方法正式开启分布式事务
这个方法是在DTXLogicWeaver类中实现,而DTXLogicWeaver类是通过IOC注入进来
public class TransactionAspect implements Ordered {
private final TxClientConfig txClientConfig;
private final DTXLogicWeaver dtxLogicWeaver;
public TransactionAspect(TxClientConfig txClientConfig, DTXLogicWeaver dtxLogicWeaver) {
this.txClientConfig = txClientConfig;
this.dtxLogicWeaver = dtxLogicWeaver;
}
...
}
而在生成DTXLogicWeaver这个类时也同时会依赖注入DTXServiceExecutor类和TCGlobalContext类,他们的作用分别如下:
DTXServiceExecutor:LCN分布式事务业务执行器
TCGlobalContext:LCN分布式事务业务全局上下文,默认实现类是(DefaultGlobalContext)
接下来回到DTXLogicWeaver.runTransaction方法
public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {
if (Objects.isNull(DTXLocalContext.cur())) {
DTXLocalContext.getOrNew();
} else {
return business.call();
}
log.debug("<---- TxLcn start ---->");
DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
TxContext txContext;
if (globalContext.hasTxContext()) {
// 有事务上下文的获取父上下文
txContext = globalContext.txContext();
dtxLocalContext.setInGroup(true);
log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
} else {
// 没有的开启本地事务上下文
txContext = globalContext.startTx();
}
// 本地事务调用
if (Objects.nonNull(dtxLocalContext.getGroupId())) {
dtxLocalContext.setDestroy(false);
}
dtxLocalContext.setUnitId(dtxInfo.getUnitId());
dtxLocalContext.setGroupId(txContext.getGroupId());
dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
// 事务参数
TxTransactionInfo info = new TxTransactionInfo();
info.setBusinessCallback(business);
info.setGroupId(txContext.getGroupId());
info.setUnitId(dtxInfo.getUnitId());
info.setPointMethod(dtxInfo.getBusinessMethod());
info.setPropagation(dtxInfo.getTransactionPropagation());
info.setTransactionInfo(dtxInfo.getTransactionInfo());
info.setTransactionType(dtxInfo.getTransactionType());
info.setTransactionStart(txContext.isDtxStart());
//LCN事务处理器
try {
return transactionServiceExecutor.transactionRunning(info);
} finally {
if (dtxLocalContext.isDestroy()) {
// 获取事务上下文通知事务执行完毕
synchronized (txContext.getLock()) {
txContext.getLock().notifyAll();
}
// TxContext生命周期是? 和事务组一样(不与具体模块相关的)
if (!dtxLocalContext.isInGroup()) {
globalContext.destroyTx();
}
DTXLocalContext.makeNeverAppeared();
TracingContext.tracing().destroy();
}
log.debug("<---- TxLcn end ---->");
}
}
方法略长,逐一分析:
- 判断有没有产生事务,一般是没有,所以进入DTXLocalContext.getOrNew();
if (Objects.isNull(DTXLocalContext.cur())) {//创建分布式事务远程调用控制对象DTXLocalContext
DTXLocalContext.getOrNew();
} else {
return business.call();
}
- 生成一个事务组进行传播
log.debug("<---- TxLcn start ---->");
DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
TxContext txContext;
if (globalContext.hasTxContext()) {//参与方时会进入下面的分支
// 有事务上下文的获取父上下文
txContext = globalContext.txContext();
dtxLocalContext.setInGroup(true);//参与进来
log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
} else {
// 没有的开启本地事务上下文
txContext = globalContext.startTx();
}
@Override
public TxContext startTx() {
TxContext txContext = new TxContext();
// 事务发起方判断
txContext.setDtxStart(!TracingContext.tracing().hasGroup());//启动前肯定没有group,取反,则将txContext的dtxStart设置成true
if (txContext.isDtxStart()) {
//生成组ID
TracingContext.tracing().beginTransactionGroup();
}
//上下文设置组ID
txContext.setGroupId(TracingContext.tracing().groupId());
//缓存用
String txContextKey = txContext.getGroupId() + ".dtx";
attachmentCache.attach(txContextKey, txContext);
log.debug("Start TxContext[{}]", txContext.getGroupId());
return txContext;
}
- 设置事务信息
// 事务参数
TxTransactionInfo info = new TxTransactionInfo();
info.setBusinessCallback(business);
info.setGroupId(txContext.getGroupId());
info.setUnitId(dtxInfo.getUnitId());
info.setPointMethod(dtxInfo.getBusinessMethod());
info.setPropagation(dtxInfo.getTransactionPropagation());
info.setTransactionInfo(dtxInfo.getTransactionInfo());
info.setTransactionType(dtxInfo.getTransactionType());
info.setTransactionStart(txContext.isDtxStart());
- 用事务执行器执行事务
public Object transactionRunning(TxTransactionInfo info) throws Throwable {
// 1. 获取事务类型
String transactionType = info.getTransactionType();
// 2. 获取事务传播状态
DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);
// 2.1 如果不参与分布式事务立即终止
if (propagationState.isIgnored()) {
return info.getBusinessCallback().call();
}
// 3. 获取本地分布式事务控制器
DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);
// 4. 织入事务操作
try {
// 4.1 记录事务类型到事务上下文
Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
transactionTypeSet.add(transactionType);
dtxLocalControl.preBusinessCode(info);
// 4.2 业务执行前
txLogger.transactionInfo(
info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);
// 4.3 执行业务
Object result = dtxLocalControl.doBusinessCode(info);
// 4.4 业务执行成功
txLogger.transactionInfo(info.getGroupId(), info.getUnitId(), "business success");
dtxLocalControl.onBusinessCodeSuccess(info, result);
return result;
} catch (TransactionException e) {
txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
throw e;
} catch (Throwable e) {
// 4.5 业务执行失败
txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
"business code error");
dtxLocalControl.onBusinessCodeError(info, e);
throw e;
} finally {
// 4.6 业务执行完毕
dtxLocalControl.postBusinessCode(info);
}
}
其中如果是create的话DTXLocalControl返回LcnStartingTransaction,否则返回LcnRunningTransaction,所以发起方会调用LcnStartingTransaction.preBusinessCode,而参与方则会调用LcnRunningTransaction.preBusinessCode
public class LcnStartingTransaction implements DTXLocalControl {
...
@Override
public void preBusinessCode(TxTransactionInfo info) throws TransactionException {
// create DTX group
transactionControlTemplate.createGroup(
info.getGroupId(), info.getUnitId(), info.getTransactionInfo(), info.getTransactionType());
// lcn type need connection proxy
DTXLocalContext.makeProxy();
}
...
}
这段代码就两个功能,第一个是将创建的事务组通过消息的方式发送给LCN协调者服务器,然后生成一个代理(Connection时用)
而参与方则设置代理即可,事务组消息会通过拦截时获取到
public class LcnStartingTransaction implements DTXLocalControl {
@Override
public void preBusinessCode(TxTransactionInfo info) {
// lcn type need connection proxy
DTXLocalContext.makeProxy();
}
}
再往下就比较简单,执行业务方法,如果成功则将上下文中DTXLocalContext的成员变量sysTransactionState设置成1,表示执行成功,否则在一场中将其置为0,表示事务执行失败,最后调用postBusinessCode,根据sysTransactionState的值来通知所有参与者是否提交事务还是回滚事务
另外,LCN还会代理数据库连接
@Aspect
@Component
@Slf4j
public class DataSourceAspect implements Ordered {
@Around("execution(* javax.sql.DataSource.getConnection(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
return dtxResourceWeaver.getConnection(() -> (Connection) point.proceed());
}
}
public Object getConnection(ConnectionCallback connectionCallback) throws Throwable {
DTXLocalContext dtxLocalContext = DTXLocalContext.cur();
if (Objects.nonNull(dtxLocalContext) && dtxLocalContext.isProxy()) {
String transactionType = dtxLocalContext.getTransactionType();
TransactionResourceProxy resourceProxy = txLcnBeanHelper.loadTransactionResourceProxy(transactionType);
Connection connection = resourceProxy.proxyConnection(connectionCallback);
log.debug("proxy a sql connection: {}.", connection);
return connection;
}
return connectionCallback.call();
}
最后操作数据拿到的连接是LCN代理的连接
@Override
public Connection proxyConnection(ConnectionCallback connectionCallback) throws Throwable {
String groupId = DTXLocalContext.cur().getGroupId();
try {
return globalContext.getLcnConnection(groupId);
} catch (TCGlobalContextException e) {
LcnConnectionProxy lcnConnectionProxy = new LcnConnectionProxy(connectionCallback.call());
globalContext.setLcnConnection(groupId, lcnConnectionProxy);
lcnConnectionProxy.setAutoCommit(false);
return lcnConnectionProxy;
}
}
public class LcnConnectionProxy implements Connection {
...
@Override
public void commit() throws SQLException {
//connection.commit();
}
@Override
public void rollback() throws SQLException {
//connection.rollback();
}
@Override
public void close() throws SQLException {
//connection.close();
}
...
}
当spring提交事务方法时其实执行的都是空方法,官方称之为假关闭
而真正的提交或回滚是有协调者来触发
public class ClientRpcAnswer implements RpcAnswer, DisposableBean {
//接收到协调者服务的消息的回调方法
@Override
public void callback(RpcCmd rpcCmd) {
executorService.submit(() -> {
...
Serializable message = executeService.execute(transactionCmd);
...
});
}
}
public class DefaultNotifiedUnitService implements RpcExecuteService {
...
@Override
public Serializable execute(TransactionCmd transactionCmd) throws TxClientException {
...
// 事务清理操作
transactionCleanTemplate.clean(
notifyUnitParams.getGroupId(),
notifyUnitParams.getUnitId(),
notifyUnitParams.getUnitType(),
notifyUnitParams.getState());
}
...
}
@Component
@Slf4j
public class TransactionCleanTemplate {
...
public void clean(String groupId, String unitId, String unitType, int state) throws TransactionClearException {
...
transactionBeanHelper.loadTransactionCleanService(unitType).clear(
groupId, state, unitId, unitType
);
...
}
...
}
@Component
@Slf4j
public class LcnTransactionCleanService implements TransactionCleanService {
...
@Override
public void clear(String groupId, int state, String unitId, String unitType) throws TransactionClearException {
try {
LcnConnectionProxy connectionProxy = globalContext.getLcnConnection(groupId);
connectionProxy.notify(state);
// todo notify exception
} catch (TCGlobalContextException e) {
log.warn("Non lcn connection when clear transaction.");
}
}
}
@Slf4j
public class LcnConnectionProxy implements Connection {
private Connection connection;
public LcnConnectionProxy(Connection connection) {
this.connection = connection;//真正的数据库连接
}
/**
* notify connection
*
* @param state transactionState
* @return RpcResponseState RpcResponseState
*/
public RpcResponseState notify(int state) {
try {
if (state == 1) {//1表示成功,提交事务
log.debug("commit transaction type[lcn] proxy connection:{}.", this);
connection.commit();
} else {//非1表示成功,回滚事务
log.debug("rollback transaction type[lcn] proxy connection:{}.", this);
connection.rollback();
}
connection.close();//执行真正的关闭
log.debug("transaction type[lcn] proxy connection:{} closed.", this);
return RpcResponseState.success;
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
return RpcResponseState.fail;
}
}
...
}
整个流程大概如下官图所示: 核心步骤
- 创建事务组
是指在事务发起方开始执行业务代码之前先调用TxManager创建事务组对象,然后拿到事务标示GroupId的过程。 - 加入事务组
添加事务组是指参与方在执行完业务方法以后,将该模块的事务信息通知给TxManager的操作。 - 通知事务组
是指在发起方执行完业务代码以后,将发起方执行结果状态通知给TxManager,TxManager将根据事务最终状态和事务组的信息来通知相应的参与模块提交或回滚事务,并返回结果给事务发起方。