Spring 事务 - litter-fish/ReadSource GitHub Wiki

事务的开启

<!-- 声明式事务管理 配置事物的注解方式注入-->
<tx:annotation-driven transaction-manager="transactionManager"/>

annotation-driven标签解析器的注册

public class TxNamespaceHandler extends NamespaceHandlerSupport {

	static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager";
	static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager";

	static String getTransactionManagerName(Element element) {
		return (element.hasAttribute(TRANSACTION_MANAGER_ATTRIBUTE) ?
				element.getAttribute(TRANSACTION_MANAGER_ATTRIBUTE) : DEFAULT_TRANSACTION_MANAGER_BEAN_NAME);
	}

	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}

}

annotation-driven标签解析时序图

annotation-driven标签解析

// org/springframework/transaction/config/AnnotationDrivenBeanDefinitionParser.java
public BeanDefinition parse(Element element, ParserContext parserContext) {
    String mode = element.getAttribute("mode");
    if ("aspectj".equals(mode)) {
        // mode="aspectj"
        registerTransactionAspect(element, parserContext);
    }
    else {
        // mode="proxy"
        AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
    }
    return null;
}

proxy模式的处理

// org/springframework/transaction/config/AnnotationDrivenBeanDefinitionParser.java
private static class AopAutoProxyConfigurer {
    public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
        // 注册InfrastructureAdvisorAutoProxyCreator类型bean
        AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

        // org.springframework.transaction.config.internalTransactionAdvisor
        String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
        if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
            Object eleSource = parserContext.extractSource(element);

            // Create the TransactionAttributeSource definition.
            // 创建AnnotationTransactionAttributeSource的bean
            RootBeanDefinition sourceDef = new RootBeanDefinition(
                    "org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
            sourceDef.setSource(eleSource);
            sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
            // 注册bean,并使用spring的规则生成bean名称
            String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);

            // Create the TransactionInterceptor definition.
            // 创建TransactionInterceptor的bean (事务切面)
            RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
            interceptorDef.setSource(eleSource);
            interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
            registerTransactionManager(element, interceptorDef);
            interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
            String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);

            // Create the TransactionAttributeSourceAdvisor definition.

            RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
            advisorDef.setSource(eleSource);
            advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
            // 将sourceName的bean注入到advisorDef的属性transactionAttributeSource中
            advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
            // 将interceptorName的bean注入到advisorDef的属性adviceBeanName中
            advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
            if (element.hasAttribute("order")) {
                advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
            }
            parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);

            CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
            compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
            compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
            compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
            parserContext.registerComponent(compositeDef);
        }
    }
}

注册InfrastructureAdvisorAutoProxyCreator类型bean

// org/springframework/aop/config/AopNamespaceUtils.java
public static void registerAutoProxyCreatorIfNecessary(
        ParserContext parserContext, Element sourceElement) {

    // 注册InfrastructureAdvisorAutoProxyCreator类型bean
    BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
            parserContext.getRegistry(), parserContext.extractSource(sourceElement));
    // 处理proxy-target-class和expose-proxy属性
    useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
    // 注册组件并通知,便于监听器进一步处理
    registerComponentIfNecessary(beanDefinition, parserContext);
}

// org/springframework/aop/config/AopConfigUtils.java
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) {
    return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

InfrastructureAdvisorAutoProxyCreator的继承结构

从上图可知,其间接实现了SmartInstantiationAwareBeanPostProcessor接口,及该bean在实例化和初始化的时候会进行相应回调函数的调用

// org/springframework/aop/framework/autoproxy/AbstractAutoProxyCreator.java
// 初始化bean之后会调用次方法
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean != null) {
        // 根据给定bean的class和name构建key,格式:beanClassName_beanName
        Object cacheKey = getCacheKey(bean.getClass(), beanName);
        if (!this.earlyProxyReferences.containsKey(cacheKey)) {
            // 如果它适合被代理,则需要封装指定bean
            return wrapIfNecessary(bean, beanName, cacheKey);
        }
    }
    return bean;
}

接着查找指定bean的增强方法并创建代理

// org/springframework/aop/framework/autoproxy/AbstractAutoProxyCreator.java
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    // 已经处理过直接return
    if (beanName != null && this.targetSourcedBeans.containsKey(beanName)) {
        return bean;
    }
    // 无需增强直接return
    if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
        return bean;
    }
    // 给定bean是否是一个基础设施类(Advice、Advisor、AopInfrastructureBean)或者配置了指定bean不需要自动代理,则直接return
    if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
        this.advisedBeans.put(cacheKey, Boolean.FALSE);
        return bean;
    }

    // Create proxy if we have advice.
    // 如果存在增强方法则创建代理
    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    // 针对增强创建代理
    if (specificInterceptors != DO_NOT_PROXY) {
        this.advisedBeans.put(cacheKey, Boolean.TRUE);
        // 创建代理
        Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
        this.proxyTypes.put(cacheKey, proxy.getClass());
        return proxy;
    }

    this.advisedBeans.put(cacheKey, Boolean.FALSE);
    return bean;
}

