事件监听设计 - 969251639/study GitHub Wiki

项目中,需要定义各种事件进行解耦,在spring集成mq的过程中,可以使用以下方式进行定义,使用十分方便

  1. 定义生产者发布事件
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;

public void sendMessage(Destination destination, final String message) {
	jmsMessagingTemplate.convertAndSend(destination, message);
}
  1. 定义消费目的地进行消费
  @JmsListener(destination = xxx)
  public void xxx(String text) {	
  }

事件发送与事件消费完全解耦,使用简单方便,现模拟出这样的一个事件监听器进行本地解耦代码

注:spring等很多框架都有一套事件监听方案,可直接使用,但个人觉得都不如以上方式使用方便,故造个小轮子

实现流程

同步模式

异步模式

实现模式

  1. 定义监听注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Listener {
	String value();
}
  1. 定义事件抽象
public interface Event {

}
public enum EventModeEnum {
	STANDARD(1, "标准模式"), ASYN(2, "异步模式");
	
	private int key;

	private String value;

	private EventModeEnum(int key, String value) {
		this.key = key;
		this.value = value;
	}

	public static EventModeEnum get(int id) {
		for (EventModeEnum e : EventModeEnum.values()) {
			if (e.getKey() == id) {
				return e;
			}
		}
		return null;
	}

	public int getKey() {
		return key;
	}

	public void setKey(int key) {
		this.key = key;
	}

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}
}
  1. 定义观察者信息
public class ListenerClass {
	private Class<?> clazz;
	private Method method;
	public Class<?> getClazz() {
		return clazz;
	}
	public void setClazz(Class<?> clazz) {
		this.clazz = clazz;
	}
	public Method getMethod() {
		return method;
	}
	public void setMethod(Method method) {
		this.method = method;
	}
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((clazz == null) ? 0 : clazz.hashCode());
		result = prime * result + ((method == null) ? 0 : method.hashCode());
		return result;
	}
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		ListenerClass other = (ListenerClass) obj;
		if (clazz == null) {
			if (other.clazz != null)
				return false;
		} else if (!clazz.equals(other.clazz))
			return false;
		if (method == null) {
			if (other.method != null)
				return false;
		} else if (!method.equals(other.method))
			return false;
		return true;
	}
}
public class ListenerClassEvent {
	private ListenerClass listenerClass;
	private Event event;
	public ListenerClassEvent(ListenerClass listenerClass, Event event) {
		super();
		this.listenerClass = listenerClass;
		this.event = event;
	}
	public ListenerClass getListenerClass() {
		return listenerClass;
	}
	public Event getEvent() {
		return event;
	}
}
  1. 启动时扫描注解
public class Boostrap {
	private static final Logger LOGGER = LoggerFactory.getLogger(Boostrap.class);
	
	public static void standardStartup(String packageBase) {
		startup(packageBase, EventModeEnum.STANDARD.getKey());
	}
	
	public static void asynStartup(String packageBase) {
		startup(packageBase, EventModeEnum.ASYN.getKey());
	}
	
	private static void startup(String packageBase, int mode) {
		if(EventModeEnum.ASYN.getKey() == mode) {
			Publisher.setQueue(new LinkedBlockingQueue<ListenerClassEvent>());
		}
		
		// 包下面的类
		Set<Class<?>> clazzes = new HashSet<>();
		for(String pack : packageBase.split("\\|")) {
	        Set<Class<?>> set = ClassUtils.getClasses(pack);
	        clazzes.addAll(set);
		}
		if (clazzes.size() == 0) {
            return;
        }
        
        for (Class<?> clazz : clazzes) {
            // 获取方法上的注解
            Method[] methods = clazz.getDeclaredMethods();
            for (Method method : methods) {
            	Listener listener = method.getAnnotation(Listener.class);
                if(listener != null) {
                	//确保加了该注解的参数的真确性
                	try {
						checkParam(method);
					} catch (Exception e) {
						LOGGER.error("启动监听异常: " + e.getMessage(), e);
						System.exit(0);
					}
                	String eventType = listener.value();
                	ListenerClass listenerClass = new ListenerClass();
                	listenerClass.setClazz(clazz);
                	listenerClass.setMethod(method);
                	Publisher.registerListener(eventType, listenerClass);
                }
            }
        }
	}
	
	private static void checkParam(Method method) throws Exception {
		Class<?>[] clazzes = method.getParameterTypes();
		if(clazzes.length != 1) {
			throw new Exception("方法" + method.getName() + "参数个数必须为一个");
		}
		Class<?> clazz1 = clazzes[0];
		Class<?>[] interfaceClazzes = clazz1.getInterfaces();
		if(interfaceClazzes.length == 0) {
			throw new Exception("方法" + method.getName() + "参数必须为com.yujinyi.event.Event类型");
		}
		for(Class<?> interfaceClazz : interfaceClazzes) {
			if(interfaceClazz.isAssignableFrom(Event.class)) {
				return;
			}
		}
		throw new Exception("方法" + method.getName() + "参数必须为com.yujinyi.event.Event类型");
	}
	
}
  1. 实现生产者
