springbatch-使用

springbatch-使用

起男 310 2023-05-24

springbatch-使用

springbatch一共有4个重要角色:

  • JobLauncher:任务启动器,通过它来启动任务,可以看作是程序的入口
  • Job:一个具体的任务
  • Step:一个具体的步骤,一个job可以包含多个step
  • JobRepository:存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息

整合springboot

  1. 导入依赖

    		<dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-batch</artifactId>
            </dependency>		
    		<dependency>
                <groupId>org.springframework.batch</groupId>
                <artifactId>spring-batch-test</artifactId>
                <scope>test</scope>
            </dependency>
    		<!--持久化-->
            <!--<dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
                <scope>runtime</scope>
            </dependency>-->
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.23</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
    
  2. 修改配置文件(配置数据库访问)

    spring:
      datasource:
        url: 
        username: 
        password: 
        driver-class-name: 
    
  3. 引导类添加注解,开启springbatch

    @SpringBootApplication
    @EnableBatchProcessing
    public class BatchDemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(BatchDemoApplication.class, args);
        }
    
    }
    
  4. 使用

    @Configuration
    public class JobConfig {
    
        //注入创建任务对象的对象
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
        //注入创建step对象的对象(任务的执行由step决定)
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        //创建任务对象
        @Bean
        public Job helloWorldJob(){
            return jobBuilderFactory
                    .get("helloWorld")
                    .start(step1())
                    .build();
        }
    
        @Bean
        public Step step1(){
            return stepBuilderFactory
                    .get("step1")
                    .tasklet(new Tasklet() {
                        @Override
                        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                            System.out.println("hello world");
                            return RepeatStatus.FINISHED;
                        }
                    }).build();
        }
    }
    

执行多个step