首先,获取与class匹配的增强器

// org/springframework/aop/framework/autoproxy/AbstractAdvisorAutoProxyCreator.java
protected List<Advisor> findEligibleAdvisors(Class beanClass, String beanName) {
    // 获取所有的增强
    List<Advisor> candidateAdvisors = findCandidateAdvisors();
    // 获取与class匹配的增强器
    List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
    extendAdvisors(eligibleAdvisors);
    if (!eligibleAdvisors.isEmpty()) {
        eligibleAdvisors = sortAdvisors(eligibleAdvisors);
    }
    return eligibleAdvisors;
}

寻找所有增强器中适合于当前class的增强器

// org/springframework/aop/support/AopUtils.java
// 寻找所有增强器中适合于当前class的增强器
public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) {
    if (candidateAdvisors.isEmpty()) {
        return candidateAdvisors;
    }
    List<Advisor> eligibleAdvisors = new LinkedList<Advisor>();
    // 先处理IntroductionAdvisor引介增强
    for (Advisor candidate : candidateAdvisors) {
        if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) {
            eligibleAdvisors.add(candidate);
        }
    }
    boolean hasIntroductions = !eligibleAdvisors.isEmpty();
    for (Advisor candidate : candidateAdvisors) {
        if (candidate instanceof IntroductionAdvisor) {
            // already processed
            continue;
        }
        // 普通bean的处理
        if (canApply(candidate, clazz, hasIntroductions)) {
            eligibleAdvisors.add(candidate);
        }
    }
    return eligibleAdvisors;
}

回顾之前开启事务自定义标签解析过程,在解析中会向Spring注入一个BeanFactoryTransactionAttributeSourceAdvisor的bean, 查看该bean的类继承结构,其间接实现了Advisor接口,所有上面的candidateAdvisors中会包含该bean,且该bean也实现了PointcutAdvisor接口

// org/springframework/aop/support/AopUtils.java
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
    if (advisor instanceof IntroductionAdvisor) {
        return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
    }
    // BeanFactoryTransactionAttributeSourceAdvisor 实现了 PointcutAdvisor,因此会走下面分支
    else if (advisor instanceof PointcutAdvisor) {
        PointcutAdvisor pca = (PointcutAdvisor) advisor;
        return canApply(pca.getPointcut(), targetClass, hasIntroductions);
    }
    else {
        // It doesn't have a pointcut so we assume it applies.
        return true;
    }
}

canApply 方法的第一个参数pca.getPointcut(),会在BeanFactoryTransactionAttributeSourceAdvisor创建时被赋值

public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

	private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
		@Override
		protected TransactionAttributeSource getTransactionAttributeSource() {
			return transactionAttributeSource;
		}
	};
}

canApply逻辑

public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
    Assert.notNull(pc, "Pointcut must not be null");
    if (!pc.getClassFilter().matches(targetClass)) {
        return false;
    }
    // pc 返回this 即 TransactionAttributeSourcePointcut
    MethodMatcher methodMatcher = pc.getMethodMatcher();
    IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
    if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
        introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
    }

    Set<Class> classes = new LinkedHashSet<Class>(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
    classes.add(targetClass);
    for (Class<?> clazz : classes) {
        Method[] methods = clazz.getMethods();
        for (Method method : methods) {
            if ((introductionAwareMethodMatcher != null &&
                    introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions)) ||
                    // methodMatcher 指向:TransactionAttributeSourcePointcut
                    methodMatcher.matches(method, targetClass)) {
                return true;
            }
        }
    }

    return false;
}

进行事务标签匹配

// org/springframework/transaction/interceptor/TransactionAttributeSourcePointcut.java
public boolean matches(Method method, Class targetClass) {
    //  tas指向AnnotationTransactionAttributeSource
    TransactionAttributeSource tas = getTransactionAttributeSource();
    return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

AnnotationTransactionAttributeSource 继承自 AbstractFallbackTransactionAttributeSource,所有将委托其处理提取事务标签

// org/springframework/transaction/interceptor/AbstractFallbackTransactionAttributeSource.java
public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
    // First, see if we have a cached value.
    // 检测是否存在缓存
    Object cacheKey = getCacheKey(method, targetClass);
    Object cached = this.attributeCache.get(cacheKey);
    if (cached != null) {
        // Value will either be canonical value indicating there is no transaction attribute,
        // or an actual transaction attribute.
        if (cached == NULL_TRANSACTION_ATTRIBUTE) {
            return null;
        }
        else {
            return (TransactionAttribute) cached;
        }
    }
    else {
        // We need to work it out.
        // 提取事务标签
        TransactionAttribute txAtt = computeTransactionAttribute(method, targetClass);
        // Put it in the cache.
        if (txAtt == null) {
            this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
        }
        else {
            if (logger.isDebugEnabled()) {
                Class<?> classToLog = (targetClass != null ? targetClass : method.getDeclaringClass());
                logger.debug("Adding transactional method '" + classToLog.getSimpleName() + "." +
                        method.getName() + "' with attribute: " + txAtt);
            }
            // 加入缓存
            this.attributeCache.put(cacheKey, txAtt);
        }
        return txAtt;
    }
}

