批量任务存储设计 - 969251639/study GitHub Wiki
项目中,批量操作都是直接一个for循环调用,如果这个for很大,那么可以考虑采用持久化队列方式逐一消费,那么如果用mq做持久化队列方式,那么需要引入中间层,所以这里实现一个无依赖的本地持久化队列方案
客户端提交任务到队列中,队列扫描器发现队列有任务进来后会进行回调执行
其实本质上就是对一个持久化队列的头尾指针的维护
- 存储队列
/**
* 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭
* 相比一般的内存Queue,插入和获取值需要多消耗一定的时间
* 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可,
* 其他如remove,addAll,AbstractQueue会基于这几个方法去实现
*
*/
public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(BdbPersistentQueue.class);
private static final long serialVersionUID = 3427799316155220967L;
private transient BdbEnvironment dbEnv; // 数据库环境,无需序列化
private transient String dbDir; // 数据库所在目录
private static final transient String DB_NAME = "BATCH_QUEUE";// 数据库名字
private AtomicLong headIndex; // 头部指针
private AtomicLong tailIndex; // 尾部指针
private transient E peekItem = null; // 当前获取的值
private static final transient int MAX_CONN_POOL_SIZE = BatchConfig.getConfig().getMaxConnCount();//最大连接数
private static final transient int MIN_CONN_POOL_SIZE = BatchConfig.getConfig().getMinConnCount();//最小连接数
private final transient ArrayBlockingQueue<BerkeleyQueueDb> poolQueue = new ArrayBlockingQueue<>(BdbPersistentQueue.MAX_CONN_POOL_SIZE);
//private static final transient String WIN_DATA_PATH = "D:/bqueuedb";
//private static final transient String LINUX_DATA_PATH = "/home/spt/project/bqueuedb";
private static transient volatile BdbPersistentQueue<BatchTask> bdbPersistentQueue = null;
private transient Class<E> valueClass;
/**
* 构造函数,传入BDB数据库位置和名字,自己创建数据库
*
*/
private BdbPersistentQueue(Class<E> valueClass, String appName) {
this.dbDir = getDbFilePath(appName);
this.valueClass = valueClass;
initConnPool(this.dbDir);
BerkeleyQueueDb db = getDb();
try {
Iterator<Long> iterator = db.getQueueMap().keySet().iterator();
if(iterator.hasNext()) {
int statrIndex = iterator.next().intValue();
headIndex = new AtomicLong(statrIndex);
tailIndex = new AtomicLong(db.getQueueMap().size() + statrIndex);
}else {
headIndex = new AtomicLong(0);
tailIndex = new AtomicLong(0);
}
}finally {
release(db);
}
}
public static BdbPersistentQueue<BatchTask> getInstance(String appName) {
if(bdbPersistentQueue == null) {
synchronized(BdbPersistentQueue.class) {
if(bdbPersistentQueue == null) {
bdbPersistentQueue = new BdbPersistentQueue<BatchTask>(BatchTask.class, appName);
}
}
}
return bdbPersistentQueue;
}
/**
* 获取数据库存储目录文件夹
*
*/
private static String getDbFilePath(String appName) {
// boolean isWindowsOS = false;
// String osName = System.getProperty("os.name");
// if (osName.toLowerCase().indexOf("windows") > -1) {
// isWindowsOS = true;
// }
// if(isWindowsOS) {
// return BdbPersistentQueue.WIN_DATA_PATH + "/" + appName;
// }else {
// return BdbPersistentQueue.LINUX_DATA_PATH + "/" + appName;
// }
return BatchConfig.getConfig().getDataPath() + "/" + appName;
}
/**
* 初始化连接池
*/
private void initConnPool(String dbDir) {
//初始化连接池
BerkeleyQueueDb db = null;
try {
for(int i = 0; i < BdbPersistentQueue.MIN_CONN_POOL_SIZE; i++) {
db = createAndBindDatabase(dbDir, DB_NAME, this.valueClass);
poolQueue.put(db);
}
} catch (Exception e) {
logger.error("初始化Berkeleydb连接池异常", e);
if(db != null) {
try {
close(db);
} catch (Exception e1) {
logger.error("关闭初始化Berkeleydb连接异常", e);
}
}
}
}
/**
* 从连接池获取
*
*/
public synchronized BerkeleyQueueDb getDb() {
try {
BerkeleyQueueDb db = poolQueue.poll();
if(db != null) {
return db;
}
if(db == null && poolQueue.size() <= BdbPersistentQueue.MAX_CONN_POOL_SIZE) {
db = createAndBindDatabase(dbDir, DB_NAME, this.valueClass);
poolQueue.put(db);
}
return poolQueue.take();
} catch (Exception e) {
logger.error("从连接池获取Berkeleydb异常", e);
}
return null;
}
/**
* 归还连接池
*
*/
public synchronized void release(BerkeleyQueueDb db) {
if(db == null) {
return;
}
try {
if(BdbPersistentQueue.MIN_CONN_POOL_SIZE > poolQueue.size()) {
poolQueue.put(db);
}else {
close(db);
}
} catch (Exception e) {
logger.error("归还Berkeleydb连接池异常", e);
}
}
/**
* 创建以及绑定数据库
*
*/
private BerkeleyQueueDb createAndBindDatabase(String dbDir, String dbName, Class<E> valueClass) throws DatabaseNotFoundException,
DatabaseExistsException, DatabaseException, IllegalArgumentException {
File envFile = null;
EnvironmentConfig envConfig = null;
DatabaseConfig dbConfig = null;
Database db = null;
try {
// 数据库位置
envFile = new File(dbDir);
if(!envFile.exists()) {
envFile.mkdir();
}
// 数据库环境配置
envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(false);
envConfig.setLocking(false);
// 数据库配置
dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(false);
dbConfig.setDeferredWrite(true);
// 创建环境
dbEnv = new BdbEnvironment(envFile, envConfig);
// 打开数据库
db = dbEnv.openDatabase(null, dbName, dbConfig);
// 绑定数据库
EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
if(valueBinding == null) {
valueBinding = new SerialBinding<E>(dbEnv.getClassCatalog(), valueClass); // 序列化绑定
}
StoredSortedMap<Long, E> map = new StoredSortedMap<Long, E>(
db, // db
TupleBinding.getPrimitiveBinding(Long.class), // Key
valueBinding, // Value
true);
BerkeleyQueueDb berkeleyQueueDb = new BerkeleyQueueDb(db, map);
return berkeleyQueueDb;
} catch (DatabaseNotFoundException e) {
throw e;
} catch (DatabaseExistsException e) {
throw e;
} catch (DatabaseException e) {
throw e;
} catch (IllegalArgumentException e) {
throw e;
}
}
/**
* 值遍历器
*/
@Override
public Iterator<E> iterator() {
return getDb().getQueueMap().values().iterator();
}
/**
* 队列大小
*/
@Override
public int size() {
synchronized(tailIndex) {
synchronized(headIndex) {
return (int)(tailIndex.get() - headIndex.get());
}
}
}
/**
* 插入值
*/
@Override
public boolean offer(E e) {
synchronized(tailIndex) {
BerkeleyQueueDb db = getDb();
try {
db.getQueueMap().put(tailIndex.getAndIncrement(), e); // 从尾部插入
}finally {
release(db);
}
}
return true;
}
/**
* 获取值,从头部获取
*/
@Override
public E peek() {
synchronized(headIndex) {
if(peekItem != null){
return peekItem;
}
E headItem = null;
BerkeleyQueueDb db = getDb();
try {
while(headItem == null && headIndex.get() < tailIndex.get()) { // 没有超出范围
headItem = db.getQueueMap().get(headIndex.get());
if(headItem != null) {
peekItem = headItem;
continue;
}
headIndex.incrementAndGet(); // 头部指针后移
}
}finally {
release(db);
}
return headItem;
}
}
/**
* 移出元素,移出头部元素
*
*/
@Override
public E poll() {
synchronized(headIndex) {
BerkeleyQueueDb db = getDb();
try {
E headItem = peek();
if(headItem != null) {
db.getQueueMap().remove(headIndex.getAndIncrement());
peekItem = null;
return headItem;
}
}finally {
release(db);
}
}
return null;
}
/**
* 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
*
*/
public void close(BerkeleyQueueDb berkeleyQueueDb) {
try {
if(berkeleyQueueDb != null && berkeleyQueueDb.getQueueDb() != null) {
berkeleyQueueDb.getQueueDb().sync();
berkeleyQueueDb.getQueueDb().close();
}
} catch (DatabaseException e) {
e.printStackTrace();
} catch (UnsupportedOperationException e) {
e.printStackTrace();
}
}
/**
* 同步到磁盘
*
*/
public void sync() {
getDb().getQueueDb().sync();
}
class BerkeleyQueueDb {
private Database queueDb; // 数据库,用于保存值,使得支持队列持久化,无需序列化
private StoredMap<Long, E> queueMap; // 持久化Map, Key为指针位置, Value为值, 无需序列化
public BerkeleyQueueDb(Database queueDb, StoredMap<Long, E> queueMap) {
super();
this.queueDb = queueDb;
this.queueMap = queueMap;
}
public Database getQueueDb() {
return queueDb;
}
public void setQueueDb(Database queueDb) {
this.queueDb = queueDb;
}
public StoredMap<Long, E> getQueueMap() {
return queueMap;
}
public void setQueueMap(StoredMap<Long, E> queueMap) {
this.queueMap = queueMap;
}
}
}
public class BdbEnvironment extends Environment {
private StoredClassCatalog classCatalog;
private Database classCatalogDB;
/**
* Constructor
*
* @param envHome 数据库环境目录
* @param envConfig config options 数据库换纪念馆配置
* @throws DatabaseException
*/
public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
super(envHome, envConfig);
}
/**
* 返回StoredClassCatalog
* @return the cached class catalog
*/
public StoredClassCatalog getClassCatalog() {
if(classCatalog == null) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
try {
classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
classCatalog = new StoredClassCatalog(classCatalogDB);
} catch (DatabaseException e) {
throw new RuntimeException(e);
}
}
return classCatalog;
}
@Override
public synchronized void close() throws DatabaseException {
if(classCatalogDB != null) {
classCatalogDB.close();
}
super.close();
}
}
- 几个工具类的实现
public class ClassUtils {
/**
* 从包package中获取所有的Class
*
* @param pack
* @return
*/
public static Set<Class<?>> getClasses(String pack) {
// 第一个class类的集合
Set<Class<?>> classes = new LinkedHashSet<>();
// 是否循环迭代
boolean recursive = true;
// 获取包的名字 并进行替换
String packageName = pack;
String packageDirName = packageName.replace('.', '/');
// 定义一个枚举的集合 并进行循环来处理这个目录下的things
Enumeration<URL> dirs;
try {
dirs = Thread.currentThread().getContextClassLoader().getResources(packageDirName);
// 循环迭代下去
while (dirs.hasMoreElements()) {
// 获取下一个元素
URL url = dirs.nextElement();
// 得到协议的名称
String protocol = url.getProtocol();
// 如果是以文件的形式保存在服务器上
if ("file".equals(protocol)) {
// 获取包的物理路径
String filePath = URLDecoder.decode(url.getFile(), "UTF-8");
// 以文件的方式扫描整个包下的文件 并添加到集合中
findAndAddClassesInPackageByFile(packageName, filePath, recursive, classes);
} else if ("jar".equals(protocol)) {
// 如果是jar包文件
// 定义一个JarFile
JarFile jar;
try {
// 获取jar
jar = ((JarURLConnection) url.openConnection()).getJarFile();
// 从此jar包 得到一个枚举类
Enumeration<JarEntry> entries = jar.entries();
// 同样的进行循环迭代
while (entries.hasMoreElements()) {
// 获取jar里的一个实体 可以是目录 和一些jar包里的其他文件 如META-INF等文件
JarEntry entry = entries.nextElement();
String name = entry.getName();
// 如果是以/开头的
if (name.charAt(0) == '/') {
// 获取后面的字符串
name = name.substring(1);
}
// 如果前半部分和定义的包名相同
if (name.startsWith(packageDirName)) {
int idx = name.lastIndexOf('/');
// 如果以"/"结尾 是一个包
if (idx != -1) {
// 获取包名 把"/"替换成"."
packageName = name.substring(0, idx).replace('/', '.');
}
// 如果可以迭代下去 并且是一个包
if ((idx != -1) || recursive) {
// 如果是一个.class文件 而且不是目录
if (name.endsWith(".class") && !entry.isDirectory()) {
// 去掉后面的".class" 获取真正的类名
String className = name.substring(packageName.length() + 1, name.length() - 6);
try {
// 添加到classes
classes.add(Class.forName(packageName + '.' + className));
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
}
}
} catch (IOException e) {
// log.error("在扫描用户定义视图时从jar包获取文件出错");
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
return classes;
}
/**
* 以文件的形式来获取包下的所有Class
*
* @param packageName
* @param packagePath
* @param recursive
* @param classes
*/
public static void findAndAddClassesInPackageByFile(String packageName, String packagePath, final boolean recursive,
Set<Class<?>> classes) {
// 获取此包的目录 建立一个File
File dir = new File(packagePath);
// 如果不存在或者 也不是目录就直接返回
if (!dir.exists() || !dir.isDirectory()) {
// log.warn("用户定义包名 " + packageName + " 下没有任何文件");
return;
}
// 如果存在 就获取包下的所有文件 包括目录
File[] dirfiles = dir.listFiles(new FileFilter() {
// 自定义过滤规则 如果可以循环(包含子目录) 或则是以.class结尾的文件(编译好的java类文件)
public boolean accept(File file) {
return (recursive && file.isDirectory()) || (file.getName().endsWith(".class"));
}
});
// 循环所有文件
for (File file : dirfiles) {
// 如果是目录 则继续扫描
if (file.isDirectory()) {
findAndAddClassesInPackageByFile(packageName + "." + file.getName(), file.getAbsolutePath(), recursive,
classes);
} else {
// 如果是java类文件 去掉后面的.class 只留下类名
String className = file.getName().substring(0, file.getName().length() - 6);
try {
// 添加到集合中去
// classes.add(Class.forName(packageName + '.' + className));
// 经过回复同学的提醒,这里用forName有一些不好,会触发static方法,没有使用classLoader的load干净
classes.add(
Thread.currentThread().getContextClassLoader().loadClass(packageName + '.' + className));
} catch (ClassNotFoundException e) {
// log.error("添加用户自定义视图类错误 找不到此类的.class文件");
e.printStackTrace();
}
}
}
}
// --------------------------------------------------------------------------------------------------------
@SuppressWarnings({ "rawtypes", "unchecked" })
public static Set<Class<?>> getByInterface(Class clazz, Set<Class<?>> classesAll) {
Set<Class<?>> classes = new LinkedHashSet<Class<?>>();
// 获取指定接口的实现类
if (!clazz.isInterface()) {
try {
/**
* 循环判断路径下的所有类是否继承了指定类 并且排除父类自己
*/
Iterator<Class<?>> iterator = classesAll.iterator();
while (iterator.hasNext()) {
Class<?> cls = iterator.next();
/**
* isAssignableFrom该方法的解析,请参考博客:
* http://blog.csdn.net/u010156024/article/details/44875195
*/
if (clazz.isAssignableFrom(cls)) {
if (!clazz.equals(cls)) {// 自身并不加进去
classes.add(cls);
} else {
}
}
}
} catch (Exception e) {
System.out.println("出现异常");
}
}
return classes;
}
}
public class CommonUtils {
public static boolean isEmpty(Collection<?> coll) {
return (coll == null || coll.isEmpty());
}
public static boolean isBlank(final CharSequence cs) {
int strLen;
if (cs == null || (strLen = cs.length()) == 0) {
return true;
}
for (int i = 0; i < strLen; i++) {
if (!Character.isWhitespace(cs.charAt(i))) {
return false;
}
}
return true;
}
public static boolean isNotBlank(final CharSequence cs) {
return !isBlank(cs);
}
}
/**
* JSON工具类
*
*/
public class JsonUtils {
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").disableHtmlEscaping().create();
/**
* 根据json字符串,得到某个特定类型的对象(当此特定类型不是泛型时,可以使用本方法)
*
* @param json
* @param clazz
* @return
* @throws
*/
public static <T> T fromJson(String json, Class<T> clazz) {
return (T) gson.fromJson(json, clazz);
}
/**
* 将对象转为json字符串
*
* @param obj
* @return
* @throws
*/
public static String toJson(Object obj) {
if (null == obj) {
return gson.toJson(JsonNull.INSTANCE);
}
if (obj instanceof JsonObject) {
return obj.toString();
}
if (obj instanceof JsonArray) {
return obj.toString();
}
return gson.toJson(obj);
}
/**
* 将对象转换成JSON字符串 JSON格式:{seccess:true,data:'信息'}
*
* @param seccess
* @param obj
* @return
*/
public static String objToJson(boolean seccess, Object... obj) {
Map<String, Object> m = new HashMap<String, Object>();
m.put("success", seccess);
if (null != obj && obj.length > 0) {
if (obj.length > 1) {
m.put("data", obj);
} else {
m.put("data", obj[0]);
}
}
return gson.toJson(m);
}
/**
* 从Json格式字符串,得到对象
*
* @param json
* @param t
* @return
*/
@SuppressWarnings("unchecked")
public static <T> T fromJson(String json, Type t) {
return (T) gson.fromJson(json, t);
}
/**
* 根据json字符串,得到一个JsonElement对象
*
* @param json
* @return
*/
public static JsonElement fromJsonAsJsonElement(String json) {
return gson.fromJson(json, JsonElement.class);
}
/**
* 将json字符串转化为json对象
* @param json
* @return
*/
public static JsonObject toJsonObj(String json) {
JsonObject jsonObj = null;
JsonElement el = fromJsonAsJsonElement(json);
if(el.isJsonObject()){
jsonObj = el.getAsJsonObject();
}
return jsonObj;
}
/**
* 将json字符串转化为json数组对象
* @param json
* @return
*/
public static JsonArray toJsonArray(String json) {
JsonArray jsonArray = null;
JsonElement el = JsonUtils.fromJsonAsJsonElement(json);
if(el.isJsonArray()){
jsonArray = el.getAsJsonArray();
}
return jsonArray;
}
public static <T> T toObj(Object obj, Class<T> c) {
@SuppressWarnings("rawtypes")
Map map = (Map)obj;
String jsonString = JsonUtils.toJson(map);
return JsonUtils.fromJson(jsonString, c);
}
}
public class SpringContainerTools implements ApplicationContextAware {
private static ApplicationContext context = null;
private static SpringContainerTools tools = null;
public synchronized static SpringContainerTools init() {
if (tools == null) {
tools = new SpringContainerTools();
}
return tools;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
context = applicationContext;
}
public synchronized static Object getBean(String beanName) {
return context.getBean(beanName);
}
public synchronized static Object getBean(Class<?> requiredType) {
if (context == null) {
return null;
}
return context.getBean(requiredType);
}
}
- task的封装
public class BatchTask implements Serializable {
private static final long serialVersionUID = 1L;
private String key;
public Object[] args;
public BatchTask(String key, Object... args) {
super();
this.key = key;
this.args = args;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
}
- 配置处理
public class BatchConfig {
private static final Logger logger = LoggerFactory.getLogger(BatchConfig.class);
private static final BatchConfig BATCH_CONFIG = new BatchConfig();
private String packageBase;//扫描的依赖包,多个用|分割,不配置的话扫描classpath下的所有包
private String dataPath;//数据存储路径
private int maxExecuteSize = 1;//最大抓取数,如果配置成大于1,则有丢数据的风险,但效率会提升
private int maxConnCount = 5;
private int minConnCount = 1;
private BatchConfig(){}
public static BatchConfig getConfig() {
return BATCH_CONFIG;
}
public String getPackageBase() {
if(CommonUtils.isBlank(packageBase)) {
logger.error("package base不能为空");
return null;
}
return packageBase;
}
public void setPackageBase(String packageBase) {
this.packageBase = packageBase;
}
public String getDataPath() {
if(CommonUtils.isBlank(dataPath)) {
logger.error("data path不能为空");
return null;
}
return dataPath;
}
public void setDataPath(String dataPath) {
this.dataPath = dataPath;
}
public int getMaxConnCount() {
return maxConnCount;
}
public void setMaxConnCount(int maxConnCount) {
this.maxConnCount = maxConnCount;
}
public int getMinConnCount() {
return minConnCount;
}
public void setMinConnCount(int minConnCount) {
this.minConnCount = minConnCount;
}
public int getMaxExecuteSize() {
return maxExecuteSize;
}
public void setMaxExecuteSize(int maxExecuteSize) {
this.maxExecuteSize = maxExecuteSize;
}
}
- 实现handler,处理任务
/**
* 批量处理类
*
*/
public class BatchHandler {
private static final Logger logger = LoggerFactory.getLogger(BatchHandler.class);
private static final String PACKAGE_BASE = BatchConfig.getConfig().getPackageBase();
private static final Map<String, Object> BATCH_EXECUTOR_OBJECT_MAP = new HashMap<>();
private static final Map<String, ExecutorHandler> BATCH_EXECUTOR_HANDLER_MAP = new HashMap<>();
private static final int MAX_EXECUTOR_COUNT = 5;
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(MAX_EXECUTOR_COUNT, MAX_EXECUTOR_COUNT, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
private static final AtomicLong EXECUTOR_TASK_COUNT = new AtomicLong(0);
/**
* 获取正在执行的线程数
*/
public static int getActiveCount() {
return EXECUTOR.getActiveCount();
}
/**
* 获取最大执行的线程数
*/
public static int getMaxActiveCount() {
return MAX_EXECUTOR_COUNT;
}
/**
* 尝试获取是否有可执行的线程
*/
public static boolean tryGetUseableThread() {
return EXECUTOR.getActiveCount() < MAX_EXECUTOR_COUNT;
}
/**
* 获取执行的任务数
*/
public static long getExecutorTaskCount() {
return EXECUTOR_TASK_COUNT.get();
}
/**
* 处理多个异步任务
*/
public static void handler(List<BatchTask> list) {
EXECUTOR.execute(new Runnable() {
@Override
public void run() {
try {
for(BatchTask task : list) {
EXECUTOR_TASK_COUNT.incrementAndGet();
logger.info("执行task: " + JsonUtils.toJson(task));
if(!execute(task)) {
logger.info("执行失败,task: " + JsonUtils.toJson(task));
}
EXECUTOR_TASK_COUNT.decrementAndGet();
}
}finally {
if(!tryGetUseableThread()) {
LockSupport.unpark(BatchWorker.THREAD);
}
}
}
});
}
/**
* 处理单个异步任务
*/
public static void handler(BatchTask task) {
execute(task);
}
/**
* 执行回调操作
*/
private static boolean execute(BatchTask task) {
boolean result = true;
try {
ExecutorHandler executorHandler = BATCH_EXECUTOR_HANDLER_MAP.get(task.getKey());
executorHandler.getMethod().setAccessible(true);
executorHandler.getMethod().invoke(executorHandler.getObject(), task.getArgs());
} catch (Exception e) {
logger.error("执行回调操作异常", e);
result = false;
}
return result;
}
public static void scanClasses() throws Exception {
for(String packageName : PACKAGE_BASE.split("\\|")) {
for(Class<?> clazz : ClassUtils.getClasses(packageName)) {
for(Method method : clazz.getDeclaredMethods()) {
BatchMethod batchMethod = method.getAnnotation(BatchMethod.class);
if(batchMethod != null) {
String key = batchMethod.value();
//该类必须要有无参构造函数
boolean isNoParamConstructor = false;
for(Constructor<?> constructor : clazz.getConstructors()) {
if(constructor.getParameterCount() == 0 && Modifier.isPublic(constructor.getModifiers())) {
isNoParamConstructor = true;
break;
}
}
if(!isNoParamConstructor) {
throw new Error("不存在无参且访问修饰符为public的构造函数: " + key);
}
if(!Modifier.isPublic(method.getModifiers())) {
throw new Error("BatchMethod方法的访问修饰符必须要为public: " + key);
}
//校验key是否存在
if(BATCH_EXECUTOR_HANDLER_MAP.get(key) != null) {
throw new Error("存在相同的batchMethod: " + key);
}
//判断该方法所在的类是否有spring的service注解或component注解
Service service = clazz.getAnnotation(Service.class);
Component component = clazz.getAnnotation(Component.class);
logger.info("key: " + key + "已添加BatchMethod");
if(service != null || component != null) {
String beanName = service == null ? component.value() : service.value();
if(CommonUtils.isBlank(beanName)) {
beanName = clazz.getSimpleName();
char[]chars = beanName.toCharArray();
chars[0] += 32;
beanName = String.valueOf(chars);
}
Object object = SpringContainerTools.getBean(clazz);
if(object == null) {
throw new Error("serviceName: " + beanName + ", batchMethod: " + key + "是Spring方法,必须在Spring容器加载完之后启动");
}
logger.info("key: " + key + "是spring类");
BATCH_EXECUTOR_HANDLER_MAP.put(key, new ExecutorHandler(object, method));
}else {
//创建一个实例
Object object = BATCH_EXECUTOR_OBJECT_MAP.get(clazz.getName());
if(object == null) {
object = clazz.newInstance();
BATCH_EXECUTOR_OBJECT_MAP.put(key, object);
}
BATCH_EXECUTOR_HANDLER_MAP.put(key, new ExecutorHandler(object, method));
}
}
}
}
}
}
static class ExecutorHandler{
private Object object;
private Method method;
public Object getObject() {
return object;
}
public void setObject(Object object) {
this.object = object;
}
public Method getMethod() {
return method;
}
public void setMethod(Method method) {
this.method = method;
}
public ExecutorHandler(Object object, Method method) {
super();
this.object = object;
this.method = method;
}
}
}
- 实现对任务线程的管理
/**
* 批量工作类
*
*/
public class BatchWorker {
private static final Logger logger = LoggerFactory.getLogger(BatchWorker.class);
private static volatile boolean STARTUP = false;
private static final int SIZE = BatchConfig.getConfig().getMaxExecuteSize();
public static final Thread THREAD = new Thread(new BatchThread());
private static final int SINGLE_THREAD_MODE = 1;
private static final int MULTI_THREAD_MODE = 2;
private static final int MAX_POOL_SIZE = 100;//最大弹出数
private static int MODE = BatchWorker.MULTI_THREAD_MODE;
private static BdbPersistentQueue<BatchTask> BDB_PERSISTENT_QUEUE;
public static void setMode(int mode) {
BatchWorker.MODE = mode;
}
/**
* 获取工作模式
*
*/
public static int getMode() {
return MODE;
}
/**
* 获取一个线程的处理任务数
*
*/
public static int getSize() {
return SIZE;
}
/**
* 获取总队列数
*
*/
public static int getQueueSize() {
return BDB_PERSISTENT_QUEUE.size();
}
/**
* 启动推任务
*/
public static synchronized void start(String appName) {
logger.info("============Batch Worker开始启动============");
try {
if(!BatchWorker.STARTUP) {
logger.info("============Batch Thread开始启动============");
BatchWorker.STARTUP = true;
BDB_PERSISTENT_QUEUE = BdbPersistentQueue.getInstance(appName);
BatchWorker.THREAD.start();
logger.info("============Batch Thread启动成功============");
}else {
logger.info("============Batch Thread已启动============");
}
}catch(Exception e) {
logger.error("============Batch Worker启动失败============", e);
}
logger.info("============Batch Worker启动结束============");
}
/**
* 删除任务
*
*/
public static int delete() {
//执行成功后可以delete掉对应的job
return 0;
}
/**
* 从头部获取指定大小的队列内容
*
*/
public static List<BatchTask> poll(int size) {
size = size > MAX_POOL_SIZE ? MAX_POOL_SIZE : size;
List<BatchTask> list = new ArrayList<BatchTask>(size);
for(int i = 0; i < size; i++) {
BatchTask task = BDB_PERSISTENT_QUEUE.poll();
if(task != null) {
list.add(task);
}else {
break;
}
}
BDB_PERSISTENT_QUEUE.sync();
return list;
}
/**
* 从头部获取队列内容
*
*/
public static BatchTask poll() {
BatchTask task = BDB_PERSISTENT_QUEUE.poll();
BDB_PERSISTENT_QUEUE.sync();
return task;
}
public static void offer(BatchTask batchTask) {
BDB_PERSISTENT_QUEUE.offer(batchTask);
}
public static void sync() {
BDB_PERSISTENT_QUEUE.sync();
}
private static class BatchThread implements Runnable {
@Override
public void run() {
for(;;) {
if(BatchWorker.MODE == BatchWorker.SINGLE_THREAD_MODE) {//单线程模式
poll();
}else if(BatchWorker.MODE == BatchWorker.MULTI_THREAD_MODE) {//多线程模式
if(BatchHandler.tryGetUseableThread()) {
//1. 批量拉取BERKELEYDB中的任务
List<BatchTask> list = poll(SIZE);
//2. 如果获取不到,则休眠,等待唤醒
if(CommonUtils.isEmpty(list)) {
LockSupport.park();
}else {
//3. 如果获取到任务,则交给到Handler
BatchHandler.handler(list);
}
}
}
}
}
}
}
- 显示对外使用的工具类
/**
* 批量处理工具类
* @author Administrator
*
*/
public class BatchUtils {
private static volatile boolean STARTUP = false;
private static final int MAX_OFFER_SIZE = 1000;//最大入队数
private static final int MONITOR_PORT = 11009;//监控端口
private static final String MONITOR_URL = "/batch";//监控路径
private BatchUtils(){}
/**
* 启动批量处理功能
*
*/
public static void startup(String appName) throws Exception {
startup(appName, null);
}
public static void startup(String appName, ApplicationContext applicationContext) throws Exception {
synchronized (BatchUtils.class) {
if(applicationContext != null) {
SpringContainerTools.init().setApplicationContext(applicationContext);
}
if(!BatchUtils.STARTUP) {
BatchUtils.STARTUP = true;
BatchHandler.scanClasses();
BatchWorker.start(appName);
startupHttpMonitorServer();
}
}
}
private static void startupHttpMonitorServer() throws IOException {
HttpServer httpServer = HttpServer.create(new InetSocketAddress(MONITOR_PORT), 0);
httpServer.createContext(MONITOR_URL, new HttpHandler() {
@Override
public void handle(HttpExchange httpExchange) throws IOException {
StringBuilder sb = new StringBuilder();
sb.append("工作模式: ").append(BatchWorker.getMode() == 2 ? "多线程模式" : "单线程模式").append("<br/>");
sb.append("单个线程的处理任务数: ").append(BatchWorker.getSize()).append("<br/>");
sb.append("队列总数量: ").append(BatchWorker.getQueueSize()).append("<br/>");
sb.append("执行的任务数: ").append(BatchHandler.getExecutorTaskCount()).append("<br/>");
sb.append("最大执行的线程数: ").append(BatchHandler.getMaxActiveCount()).append("<br/>");
sb.append("正在执行的线程数: ").append(BatchHandler.getActiveCount()).append("<br/>");
byte[] respContents = sb.toString().getBytes("UTF-8");
httpExchange.getResponseHeaders().add("Content-Type", "text/html; charset=UTF-8");
httpExchange.sendResponseHeaders(200, respContents.length);
httpExchange.getResponseBody().write(respContents);
httpExchange.close();
}
});
// 启动服务
httpServer.start();
}
/**
* 是否启动了批量处理
*
*/
public static boolean isStartup() {
return BatchUtils.STARTUP;
}
/**
* 批量添加任务
*
*/
public static void batchAdd(String key, Object args) throws Exception {
List<Object> list = new ArrayList<>(1);
if(args != null) {
list.add(args);
}
batchAdd(key, list);
}
/**
* 批量添加任务
*
*/
public static void batchAdd(String key, List<?> args) throws Exception {
if(!BatchUtils.STARTUP) {
throw new Exception("批量功能未启动,请先调用startup方法");
}
if(CommonUtils.isEmpty(args)) {
BatchWorker.offer(new BatchTask(key));
BatchWorker.sync();
LockSupport.unpark(BatchWorker.THREAD);
return;
}
int size = args.size();
if(size > BatchUtils.MAX_OFFER_SIZE) {//通过提交任务方式入队
Executors.callable(new Runnable() {
@Override
public void run() {
for(Object arg : args) {
BatchWorker.offer(new BatchTask(key, arg));
}
BatchWorker.sync();
}
});
}else {
for(Object arg : args) {
BatchWorker.offer(new BatchTask(key, arg));
}
BatchWorker.sync();
}
LockSupport.unpark(BatchWorker.THREAD);
}
}
- 定义任务注解,快速生产
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BatchMethod {
String value();
}
- 使用
- 使用@BatchMethod注解在需要实现业务的方法标上
package com.yujinyi.test;
import java.util.ArrayList;
import java.util.List;
import com.yujinyi.batch.BatchConfig;
import com.yujinyi.batch.BatchMethod;
import com.yujinyi.batch.BatchUtils;
public class Test {
@BatchMethod("test")
public void test() {
System.out.println("test");
}
@BatchMethod("test1")
public void test1(String s) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date()) + ": " + s);
}
@BatchMethod("test2")
public void test2(String s) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date()) + ": " + s);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 启动前做两个配置项,一个是存储路径,一个是扫描存在@BatchMethod注解的包(多个用“|”隔开),最后调用启动方法
public static void main(String[] args) throws Exception {
BatchConfig.getConfig().setDataPath("E:/test");//配置存储路径
BatchConfig.getConfig().setPackageBase("com.yujinyi.test");//待扫描的包,多个用|隔开
//启动队列扫描器,参数是应用名
BatchUtils.startup("test");
//往test1队列塞数据
List<String> list = new ArrayList<String>();
list.add("1");
list.add("2");
list.add("3");
BatchUtils.batchAdd("test1", list);
//继续往test1队列塞数据
new Thread(() -> {
for(int i = 4; i < 14; i++) {
try {
BatchUtils.batchAdd("test1", "" + i);
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//往test2队列塞数据
List<String> list1 = new ArrayList<String>();
list1.add("a");
list1.add("b");
list1.add("c");
list1.add("d");
list1.add("e");
list1.add("f");
list1.add("g");
list1.add("h");
BatchUtils.batchAdd("test2", list1);
}
- 输出结果如下
2019-03-04 10:08:48: 2
2019-03-04 10:08:48: 3
2019-03-04 10:08:48: 1
2019-03-04 10:08:48: a
2019-03-04 10:08:48: b
2019-03-04 10:08:48: c
2019-03-04 10:08:48: d
2019-03-04 10:08:48: e
2019-03-04 10:08:53: 4
2019-03-04 10:08:53: f
2019-03-04 10:08:53: g
2019-03-04 10:08:53: h
2019-03-04 10:08:53: 5
2019-03-04 10:08:53: 6
2019-03-04 10:08:53: 7
2019-03-04 10:08:53: 8
2019-03-04 10:08:53: 9
2019-03-04 10:08:54: 10
2019-03-04 10:08:55: 11
2019-03-04 10:08:56: 12
2019-03-04 10:08:57: 13