dubbo服务调用过程 - litter-fish/ReadSource GitHub Wiki

Dubbo 服务调用过程

远程调用请求的发送与接收过程

发送方处理

  1. 首先服务消费者通过代理对象 Proxy 发起远程调用,
  2. 接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。

接收方处理

  1. 首先要做的事情是对数据包进行解码。
  2. 然后将解码后的请求发送至分发器 Dispatcher,
  3. 再由分发器将请求派发到指定的线程池上,
  4. 最后由线程池调用具体的服务

Dubbo 数据包结构

格式说明:

偏移量(Bit)         字段                  取值
0 ~ 7           魔数高位             0xda00
8 ~ 15          魔数低位             0xbb
  16            数据包类型            0 - Response,
                                    1 - Request

  17            调用方式             仅在第16位被设为1的情况下有效,
                                    0 - 单向调用,1 - 双向调用

  18            事件标识             0 - 当前数据包是请求或响应包,
                                    1 - 当前数据包是心跳包

19 ~ 23         序列化器编号          2 - Hessian2Serialization
                                    3 - JavaSerialization
                                    4 - CompactedJavaSerialization
                                    6 - FastJsonSerialization
                                    7 - NativeJavaSerialization
                                    8 - KryoSerialization
                                    9 - FstSerialization

24 ~ 31         状态                 20 - OK
                                    30 - CLIENT_TIMEOUT
                                    31 - SERVER_TIMEOUT
                                    40 - BAD_REQUEST
                                    50 - BAD_RESPONSE
                                    ......

32 ~ 95         请求编号             共8字节,运行时生成
96 ~ 127        消息体长度           运行时计算

发送请求

调用路径

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
            —> ListenerInvokerWrapper#invoke(Invocation)
              —> AbstractInvoker#invoke(Invocation)
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

Dubbo 默认使用 Javassist 框架为服务接口生成动态代理类,使用阿里开源 Java 应用诊断工具 Arthas 反编译代理类,结果如下:

/**
 * Arthas 反编译步骤:
 * 1. 启动 Arthas
 *    java -jar arthas-boot.jar
 *
 * 2. 输入编号选择进程
 *    Arthas 启动后,会打印 Java 应用进程列表,如下:
 *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
 *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
 *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
 *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
 *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
 * 这里输入编号 3,让 Arthas 关联到启动类为 com.....Consumer 的 Java 进程上
 *
 * 3. 由于 Demo 项目中只有一个服务接口,因此此接口的代理类类名为 proxy0,此时使用 sc 命令搜索这个类名。
 *    $ sc *.proxy0
 *    com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 4. 使用 jad 命令反编译 com.alibaba.dubbo.common.bytecode.proxy0
 *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
 *
 * 更多使用方法请参考 Arthas 官方文档:
 *   https://alibaba.github.io/arthas/quick-start.html
 */
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法数组
    public static Method[] methods;
    // InvokerInvocationHandler
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }

    public proxy0() {
    }

    public String sayHello(String string) {
        // 将参数存储到 Object 数组中
        Object[] arrobject = new Object[]{string};
        // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回调用结果
        return (String)object;
    }

    /** 回声测试方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }
}

调用 InvocationHandler 实现类的 invoke 方法得到调用结果

// InvokerInvocationHandler
public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用
    if ("toString".equals(methodName)
            && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName)
            && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName)
            && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用
    return invoker
                .invoke(new RpcInvocation(method, args))
                .recreate();
}

invoker结构???

类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑

// MockClusterInvoker
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    // 获取 mock 配置值
    String value =
            directory
                .getUrl()
                .getMethodParameter(
                    invocation.getMethodName(),
                    Constants.MOCK_KEY,
                    Boolean.FALSE.toString())
                .trim();
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法,
        // 比如 FailoverClusterInvoker
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        // force:xxx 直接执行 mock 逻辑,不发起远程调用
        result = doMockInvoke(invocation, null);
    } else {
        // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常
        try {
            // 调用其他 Invoker 对象的 invoke 方法
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            } else {
                // 调用失败,执行 mock 逻辑
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}

直接调用其他 Invoker 对象的 invoke 方法

// com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;

    // 绑定 attachments 到 invocation
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 获取符合条件的所有 invoker
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 调用集群容错处理逻辑
    return doInvoke(invocation, invokers, loadbalance);
}

调用集群容错处理逻辑

public Result doInvoke(Invocation invocation,
        final List<Invoker<T>> invokers, LoadBalance loadbalance)
            throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    // 检查invokers
    checkInvokers(copyinvokers, invocation);
    // 获取失败是重新尝试调用的次数
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        // 重新尝试调用之前,需要重新获取invoker列表,如果invoker发生了变更
        if (i > 0) {
            checkWhetherDestroyed();
            copyinvokers = list(invocation);
            // check again
            checkInvokers(copyinvokers, invocation);
        }
        // 通过负载均衡算法选择一个Invoker
        Invoker<T> invoker = select(loadbalance,
            invocation, copyinvokers, invoked);
        // 加入一般调用Invoker列表中,下次重新选择Invoker的时候,如果选择了需要重新再次选
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException("......");
}

拦截器的调用

// com/alibaba/dubbo/rpc/protocol/ProtocolFilterWrapper.java
private static <T> Invoker<T> buildInvokerChain(
        final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                // 省略其他方法
                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }
               // 省略其他方法
            };
        }
    }
    return last;
}

依次调用拦截器链中的每个拦截器,

调用监听器的invoke方法

// com/alibaba/dubbo/rpc/listener/ListenerInvokerWrapper.java
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

经过前面集群容错及负载均衡的处理,接着将会调用具体的invoke

public Result invoke(Invocation inv) throws RpcException {
    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
    if (destroyed.get()) {
        logger.warn("");
    }

    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        // 为何处理???
        invocation.addAttachments(contextAttachments);
    }
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception
        //
    } catch (RpcException e) {
        //
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}

首先处理attachments,为何要再次处理????, 接着调用子类的doInvoke方法

执行invoker调用

AbstractInvoker 类,Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法

// DubboInvoker
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // 设置 path 和 version 到 attachment 中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        // 从 clients 数组中获取 ExchangeClient
        currentClient = clients[0];
    } else {
        currentClient =
            clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 获取异步配置
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // isOneway 为 true,表示“单向”通信
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout =
                getUrl()
                .getMethodParameter(
                        methodName,
                        Constants.TIMEOUT_KEY,
                        Constants.DEFAULT_TIMEOUT);

        // 异步无返回值
        if (isOneway) {
            boolean isSent =
                    getUrl()
                    .getMethodParameter(
                        methodName,
                        Constants.SENT_KEY,
                        false);
            // 发送请求
            currentClient.send(inv, isSent);
            // 设置上下文中的 future 字段为 null
            RpcContext.getContext().setFuture(null);
            // 返回一个空的 RpcResult
            return new RpcResult();
        }

        // 异步有返回值
        else if (isAsync) {
            // 发送请求,并得到一个 ResponseFuture 实例
            ResponseFuture future =
                currentClient.request(inv, timeout);
            // 设置 future 到上下文中
            RpcContext
                .getContext()
                .setFuture(new FutureAdapter<Object>(future));
            // 暂时返回一个空结果
            return new RpcResult();
        }

        // 同步调用
        else {
            RpcContext.getContext().setFuture(null);
            // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
            return (Result) currentClient
                .request(inv, timeout)
                .get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(..., "Invoke remote method timeout....");
    } catch (RemotingException e) {
        throw new RpcException(..., "Failed to invoke remote method: ...");
    }
}

ReferenceCountExchangeClient 源码

/**
 * 引用计数变量 referenceCount作用???
 */
