Vertx Thread Model 线程模型 - baoyingwang/LibrariesEvaluation GitHub Wiki

Introduction

Vertx线程模型在官方文档,以及多个blog中都有总结, 如

这里,我想从使用角度描述一下,以避免写代码时导致线程问题。

先看如下几行代码,想一想它的输出是什么以及为什么

public static void main (String[] args) throws java.lang.Exception
{
	Vertx vertx = Vertx.vertx();
	
	vertx.runOnContext((v)->{
		System.out.println("thread:" + Thread.currentThread());
	});
	
	vertx.runOnContext((v)->{
		System.out.println("thread:" + Thread.currentThread());
	 });

}

测试输出,为什么是两个不同的线程呢(thread-0, thread-1)? vertx文档中不是说单线程模型么?是的文档中是说了multi reactor, 但是文档中也说了,vertx会保证handler被同一eventloop调用的呀。文档中相关描述并不准确("Even though a Vertx instance maintains multiple event loops, any particular handler will never be executed concurrently, and in most cases (with the exception of worker verticles) will always be called using the exact same event loop."),可能对相关开发造成困扰,甚至引入bug。线程相关的bug是很难调查与重现的,所以尽量从源头上避免

thread:Thread[vert.x-eventloop-thread-0,5,main]
thread:Thread[vert.x-eventloop-thread-1,5,main]

解释:

  1. 该问题源于一个实际的问题。不能默认vertx没有线程/同步问题。但是,可以默认,在eventloop线程中(即verticle,或者tcp server/tcp socket中)运行的任何程序都是线程安全的。
  2. 上面的代码中,我们看到两个方法在不同的另个eventloop中调用。原因就是两者分别属于不同的两个context. 在runOnContext(调用getOrCreateContext)的时候,vertx会检查当前线程是否为eventloop线程(这里不是,而是main线程);如果是,则重用eventloop线程和相关的context;如果不是,则创建新的context并link下一个eventloop线程(round robin - Check the ContextImp::getEventLoop, it will find that the EventLoopGroup.next() is used to pick the event loop for current Context. Then check the VertxEventLoopGroup::next(), you will find a pos(as int) to record the next eventloop(worker thread) should be picked. If the pos reach the worker size, it will be reset to 0;). 上面代码中,因为不是eventloop线程而为main线程,所以两次调用创建了两个conext分别关联两个不同的eventloop线程。
  3. 如何改造上面代码使他们在同一个线程中使用呢?
Vertx vertx = Vertx.vertx();
Context context = vertx.getOrCreateContext();

context.runOnContext((v)->{
	System.out.println("thread:" + Thread.currentThread());
});
context.runOnContext((v)->{
	System.out.println("thread:" + Thread.currentThread());
});

测试输出:

thread:Thread[vert.x-eventloop-thread-0,5,main]
thread:Thread[vert.x-eventloop-thread-0,5,main]
  1. 请想一想,在verticle中,或者tcp server/socket, 或者http server回调中,注册的handler是否会出现上面的被不同的eventloop线程调用的情况呢?

  2. 如果一般在我们的代码中(可能)很少使用runOnContext,但是理解了它对于我们深入理解vertx线程模型非常有帮助

扩展 : schedule程序的线程,以下代码将在哪个线程中执行呢

_vertx.setPeriodic(_snapshotRequestIntervalInSecond * 1000, id ->{
	for(String symbol: _engine._symbols){
		_engine.addOrdBookRequest(new MarketDataMessage.AggregatedOrderBookRequest(String.valueOf(System.nanoTime()), symbol,5));
		_engine.addOrdBookRequest(new MarketDataMessage.DetailOrderBookRequest(String.valueOf(System.nanoTime()), symbol,5));
	}
});

通过查看源代码,可以发现与之前runOnContext类似的情况,即里面调用vertx.getOrCreateContext()获取一个context。这个获取过程也就是上面描述的,通过判断当前线程是否为eventloop线程来决定,是否创建一个新的context(并link next eventloop thread)用于当前handler.

由于并没有_vertx.setPeriodic(handler, context)方法,我们并不能指定使用哪个context。所以,请确认你的schedule代码将在哪个/哪些线程中使用。不过,一般来说,不会有问题,如果你只是在verticle中使用或者在tcp/httpserver回调中使用,因为他们都在eventloop线程中运行。

扩展 - 在一个vertx中启动多个相同verticle实例时,多个verticle实例的线程关系是什么?

即: https://vertx.io/docs/vertx-core/java/#_specifying_number_of_verticle_instances

如文档中所述,这样做(启动多个instance)的目的是为了充分利用多cpu core的能力。所以,其必然是分别属于不同的eventloop线程(或者几个verticle对应一个eventloop线程,另外几个对应于另一个eventloop线程). 这是怎么做到的呢? 查看DeploymentManager::doDeploy代码,你可以看到部署verticle都将创建一个新的Context(与上面所提到的round robin关联eventloop线程是同一条代码调用链)

    for (Verticle verticle: verticles) {
      WorkerExecutorImpl workerExec = poolName != null ? vertx.createSharedWorkerExecutor(poolName, options.getWorkerPoolSize()) : null;
      WorkerPool pool = workerExec != null ? workerExec.getPool() : null;
      ContextImpl context = options.isWorker() ? vertx.createWorkerContext(options.isMultiThreaded(), deploymentID, pool, conf, tccl) :
        vertx.createEventLoopContext(deploymentID, pool, conf, tccl);
      if (workerExec != null) {
        context.addCloseHook(workerExec);
      }
      context.setDeployment(deployment);
      deployment.addVerticle(new VerticleHolder(verticle, context));
      context.runOnContext(v -> {
        try {
          verticle.init(vertx, context);
          Future<Void> startFuture = Future.future();
          verticle.start(startFuture);
          startFuture.setHandler(ar -> {

注意: 上面代码中包含options.isWorker(), 但这里并没有用解释关于worker thread的使用。如果你打算使用work pool thread, 则线程模型需要单独考虑了,因为worker thread使用并不保证单线程保护关键变量的。还好,默认都是使用eventloop线程。

⚠️ **GitHub.com Fallback** ⚠️