@Configuration
public class JobDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job jobDemoJob(){
        return jobBuilderFactory
                .get("jobDemoJob")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory
                .get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step1...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step step2(){
        return stepBuilderFactory
                .get("step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step2...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step step3(){
        return stepBuilderFactory
                .get("step3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step3...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
}

使用flow

flow可以把step进行统一管理类型,相当于一种分组。大大提高了step的复用性

@Configuration
public class FlowDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1(){
        return stepBuilderFactory
                .get("step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step1...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step step2(){
        return stepBuilderFactory
                .get("step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step2...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step step3(){
        return stepBuilderFactory
                .get("step3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step3...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Flow flowDemoFlow(){
        return new FlowBuilder<Flow>("flowDemoFlow")
                .start(step1())
                .next(step2())
                .next(step3())
                .build();
    }

    @Bean
    public Job fowDemoJob(){
        return jobBuilderFactory
                .get("jobBuilderJob")
                .start(flowDemoFlow())
                .next(step3())
                .end()
                .build();
    }
}

split使用

split可以使同一个job下的step并发执行

@Configuration
@EnableBatchProcessing
public class SplitDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step splitDemoStep1(){
        return stepBuilderFactory
                .get("splitDemoStep1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step1...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Step splitDemoStep2(){
        return stepBuilderFactory
                .get("splitDemoStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step2...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Step splitDemoStep3(){
        return stepBuilderFactory
                .get("splitDemoStep3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step3...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Flow splitDemoFlow1(){
        return new FlowBuilder<Flow>("splitDemoFlow1")
                .start(splitDemoStep1())
                .build();
    }
    @Bean
    public Flow splitDemoFlow2(){
        return new FlowBuilder<Flow>("splitDemoFlow2")
                .start(splitDemoStep2())
                .next(splitDemoStep3())
                .build();
    }

    @Bean
    public Job splitDemoJob(){
        return jobBuilderFactory
                .get("splitDemoJob")
                .start(splitDemoFlow1())
                .split(new SimpleAsyncTaskExecutor())//并发执行
                .add(splitDemoFlow2())
                .end()
                .build();
    }
}

自定义决策器

决策器

public class MyDecider implements JobExecutionDecider {

    private int count;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        count++;
        if (count%2==0)
            return new FlowExecutionStatus("even");
        else
            return new FlowExecutionStatus("odd");
    }
}

使用

@Configuration
@EnableBatchProcessing
public class DeciderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step deciderDemoStep1(){
        return stepBuilderFactory
                .get("deciderDemoStep1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step1...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step deciderDemoStep2(){
        return stepBuilderFactory
                .get("deciderDemoStep2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step2...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }
    @Bean
    public Step deciderDemoStep3(){
        return stepBuilderFactory
                .get("deciderDemoStep3")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step3...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public JobExecutionDecider myDecider(){
        return new MyDecider();
    }
    @Bean
    public Job deciderDemoJob(){
        return jobBuilderFactory
                .get("deciderDemoJob")
                .start(deciderDemoStep1())
                .next(myDecider())
                .from(myDecider()).on("even").to(deciderDemoStep2())
                .from(myDecider()).on("odd").to(deciderDemoStep3())
                .from(deciderDemoStep3()).on("*").to(myDecider())//*匹配所有
                .end()
                .build();
    }
}

job嵌套

子job1

@Configuration
public class ChildJob1 {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step childJob1Step1(){
        return stepBuilderFactory
                .get("childJob1Step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step1...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Job childJobOne(){
        return jobBuilderFactory
                .get("childJobOne")
                .start(childJob1Step1())
                .build();
    }
}

子job2

@Configuration
public class ChildJob2 {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step childJob1Stet1(){
        return stepBuilderFactory
                .get("childJob2Step1")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step2-1...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Step childJob1Stet2(){
        return stepBuilderFactory
                .get("childJob2Step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("step2-2...");
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Bean
    public Job childJobTwo(){
        return jobBuilderFactory
                .get("childJobOne")
                .start(childJob1Stet1())
                .next(childJob1Stet2())
                .build();
    }
}

父job

@Configuration
public class NestedDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private Job childJobOne;
    @Autowired
    private Job childJobTwo;

    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;
    @Autowired
    private JobLauncher jobLauncher;//启动对象

    @Bean
    public Job parentJob(){
        return jobBuilderFactory
                .get("parentJob1")
                .start(childJob1())
                .next(childJob2())
                .build();
    }
    //返回job类型的step(特殊的step)
    private Step childJob1(){
        return new JobStepBuilder(new StepBuilder("childJob1"))
                .job(childJobOne)
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
    private Step childJob2(){
        return new JobStepBuilder(new StepBuilder("childJob2"))
                .job(childJobTwo)
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }
}

为防止子job执行可以在配置文件中指定执行的job

spring:
  batch:
    job:
      names: parentJob1

监听器

接口形式监听器

public class MyJobListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("job---before:"+jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("job---after:"+jobExecution.getJobInstance().getJobName());
    }
}

注解形式监听器

public class MyChunkListener {

    @BeforeChunk
    public void beforeChunk(ChunkContext context){
        System.out.println(context.getStepContext().getStepName()+"before...");
    }

    @AfterChunk
    public void afterChunk(ChunkContext context){
        System.out.println(context.getStepContext().getStepName()+"after...");
    }
}

配置监听器

@Configuration
public class ListenerDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job listenerJob(){
        return jobBuilderFactory
                .get("listenerJob")
                .start(step1())
                .listener(new MyJobListener())
                .build();
    }
    @Bean
    public Step step1(){
        return stepBuilderFactory
                .get("step1")
                .<String,String>chunk(2)//每操作多少数据进行一次处理
                .listener(new MyChunkListener())
                .reader(reader())//读数据
                .writer(writer())//写数据
                .build();
    }

    @Bean
    public ItemReader<String> reader(){
        return new ListItemReader<>(Arrays.asList("a","b","c","d","e"));
    }
    @Bean
    public ItemWriter<String> writer(){
        return new ItemWriter<String>() {
            @Override
            public void write(List<? extends String> list) throws Exception {
                System.out.println(list);
            }
        };
    }
}

job参数

job执行的是step,job使用的数据肯定是在step中使用。所以所以我们只需要给step传递数据
使用step级别的监听来传递数据

@Configuration
public class ParametersDemo implements StepExecutionListener {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private Map<String,JobParameter> parameterMap;

    @Bean
    public Job parameterJob(){
        return jobBuilderFactory
                .get("parameterJob")
                .start(parameterStep())
                .build();
    }
    @Bean
    public Step parameterStep(){
        return stepBuilderFactory
                .get("parameterStep")
                .listener(this)
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                        System.out.println("接收参数:"+parameterMap.get("info"));
                        return RepeatStatus.FINISHED;
                    }
                }).build();
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        parameterMap=stepExecution.getJobParameters().getParameters();
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return null;
    }
}

在启动时添加参数info=xxx