final class ReferenceCountExchangeClient implements ExchangeClient {
    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        // 引用计数自增
        referenceCount.incrementAndGet();
        this.url = client.getUrl();
        // ...
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接调用被装饰对象的同签名方法
        return client.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接调用被装饰对象的同签名方法
        return client.request(request, timeout);
    }

    /** 引用计数自增,该方法由外部调用 */
    public void incrementAndGetCount() {
        // referenceCount 自增
        referenceCount.incrementAndGet();
    }
    @Override
    public void close(int timeout) {
        // referenceCount 自减
        if (referenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
                client.close();
            } else {
                client.close(timeout);
            }
            client = replaceWithLazyClient();
        }
    }
    // 省略部分方法
}

HeaderExchangeClient :封装了一些关于心跳检测的逻辑

public class HeaderExchangeClient implements ExchangeClient {
    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    private final Client client;
    private final ExchangeChannel channel;
    private ScheduledFuture<?> heartbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        // 创建 HeaderExchangeChannel 对象
        this.channel = new HeaderExchangeChannel(client);
        // 以下代码均与心跳检测逻辑有关
        String dubbo =
            client
                .getUrl()
                .getParameter(Constants.DUBBO_VERSION_KEY);

        this.heartbeat =
            client
                .getUrl()
                .getParameter(
                    Constants.HEARTBEAT_KEY,
                    dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);

        this.heartbeatTimeout =
            client
                .getUrl()
                .getParameter(
                    Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        if (needHeartbeat) {
            // 开启心跳检测定时器
            startHeartbeatTimer();
        }
    }

    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        // 直接 HeaderExchangeChannel 对象的同签名方法
        return channel.request(request);
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接 HeaderExchangeChannel 对象的同签名方法
        return channel.request(request, timeout);
    }

    @Override
    public void close() {
        doClose();
        channel.close();
    }

    private void doClose() {
        // 停止心跳检测定时器
        stopHeartbeatTimer();
    }

    private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        @Override
                        public Collection<Channel> getChannels() {
                            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

    private void stopHeartbeatTimer() {
        if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
            try {
                heartbeatTimer.cancel(true);
                scheduled.purge();
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        heartbeatTimer = null;
    }
    // 省略部分方法
}

HeaderExchangeChannel

final class HeaderExchangeChannel implements ExchangeChannel {
    private final Channel channel;
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        // 这里的 channel 指向的是 NettyClient
        this.channel = channel;
    }
    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(
            request,
            channel
                .getUrl()
                .getPositiveParameter(
                    Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...);
        }
        // 创建 Request 对象
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        // 设置双向通信标志为 true
        req.setTwoWay(true);
        // 这里的 request 变量类型为 RpcInvocation
        req.setData(request);
        // 创建 DefaultFuture 对象
        DefaultFuture future =
            new DefaultFuture(channel, req, timeout);
        try {
            // 调用 NettyClient 的 send 方法发送请求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        // 返回 DefaultFuture 对象
        return future;
    }
}

