Spring动态数据源与运行时动态添加数据源 - zhangxiulin/RuoYi-Vue GitHub Wiki
参考资料 Scala版本 https://blog.csdn.net/UFO___/article/details/98475522
使用Spring提供的动态数据源AbstractRoutingDataSource
AbstractRoutingDataSource是一个抽象类,他实现了DataSource接口,内部可以存放多个DataSource,可以在需要的时候返回不同的DataSource。
接下来解析AbstractRoutingDataSource的部分源码:
//AbstractRoutingDataSource的内部使用了一个map存放多个数据源,key是数据源的唯一名字(可以任意命名,但是要保证唯一),value是对应的DataSource
private Map targetDataSources;
//提供一个默认使用的数据源
private Object defaultTargetDataSource;
//这个是我们要实现的一个抽象方法,返回值是DataSource的唯一名字,表示使用该名字对应的DataSource
protected abstract Object determineCurrentLookupKey();
//这个是决定使用哪个数据源的方法,根据determineCurrentLookupKey的返回值来决定
protected DataSource determineTargetDataSource() {
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
//可以设置存放多个数据源的map
public void setTargetDataSources(Map targetDataSources) {
this.targetDataSources = targetDataSources;
}
//设置默认的数据源
public void setDefaultTargetDataSource(Object defaultTargetDataSource) {
this.defaultTargetDataSource = defaultTargetDataSource;
}
我们要做的就是继承这个AbstractRoutingDataSource抽象类,并且实现determineCurrentLookupKey() 这个方法,通过返回值来动态改变需要使用的数据源。
也就是说我们需要在标注了@Transactional注解的事务方法执行之前,根据方法签名去动态改变使用的DataSource。
这里可以编写一个切面去拦截该要执行的事务方法,然后在切面当中去判断执行的方法的说明,将获取的结果信息保存在一个ThreradLocal当中,这样就可以在AbstractRoutingDataSource的determineCurrentLookupKey方法中从ThreadLocal中获取这个信息并且返回对应的数据源的名字。使用ThreadLocal的主要原因是因为事务方法总是并发执行的,为了防止相互的干扰。
编写一个DynamicDataSource
去继承了AbstractRoutingDataSource,并且实现了determineCurrentLookupKey方法。
public class DynamicDataSource extends AbstractRoutingDataSource
{
public DynamicDataSource(DataSource defaultTargetDataSource, Map targetDataSources)
{
super.setDefaultTargetDataSource(defaultTargetDataSource);
super.setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
@Override
protected Object determineCurrentLookupKey()
{
return DynamicDataSourceContextHolder.getDataSourceType();
}
}
线程级数据源存储器DynamicDataSourceContextHolder
public class DynamicDataSourceContextHolder
{
public static final Logger log = LoggerFactory.getLogger(DynamicDataSourceContextHolder.class);
/**
* 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
*/
private static final ThreadLocal CONTEXT_HOLDER = new ThreadLocal<>();
/**
* 设置数据源的变量
*/
public static void setDataSourceType(String dsType)
{
log.info("切换到{}数据源", dsType);
CONTEXT_HOLDER.set(dsType);
}
/**
* 获得数据源的变量
*/
public static String getDataSourceType()
{
return CONTEXT_HOLDER.get();
}
/**
* 清空数据源变量
*/
public static void clearDataSourceType()
{
CONTEXT_HOLDER.remove();
}
}
配置类DruidConfig
@Configuration
public class DruidConfig
{
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource masterDataSource(DruidProperties druidProperties)
{
DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
return druidProperties.dataSource(dataSource);
}
@Bean
@ConfigurationProperties("spring.datasource.druid.slave")
@ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
public DataSource slaveDataSource(DruidProperties druidProperties)
{
DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
return druidProperties.dataSource(dataSource);
}
@Bean(name = "dynamicDataSource")
@Primary
public DynamicDataSource dataSource(DataSource masterDataSource)
{
Map targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.MASTER.name(), masterDataSource);
setDataSource(targetDataSources, DataSourceType.SLAVE.name(), "slaveDataSource");
return new DynamicDataSource(masterDataSource, targetDataSources);
}
/**
* 设置数据源
*
* @param targetDataSources 备选数据源集合
* @param sourceName 数据源名称
* @param beanName bean名称
*/
public void setDataSource(Map targetDataSources, String sourceName, String beanName)
{
try
{
DataSource dataSource = SpringUtils.getBean(beanName);
targetDataSources.put(sourceName, dataSource);
}
catch (Exception e)
{
}
}
}
拦截事务方法的切面DataSourceAspect
进行多数据源判断处理
@Aspect
@Order(1)
@Component
public class DataSourceAspect
{
protected Logger logger = LoggerFactory.getLogger(getClass());
@Pointcut("@annotation(com.ruoyi.common.annotation.DataSource)"
+ "|| @within(com.ruoyi.common.annotation.DataSource)")
public void dsPointCut()
{
}
@Around("dsPointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable
{
DataSource dataSource = getDataSource(point);
if (StringUtils.isNotNull(dataSource))
{
DynamicDataSourceContextHolder.setDataSourceType(dataSource.value().name());
}
try
{
return point.proceed();
}
finally
{
// 销毁数据源 在执行方法之后
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
/**
* 获取需要切换的数据源
*/
public DataSource getDataSource(ProceedingJoinPoint point)
{
MethodSignature signature = (MethodSignature) point.getSignature();
DataSource dataSource = AnnotationUtils.findAnnotation(signature.getMethod(), DataSource.class);
if (Objects.nonNull(dataSource))
{
return dataSource;
}
return AnnotationUtils.findAnnotation(signature.getDeclaringType(), DataSource.class);
}
}
数据库操作方法注解DataSource
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface DataSource
{
/**
* 切换数据源名称
*/
public DataSourceType value() default DataSourceType.MASTER;
}
Spring当中有一个事务切面来帮我们处理事务方法,他主要是在事务方法执行之前从数据源中拿取connection,设置开启事务,如果成功执行则提交,抛出异常则回滚(具体的源码在Spring的事务管理器DataSourceTransactionManager中)。
而我们的切面是要在事务方法执行之前进行使用数据源判断的,也就是是说这两个切面是有执行的先后顺序的。
例如:假设开始的时候Spring的事务切面先执行,他从数据源中拿取connection,因为ThreadLocal当中没有值,所以拿取到的connection是默认数据源master的,在他拿取connection之后我们才改变了使用的数据源,这个显然是错误的。
那么如何改变切面的执行顺序呢?
Spring关于切面执行顺序的描述
大致内容就是可以给切面加上@Order注解,@Order注解内部有一个int类型的值表示优先级,该值越小则切面越优先被执行。
因此给我们配置的切面加上@Order(1)注解,就可以保证我们的优先执行了。
RuntimeDataSource
@Target({ ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RuntimeDataSource {
/**
* 切换数据源名称
*/
public String value() default "";
}
RuntimeDataSourceAspect
@Aspect
@Order(1)
@Component
public class RuntimeDataSourceAspect {
@Pointcut("@annotation(com.ruoyi.common.annotation.RuntimeDataSource)"
+ "|| @within(com.ruoyi.common.annotation.RuntimeDataSource)")
public void dsPointCut()
{
}
@Around("dsPointCut()")
public Object around(ProceedingJoinPoint point) throws Throwable
{
RuntimeDataSource runtimeDataSource = getDataSource(point);
if (StringUtils.isNotNull(runtimeDataSource))
{
RuntimeDynamicDataSourceContextHolder.setDataSourceType(runtimeDataSource.value());
}
try
{
return point.proceed();
}
finally
{
// 销毁数据源 在执行方法之后
RuntimeDynamicDataSourceContextHolder.clearDataSourceType();
}
}
/**
* 获取需要切换的数据源
*/
public RuntimeDataSource getDataSource(ProceedingJoinPoint point)
{
MethodSignature signature = (MethodSignature) point.getSignature();
RuntimeDataSource runtimeDataSource = AnnotationUtils.findAnnotation(signature.getMethod(), RuntimeDataSource.class);
if (Objects.nonNull(runtimeDataSource))
{
return runtimeDataSource;
}
return AnnotationUtils.findAnnotation(signature.getDeclaringType(), RuntimeDataSource.class);
}
}
RuntimeDruidConfig
@Configuration
public class RuntimeDruidConfig {
@Bean
@ConfigurationProperties("spring.datasource.druid.master")
public DataSource masterDataSource(DruidProperties druidProperties)
{
DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
return druidProperties.dataSource(dataSource);
}
@Bean
@ConfigurationProperties("spring.datasource.druid.slave")
@ConditionalOnProperty(prefix = "spring.datasource.druid.slave", name = "enabled", havingValue = "true")
public DataSource slaveDataSource(DruidProperties druidProperties)
{
DruidDataSource dataSource = DruidDataSourceBuilder.create().build();
return druidProperties.dataSource(dataSource);
}
@Bean(name = "runtimeDynamicDataSource")
@Primary
public RuntimeDynamicDataSource dataSource(DataSource masterDataSource)
{
Map targetDataSources = new HashMap<>();
targetDataSources.put(DataSourceType.MASTER.name(), masterDataSource);
setDataSource(targetDataSources, DataSourceType.SLAVE.name(), "slaveDataSource");
setCustomizedDataSources(targetDataSources);
return new RuntimeDynamicDataSource(masterDataSource, targetDataSources);
}
/**
* 设置数据源
*
* @param targetDataSources 备选数据源集合
* @param sourceName 数据源名称
* @param beanName bean名称
*/
public void setDataSource(Map targetDataSources, String sourceName, String beanName)
{
try
{
DataSource dataSource = SpringUtils.getBean(beanName);
targetDataSources.put(sourceName, dataSource);
}
catch (Exception e)
{
}
}
/**将已经配置了的数据源存放在自定义的容器里**/
public void setCustomizedDataSources(Map targetDataSources){
if (targetDataSources != null){
targetDataSources.forEach((k, v) -> {
DatabaseAccessLayer.DATASOURCE.put((String) k, (DataSource) v);
});
}
}
}
存放数据源DatabaseAccessLayer
public class DatabaseAccessLayer {
private static final Logger log = LoggerFactory.getLogger(DatabaseAccessLayer.class);
// ConcurrentHashMap#key不能为null
public static final Map DATASOURCE = new ConcurrentHashMap<>();
public static Connection getConnection(String dataSourceName) {
try {
if (!DatabaseAccessLayer.DATASOURCE.containsKey(dataSourceName)) {
return null;
}
return DatabaseAccessLayer.DATASOURCE.get(dataSourceName).getConnection();
} catch (SQLException e) {
log.error("数据库连接获取失败", e);
return null;
}
}
}
RuntimeDynamicDataSource重写determineTargetDataSource和determineCurrentLookupKey
public class RuntimeDynamicDataSource extends AbstractRoutingDataSource {
public RuntimeDynamicDataSource(DataSource defaultTargetDataSource, Map targetDataSources)
{
super.setDefaultTargetDataSource(defaultTargetDataSource);
super.setTargetDataSources(targetDataSources);
super.afterPropertiesSet();
}
@Override
protected Object determineCurrentLookupKey()
{
return RuntimeDynamicDataSourceContextHolder.getDataSourceType();
}
@Override
protected DataSource determineTargetDataSource(){
String dataSourceName = (String) this.determineCurrentLookupKey();
DataSource dataSource = null;
if (dataSourceName == null){ // 没有指定dataSourceName,则使用默认数据源,交由父类处理,也可以从DatabaseAccessLayer硬编码指定获取
dataSource = super.determineTargetDataSource();
} else {
dataSource = DatabaseAccessLayer.DATASOURCE.get(dataSourceName);
}
return dataSource;
}
}
使用案例
@Service
public class InDataSourceTestServiceImpl implements IInDataSourceTestService {
private static final Logger log = LoggerFactory.getLogger(InDataSourceTestServiceImpl.class);
@Autowired
private InDatasourceMapper inDatasourceMapper;
@Override
@RuntimeDataSource("ruoyi")
public InDatasource selectInDatasourceById(String datasourceId) {
return inDatasourceMapper.selectInDatasourceById(datasourceId);
}
@Override
public InDatasource selectInDatasourceByIdConn(String datasourceId) {
InDatasource inDatasource = new InDatasource();
// 获取Connection
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = DatabaseAccessLayer.getConnection("ruoyi");
conn.setAutoCommit(false);
ps = conn.prepareStatement("select * from in_datasource where datasource_id = ?");
ps.setString(1, datasourceId);
rs = ps.executeQuery();
Map colMap = new HashMap<>();
ResultSetMetaData rsmd = rs.getMetaData();
for (int i=0,cc=rsmd.getColumnCount(); i < cc; i++){
colMap.put(rsmd.getColumnName(i+1), i);
}
if (rs != null){
rs.next();
inDatasource.setDatasourceId(rs.getString("datasource_id"));
inDatasource.setDatasourceName(rs.getString("datasource_name"));
inDatasource.setCreateTime((Date) rs.getObject("create_time"));
}
} catch (SQLException e) {
String errMsg = "数据库操作异常";
log.error(errMsg, e);
throw new CustomException(errMsg);
} finally {
// 释放资源
if (rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (ps == null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn == null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return inDatasource;
}
}