首先会更加方法和类检查缓存中是否已经存在已经提取的事务标签,如果存在直接取,否则进行标签提取操作

// org/springframework/transaction/interceptor/AbstractFallbackTransactionAttributeSource.java
// 事务标签提取
// 只是定义了搜索事务标签属性的框架
private TransactionAttribute computeTransactionAttribute(Method method, Class<?> targetClass) {
    // Don't allow no-public methods as required.
    if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
        return null;
    }

    // Ignore CGLIB subclasses - introspect the actual user class.
    Class<?> userClass = ClassUtils.getUserClass(targetClass);
    // The method may be on an interface, but we need attributes from the target class.
    // If the target class is null, the method will be unchanged.
    // method代表接口中的方法,specificMethod代表实现类中的方法
    Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
    // If we are dealing with method with generic parameters, find the original method.
    specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

    // First try is the method in the target class.
    // 查看方法中是否存在事务声明
    TransactionAttribute txAtt = findTransactionAttribute(specificMethod);
    if (txAtt != null) {
        return txAtt;
    }

    // Second try is the transaction attribute on the target class.
    // 查看方法所在类是否存在事务声明
    txAtt = findTransactionAttribute(specificMethod.getDeclaringClass());
    if (txAtt != null) {
        return txAtt;
    }
    // 存在接口去接口中查找
    if (specificMethod != method) {
        // Fallback is to look at the original method.
        txAtt = findTransactionAttribute(method);
        if (txAtt != null) {
            return txAtt;
        }
        // Last fallback is the class of the original method.
        return findTransactionAttribute(method.getDeclaringClass());
    }
    return null;
}

对于标签的提取,首先查看方法中是否存在事务声明,如果没有则查看方法所在类是否存在事务声明,最后去接口中查找。真正查找操作委托给findTransactionAttribute方法

// org/springframework/transaction/annotation/AnnotationTransactionAttributeSource.java
protected TransactionAttribute findTransactionAttribute(Method method) {
    return determineTransactionAttribute(method);
}

//org/springframework/transaction/annotation/AnnotationTransactionAttributeSource.java
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement ae) {
    // this.annotationParsers被初始化为SpringTransactionAnnotationParser
    for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
        TransactionAttribute attr = annotationParser.parseTransactionAnnotation(ae);
        if (attr != null) {
            return attr;
        }
    }
    return null;
}

上述方法中this.annotationParsers会在bean的创建中进行初始化时加入SpringTransactionAnnotationParser处理器

// org/springframework/transaction/annotation/AnnotationTransactionAttributeSource.java
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
    this.publicMethodsOnly = publicMethodsOnly;
    this.annotationParsers = new LinkedHashSet<TransactionAnnotationParser>(2);
    this.annotationParsers.add(new SpringTransactionAnnotationParser());
    if (ejb3Present) {
        this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
    }
}

委托SpringTransactionAnnotationParser进行解析

// org/springframework/transaction/annotation/SpringTransactionAnnotationParser.java
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement ae) {
    // 获取事务标签
    Transactional ann = AnnotationUtils.getAnnotation(ae, Transactional.class);
    if (ann != null) {
        return parseTransactionAnnotation(ann);
    }
    else {
        return null;
    }
}

上面代码中获取了Transactional注解,接着创建事务对象并进行事务属性设置

// org/springframework/transaction/annotation/SpringTransactionAnnotationParser.java
public TransactionAttribute parseTransactionAnnotation(Transactional ann) {
    // 创建事务承载对象
    RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
    // 事务
    rbta.setPropagationBehavior(ann.propagation().value());
    // 事务隔离级别
    rbta.setIsolationLevel(ann.isolation().value());
    rbta.setTimeout(ann.timeout());
    rbta.setReadOnly(ann.readOnly());
    rbta.setQualifier(ann.value());
    ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<RollbackRuleAttribute>();
    Class[] rbf = ann.rollbackFor();
    for (Class rbRule : rbf) {
        RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
        rollBackRules.add(rule);
    }
    String[] rbfc = ann.rollbackForClassName();
    for (String rbRule : rbfc) {
        RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
        rollBackRules.add(rule);
    }
    Class[] nrbf = ann.noRollbackFor();
    for (Class rbRule : nrbf) {
        NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
        rollBackRules.add(rule);
    }
    String[] nrbfc = ann.noRollbackForClassName();
    for (String rbRule : nrbfc) {
        NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
        rollBackRules.add(rule);
    }
    rbta.getRollbackRules().addAll(rollBackRules);
    return rbta;
}

经过上面处理事务标签可以提取了,接着可以进行代理类的创建