发送请求

// AbstractPeer
public void send(Object message) throws RemotingException {
    // 该方法由 AbstractClient 类实现
    send(message, url.getParameter(Constants.SENT_KEY, false));
}

public abstract class AbstractClient extends
    AbstractEndpoint implements Client {
    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        }
        // 继续向下调用
        channel.send(message, sent);
    }
    protected abstract Channel getChannel();
    // 省略其他方法
}

public class NettyClient extends AbstractClient {
    // 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 获取一个 NettyChannel 类型对象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap =
        new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

    private final org.jboss.netty.channel.Channel channel;
    /** 私有构造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(
                                    org.jboss.netty.channel.Channel ch,
                                    URL url,
                                    ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        // 尝试从集合中获取 NettyChannel 实例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,则创建一个新的 NettyChannel 实例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 将 <Channel, NettyChannel> 键值对存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            }
            if (ret == null) {
                ret = nc;
            }
        }
        return ret;
    }

    public void send(Object message, boolean sent)
            throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // 发送消息(包含请求和响应消息)
            ChannelFuture future = channel.write(message);
            // sent 的值源于 <dubbo:method sent="true/false" /> 中 sent 的配置值,有两种配置值:
            //   1. true: 等待消息发出,消息发送失败将抛出异常
            //   2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回
            // 默认情况下 sent = false;
            if (sent) {
                timeout =
                    getUrl()
                        .getPositiveParameter(
                            Constants.TIMEOUT_KEY,
                            Constants.DEFAULT_TIMEOUT);
                // 等待消息发出,若在规定时间没能发出,success 会被置为 false
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message ...");
        }

        // 若 success 为 false,这里抛出异常
        if (!success) {
            throw new RemotingException(this, "Failed to send message ...");
        }
    }
}

通过ResponseFuture获取返回调用的结果

ResponseFuture 接口的默认实现DefaultFuture

public class DefaultFuture implements ResponseFuture {

    private static final Map<Long, Channel> CHANNELS =
            new ConcurrentHashMap<Long, Channel>();

    private static final Map<Long, DefaultFuture> FUTURES =
            new ConcurrentHashMap<Long, DefaultFuture>();
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;

    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 获取请求 id,这个 id 很重要,后面还会见到
        this.id = request.getId();
        this.timeout =
            timeout > 0
            ? timeout
            : channel
                .getUrl()
                .getPositiveParameter(
                    Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存储 <requestId, DefaultFuture> 映射关系到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }

        // 检测服务提供方是否成功返回了调用结果
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 循环检测服务提供方是否成功返回了调用结果
                while (!isDone()) {
                    // 如果调用结果尚未返回,这里等待一段时间
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果调用结果成功返回,或等待超时,此时跳出 while 循环,执行后续的逻辑
                    if (isDone() ||
                        System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }

            // 如果调用结果仍未返回,则抛出超时异常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }

        // 返回调用结果
        return returnFromResponse();
    }

    @Override
    public boolean isDone() {
        // 通过检测 response 字段为空与否,判断是否收到了调用结果
        return response != null;
    }
    private Object returnFromResponse()
            throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }

        // 如果调用结果的状态为 Response.OK,则表示调用过程正常,服务提供方成功返回了调用结果
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }

        // 抛出异常
        if (res.getStatus() == Response.CLIENT_TIMEOUT ||
                res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(
                res.getStatus() == Response.SERVER_TIMEOUT,
                channel,
                res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
    // 省略其他方法
}

请求编码

消息的一些常量

// ExchangeCodec
// 消息头长度
protected static final int HEADER_LENGTH = 16;
// 魔数内容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;

根据消息类型对消息进行编码处理

public void encode(Channel channel, ChannelBuffer buffer, Object msg)
        throws IOException {
    if (msg instanceof Request) {
        // 对 Request 对象进行编码
        encodeRequest(channel, buffer, (Request) msg);
    } else if (msg instanceof Response) {
        // 对 Response 对象进行编码,后面分析
        encodeResponse(channel, buffer, (Response) msg);
    } else {
        super.encode(channel, buffer, msg);
    }
}

对 Request 对象进行编码

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel);

    // 创建消息头字节数组,长度为 16
    byte[] header = new byte[HEADER_LENGTH];

    // 设置魔数
    Bytes.short2bytes(MAGIC, header);

    // 设置数据包类型(Request/Response)和序列化器编号
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    // 设置通信方式(单向/双向)
    if (req.isTwoWay()) {
        header[2] |= FLAG_TWOWAY;
    }

    // 设置事件标识
    if (req.isEvent()) {
        header[2] |= FLAG_EVENT;
    }

    // 设置请求编号,8个字节,从第4个字节开始设置
    Bytes.long2bytes(req.getId(), header, 4);

    // 获取 buffer 当前的写位置
    int savedWriteIndex = buffer.writerIndex();
    // 更新 writerIndex,为消息头预留 16 个字节的空间
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    // 创建序列化器,比如 Hessian2ObjectOutput
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        // 对事件数据进行序列化操作
        encodeEventData(channel, out, req.getData());
    } else {
        // 对请求数据进行序列化操作
        encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();

    // 获取写入的字节数,也就是消息体长度
    int len = bos.writtenBytes();
    checkPayload(channel, len);

    // 将消息体长度写入到消息头中
    Bytes.int2bytes(len, header, 12);

    // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
    buffer.writerIndex(savedWriteIndex);
    // 从 savedWriteIndex 下标处写入消息头
    buffer.writeBytes(header);
    // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

Request 对象的 data 字段序列化过程

// DubboCodec
protected void encodeRequestData(Channel channel, ObjectOutput out,
        Object data, String version) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;

    // 依次序列化 dubbo version、path、version
    out.writeUTF(version);
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

    // 序列化调用方法名
    out.writeUTF(inv.getMethodName());
    // 将参数类型转换为字符串,并进行序列化
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
    Object[] args = inv.getArguments();
    if (args != null)
        for (int i = 0; i < args.length; i++) {
            // 对运行时参数进行序列化
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }

    // 序列化 attachments
    out.writeObject(inv.getAttachments());
}

经过上述对请求的编码处理后,请求经过TCP长连接进行发送,在服务端接收到请求后,netty服务器会先对请求进行解码操作,然后交给服务器进行方法调用。

总结请求的过程

  1. 请求首先经过降级服务的处理,
  2. 其次根据集群容错方案,选择对应的集群容错处理模式,
  3. 然后根据负载均衡配置,选择一个合适的服务,
  4. 接着经过一系列的Filter及Listener,
  5. 紧接着经过netty对请求进行编码处理,
  6. 最后通过netty客户端发送请求给服务器

数据解码过程

读取请求头数据

// ExchangeCodec
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    // 创建消息头字节数组
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    // 读取消息头数据
    buffer.readBytes(header);
    // 调用重载方法进行后续解码工作
    return decode(channel, buffer, readable, header);
}

根据请求头做一些判断,检测

// ExchangeCodec
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // 检查魔数是否相等
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        // 通过 telnet 命令行发送的数据包不包含消息头,所以这里
        // 调用 TelnetCodec 的 decode 方法对数据包进行解码
        return super.decode(channel, buffer, readable, header);
    }

    // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // 从消息头中获取消息体长度
    int len = Bytes.bytes2int(header, 12);
    // 检测消息体长度是否超出限制,超出则抛出异常
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    // 检测可读的字节数是否小于实际的字节数
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        // 继续进行解码工作
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

读取请求体数据

// DubboCodec
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    // 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
    // 获取调用编号
    long id = Bytes.bytes2long(header, 4);
    // 通过逻辑与运算得到调用类型,0 - Response,1 - Request
    if ((flag & FLAG_REQUEST) == 0) {
        // 对响应结果进行解码,得到 Response 对象。这个非本节内容,后面再分析
        // ...
    } else {
        // 创建 Request 对象
        Request req = new Request(id);
        req.setVersion(Version.getProtocolVersion());
        // 通过逻辑与运算得到通信方式,并设置到 Request 对象中
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);

        // 通过位运算检测数据包是否为事件类型
        if ((flag & FLAG_EVENT) != 0) {
            // 设置心跳事件到 Request 对象中
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            if (req.isHeartbeat()) {
                // 对心跳包进行解码,该方法已被标注为废弃
                data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
            } else if (req.isEvent()) {
                // 对事件数据进行解码
                data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
            } else {
                DecodeableRpcInvocation inv;
                // 根据 url 参数判断是否在 IO 线程上对消息体进行解码
                if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    // 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
                    // 调用方法名、attachment、以及调用参数解析出来
                    inv.decode();
                } else {
                    // 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
                    inv = new DecodeableRpcInvocation(channel, req,
                            new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            // 设置 data 到 Request 对象中
            req.setData(data);
        } catch (Throwable t) {
            // 若解码过程中出现异常,则将 broken 字段设为 true,
            // 并将异常对象设置到 Reqeust 对象中
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}

decodeBody 对部分字段进行了解码,并将解码得到的字段封装到 Request 中

反序列化将诸如 path、version、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

// DecodeableRpcInvocation
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);

    // 通过反序列化得到 dubbo version,并保存到 attachments 变量中
    String dubboVersion = in.readUTF();
    request.setVersion(dubboVersion);
    setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

    // 通过反序列化得到 path,version,并保存到 attachments 变量中
    setAttachment(Constants.PATH_KEY, in.readUTF());
    setAttachment(Constants.VERSION_KEY, in.readUTF());

    // 通过反序列化得到调用方法名
    setMethodName(in.readUTF());
    try {
        Object[] args;
        Class<?>[] pts;
        // 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
        String desc = in.readUTF();
        if (desc.length() == 0) {
            pts = DubboCodec.EMPTY_CLASS_ARRAY;
            args = DubboCodec.EMPTY_OBJECT_ARRAY;
        } else {
            // 将 desc 解析为参数类型数组
            pts = ReflectUtils.desc2classArray(desc);
            args = new Object[pts.length];
            for (int i = 0; i < args.length; i++) {
                try {
                    // 解析运行时参数
                    args[i] = in.readObject(pts[i]);
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        // 设置参数类型数组
        setParameterTypes(pts);

        // 通过反序列化得到原 attachment 的内容
        Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
        if (map != null && map.size() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            // 将 map 与当前对象中的 attachment 集合进行融合
            attachment.putAll(map);
            setAttachments(attachment);
        }
        // 对 callback 类型的参数进行处理
        for (int i = 0; i < args.length; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }

        // 设置参数列表
        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}

经过上面对请求的解码操作,接着会将请求交给NettyHandler处理

调用服务

调用栈

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
    —> AbstractPeer#received(Channel, Object)
        —> MultiMessageHandler#received(Channel, Object)
            —> HeartbeatHandler#received(Channel, Object)
                —> AllChannelHandler#received(Channel, Object)
                    —> ExecutorService#execute(Runnable)

// 由线程池执行后续的调用逻辑
ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)

首先根据一些信息获取 NettyChannel 实例,然后将 NettyChannel 实例以及 Request 对象向下传递

// NettyHandler
public void messageReceived(
        ChannelHandlerContext ctx,
        MessageEvent e) throws Exception {
    // 获取 NettyChannel
    NettyChannel channel =
            NettyChannel
                .getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        // 继续向下调用
        handler.received(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

请求交个NettyHandler的messageReceived处理后,会先获取 NettyChannel, 然后会根据配置的线程派发模型进行下一步。

Dubbo 中的线程派发模型

Dispatcher 就是线程派发器,Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。 Dubbo 支持 5 种不同的线程派发策略

    策略                          用途
all         所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct      所有消息都不派发到线程池,全部在 IO 线程上直接执行
message     只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行
execution   只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行
connection  在 IO 线程上,将连接断开事件放入队列,有序逐个执行,
            其它消息派发到线程池

默认配置下,Dubbo 使用 all 派发策略,即将所有的消息都派发到线程池中

// AllChannelHandler
/**
 * 处理请求和响应消息,这里的 message 变量类型可能是 Request,
 * 也可能是 Response
 */
@Override
public void received(Channel channel, Object message)
        throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 将请求和响应消息派发到线程池中处理
        cexecutor.execute(
            new ChannelEventRunnable(
                channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        if(
            message instanceof Request
            && t instanceof RejectedExecutionException){
            Request request = (Request)message;
            // 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted
            // 错误信息封装到 Response 中,并返回给服务消费方。
            if(request.isTwoWay()){
                String msg = "Server side(" + url.getIp() + "," + url.getPort()
                    - ") threadpool is exhausted ,detail msg:" + t.getMessage();
                Response response =
                    new Response(
                        request.getId(), request.getVersion());
                response
                    .setStatus(
                        Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                response.setErrorMessage(msg);
                // 返回包含错误信息的 Response 对象
                channel.send(response);
                return;
            }
        }
        throw new ExecutionException(..., " error when process received event .", t);
    }
}

上述逻辑主要是先根据请求创建一个ChannelEventRunnable线程,然后将该线程加入到线程池中。

线程对请求任务的处理

// ChannelEventRunnable
public void run() {
    // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED
    if (state == ChannelState.RECEIVED) {
        try {
            // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("... operation error, channel is ... message is ...");
        }
    }
    // 省略一大堆代码

}

派发到线程池中的请求进行解码操作

DecodeHandler 解码处理: 请求解码可在 IO 线程上执行,也可在线程池中执行,这个取决于运行时配置。DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {
        super(handler);
    }

    @Override
    public void received(Channel channel, Object message)
            throws RemotingException {
        if (message instanceof Decodeable) {
            // 对 Decodeable 接口实现类对象进行解码
            decode(message);
        }

        if (message instanceof Request) {
            // 对 Request 的 data 字段进行解码
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            // 对 Request 的 result 字段进行解码
            decode(((Response) message).getResult());
        }

        // 执行后续逻辑
        handler.received(channel, message);
    }

    private void decode(Object message) {
        // Decodeable 接口目前有两个实现类,
        // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 执行解码逻辑
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
                }
            }
        }
    }
}