public class Publisher {
	private static final Logger LOGGER = LoggerFactory.getLogger(Publisher.class);
	private static final Map<String, Set<ListenerClass>> MAP = new HashMap<>();
	private static final Map<String, Object> OBJECT_MAP = new HashMap<>();
	private static BlockingQueue<ListenerClassEvent> QUEUE = null;
	private static boolean IS_MULTITHREADING_MODE = false;
	private static Thread THREAD = null;
	
	protected static void setQueue(BlockingQueue<ListenerClassEvent> queue) {
		QUEUE = queue;
		IS_MULTITHREADING_MODE = true;
		THREAD = new Thread(() -> {
			consumerQueue();
		});
		THREAD.start();
	}
	
	public static void publishEvent(Event event, String eventType) {
		//1. 根据eventType找到所有Listener
		Set<ListenerClass> s = MAP.get(eventType);
		
		//2. 遍历所有Listener,进行事件派发
		if(!CommonUtils.isEmpty(s)) {
			for(ListenerClass l : s) {
				if(!IS_MULTITHREADING_MODE) {
					invoke(l, event);
				}else {
					QUEUE.add(new ListenerClassEvent(l, event));
					//唤醒线程
					LockSupport.unpark(THREAD);
				}
			}
		}
	}
	
	private static void consumerQueue() {
		for(;;) {
			ListenerClassEvent lce = QUEUE.poll();
			if(lce == null) {
				LockSupport.park();
			}else {
				invoke(lce.getListenerClass(), lce.getEvent());
			}
		}
	}
	
	private static void invoke(ListenerClass l, Event event) {
		String key = l.getClazz().getSimpleName() + "." + l.getMethod().getName();
		Object o = OBJECT_MAP.get(key);
		try {
			if(o == null) {
				o = l.getClazz().newInstance();
				OBJECT_MAP.put(key, o);
			}
			l.getMethod().invoke(o, event);
		} catch (Exception e) {
			LOGGER.error("executor error", e);
		}
	}
	
	public static void registerListener(String eventType, ListenerClass listenerClass) {
		Set<ListenerClass> s = MAP.get(eventType);
		if(CommonUtils.isEmpty(s)) {
			s = new HashSet<ListenerClass>();
			MAP.put(eventType, s);
		}
		if(!s.contains(listenerClass)) {
			s.add(listenerClass);
		}
	}
}

使用步骤

  1. 定义event类,实现Event接口,作为listener方法的入参
public class TestEvent implements Event {
	private String text;

	public String getText() {
		return text;
	}

	public void setText(String text) {
		this.text = text;
	}
	
}
  1. 定义监听器,可以有多个
package com.yujinyi.test;

import com.yujinyi.event.Listener;

public class Listener1 {
	
	@Listener("test")
	public void test(TestEvent event) {
		System.out.println("im listener1, text: " + event.getText());
	}
}

package com.yujinyi.test;

import com.yujinyi.event.Listener;

public class Listener2 {
	
	@Listener("test")
	public void test(TestEvent event) {
		System.out.println("im listener2, text: " + event.getText());
	}
}
  1. 启动
public class Test {
	public static void main(String[] args) {
		Boostrap.standardStartup("com.yujinyi.event|com.yujinyi.test");//同步模式启动
		//Boostrap.multithreadingStartup("com.yujinyi.event|com.yujinyi.test");//异步模式启动
		
//		for(int i = 0; i < 10; i++) {
//			TestEvent event = new TestEvent();
//			event.setText("Text" + i);
//			Publisher.publishEvent(event, "test");
//		}
		SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		new Thread(() -> {
			for(int i = 0; i < 10; i++) {
				TestEvent event = new TestEvent();
				event.setText("Text" + i);
				Publisher.publishEvent(event, "test");//发布test事件
				System.out.println(simpleDateFormat.format(new Date()));
				try {
					Thread.sleep(2000);
				} catch (Exception e) {
					e.printStackTrace();
				}
				System.out.println(simpleDateFormat.format(new Date()));
			}
		}).start();
	}
}

测试结果如下

im listener1, text: Text0
im listener2, text: Text0
2019-03-04 10:29:01
2019-03-04 10:29:03
im listener1, text: Text1
im listener2, text: Text1
2019-03-04 10:29:03
2019-03-04 10:29:05
im listener1, text: Text2
im listener2, text: Text2
2019-03-04 10:29:05
2019-03-04 10:29:07
im listener1, text: Text3
im listener2, text: Text3
2019-03-04 10:29:07
2019-03-04 10:29:09
im listener1, text: Text4
im listener2, text: Text4
2019-03-04 10:29:09
2019-03-04 10:29:11
im listener1, text: Text5
im listener2, text: Text5
2019-03-04 10:29:11
2019-03-04 10:29:13
im listener1, text: Text6
im listener2, text: Text6
2019-03-04 10:29:13
2019-03-04 10:29:15
im listener1, text: Text7
im listener2, text: Text7
2019-03-04 10:29:15
2019-03-04 10:29:17
im listener1, text: Text8
im listener2, text: Text8
2019-03-04 10:29:17
2019-03-04 10:29:19
im listener1, text: Text9
im listener2, text: Text9
2019-03-04 10:29:19
2019-03-04 10:29:21
⚠️ **GitHub.com Fallback** ⚠️