// org/springframework/aop/framework/autoproxy/AbstractAutoProxyCreator.java
protected Object createProxy(
        Class<?> beanClass, String beanName, Object[] specificInterceptors, TargetSource targetSource) {

    ProxyFactory proxyFactory = new ProxyFactory();
    // Copy our properties (proxyTargetClass etc) inherited from ProxyConfig.
    // 获取当前类的相关属性
    proxyFactory.copyFrom(this);

    if (!shouldProxyTargetClass(beanClass, beanName)) {
        // Must allow for introductions; can't just set interfaces to
        // the target's interfaces only.
        Class<?>[] targetInterfaces = ClassUtils.getAllInterfacesForClass(beanClass, this.proxyClassLoader);
        for (Class<?> targetInterface : targetInterfaces) {
            // 添加代理接口
            proxyFactory.addInterface(targetInterface);
        }
    }

    // 将拦截器封装为增强器,存在拦截器、增强器、增强方法等方式来对逻辑进行增强,需要进行统一封装成Advisor来进行代理的创建
    Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
    for (Advisor advisor : advisors) {
        // 加入增强器
        proxyFactory.addAdvisor(advisor);
    }

    // 设置要代理的类
    proxyFactory.setTargetSource(targetSource);
    // 定制代理
    customizeProxyFactory(proxyFactory);
    // 代理工厂被配置后,是否还允许修改通知
    // 默认false
    proxyFactory.setFrozen(this.freezeProxy);
    if (advisorsPreFiltered()) {
        proxyFactory.setPreFiltered(true);
    }

    // 创建代理
    return proxyFactory.getProxy(this.proxyClassLoader);
}

首先会先创建一个代理工厂对象,然后设置代理工厂的一些属性、封装上一步过滤出来的增强器,最后创建代理。

// org/springframework/aop/framework/autoproxy/AbstractAutoProxyCreator.java
protected Advisor[] buildAdvisors(String beanName, Object[] specificInterceptors) {
    // Handle prototypes correctly...
    // 解析注册的所有interceptorNames
    Advisor[] commonInterceptors = resolveInterceptorNames();

    List<Object> allInterceptors = new ArrayList<Object>();
    if (specificInterceptors != null) {
        // 加入拦截器
        allInterceptors.addAll(Arrays.asList(specificInterceptors));
        if (commonInterceptors != null) {
            if (this.applyCommonInterceptorsFirst) {
                allInterceptors.addAll(0, Arrays.asList(commonInterceptors));
            }
            else {
                allInterceptors.addAll(Arrays.asList(commonInterceptors));
            }
        }
    }
    if (logger.isDebugEnabled()) {
        int nrOfCommonInterceptors = (commonInterceptors != null ? commonInterceptors.length : 0);
        int nrOfSpecificInterceptors = (specificInterceptors != null ? specificInterceptors.length : 0);
        logger.debug("Creating implicit proxy for bean '" + beanName + "' with " + nrOfCommonInterceptors +
                " common interceptors and " + nrOfSpecificInterceptors + " specific interceptors");
    }

    Advisor[] advisors = new Advisor[allInterceptors.size()];
    for (int i = 0; i < allInterceptors.size(); i++) {
        // 拦截器进行封装转换为Advisor
        advisors[i] = this.advisorAdapterRegistry.wrap(allInterceptors.get(i));
    }
    return advisors;
}

获取所有公共拦截器及过滤出来的,然后进行封装转换为Advisor,对于事务处理BeanFactoryTransactionAttributeSourceAdvisor也会在这边进行封装

// org/springframework/aop/framework/adapter/DefaultAdvisorAdapterRegistry.java
public Advisor wrap(Object adviceObject) throws UnknownAdviceTypeException {
    // 如果封装的对象本身是Advisor直接return
    if (adviceObject instanceof Advisor) {
        return (Advisor) adviceObject;
    }
    // 封装对象只支持Advisor和Advice两种类型
    if (!(adviceObject instanceof Advice)) {
        throw new UnknownAdviceTypeException(adviceObject);
    }
    Advice advice = (Advice) adviceObject;
    if (advice instanceof MethodInterceptor) {
        // So well-known it doesn't even need an adapter.
        // 使用DefaultPointcutAdvisor封装MethodInterceptor
        return new DefaultPointcutAdvisor(advice);
    }
    // 如果存在Advisor的适配器也进行封装
    for (AdvisorAdapter adapter : this.adapters) {
        // Check that it is supported.
        if (adapter.supportsAdvice(advice)) {
            return new DefaultPointcutAdvisor(advice);
        }
    }
    throw new UnknownAdviceTypeException(advice);
}

BeanFactoryTransactionAttributeSourceAdvisor本身就是一个Advisor,因此会走第一个分支,全部拦截器转换好后会将 这些增强器加入到代理工厂对象的advisors中,接着回到创建代理逻辑

// org/springframework/aop/framework/ProxyFactory.java
public Object getProxy(ClassLoader classLoader) {
    return createAopProxy().getProxy(classLoader);
}