根据请求的类型及通信模式分别处理

// HeaderExchangeHandler
public void received(Channel channel, Object message)
        throws RemotingException {
    channel.setAttribute(
        KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel =
        HeaderExchangeChannel
            .getOrAddChannel(channel);
    try {
        // 处理请求对象
        if (message instanceof Request) {
            Request request = (Request) message;
            if (request.isEvent()) {
                // 处理事件
                handlerEvent(channel, request);
            }
            // 处理普通的请求
            else {
                // 双向通信
                /**
                 * 对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。
                 * 然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。
                 * 如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。
                 */
                if (request.isTwoWay()) {
                    // 向后调用服务,并得到调用结果
                    Response response =
                        handleRequest(exchangeChannel, request);
                    // 将调用结果返回给服务消费端
                    channel.send(response);
                }
                // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
                else {
                    handler
                        .received(
                            exchangeChannel, request.getData());
                }
            }
        }
        // 处理响应对象,服务消费方会执行此处逻辑,后面分析
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

双向通信

Response handleRequest(ExchangeChannel channel, Request req)
        throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion());
    // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
    if (req.isBroken()) {
        Object data = req.getData();

        String msg;
        if (data == null)
            msg = null;
        else if
            (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
        else
            msg = data.toString();
        res.setErrorMessage("Fail to decode request due to: " + msg);
        // 设置 BAD_REQUEST 状态
        res.setStatus(Response.BAD_REQUEST);

        return res;
    }

    // 获取 data 字段值,也就是 RpcInvocation 对象
    Object msg = req.getData();
    try {
        // 继续向下调用
        Object result = handler.reply(channel, msg);
        // 设置 OK 状态码
        res.setStatus(Response.OK);
        // 设置调用结果
        res.setResult(result);
    } catch (Throwable e) {
        // 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

handler为 DubboProtocol 类中的匿名类的实例

public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";

    /**
     * 获取与指定服务对应的 Invoker 实例,
     * 并通过 Invoker 的 invoke 方法调用服务逻辑。
     */
    private ExchangeHandler requestHandler =
            new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message)
                throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 获取 Invoker 实例
                Invoker<?> invoker = getInvoker(channel, inv);
                if (Boolean.TRUE.toString()
                        .equals(
                            inv.getAttachments()
                                .get(IS_CALLBACK_SERVICE_INVOKE))) {
                    // 回调相关,忽略
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 通过 Invoker 调用具体的服务
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: ...");
        }
        // 忽略其他方法
    }
    Invoker<?> getInvoker(Channel channel, Invocation inv)
            throws RemotingException {
        // 忽略回调和本地存根相关逻辑
        // ...
        int port = channel.getLocalAddress().getPort();
        // 计算 service key,
        // 格式为 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey =
            serviceKey(
                port,
                path,
                inv
                    .getAttachments()
                    .get(Constants.VERSION_KEY),
                inv.getAttachments().get(Constants.GROUP_KEY));

        // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
        // 服务导出过程中会将 <serviceKey, DubboExporter> 映射关系存储到 exporterMap 集合中
        DubboExporter<?> exporter =
            (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service ...");

        // 获取 Invoker 对象,并返回
        return exporter.getInvoker();
    }
    // 忽略其他方法
}

通过 Invoker 调用具体的服务,AbstractProxyInvoker 中调用invoke

// AbstractProxyInvoker
public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 调用 doInvoke 执行后续的调用,并将调用结果封装到 RpcResult 中,并
        return
            new RpcResult(
                doInvoke(
                    proxy,
                    invocation.getMethodName(),
                    invocation.getParameterTypes(),
                    invocation.getArguments()));
    } catch (InvocationTargetException e) {
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method ...");
    }
}

