Kafka Timer & TimingWheel - tenji/ks GitHub Wiki

Kafka 中的时间轮

  • 时间轮由来已久,Linux 内核里有它,大大小小的应用里也用它;
  • Kafka 里主要用它来作大量的定时任务,超时判断等;
  • 这里我们主要分析 Kafka 中时间轮实现中用到的各个类。

TimerTask

  • 所在文件:core/src/main/scala/kafka/utils/timer/TimerTask.scala
  • 继承于Runnable,需要放在时间轮里执行的任务都要继承这个TimerTask
  • 每个TimerTask必须和一个TimerTaskEntry绑定,实现上放到时间轮里的是TimerTaskEntry
  • def cancel(): 取消当前的 Task, 实际是解除在当前TaskEntry上的绑定
def cancel(): Unit = {
  synchronized {
    if (timerTaskEntry != null) timerTaskEntry.remove()
    timerTaskEntry = null
  }
}

TimerTaskEntry

  • 所在文件:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
  • 作用:绑定一个TimerTask对象,然后被加入到一个TimerTaskLIst中;
  • 它是TimerTaskList这个双向列表 中的元素,因此有如下三个成员:
var list: TimerTaskList = null // 属于哪一个TimerTaskList
var next: TimerTaskEntry = null // 指向其后一个元素 
var prev: TimerTaskEntry = null // 指向其前一个元素 
  • TimerTaskEntry对象在构造成需要一个TimerTask对象,并且调用
timerTask.setTimerTaskEntry(this)

TimerTask对象绑定到TimerTaskEntry上,如果这个TimerTask对象之前已经绑定到了一个TimerTaskEntry上, 先调用timerTaskEntry.remove()解除绑定。

  • 关于def remove()
def remove(): Unit = {
  var currentList = list
  // If remove is called when another thread is moving the entry from a task entry list to another,
  // this may fail to remove the entry due to the change of value of list. Thus, we retry until the list becomes null.
  // In a rare case, this thread sees null and exits the loop, but the other thread insert the entry to another list later.
  while (currentList != null) {
    currentList.remove(this)
    currentList = list
  }
}

实际上就是把自己从当前所在TimerTaskList上摘掉, 为什么要使用一个while(...)来作,简单说就是相当于用个自旋锁代替读写锁来尽量保证这个 remove 的操作的彻底。

TimerTaskList

  • 所在文件:core/src/main/scala/kafka/utils/timer/TimerTaskList.scala
  • 作为时间轮上的一个 bucket, 是一个有头指针的双向链表
  • 双向链表结构:
// TimerTaskList forms a doubly linked cyclic list using a dummy root entry
// root.next points to the head
// root.prev points to the tail
private[this] val root = new TimerTaskEntry(null, -1)
root.next = root
root.prev = root
  • 继承于 Java 的 Delayed,说明这个对象应该是要被放入 Java 的 DelayQueue,自然要实现下面的两个接口:
def getDelay(unit: TimeUnit): Long = {
  unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)
}

def compareTo(d: Delayed): Int = {

  val other = d.asInstanceOf[TimerTaskList]

  if(getExpiration < other.getExpiration) -1
  else if(getExpiration > other.getExpiration) 1
  else 0
}
  • 每个TimerTaskList都是时间轮上的一个 bucket,自然也要关联一个过期时间
private[this] val expiration = new AtomicLong(-1L)

// Set the bucket's expiration time
// Returns true if the expiration time is changed
def setExpiration(expirationMs: Long): Boolean = {
  expiration.getAndSet(expirationMs) != expirationMs
}
  • addremove方法,用来添加和删除TimerTaskEntry
  • foreach方法:在链表的每个元素上应用给定的函数
  • flush方法:在链表的每个元素上应用给定的函数,并清空整个链表,同时超时时间也设置为-1

TimingWheel

  • 所在文件:core/src/main/scala/kafka/utils/timer/TimingWheel.scala
  • 上面说了这么多,终于到这个时间轮出场了,说简单也简单,说复杂也复杂

待更新...

Timer

  • 所在文件:core/src/main/scala/kafka/utils/timer/Timer.scala
  • 上面讲了这么多,现在是时候把这些组装起来了,这就是个用TimingWheel实现的定时器,可以添加任务,任务可以取消,可以到期被执行

待更新...

参考链接