委托ProxyCreatorSupport的createAopProxy方法进行创建代理处理器进行获取

// org/springframework/aop/framework/ProxyCreatorSupport.java
protected final synchronized AopProxy createAopProxy() {
    if (!this.active) {
        activate();
    }
    // 创建代理器的获取
    return getAopProxyFactory().createAopProxy(this);
}

首先createAopProxy会根据被代理bean是否实现接口或是否设置相关参数(proxy-target-class)进行确定使用jdk还是cglib创建代理

// org/springframework/aop/framework/DefaultAopProxyFactory.java
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
    if (config.isOptimize() // CGLIB 创建的代理是否使用急进的优化策略
            || config.isProxyTargetClass() // 如果值为true,则目标类被代理而不是目标类的接口
            || hasNoUserSuppliedProxyInterfaces(config)) { // 是否存在代理接口
        Class targetClass = config.getTargetClass();
        if (targetClass == null) {
            throw new AopConfigException("TargetSource cannot determine target class: " +
                    "Either an interface or a target is required for proxy creation.");
        }
        // 如果proxy-target-class被设置成true时,在createProxy并不会把接口加入到属性中的
        if (targetClass.isInterface()) {
            return new JdkDynamicAopProxy(config);
        }
        return CglibProxyFactory.createCglibProxy(config);
    }
    else {
        return new JdkDynamicAopProxy(config);
    }
}

根据返回的AopProxy决定使用那种方式创建代理,先看Jdk动态代理的创建方式 查看JdkDynamicAopProxy类,其实现了InvocationHandler接口,并实现了

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
public Object getProxy(ClassLoader classLoader)

这两个重要方法,通过getProxy获取代理对象,在执行方法的逻辑时会调用invoke方法

// org/springframework/aop/framework/JdkDynamicAopProxy.java
public Object getProxy(ClassLoader classLoader) {
    if (logger.isDebugEnabled()) {
        logger.debug("Creating JDK dynamic proxy: target source is " + this.advised.getTargetSource());
    }
    Class<?>[] proxiedInterfaces = AopProxyUtils.completeProxiedInterfaces(this.advised);
    findDefinedEqualsAndHashCodeMethods(proxiedInterfaces);
    // 创建代理对象,会自动调用invoke方法
    return Proxy.newProxyInstance(classLoader, proxiedInterfaces, this);
}

通过上述逻辑即可完成代理类的创建。

事务执行

对于JDK的代理而言,当执行事务时会触发invoke方法的调用

// org/springframework/aop/framework/JdkDynamicAopProxy.java
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    MethodInvocation invocation;
    Object oldProxy = null;
    boolean setProxyContext = false;

    TargetSource targetSource = this.advised.targetSource;
    Class<?> targetClass = null;
    Object target = null;

    try {
        // equals方法的处理
        if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
            // The target does not implement the equals(Object) method itself.
            return equals(args[0]);
        }
        // hashCode方法的处理
        if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
            // The target does not implement the hashCode() method itself.
            return hashCode();
        }
        // 跳过Advised
        if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
                method.getDeclaringClass().isAssignableFrom(Advised.class)) {
            // Service invocations on ProxyConfig with the proxy config...
            return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
        }

        Object retVal;
        // expose-proxy属性的处理
        if (this.advised.exposeProxy) {
            // Make invocation available if necessary.
            oldProxy = AopContext.setCurrentProxy(proxy);
            setProxyContext = true;
        }

        // May be null. Get as late as possible to minimize the time we "own" the target,
        // in case it comes from a pool.
        target = targetSource.getTarget();
        if (target != null) {
            targetClass = target.getClass();
        }

        // Get the interception chain for this method.
        // 获取当前方法的拦截器链
        List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);

        // Check whether we have any advice. If we don't, we can fallback on direct
        // reflective invocation of the target, and avoid creating a MethodInvocation.
        if (chain.isEmpty()) {
            // We can skip creating a MethodInvocation: just invoke the target directly
            // Note that the final invoker must be an InvokerInterceptor so we know it does
            // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.
            // 没有发现任何拦截器直接调用切点方法
            retVal = AopUtils.invokeJoinpointUsingReflection(target, method, args);
        }
        else {
            // We need to create a method invocation...
            // 将拦截器链封装成ReflectiveMethodInvocation,以便方法proceed进行递归调用
            invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
            // Proceed to the joinpoint through the interceptor chain.
            // 执行拦截器链
            retVal = invocation.proceed();
        }

        // Massage return value if necessary.
        Class<?> returnType = method.getReturnType();
        if (retVal != null && retVal == target && returnType.isInstance(proxy) &&
                !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
            // Special case: it returned "this" and the return type of the method
            // is type-compatible. Note that we can't help if the target sets
            // a reference to itself in another returned object.
            retVal = proxy;
        }
        else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
            throw new AopInvocationException(
                    "Null return value from advice does not match primitive return type for: " + method);
        }
        return retVal;
    }
    finally {
        if (target != null && !targetSource.isStatic()) {
            // Must have come from TargetSource.
            targetSource.releaseTarget(target);
        }
        if (setProxyContext) {
            // Restore old proxy.
            AopContext.setCurrentProxy(oldProxy);
        }
    }
}

