Kafka NIO 网络通信模型 - tenji/ks GitHub Wiki

Kafka NIO 网络通信模型

一、整体框架概述

Kafka 的网络通信模型是基于 NIO 的 Reactor 多线程模型来设计的(关于多线程模型,可以参考:JAVA IO 第三章节)。这里先引用 Kafka 源码中注释的一段话:

An NIO socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的这段引文注释后,大致可以了解到 Kafka 的网络通信层模型,主要采用了 1(1个Acceptor线程) + N(N个Processor线程) + M(M个业务处理线程)。下面的表格简要的列举了下(这里先简单的看下后面还会详细说明):

线程数 线程名 线程具体说明
1 kafka-socket-acceptor_%x Acceptor 线程,负责监听 Client 端发起的请求
N kafka-network-thread_%d Processor 线程,负责对 Socket 进行读写
M kafka-request-handler-_%d Worker 线程,处理具体的业务逻辑并生成 Response 返回

Kafka 网络通信层的完整框架图如下图所示:

Kafka 消息队列的通信层模型 - 1 + N + M 模型

刚开始看到上面的这个框架图可能会有一些不太理解,并不要紧,这里可以先对 Kafka 的网络通信层框架结构有一个大致了解。本文后面会结合 Kafka 的部分重要源码来详细阐述上面的过程。这里可以简单总结一下其网络通信模型中的几个重要概念:

  1. Acceptor:1 个接收线程,负责监听新的连接请求,同时注册 OP_ACCEPT 事件,将新的连接按照round robin方式交给对应的 Processor 线程处理;
  2. Processor:N 个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小由num.networker.threads决定;
  3. KafkaRequestHandler:M 个请求处理线程,包含在线程池 KafkaRequestHandlerPool 内部,从 RequestChannel 的全局请求队列 requestQueue 中获取请求数据并交给 KafkaApis 处理,M的大小由num.io.threads决定;
  4. RequestChannel:其为 Kafka 服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue 和多个与 Processor 处理器相对应的响应队列 responseQueue,提供给 Processor 与请求处理线程 KafkaRequestHandler 和 KafkaApis 交换数据的地方。
  5. NetworkClient:其底层是对 Java NIO 进行相应的封装,位于 Kafka 的网络接口层。Kafka 消息生产者对象 KafkaProducer 的 send 方法主要调用 NetworkClient 完成消息发送;
  6. SocketServer:其是一个 NIO 的服务,它同时启动一个 Acceptor 接收线程和多个 Processor 处理器线程。提供了一种典型的 Reactor 多线程模式,将接收客户端请求和处理请求相分离;
  7. KafkaServer:代表了一个 Kafka Broker 的实例;其 startup 方法为实例启动的入口;
  8. KafkaApis:Kafka 的业务逻辑处理 Api,负责处理不同类型的请求;比如“发送消息”、“获取消息偏移量—offset”和“处理心跳请求”等;

二、网络通信层的设计与具体实现

这一节将结合 Kafka 网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素 — SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源码部分均基于 Kafka 的0.11.0版本。

2.1 SocketServer

SocketServer 是接收客户端 Socket 请求连接、处理请求并返回处理结果的核心类,Acceptor 及 Processor 的初始化、处理逻辑都是在这里实现的。在 KafkaServer 实例启动时会调用其startup的初始化方法,会初始化 1 个 Acceptor 和 N 个 Processor 线程(每个 EndPoint 都会初始化,一般来说一个 Server 只会设置一个端口),其实现如下:

def startup() {
    this.synchronized {

      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId

      var processorBeginIndex = 0
      // 一个broker一般只设置一个端口
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        // N 个 processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
        // 1个 Acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }
}

2.2 Acceptor

Acceptor 是一个继承自抽象类 AbstractServerThread 的线程类。Acceptor 的主要任务是监听并且接收客户端的请求,同时建立数据传输通道 — SocketChannel,然后以轮询的方式交给一个后端的 Processor 线程处理(具体的方式是添加 socketChannel 至并发队列并唤醒 Processor 线程处理)。

在该线程类中主要可以关注以下两个重要的变量:

  1. nioSelector:通过 NSelector.open() 方法创建的变量,封装了 JAVA NIO Selector 的相关操作;
  2. serverChannel:用于监听端口的服务端 Socket 套接字对象;

下面来看下 Acceptor 主要的 run 方法的源码:

def run() {
    // 首先注册 OP_ACCEPT 事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      // 以轮询方式查询并等待关注的事件发生
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  // 如果事件发生则调用 accept 方法对 OP_ACCEPT 事件处理
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                // 轮询算法
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
       // 代码省略
  }

  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    } catch {
        // 省略部分代码
    }
  }

  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

在上面源码中可以看到,Acceptor 线程启动后,首先会向用于监听端口的服务端套接字对象 — ServerSocketChannel 上注册 OP_ACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用 accept() 方法对 OP_ACCEPT 事件进行处理。这里,Processor 是通过 round robin 方法选择的,这样可以保证后面多个 Processor 线程的负载基本均匀。

