Elastic-Job
整体架构
-
app:应用程序,内部包含任务执行业务逻辑和elastic-job-lite组件,其中执行任务需要实现ElasticJob接口完成与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实例
-
elastic-job-lite:elastic-job-lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,此组件负责任务的调度,并产生任务调度纪录
无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务都是对等的,各个作业节点是自治的、平等的、节点之间通过注册中心进行分布式协调
-
registry:以zookeeper作为elastic-job的注册中心组件,存储了执行任务的相关信息。同时,elastic-job利用该组件进行执行任务实例的选举
-
console:elastic-job提供了运维平台,它通过读取zookeeper数据展现任务执行状态,或更新zookeeper数据修改全局配置。通过elastic-job-lite组件产生的数据来查看任务执行历史纪录
应用程序在启动时,在其内嵌的elastic-job-lite组件会向zookeeper注册该实例的信息,并触发选举(此时可能已经启动了该应用程序的其他实例),从众多实例中选举出一个leader,让其执行任务。当到达任务执行时间时,elastic-job-lite组件会调用由应用程序实现的任务业务逻辑,任务执行后会产生任务执行纪录。当应用程序的某一个实例宕机时,zookeeper组件会感知到并重新触发leader选举
zookeeper的功能
- 对执行任务信息的存储(任务名称、任务参与实例、任务执行策略)
- 实现选举机制,在任务执行实例数量变化时,会触发选举机制来决定让哪个实例去执行该任务
zookeeper存储信息
- instances:同一个elastic-job的部署实例。一台机器上可以启动多个job实例,也就是jar包。instances的命名规则是:[IP+@-@+PID]
- leader:任务实例的主节点信息,通过zk的主节点选举
- election:主节点选举
- instance:当前主节点的实例id
- latch:一个永久节点用于选举的时候实现分布式锁
- sharding:分配
- necessary:释放需要重新分片的标记
- failover:失效转移
- election:主节点选举
- sharding:任务的分片信息,子节点是分片项序号
- config:配置信息
- servers:机器列表
zookeeper选举过程
- 任意一个实例启动时首先创建一个/server的持久节点
- 多个实例同时创建/server/leader临时子节点
- /server/leader子节点只能创建一个,后创建的会失败。创建成功的实例被选为leader节点,用来执行任务
- 所有任务实例监听/server/leader的变化,一旦节点被删除,就重新进行选举,抢占式的创建/server/leader节点,谁创建成功谁就是leader
原生开发
-
导入依赖
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency>
-
设置zookeeper
//注册中心配置 ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "job_namespace"); //zk超时时间 zookeeperConfiguration.setSessionTimeoutMilliseconds(100); //创建注册中心 ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); zookeeperRegistryCenter.init();
-
编写任务
public class MyJob implements SimpleJob { //任务逻辑 @Override public void execute(ShardingContext shardingContext) { //...... } }
-
任务配置
//创建JobCoreConfiguration 参数1任务名称 2cron表达式 3分片 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration .newBuilder("job-name", "0/3 * * * * ?", 1) .build(); //创建SimpleJobConfiguration 参数1JobCoreConfiguration 2任务类的全限定类名 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MyJob.class.getCanonicalName()); //启动任务 new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration .newBuilder(simpleJobConfiguration) .overwrite(true).build()) .init();
整合springboot
-
导入依赖
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
-
配置zookeeper
@Bean(initMethod = "init")//调用init实现初始化 ZookeeperRegistryCenter zookeeperRegistryCenter(){ //注册中心配置 ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "job_namespace"); //zk超时时间 zookeeperConfiguration.setSessionTimeoutMilliseconds(100); //创建注册中心 return new ZookeeperRegistryCenter(zookeeperConfiguration); }
-
配置任务
@Autowired private MyJob myJob; @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; @Bean(initMethod = "init") SpringJobScheduler initSimpleElasticJob(){ //创建SpringJobScheduler return new SpringJobScheduler(myJob, zookeeperRegistryCenter, createJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 1, null)); } /** * 配置任务想象信息 * @param jobClass 任务执行类 * @param cron 执行策略 * @param shardingTotalCount 分片数量 * @param shardingItemParameters 分片个性化参数 * @return */ private LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters){ //定义核心配置 JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration .newBuilder(jobClass.getName(), cron, shardingTotalCount); //设置shardingItemParameters if (shardingItemParameters != null){ jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters); } JobCoreConfiguration jobCoreConfiguration = jobCoreConfigurationBuilder.build(); //创建SimpleJobConfiguration SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, jobClass.getCanonicalName()); //创建liteJobConfiguration LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration .newBuilder(simpleJobConfiguration) .overwrite(true) .build(); return liteJobConfiguration; }
作业分片
作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或几个分片项
自定义分片
- 设置分片参数
@Bean(initMethod = "init")
SpringJobScheduler initSimpleElasticJob(){
//创建SpringJobScheduler
return new SpringJobScheduler(myJob,
zookeeperRegistryCenter,
createJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 3, "0=a,1=b,2=c"));//数量要和分片数量相同,下标从0开始
}
-
在任务中获取分片参数:shardingContext.getShardingParameter();
例如当前是分片0时,获取的参数就是a
事件追踪
-
配置文件中添加事务追踪配置
@Autowired private DataSource dataSource; @Bean(initMethod = "init") SpringJobScheduler initSimpleElasticJob(){ //事件追踪配置 JobEventRdbConfiguration jobEventRdbConfiguration = new JobEventRdbConfiguration(dataSource); //创建SpringJobScheduler return new SpringJobScheduler(myJob, zookeeperRegistryCenter, createJobConfiguration(myJob.getClass(), "0/5 * * * * ?", 3, "0=a,1=b,2=c"),jobEventRdbConfiguration); }
-
启动项目,此时数据库中会多两张表
- job_execution_log:纪录每次作业的执行历史
- job_status_trace_log:纪录作业状态变更痕迹