首先会根据方法是否时equal、hashcode进行处理,还会判断类本身是否时一个Advisor是则跳过,接着会获取方法适应的增强器, 对于事务TransactionInterceptor这边会被匹配,然后执行拦截器链操作(调用其invoke)。

TransactionInterceptor

在进行annotation-driven自定义标签解析时,会将TransactionInterceptor注册进Spring中并将其名字设置为BeanFactoryTransactionAttributeSourceAdvisor的adviceBeanName属性,且实现MethodInterceptor接口,当执行事务时会调用MethodInterceptor接口的invoke方法。

private static class AopAutoProxyConfigurer {

    public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {

        // org.springframework.transaction.config.internalTransactionAdvisor
        String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
        if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
            // 。。。。。。
            // Create the TransactionInterceptor definition.
            // 创建TransactionInterceptor的bean (事务切面)
            // TransactionInterceptor 事务增强器,在调用事务增强器增强代理类时会先执行TransactionInterceptor进行增强
            RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
            // bean注册到Spring
            String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
            // 创建BeanFactoryTransactionAttributeSourceAdvisor的bean
            RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
            // 将interceptorName的bean注入到advisorDef的属性adviceBeanName中
            advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
            // 。。。。。
        }
    }
}

执行事务时调用处理器链

// org/springframework/transaction/interceptor/TransactionInterceptor.java
public Object invoke(final MethodInvocation invocation) throws Throwable {

    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
        public Object proceedWithInvocation() throws Throwable {
            return invocation.proceed();
        }
    });
}

事务执行框架时序图

事务执行框架

// org/springframework/transaction/interceptor/TransactionAspectSupport.java
protected Object invokeWithinTransaction(Method method, Class targetClass, final InvocationCallback invocation)
        throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    // 获取对应事务属性
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    // 获取beanFactory中的transactionManager
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass);

    // 声明式事务
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 创建TransactionInfo
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 执行被增强方法
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            // 异常回滚
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            // 清除信息
            cleanupTransactionInfo(txInfo);
        }
        // 提交事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }

    else {
        // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
        // 编程式事务省略
    }
}

创建事务的整个过程

创建事务

// org/springframework/transaction/interceptor/TransactionAspectSupport.java
protected TransactionInfo createTransactionIfNecessary(
        PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 获取事物
            status = tm.getTransaction(txAttr);
        }
        else {
        }
    }

    // 准备事务信息
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

获取事务的时序图

获取事务

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 1.1 获取事务
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    if (definition == null) {
        // 1.2 Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }
    // 1.3 connectionHolder不为空且transactionActive值为true
    if (isExistingTransaction(transaction)) {
        // 当前线程已经存在事务, PROPAGATION_NESTED、PROPAGATION_REQUIRES_NEW情况
        // 1.4 Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    // 事务超时设置验证
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    // 如果当前线程不存在事务,但是propagationBehavior被设置成PROPAGATION_MANDATORY时抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
        definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新建事务
        // 1.5
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 1.6
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 1.7 构造transaction,包括设置ConnectionHolder、隔离级别、timeout
            doBegin(transaction, definition);
            // 1.8 将事务信息记录到当前线程
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException ex) {
            // 1.9
            resume(null, suspendedResources);
            throw ex;
        }
        catch (Error err) {
            // 1.10
            resume(null, suspendedResources);
            throw err;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 1.11
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    }
}

获取事务,基于JDBC创建

// org/springframework/jdbc/datasource/DataSourceTransactionManager.java
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    // 是否允许保存点设置
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // 如果当前线程中已经记录数据库连接则使用原有连接
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    // false表示非新创建连接
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

transaction属性的设置及将连接绑定到当前线程

// org/springframework/jdbc/datasource/DataSourceTransactionManager.java
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        if (txObject.getConnectionHolder() == null || // 事务没有连接
                txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 事务同步为true
            // 获取新连接
            Connection newCon = this.dataSource.getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // 获取连接
        con = txObject.getConnectionHolder().getConnection();

        // 设置隔离级别、只读标志
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        // 更改自动提交模式,由spring控制提交
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }
        // 设置判断当前线程是否存在事务的依据
        txObject.getConnectionHolder().setTransactionActive(true);

        // 设置过期时间
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // Bind the session holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            // 将当前获取到的连接绑定到当前线程
            TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, this.dataSource);
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

设置连接的隔离级别、只读标志

