dubbo Filter - litter-fish/ReadSource GitHub Wiki
ProtocolFilterWrapper是Protocol的包装类,所以会在加载的Extension的时候被自动包装进来(理解这里的前提是理解Dubbo的SPI机制),然后我们看一下这个Filter链是如何构造的。
/**
* ListenerProtocol
*/
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
this.protocol = protocol;
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//获得所有激活的Filter(已经排好序的)
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
//复制引用,构建filter调用链
final Invoker<T> next = last;
//这里只是构造一个最简化的Invoker作为调用链的载体Invoker
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
@Override
public int getDefaultPort() {
return protocol.getDefaultPort();
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//向注册中心引用服务的时候并不会进行filter调用链
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
@Override
public void destroy() {
protocol.destroy();
}
}
public List<T> getActivateExtension(URL url, String key, String group) {
// 获取<dubbo:service filter="filter1,default,filter2,-token"/>配置中filter属性的值
String value = url.getParameter(key);
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
// <dubbo:service filter="filter1,-default,filter2,-token"/>配置中filter属性的值,按照,转换成数组
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
// 配置指定的项包含'-default'时, 则不加载默认的filter链组, 反之则加载
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
// group的取值范围限于provider/consumer, 标明作用的场景
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
//这里以Filter为例:三个判断条件的含义依次是:
//1.用户配置的filter列表中不包含当前ext
//2.用户配置的filter列表中不包含当前ext的加-的key
//3.如果用户的配置信息(url中体现)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
// 根据Activate注解上的order排序, 有点类似css的zindex属性, 数值越小排在前面
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
// 进行到此步骤的时候Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的Filter
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
//如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种的可以,这个if是进不去的)
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
// 可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
// 加入用户自己定义的扩展Filter
exts.addAll(0, usrs);
usrs.clear();
}
} else {
//加入用户自己定义的扩展Filter
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 在当前的RpcContext中记录本地调用的一 些状态信息
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
RpcResult result = (RpcResult) invoker.invoke(invocation);
RpcContext.getServerContext().setAttachments(result.getAttachments());
return result;
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
/**
* MonitorFilter. (SPI, Singleton, ThreadSafe)
*/
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class MonitorFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);
// key: 接口名.方法名 value: 当前的并发数
private final ConcurrentMap<String, AtomicInteger> concurrents = new ConcurrentHashMap<String, AtomicInteger>();
// MonitorFactory$Adaptive
private MonitorFactory monitorFactory;
public void setMonitorFactory(MonitorFactory monitorFactory) {
this.monitorFactory = monitorFactory;
}
// intercepting invocation
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 开启了monitor监控
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called
String remoteHost = context.getRemoteHost();
long start = System.currentTimeMillis(); // record start timestamp
// 并发数+1
getConcurrent(invoker, invocation).incrementAndGet(); // count up
try {
Result result = invoker.invoke(invocation); // proceed invocation chain
// 收集统计数据
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
// 发生异常时收集统计数据
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
// 并发数-1
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
} else {
return invoker.invoke(invocation);
}
}
// collect info
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
// ---- service statistics ----
// 此次调用花费的时间
long elapsed = System.currentTimeMillis() - start; // invocation cost
int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY);
String service = invoker.getInterface().getName(); // service name
String method = RpcUtils.getMethodName(invocation); // method name
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);
URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY);
//根据monitorUrl获取Monitor实现(默认使用DubboMonitor)
Monitor monitor = monitorFactory.getMonitor(url);
if (monitor == null) {
return;
}
int localPort;
String remoteKey;
String remoteValue;
if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) {
// ---- for service consumer ----
localPort = 0;
remoteKey = MonitorService.PROVIDER;
remoteValue = invoker.getUrl().getAddress();
} else {
// ---- for service provider ----
localPort = invoker.getUrl().getPort();
remoteKey = MonitorService.CONSUMER;
remoteValue = remoteHost;
}
String input = "", output = "";
if (invocation.getAttachment(Constants.INPUT_KEY) != null) {
input = invocation.getAttachment(Constants.INPUT_KEY);
}
if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) {
output = result.getAttachment(Constants.OUTPUT_KEY);
}
// 将统计数据构造成url
monitor.collect(new URL(Constants.COUNT_PROTOCOL,
NetUtils.getLocalHost(), localPort,
service + "/" + method,
MonitorService.APPLICATION, application,
MonitorService.INTERFACE, service,
MonitorService.METHOD, method,
remoteKey, remoteValue,
error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", // 成功失败数
MonitorService.ELAPSED, String.valueOf(elapsed), // 调用消耗的时间
MonitorService.CONCURRENT, String.valueOf(concurrent), // 并发数
Constants.INPUT_KEY, input,
Constants.OUTPUT_KEY, output,
Constants.GROUP_KEY, group,
Constants.VERSION_KEY, version));
} catch (Throwable t) {
logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
// concurrent counter
private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) {
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
AtomicInteger concurrent = concurrents.get(key);
if (concurrent == null) {
concurrents.putIfAbsent(key, new AtomicInteger());
concurrent = concurrents.get(key);
}
return concurrent;
}
}
/**
* LimitInvokerFilter
* 限制同一个客户端对于一个服务端方法的并发调用量
*/
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 主要记录每台机器针对某个方法的并发数量
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
if (active >= max) {
synchronized (count) {
// 这个while循环是必要的,因为在一次wait结束后,可能线程调用已经结束了,腾出来consumer的空间
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
// 如果wait方法被中断的话,remain这时候有可能大于0
// 如果其中一个线程运行结束自后调用notify方法的话,也有可能remain大于0
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
// 调用开始和结束后增减并发数量
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if (max > 0) {
// 这里很关键,因为一个调用完成后要通知正在等待执行的队列
synchronized (count) {
count.notify();
}
}
}
}
}
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// 这里主要处理回调逻辑,主要区分三个时间:oninvoke:调用前触发,onreturn:调用后触发 onthrow:出现异常情况时候触发
// oninvoke方法的处理
fireInvokeCallback(invoker, invocation);
Result result = invoker.invoke(invocation);
if (isAsync) {
// 异步回调oninvoke和onthrow
asyncCallback(invoker, invocation);
} else {
// oninvoke和onthrow的处理
syncCallback(invoker, invocation, result);
}
return result;
}
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
future.setCallback(new ResponseCallback() {
@Override
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
// 异常情况触发onthrow
fireThrowCallback(invoker, invocation, result.getException());
} else {
// 否则触发onreturn
fireReturnCallback(invoker, invocation, result.getValue());
}
}
@Override
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
//由于JDK的安全检查耗时较多.所以通过setAccessible(true)的方式关闭安全检查就可以达到提升反射速度的目的
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
// onInvokeMethod 即为java的一个Method对象,代表Notify的onInvoke方法
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
// Notify对象
final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
// 没有设置直接返回
if (onReturnMethod == null && onReturnInst == null) {
return;
}
if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onReturnMethod.isAccessible()) {
onReturnMethod.setAccessible(true);
}
Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
// 如果Notify方法的参数有多个
if (rParaTypes.length > 1) {
// 有两个参数,且第二个参数为Object的数组
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
// 构造两个参数的数组,一个为结果result,一个为请求入参
params = new Object[2];
params[0] = result;
params[1] = args;
} else {
// 这种情况,假设Notify有3个参数,如果本来方法入参有2个
// 那么第一个为结果,后面为入参,如果入参只有1个,那么会导致异常,因为参数不匹配
params = new Object[args.length + 1];
params[0] = result;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
params = new Object[]{result};
}
try {
// 反射调用
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
// 异常情况调用onthrow配置的方法
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
//onthrow callback not configured
if (onthrowMethod == null && onthrowInst == null) {
return;
}
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
try {
// 因为onthrow方法的参数第一个值必须为异常信息,所以这里需要构造参数列表
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
params[1] = args;
} else {
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
params = new Object[]{exception};
}
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
}
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {
private CacheFactory cacheFactory;
public void setCacheFactory(CacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
// 根据url获取缓存对象,
// dubbo://127.0.0.1:20880/com.alibaba.dubbo.demo.DemoService?accesslog=./test.log&actives=100&anyhost=true&application=demo-consumer&bean.name=com.alibaba.dubbo.demo.DemoService&cache=true&check=false&deprecated=true&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=leastactive&methods=sayHello,$invoke&pid=10652&qos.port=33333®ister.ip=172.25.104.193&remote.timestamp=1564834317960&side=consumer×tamp=1564834410981&token=697d545f-9f16-4501-80d5-a85c9f34fc67
Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation);
if (cache != null) {
// 拼接请求参数,按照“,”分割
String key = StringUtils.toArgumentString(invocation.getArguments());
Object value = cache.get(key);
// 缓存命中,直接构造RpcResult对象返回
if (value != null) {
return new RpcResult(value);
}
Result result = invoker.invoke(invocation);
if (!result.hasException() && result.getValue() != null) {
// 将结果放到缓存中
cache.put(key, result.getValue());
}
return result;
}
}
return invoker.invoke(invocation);
}
/**
* EchoInvokerFilter
* 检测服务是否正常(网络状态)
*/
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
// 单纯的检测网络情况的话其实不需要执行真正的业务逻辑的,所以通过Filter验证一下即可
return new RpcResult(inv.getArguments()[0]);
return invoker.invoke(inv);
}
}
@Activate(group = Constants.PROVIDER, order = -30000)
public class ClassLoaderFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获取当前线程的contextClassLoader
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
// 将其ContextClassLoader设置为invoker.getInterface().getClassLoader()
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
// 将当前线程的classLoader重置为原来的contextClassLoader
Thread.currentThread().setContextClassLoader(ocl);
}
}
}
/**
* ContextInvokerFilter
* 将RpcInvocation对象重新设置回RpcContext中供服务端逻辑重新获取隐式参数
*/
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY); // 清空服务端异步设置
// Remove async property to avoid being passed to the following invoke chain.
}
// 重新将invocation和attachments信息设置到RpcContext,这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
RpcResult result = (RpcResult) invoker.invoke(invocation);
// pass attachments to result
result.addAttachments(RpcContext.getServerContext().getAttachments());
return result;
} finally {
RpcContext.removeContext();
RpcContext.getServerContext().clearAttachments();
}
}
}
/**
* Log any invocation timeout, but don't stop server from running
*/
@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获取调用开始时间
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
// 获取调用所花时间:结束时间 - 开始时间
long elapsed = System.currentTimeMillis() - start;
if (invoker.getUrl() != null // 获取url中配置的timeout属性值
&& elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
"timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName()
+ " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
+ invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
return result;
}
}
Dubbo 对于异常的处理规则
如果是checked异常则直接抛出 如果是unchecked异常但是在接口上有声明,也会直接抛出 如果异常类和接口类在同一jar包里,直接抛出 如果是JDK自带的异常,直接抛出 如果是Dubbo的异常,直接抛出其余的都包装成RuntimeException然后抛出(避免异常在Client出不能反序列化问题)
@Activate(group = Constants.PROVIDER)
public class ExceptionFilter implements Filter {
private final Logger logger;
public ExceptionFilter() {
this(LoggerFactory.getLogger(ExceptionFilter.class));
}
public ExceptionFilter(Logger logger) {
this.logger = logger;
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
Result result = invoker.invoke(invocation);
if (result.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = result.getException();
// directly throw if it's checked exception
// 如果是checked异常,直接抛出
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return result;
}
// directly throw if the exception appears in the signature
// 运行时异常,并且在方法签名上有声明,直接抛出
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return result;
}
}
} catch (NoSuchMethodException e) {
return result;
}
// for the exception not found in method's signature, print ERROR message in server's log.
// 未在方法签名上定义的异常,在服务器端打印ERROR日志
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// directly throw if exception class and interface class are in the same jar file.
// 异常类和接口类在同一jar包里,直接抛出
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return result;
}
// directly throw if it's JDK exception
// 是JDK自带的异常,直接抛出
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return result;
}
// directly throw if it's dubbo exception
// 是Dubbo本身的异常,直接抛出
if (exception instanceof RpcException) {
return result;
}
// otherwise, wrap with RuntimeException and throw back to the client
// 否则,包装成RuntimeException抛给客户端
return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
return result;
}
}
return result;
} catch (RuntimeException e) {
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
throw e;
}
}
}
服务提供方进行并发控制配置方式
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" group="dev" version="1.0.0" timeout="3000" executes="10" />
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" group="dev" version="1.0.0" timeout="3000" >
<dubbo:method name="sayHello" executes="10"/>
</dubbo:service>
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
// 如果该接口/方法设置了executes并且值大于0
if (max > 0) {
// 取出该接口/方法对应的计数器
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
* 通过信号量来做并发控制(即限制能使用的线程数量)
*/
executesLimit = count.getSemaphore(max);
// 可知如果并发处理数量大于设置的值,则直接会抛出异常
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
// 计数器+1
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
// 在finally中进行计数器-1
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if(acquireResult) {
executesLimit.release();
}
}
}
}