经过前面的准备处理,此时的AbstractProxyInvoker既是服务在暴露时缓存的Invoker对象

Invoker 实例是在运行时通过 JavassistProxyFactory 创建, 创建逻辑

// JavassistProxyFactory
protected Object doInvoke(
                        T proxy,
                        String methodName,
                        Class<?>[] parameterTypes,
                        Object[] arguments)
                            throws Throwable {
    // 调用 invokeMethod 方法进行后续的调用
    return
        wrapper
            .invokeMethod(
                proxy,
                methodName,
                parameterTypes,
                arguments);
}

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。 Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类, 并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。 以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。

/** Wrapper0 是在运行时生成的,大家可使用 Arthas 进行反编译 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 类型转换
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根据方法名调用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

经过上面的逻辑处理即可调用我们自定义服务的指定方法了,

调用服务总结

  1. 首先netty框架将请求流通过解码器进行解码,得到Request对象,
  2. 接着数据经过netty一系列的handler处理,
  3. 根据派发模式将经过解码的请求放入连接池中,
  4. 线程获取到的请求任务,首先会先对报文体进行解码操作,第一步中对于线程池处理的模式,并不会对报文体进行解码处理,
  5. 得到请求对象后,会根据请求调用模式分别进行不同的处理
  6. 在进行服务暴露过程中DubboProtocol协议会初始化一个ExchangeHandlerAdapter对象,对于双向通行在这一步就会调用这个对象中的reply方法,
  7. 接着获取服务暴露后缓存的Invoker对象
  8. 最终会根据方法名、参数列表找到Javassist生成具体服务的代理类的指定方法,执行并返回结果。

服务提供方返回调用结果

获取请求结果后返回给服务消费端

// HeaderExchangeHandler
public void received(Channel channel, Object message)
        throws RemotingException {
        // 省略其他代码
        if (request.isTwoWay()) {
            // 向后调用服务,并得到调用结果
            Response response =
                handleRequest(exchangeChannel, request);
            // 将调用结果返回给服务消费端
            channel.send(response);
        }
               // 省略其他代码
        // 处理响应对象,服务消费方会执行此处逻辑,后面分析
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

重点代码channel.send(response);

对响应对象进行编码

public class ExchangeCodec extends TelnetCodec {
    public void encode(
        Channel channel, ChannelBuffer buffer, Object msg)
            throws IOException {
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 对响应对象进行编码
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

    protected void encodeResponse(
        Channel channel, ChannelBuffer buffer, Response res)
            throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            Serialization serialization = getSerialization(channel);
            // 创建消息头字节数组
            byte[] header = new byte[HEADER_LENGTH];
            // 设置魔数
            Bytes.short2bytes(MAGIC, header);
            // 设置序列化器编号
            header[2] = serialization.getContentTypeId();
            if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
            // 获取响应状态
            byte status = res.getStatus();
            // 设置响应状态
            header[3] = status;
            // 设置请求编号
            Bytes.long2bytes(res.getId(), header, 4);

            // 更新 writerIndex,为消息头预留 16 个字节的空间
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out =
                serialization.serialize(channel.getUrl(), bos);

            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    // 对心跳响应结果进行序列化,已废弃
                    encodeHeartbeatData(channel, out, res.getResult());
                } else {
                    // 对调用结果进行序列化
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                // 对错误信息进行序列化
                out.writeUTF(res.getErrorMessage())
            };
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            // 获取写入的字节数,也就是消息体长度
            int len = bos.writtenBytes();
            checkPayload(channel, len);

            // 将消息体长度写入到消息头中
            Bytes.int2bytes(len, header, 12);
            // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
            buffer.writerIndex(savedWriteIndex);
            // 从 savedWriteIndex 下标处写入消息头
            buffer.writeBytes(header);
            // 设置新的 writerIndex
            // writerIndex = 原写下标 + 消息头长度 + 消息体长度
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // 异常处理逻辑不是很难理解,但是代码略多,这里忽略了
        }
    }
}

序列化响应数据

public class DubboCodec extends ExchangeCodec implements Codec2 {

    protected void encodeResponseData(
        Channel channel, ObjectOutput out, Object data, String version)
            throws IOException {
        Result result = (Result) data;
        // 检测当前协议版本是否支持带有 attachment 集合的 Response 对象
        boolean attach = Version.isSupportResponseAttachment(version);
        Throwable th = result.getException();

        // 异常信息为空
        if (th == null) {
            Object ret = result.getValue();
            // 调用结果为空
            if (ret == null) {
                // 序列化响应类型
                out.writeByte(
                    attach
                    ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS
                    : RESPONSE_NULL_VALUE);
            }
            // 调用结果非空
            else {
                // 序列化响应类型
                out.writeByte(
                    attach
                    ? RESPONSE_VALUE_WITH_ATTACHMENTS
                    : RESPONSE_VALUE);
                // 序列化调用结果
                out.writeObject(ret);
            }
        }
        // 异常信息非空
        else {
            // 序列化响应类型
            out.writeByte(
                attach
                ? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS
                : RESPONSE_WITH_EXCEPTION);
            // 序列化异常对象
            out.writeObject(th);
        }

        if (attach) {
            // 记录 Dubbo 协议版本
            result
                .getAttachments()
                .put(
                    Constants.DUBBO_VERSION_KEY,
                    Version.getProtocolVersion());
            // 序列化 attachments 集合
            out.writeObject(result.getAttachments());
        }
    }
}

服务消费方接收调用结果

服务消费方在收到响应数据后

  1. 首先要做的事情是对响应数据进行解码,得到 Response 对象。
  2. 然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。
  3. 接下来 NettyHandler 会将这个对象继续向下传递,
  4. 最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。这个过程和服务提供方接收请求的过程是一样

响应数据解码: DubboCodec

public class DubboCodec extends ExchangeCodec implements Codec2 {

    @Override
    protected Object decodeBody(
            Channel channel, InputStream is, byte[] header)
                    throws IOException {
        byte flag = header[2],
        proto = (byte) (flag & SERIALIZATION_MASK);
        // 获取序列化实现类
        Serialization s =
            CodecSupport.getSerialization(channel.getUrl(), proto);
        // 获取请求编号
        long id = Bytes.bytes2long(header, 4);
        // 检测消息类型,若下面的条件成立,表明消息类型为 Response
        if ((flag & FLAG_REQUEST) == 0) {
            // 创建 Response 对象
            Response res = new Response(id);
            // 检测事件标志位
            if ((flag & FLAG_EVENT) != 0) {
                // 设置心跳事件
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // 获取响应状态
            byte status = header[3];
            // 设置响应状态
            res.setStatus(status);

            // 如果响应状态为 OK,表明调用过程正常
            if (status == Response.OK) {
                try {
                    Object data;
                    if (res.isHeartbeat()) {
                        // 反序列化心跳数据,已废弃
                        data =
                            decodeHeartbeatData(
                                channel,
                                deserialize(s, channel.getUrl(), is));
                    } else if (res.isEvent()) {
                        // 反序列化事件数据
                        data =
                            decodeEventData(
                                channel,
                                deserialize(s, channel.getUrl(), is));
                    } else {
                        DecodeableRpcResult result;
                        // 根据 url 参数决定是否在 IO 线程上执行解码逻辑
                        if (channel
                                .getUrl()
                                .getParameter(
                                    Constants.DECODE_IN_IO_THREAD_KEY,
                                    Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            // 创建 DecodeableRpcResult 对象
                            result =
                                new DecodeableRpcResult(
                                    channel,
                                    res,
                                    is,
                                    (Invocation) getRequestData(id),
                                    proto);
                            // 进行后续的解码工作
                            result.decode();
                        } else {
                            // 创建 DecodeableRpcResult 对象
                            result =
                                new DecodeableRpcResult(
                                    channel,
                                    res,
                                    new UnsafeByteArrayInputStream(
                                        readMessageData(is)),
                                    (Invocation) getRequestData(id), proto);
                        }
                        data = result;
                    }
                    // 设置 DecodeableRpcResult 对象到 Response 对象中
                    res.setResult(data);
                } catch (Throwable t) {
                    // 解码过程中出现了错误,此时设置 CLIENT_ERROR 状态码到 Response 对象中
                    res.setStatus(Response.CLIENT_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
            }
            // 响应状态非 OK,表明调用过程出现了异常
            else {
                // 反序列化异常信息,并设置到 Response 对象中
                res.setErrorMessage(
                    deserialize(s, channel.getUrl(), is).readUTF());
            }
            return res;
        } else {
            // 对请求数据进行解码,前面已分析过,此处忽略
        }
    }
}

执行反序列化操作

public class DecodeableRpcResult extends RpcResult
        implements Codec, Decodeable {

    private Invocation invocation;

    @Override
    public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {
                // 执行反序列化操作
                decode(channel, inputStream);
            } catch (Throwable e) {
                // 反序列化失败,设置 CLIENT_ERROR 状态到 Response 对象中
                response.setStatus(Response.CLIENT_ERROR);
                // 设置异常信息
                response.setErrorMessage(StringUtils.toString(e));
            } finally {
                hasDecoded = true;
            }
        }
    }

    @Override
    public Object decode(Channel channel, InputStream input)
            throws IOException {
        ObjectInput in =
            CodecSupport
                .getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 反序列化响应类型
        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE:
                break;
            case DubboCodec.RESPONSE_VALUE:
                // ...
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION:
                // ...
                break;

            // 返回值为空,且携带了 attachments 集合
            case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
                try {
                    // 反序列化 attachments 集合,并存储起来 
                    setAttachments((Map<String, String>) in.readObject(Map.class));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;

            // 返回值不为空,且携带了 attachments 集合
            case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
                try {
                    // 获取返回值类型
                    Type[] returnType = RpcUtils.getReturnTypes(invocation);
                    // 反序列化调用结果,并保存起来
                    setValue(
                        returnType == null || returnType.length == 0
                        ? in.readObject()
                        : (returnType.length == 1
                            ? in.readObject((Class<?>) returnType[0])
                            : in.readObject(
                                (Class<?>) returnType[0], returnType[1])));
                    // 反序列化 attachments 集合,并存储起来
                    setAttachments(
                        (Map<String, String>) in.readObject(Map.class));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;

            // 异常对象不为空,且携带了 attachments 集合
            case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
                try {
                    // 反序列化异常对象
                    Object obj = in.readObject();
                    if (obj instanceof Throwable == false)
                        throw new IOException("Response data error, expect Throwable, but get " + obj);
                    // 设置异常对象
                    setException((Throwable) obj);
                    // 反序列化 attachments 集合,并存储起来
                    setAttachments((Map<String, String>) in.readObject(Map.class));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
        return this;
    }
}

向用户线程传递调用结果

服务消费者根据报文类型进行处理

// HeaderExchangeHandler
public void received(Channel channel, Object message)
        throws RemotingException {
    channel
        .setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel =
        HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // 处理请求,前面已分析过,省略
        } else if (message instanceof Response) {
            // 处理响应
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            // telnet 相关,忽略
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

服务消费者处理响应

// HeaderExchangeHandler
static void handleResponse(Channel channel, Response response)
        throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            // 继续向下调用
            DefaultFuture.received(channel, response);
        }
    }
}

调用编号获取指定的 DefaultFuture 对象

// DefaultFuture
public static void received(Channel channel, Response response) {
    try {
        // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            // 继续向下调用
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at ...");
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

得到指定的 DefaultFuture 对象后,唤醒用户线程

private void doReceived(Response res) {
    lock.lock();
    try {
        // 保存响应对象
        response = res;
        if (done != null) {
            // 唤醒用户线程
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}

dubbo使用的设计模式

装饰器模式

抽象构件(Component)角色
    ChannelHandler
具体构件(ConcreteComponent)角色
    ChannelHandlerDispatcher
装饰角色
    WrappedChannelHandler
具体装饰角色
    MultiMessageHandler
    HeartbeatHandler
    ChannelHandlerDispatcher
    AllChannelHandler
    HeaderExchangeHandler
    DecodeHandler

Dubbo与Handler

MultiMessageHandler -> 处理received的 MultiMessage

HeartbeatHandler -> 处理心跳

Dispatcher (默认 AllChannelHandler )-> 从NioEventLoop IO线程分发到Dubbo线程池

DecodeHandler -> 解码消息

HeaderExchangeHandler ->  关键的received方法,在channelRead时触发,在client处理response请求(notify DefaultFuture),在server端处理reqeust请求。

DubboProtocol
⚠️ **GitHub.com Fallback** ⚠️