// org/springframework/jdbc/datasource/DataSourceUtils.java
public static Integer prepareConnectionForTransaction(Connection con, TransactionDefinition definition)
        throws SQLException {

    Assert.notNull(con, "No Connection specified");

    // Set read-only flag.
    // 设置数据连接的只读标识
    if (definition != null && definition.isReadOnly()) {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Setting JDBC Connection [" + con + "] read-only");
            }
            con.setReadOnly(true);
        }
        // 省略异常代码
    }

    // Apply specific isolation level, if any.
    // 设置数据库连接的隔离级别
    Integer previousIsolationLevel = null;
    if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
        if (logger.isDebugEnabled()) {
            logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
                    definition.getIsolationLevel());
        }
        int currentIsolation = con.getTransactionIsolation();
        if (currentIsolation != definition.getIsolationLevel()) {
            previousIsolationLevel = currentIsolation;
            con.setTransactionIsolation(definition.getIsolationLevel());
        }
    }

    return previousIsolationLevel;
}

将事务信息记录到当前线程

// org.springframework.transaction.support.AbstractPlatformTransactionManager.java
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) ?
                        definition.getIsolationLevel() : null);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        TransactionSynchronizationManager.initSynchronization();
    }
}

已经存在事务的处理

首先进行事务检测 接着,根据事务隔离级别分别处理 a. NOT_SUPPORTED:挂起事务、保存TransactionStatus b. PROPAGATION_REQUIRES_NEW:挂起事务、创建新事务、保存TransactionStatus c. PROPAGATION_NESTED分支

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    // 如果设置了事务隔离级别为:不支持事务则抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }
    // NOT_SUPPORTED: 当前方法不应该有事务,如果有事务存在,将它挂起,以无事务状态运行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 挂起
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // REQUIRES_NEW: 总是以新事务执行,如果当前有事务,那将当前事务挂起,新建一个事物;
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        // 挂起
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 新建一个事物
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
        catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
        }
    }

    // NESTED:如果有事务运行,就作为这个事务的嵌套事务运行; 如果没有事务运行,新建一个事务运行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            // 建立初始保存点
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

挂起事务

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            // 创建SuspendedResourcesHolder用于保存挂起事务的状态
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
        catch (Error err) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw err;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

准备事务信息

// org/springframework/transaction/interceptor/TransactionAspectSupport.java
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
        TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        // We need a transaction for this method
        if (logger.isTraceEnabled()) {
            logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        // The transaction manager will flag an error if an incompatible tx already exists
        // 记录事务状态
        txInfo.newTransactionStatus(status);
    }
    else {
        if (logger.isTraceEnabled())
            logger.trace("Don't need to create transaction for [" + joinpointIdentification +
                    "]: This method isn't transactional.");
    }
    txInfo.bindToThread();
    return txInfo;
}

回滚处理

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.hasTransaction()) { // 抛出异常时,判断当前是否存在事务
        // 判断是否回滚:抛出异常是否是RuntimeException或者是Error类型
        if (txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                // 根据TransactionStatus信息进行回滚处理
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            }
            // 省略异常处理
        }
        else {
            // We don't roll back on this exception.
            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
            try {
                // 不满足回滚条件,抛出异常同样提交事务
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
            // 省略异常处理
        }
    }
}

回滚条件

// org/springframework/transaction/interceptor/DefaultTransactionAttribute.java
public boolean rollbackOn(Throwable ex) {
    return (ex instanceof RuntimeException || ex instanceof Error);
}

默认情况下异常处理机制之会对RuntimeException和Error两种异常做回滚处理,不过可以通过注解进行设置: @Transaction(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)

回滚处理

public final void rollback(TransactionStatus status) throws TransactionException {
    // 事务已经完成后回滚将抛异常
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    processRollback(defStatus);
}

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
private void processRollback(DefaultTransactionStatus status) {
    try {
        try {
            // 自定义触发器的调用
            triggerBeforeCompletion(status);
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                // 如果有保存点,
                status.rollbackToHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                // 如果当前事务为独立的新事务,直接回退
                doRollback(status);
            }
            else if (status.hasTransaction()) {
                if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                    if (status.isDebug()) {
                        logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                    }
                    // 如果当前事务不是独立事务,只能标记状态,等待事务链都执行完在统一回滚
                    doSetRollbackOnly(status);
                }
                else {
                    if (status.isDebug()) {
                        logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                    }
                }
            }
            else {
                logger.debug("Should roll back transaction but cannot - no transaction available");
            }
        }
        catch (RuntimeException ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }
        catch (Error err) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw err;
        }
        // 激活所有TransactionSynchronization中对应的方法
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    }
    finally {
        // 回滚信息的清除
        cleanupAfterCompletion(status);
    }
}

上述代码整体处理脉络:首先进行触发器的调用,接着根据保存点、是否是新事务、当前事务不是独立事务分支进行处理

根据保存点进行回滚 此分支主要作用于嵌套事务的回滚,内嵌的事务异常不会导致外部事务的回滚,主要根据底层连接进行回滚

// org/springframework/transaction/support/AbstractTransactionStatus.java
public void rollbackToHeldSavepoint() throws TransactionException {
    if (!hasSavepoint()) {
        throw new TransactionUsageException("No savepoint associated with current transaction");
    }
    getSavepointManager().rollbackToSavepoint(getSavepoint());
    setSavepoint(null);
}

