一、分布式调度框架Elastic Job - mouse3150/mouse3150.github.io GitHub Wiki

简介

Elastic-Job介绍是一个由当当开源的分布式调度框架,有两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。

本文主要介绍Elastic-Job-Lite,该项目基于开源产品Quartz和Zookeeper及其客户端Curator进行的二次开发。我们从以下几个方面探索Elastic-Job-Lite原理,并进行源码分析:

  • 定时调度应用场景;

  • 为什么要分布式任务调度;

  • Elastic-Job核心概念;

    (1)任务分片,及分片策略

    (2)Elastic-Job特性:弹性伸缩;失效转移;容错处理...

  • Elastic-Job源码分析;

定时调度应用场景

典型场景:比如计算余额宝里的昨日收益,系统需要job在每天某个时间点开始,给所有余额宝用户计算收益。如果用户体量特别大,可能第二天之前处理不完这么多用户。而且我们部署job的时候也得注意,可能会把job直接放在我们的webapp里,webapp通常是多节点部署的,这样,我们的job也就是多节点,多个job同时执行,很容易造成重复执行,比如用户重复计息,为了避免这种情况,我们可能会对job的执行加锁,保证始终只有一个节点能执行,或者干脆让job从webapp里剥离出来,独自部署一个节点。 Elastic-job就可以帮助我们解决上面2个问题,Elastic-job底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片。

为什么要分布式任务调度

Elastic-Job核心概念

分片概念:

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

分片项与业务处理解耦 Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

个性化参数的使用场景 个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

作业类型

Elastic-job提供了三种类型的作业:

  • Simple类型作业
  • Dataflow类型作业
  • Script类型作业

(1) SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似。

(2) Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。可通过DataflowJobConfiguration配置是否流式处理。流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

(3) Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。

作业配置

Elastic-Job配置分为3个层级:

  1. Core
  2. Type
  3. Root

每个层级使用相似于装饰者模式的方式装配。Core对应JobCoreConfiguration,用于提供作业核心配置信息。Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE,DATAFLOW,SCRIPT类型作业,提供3种作业需要的不同配置。Root对应JobRootConfiguration,有2个子类分别对应Lite和Cloud部署类型,提供不同部署类型所需的配置。

作业事件追踪

任务监听

Elastic-Job源码分析

zookeeper上的节点

注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。

作业名称节点下又包含5个数据子节点,分别是:

  • config,config节点保存作业配置信息,以JSON格式存储;
  • instances, instances节点保存作业运行实例信息,子节点是当前作业运行实例的主键;
  • sharding,sharding节点保存作业分片信息,子节点是分片项序号,从零开始,至分片总数减一;
  • servers,servers节点保存作业服务器信息,子节点是作业服务器的IP地址;
  • leader,leader节点保存作业服务器主节点信息,分为election,sharding和failover三个子节点,分别用于主节点选举,分片和失效转移处理。

Elastic-Job-Lite源码由两个入口,一个是JobSchedule类的init方法,当启动服务时,程序会进入该方法,进行任务的初始化等操作;另一个是LiteJob类,它实现了Quartz框架的Job接口,当定时任务启动时,会进入该实现类,完成失效转移项执行、重新分片、获取并执行本机任务项、错过任务重触发等操作。

Elastic-Job-Lite初始化的入口是JobSchedule,应用服务器启动时,会调用JobSchedule的init方法,开启作业启动流程。首先添加或更新作业配置信息,并将配置信息持久化到zk上;接着创建quartz调度器,作业的调度执行依赖quartz技术;然后启动所有的监听器,包括leader选举监听、失效转移监听、分片监听等,并发起主节点选举,将leader节点信息set到leader/election/instance节点下;然后将服务器信息、实例信息注册到zk上,并且在leader/sharding下创建necessary节点,作为重新分片的标记;最后由quartz调度器根据cron表达式调度执行。

job配置

⚠️ **GitHub.com Fallback** ⚠️