Acceptor 的 accept() 方法的作用主要如下:

  1. 通过 SelectionKey 取得与之对应的 serverSocketChannel 实例,并调用它的 accept() 方法与客户端建立连接;
  2. 调用 connectionQuotas.inc() 方法增加连接统计计数;并同时设置第(1)步中创建返回的 socketChannel 属性(如 sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking 等)
  3. 将 socketChannel 交给 processor.accept() 方法进行处理。这里主要是将 socketChannel 加入 Processor 处理器的并发队列 newConnections 队列中,然后唤醒 Processor 线程从队列中获取socketChannel 并处理。其中,newConnections 会被 Acceptor 线程和 Processor 线程并发访问操作,所以 newConnections 是 ConcurrentLinkedQueue 队列(一个基于链接节点的无界线程安全队列)

2.3 Processor

Processor 同 Acceptor 一样,也是一个线程类,继承了抽象类 AbstractServerThread。其主要是从客户端的请求中读取数据和将 KafkaRequestHandler 处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量:

  1. newConnections:在上面的 Acceptor 一节中已经提到过,它是一种 ConcurrentLinkedQueue[SocketChannel] 类型的队列,用于保存新连接交由 Processor 处理的 socketChannel;
  2. inflightResponses:是一个 Map[String, RequestChannel.Response] 类型的集合,用于记录尚未发送的响应;
  3. selector:是一个类型为 KSelector 变量,用于管理网络连接;

下面先给出 Processor 处理器线程 run 方法执行的流程图:

Kafk Processor 线程的处理流程图

从上面的流程图中能够可以看出 Processor 处理器线程在其主流程中主要完成了这样子几步操作:

  1. 处理 newConnections 队列中的 socketChannel。遍历取出队列中的每个 socketChannel 并将其在 selector 上注册 OP_READ 事件;
  2. 处理 RequestChannel 中与当前 Processor 对应响应队列中的 Response。在这一步中会根据 responseAction 的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;
  3. 调用 selector.poll() 方法进行处理。该方法底层即为调用 nioSelector.select() 方法进行处理。
  4. 处理已接受完成的数据包队列 — completedReceives。在 processCompletedReceives 方法中调用“requestChannel.sendRequest”方法将请求 Request 添加至 requestChannel 的全局请求队列 — requestQueue 中,等待 KafkaRequestHandler 来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的 OP_READ 事件;
  5. 处理已发送完的队列 — completedSends。当已经完成将 response 发送给客户端,则将其从 inflightResponses 移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册 OP_READ 事件;
  6. 处理断开连接的队列。将该 response 从 inflightResponses 集合中移除,同时将 connectionQuotas 统计计数减 1;

2.4 RequestChannel

在 Kafka 的网络通信层中,RequestChannel 为 Processor 处理器线程与 KafkaRequestHandler 线程之间的数据交换提供了一个数据缓冲区,是通信过程中 Request 和 Response 缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor 线程将读取到的请求添加至 RequestChannel 的全局请求队列 — requestQueue 中;KafkaRequestHandler 线程从请求队列中获取并处理,处理完以后将 Response 添加至 RequestChannel 的响应队列 — responseQueue 中,并通过 responseListeners 唤醒对应的 Processor 线程,最后 Processor 线程从响应队列中取出后发送至客户端。

2.5 KafkaRequestHandler

KafkaRequestHandler 也是一种线程类,在 KafkaServer 实例启动时候会实例化一个线程池 — KafkaRequestHandlerPool 对象(包含了若干个 KafkaRequestHandler 线程),这些线程以守护线程的方式在后台运行。在 KafkaRequestHandler 的 run 方法中会循环地从 RequestChannel 中阻塞式读取 request,读取后再交由 KafkaApis 来具体处理。

2.6 KafkaApis

KafkaApis 是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出 Kafka Broker Server 可以提供哪些服务。

ApiKeys.forId(request.requestId) match {
  case ApiKeys.PRODUCE => handleProducerRequest(request)
  case ApiKeys.FETCH => handleFetchRequest(request)
  case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
  case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
  case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
  case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
  case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
  case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
  case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
  case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
  case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
  case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
  case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
  case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
  case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
  case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
  case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
  case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
  case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
  case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
  case requestId => throw new KafkaException("Unknown api code " + requestId)
}

三、总结

仔细阅读 Kafka 的 NIO 网络通信层的源码过程中还是可以收获不少关于 NIO 网络通信模块的关键技术。Apache 的任何一款开源中间件都有其设计独到之处,值得借鉴和学习。对于任何一位使用 Kafka 这款分布式消息队列的同学来说,如果能够在一定实践的基础上,再通过阅读其源码能起到更为深入理解的效果,对于大规模 Kafka 集群的性能调优和问题定位都大有裨益。

对于刚接触 Kafka 的同学来说,想要自己掌握其 NIO 网络通信层模型的关键设计,还需要不断地使用本地环境进行 debug 调试和阅读源码反复思考。

参考链接