以JDBC为例,JdbcTransactionObjectSupport

// org/springframework/jdbc/datasource/JdbcTransactionObjectSupport.java
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
    try {
        getConnectionHolderForSavepoint().getConnection().rollback((Savepoint) savepoint);
    }
    catch (Throwable ex) {
        throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
    }
}

是否是新事务分支,使用DataSourceTransactionManager 主要根据底层连接进行回滚

// org/springframework/jdbc/datasource/DataSourceTransactionManager.java
protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.rollback();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    }
}

回滚信息的清除

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    // 设置完成状态
    status.setCompleted();
    if (status.isNewSynchronization()) { // 新的同步状态
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) { // 新的事务
        doCleanupAfterCompletion(status.getTransaction());
    }
    if (status.getSuspendedResources() != null) {
        if (status.isDebug()) {
            logger.debug("Resuming suspended transaction after completion of inner transaction");
        }
        // 结束之前挂起的事务状态
        resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}

首先设置完成状态,防止重复调用 然后,如果事务是新的同步状态,则将绑定到单曲线程的事务信息清除

// org.springframework.transaction.support.TransactionSynchronizationManager.java
public static void clear() {
    clearSynchronization();
    setCurrentTransactionName(null);
    setCurrentTransactionReadOnly(false);
    setCurrentTransactionIsolationLevel(null);
    setActualTransactionActive(false);
}

如果是一个新的事务需要做一些请求资源操作

// org/springframework/jdbc/datasource/DataSourceTransactionManager.java
protected void doCleanupAfterCompletion(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

    // Remove the connection holder from the thread, if exposed.
    if (txObject.isNewConnectionHolder()) {
        // 将数据库连接从当前线程中解除绑定
        TransactionSynchronizationManager.unbindResource(this.dataSource);
    }

    // Reset connection.
    // 释放连接
    Connection con = txObject.getConnectionHolder().getConnection();
    try {

        if (txObject.isMustRestoreAutoCommit()) {
            // 恢复数据库的自动提交
            con.setAutoCommit(true);
        }
        // 重置数据库连接(设置只读、隔离级别属性)
        DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    }
    catch (Throwable ex) {
        logger.debug("Could not reset JDBC Connection after transaction", ex);
    }

    if (txObject.isNewConnectionHolder()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
        }
        // 如果当前事务是新创建的事务,则在事务完成时释放连接
        DataSourceUtils.releaseConnection(con, this.dataSource);
    }

    txObject.getConnectionHolder().clear();
}

最后,需要恢复执行事务之前挂起的事务

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
        throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

事务提交

异常回滚处理的时候,如果某个事务是另外一个事务的嵌套事务,而这些事务没有保存点又不是新事务时,Spring会为其设置一个标识,spring通过这个标识来禁止提交事务

// org/springframework/transaction/interceptor/TransactionAspectSupport.java
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
    if (txInfo != null && txInfo.hasTransaction()) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

执行事务提交

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 如果在事务链中已经被标记回滚,那么不在尝试提交,直接回滚
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus);
        return;
    }
    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus);
        // Throw UnexpectedRollbackException only at outermost transaction boundary
        // or if explicitly asked to.
        if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
            throw new UnexpectedRollbackException(
                    "Transaction rolled back because it has been marked as rollback-only");
        }
        return;
    }
    // 处理事务提交
    processCommit(defStatus);
}

对于嵌套事务,事务开启时会先设置一个保存点,如果执行过程中出现异常,则根据事务保存点进行回滚, 如果执行过程中没有出现异常,也不会进行事务的提交,只是清除了内嵌事务保存点,事务统一由外部事务进行提交

// org/springframework/transaction/support/AbstractPlatformTransactionManager.java
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;
        try {
            prepareForCommit(status);
            // 添加TransactionSynchronization中对应方法的调用
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;
            boolean globalRollbackOnly = false;
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                globalRollbackOnly = status.isGlobalRollbackOnly();
            }
            // 对于嵌套事务,事务开启时会先设置一个保存点,如果执行过程中出现异常,则根据事务保存点进行回滚,
            // 如果执行过程中没有出现异常,也不会进行事务的提交,只是清除了内嵌事务保存点,事务统一由外部事务进行提交
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                // 如果存在保存点,则清除保存点
                status.releaseHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                // 如果是独立事务则直接提交
                doCommit(status);
            }
            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (globalRollbackOnly) {
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            }
            else {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        catch (RuntimeException ex) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }
        catch (Error err) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            // 提交过程中出现异常执行回滚
            doRollbackOnCommitException(status, err);
            throw err;
        }

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            triggerAfterCommit(status);
        }
        finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        cleanupAfterCompletion(status);
    }
}

最终事务提交会被引导到底层数据库API

// org/springframework/jdbc/datasource/DataSourceTransactionManager.java
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    }
}
⚠️ **GitHub.com Fallback** ⚠️