springbatch-错误处理

springbatch-错误处理

起男 417 2023-05-31

springbatch-错误处理

默认情况下当任务出现异常时,springbatch会结束任务,当重启任务时,springbatch会执行未执行的剩余任务

例如

@Configuration
public class ErrorDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job errorDemoJob(){
        return jobBuilderFactory
                .get("errorDemoJob")
                .start(errorStep1())
                .next(errorStep2())
                .build();
    }
    @Bean
    public Step errorStep1(){
        return stepBuilderFactory
                .get("errorStep1")
                .tasklet(errorHandling())
                .build();
    }
    @Bean
    public Step errorStep2(){
        return stepBuilderFactory
                .get("errorStep2")
                .tasklet(errorHandling())
                .build();
    }

    @Bean
    @StepScope
    public Tasklet errorHandling(){
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                Map<String, Object> stepExecutionContext = chunkContext
                        .getStepContext().getStepExecutionContext();
                if (stepExecutionContext.containsKey("aaa")){
                    System.out.println("ok...");
                    return RepeatStatus.FINISHED;
                }else {
                    System.out.println("error...");
                    chunkContext.getStepContext()
                            .getStepExecution()
                            .getExecutionContext()
                            .put("aaa",true);
                    throw new RuntimeException("error");
                }
            }
        };
    }
  1. 第一次执行step1执行报错任务终止,step2没有执行
  2. 第二次执行step1正常执行,step2报错终止
  3. 第三次执行step1没有执行,step2正常执行

错误重试

@Configuration
public class RetryDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job retryDemoJob(){
        return jobBuilderFactory
                .get("retryDemoJob")
                .start(retryDemoStep())
                .build();
    }

    @Bean
    public Step retryDemoStep(){
        return stepBuilderFactory
                .get("retryDemoStep")
                .<String,String>chunk(5)
                .reader(myReader())
                .processor(myProcessor())
                .writer(System.out::println)
                .faultTolerant()//容错
                .retry(RuntimeException.class)//使用重试策略,并指明发生什么异常进行重试
                .retryLimit(5)//重试多少次
                .build();
    }

    @Bean
    public ItemProcessor<String,String> myProcessor(){
        return new ItemProcessor<String, String>() {
            private int count = 0;
            @Override
            public String process(String s) throws Exception {
                System.out.println("processor:"+s);
                if (s.equalsIgnoreCase("26")){
                    count++;
                    if (count<=3){
                        throw new RuntimeException("发生异常");
                    }

                }
                return s;
            }
        };
    }
    
    @Bean
    @StepScope
    public ListItemReader<String> myReader(){
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            list.add(""+i);
        }
        return new ListItemReader<>(list);
    }
}

错误跳过

@Configuration
public class SkipDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job skipDemoJob(){
        return jobBuilderFactory
                .get("skipDemoJob")
                .start(skipDemoStep())
                .build();
    }
    @Bean
    public Step skipDemoStep(){
        return stepBuilderFactory
                .get("skipDemoStep")
                .<String,String>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(System.out::println)
                .faultTolerant()
                .skip(RuntimeException.class)//当发生RuntimeException时跳过
                .skipLimit(5)//跳过5次
                .build();
    }

    @Bean
    public ItemProcessor<String,String> processor(){
        return new ItemProcessor<String, String>() {
            private int count = 0;
            @Override
            public String process(String s) throws Exception {
                System.out.println("processor:"+s);
                if (s.equalsIgnoreCase("26")){
                    count++;
                    if (count<=3){
                        throw new RuntimeException("发生异常");
                    }

                }
                return s;
            }
        };
    }

    @Bean
    @StepScope
    public ListItemReader<String> reader(){
        List<String> list = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            list.add(""+i);
        }
        return new ListItemReader<>(list);
    }

跳过监听器

    @Bean
    public Step skipDemoStep(){
        return stepBuilderFactory
                .get("skipDemoStep")
                .<String,String>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(System.out::println)
                .faultTolerant()
                .skip(RuntimeException.class)//当发生RuntimeException时跳过
                .skipLimit(5)//跳过5次
                .listener(skipListener())
                .build();
    }
    @Bean
    public SkipListener<String,String> skipListener(){
        return new SkipListener<String, String>() {
            //读的时候发生错误
            @Override
            public void onSkipInRead(Throwable throwable) {

            }
            //写的时候发生错误
            @Override
            public void onSkipInWrite(String s, Throwable throwable) {

            }
            //处理时发生错误
            @Override
            public void onSkipInProcess(String s, Throwable throwable) {
                System.out.println("数据项:"+s);
                System.out.println("发生异常:"+throwable.getMessage());
            }
        };
    }