springbatch-ItemWriter使用

springbatch-ItemWriter使用

起男 330 2023-05-29

springbatch-ItemWriter使用

ItemReader是一个数据一个数据的读,而ItemWriter是一批一批的输出

把数据输出到数据库

@Configuration
public class ItemWriterDbDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public Job itemWriterDbDemoJob(){
        return jobBuilderFactory
                .get("itemWriterDbDemoJob")
                .start(itemWriterDbDemoStep())
                .build();
    }

    @Bean
    public Step itemWriterDbDemoStep(){
        return stepBuilderFactory
                .get("itemWriterDbDemoStep")
                .<User,User>chunk(2)
                .reader(myReader())
                .writer(dbWriter())
                .build();
    }

    @Bean
    public JdbcBatchItemWriter<User> dbWriter(){
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);//设置数据源
        //设置sql
        writer.setSql("insert into t_user(id,name,pass) values(:id,:name,:pass)");
        //替换占位符
        writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        return writer;
    }

    @Bean
    public ItemReader<User> myReader(){
        List<User> list = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            User user = new User()
                    .setId(i)
                    .setName("name:" + i)
                    .setPass("pass:" + i);
            list.add(user);
        }
        return new ListItemReader<>(list);
    }
}

把数据输出到文件

@Configuration
public class FileItemWriterDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DataSource dataSource;

    @Bean
    public Job fileItemWriterDemoJob(){
        return jobBuilderFactory
                .get("fileItemWriterDemoJob")
                .start(fileItemWriterDemoStep())
                .build();
    }
    @Bean
    public Step fileItemWriterDemoStep(){
        return stepBuilderFactory
                .get("fileItemWriterDemoStep")
                .<User,User>chunk(10)
                .reader(dbJdbcReader())
                .writer(fileWriter())
                .build();
    }
    @SneakyThrows
    @Bean
    public ItemWriter<User> fileWriter(){
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
        //指定输出位置
        writer.setResource(new FileSystemResource("D:\\test\\users.txt"));
        //把对象转成字符串
        writer.setLineAggregator(new LineAggregator<User>() {
            @SneakyThrows
            @Override
            public String aggregate(User user) {
                ObjectMapper mapper = new ObjectMapper();
                return mapper.writeValueAsString(user);
            }
        });
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public ItemReader<User> dbJdbcReader(){
        JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(dataSource);//设置数据源
        reader.setFetchSize(2);//一次去多少记录
        //把读取到的记录转换成user对象
        reader.setRowMapper(new RowMapper<User>() {
            @Override
            public User mapRow(ResultSet rs, int rowNum) throws SQLException {
                User user = new User();
                user.setId(rs.getInt(1));
                user.setName(rs.getString(2));
                user.setPass(rs.getString(3));
                return user;
            }
        });
        //指定sql语句
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("id,name,pass");//查哪些字段
        provider.setFromClause("from t_user");//从哪个表查
        //排序
        Map<String, Order> sort = new HashMap<>(1);
        sort.put("id",Order.DESCENDING);//排序字段,升序还是降序
        provider.setSortKeys(sort);

        reader.setQueryProvider(provider);
        return reader;
    }
}

把数据输出到xml文件

@Configuration
public class XmlItemWriterDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job xmlItemWriterDemoJob(){
        return jobBuilderFactory
                .get("xmlItemWriterDemoJob")
                .start(xmlItemWriterDemoStep())
                .build();
    }

    @Bean
    public Step xmlItemWriterDemoStep(){
        return stepBuilderFactory
                .get("xmlItemWriterDemoStep")
                .<User,User>chunk(2)
                .reader(myReader())
                .writer(xmlItemWriter())
                .build();
    }

    @SneakyThrows
    @Bean
    public ItemWriter<User> xmlItemWriter(){
        StaxEventItemWriter<User> writer = new StaxEventItemWriter<>();

        XStreamMarshaller marshaller = new XStreamMarshaller();
        Map<String,Class> aliases = new HashMap<>();
        aliases.put("user",User.class);
        marshaller.setAliases(aliases);

        writer.setMarshaller(marshaller);
        writer.setRootTagName("users");//设置根标签
        //文件地址
        String path = "D:\\test\\users.xml";
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public ItemReader<User> myReader(){
        List<User> userList = new ArrayList<>();
        userList.add(new User().setId(1).setName("zhangsan").setPass("123"));
        userList.add(new User().setId(2).setName("lisi").setPass("123"));
        userList.add(new User().setId(3).setName("wangwu").setPass("123"));
        ListItemReader<User> reader = new ListItemReader<>(userList);
        return reader;
    }
}

把数据同时输出到多个文件

@Configuration
public class MultiFileItemWriterDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job multiFileDemoJob(){
        return jobBuilderFactory
                .get("multiFileDemoJob")
                .start(multiFileDemoStep())
                .build();
    }

    @Bean
    public Step multiFileDemoStep(){
        return stepBuilderFactory
                .get("multiFileDemoStep")
                .<User,User>chunk(2)
                .reader(myReader())
                .writer(multiFileItemWriter())
                .build();
    }

    @SneakyThrows
    @Bean
    public CompositeItemWriter<User> multiFileItemWriter(){
        CompositeItemWriter<User> writer = new CompositeItemWriter();
        //设置由谁进行写
        writer.setDelegates(Arrays.asList(xmlFileItemWriter(),jsonFileItemWriter()));
        writer.afterPropertiesSet();
        return writer;
    }

    @SneakyThrows
    @Bean
    public ItemWriter<User> xmlFileItemWriter(){
        StaxEventItemWriter<User> writer = new StaxEventItemWriter<>();

        XStreamMarshaller marshaller = new XStreamMarshaller();
        Map<String,Class> aliases = new HashMap<>();
        aliases.put("user",User.class);
        marshaller.setAliases(aliases);

        writer.setRootTagName("users");
        writer.setMarshaller(marshaller);

        String path = "D:\\test\\u2.xml";
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;
    }

    @SneakyThrows
    @Bean
    public ItemWriter<User> jsonFileItemWriter(){
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
        String path = "D:\\test\\u1.txt";
        writer.setResource(new FileSystemResource(path));
        //把对象转换为字符串
        writer.setLineAggregator(new LineAggregator<User>() {
            @SneakyThrows
            @Override
            public String aggregate(User user) {
                return new ObjectMapper()
                        .writeValueAsString(user);
            }
        });
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public ItemReader<User> myReader(){
        List<User> userList = new ArrayList<>();
        userList.add(new User().setId(1).setName("zhangsan").setPass("123"));
        userList.add(new User().setId(2).setName("lisi").setPass("123"));
        userList.add(new User().setId(3).setName("wangwu").setPass("123"));
        ListItemReader<User> reader = new ListItemReader<>(userList);
        return reader;
    }
}

把数据同时输出到多个文件并进行分类

@Configuration
public class MultiFileItemWriterDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job multiFileDemoJob(){
        return jobBuilderFactory
                .get("multiFileDemoJob")
                .start(multiFileDemoStep())
                .build();
    }

    @Bean
    public Step multiFileDemoStep(){
        return stepBuilderFactory
                .get("multiFileDemoStep")
                .<User,User>chunk(2)
                .reader(myReader())
                .writer(multiFileItemWriter())//它没有实现ItemStream
                .stream(jsonFileItemWriter())
                .stream(xmlFileItemWriter())
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<User> multiFileItemWriter(){
        ClassifierCompositeItemWriter<User> writer = new ClassifierCompositeItemWriter<>();
        //进行分类
        writer.setClassifier(new Classifier<User, ItemWriter<? super User>>() {
            @Override
            public ItemWriter<? super User> classify(User user) {
                //安装id分类
                return user.getId()%2==0
                        ?jsonFileItemWriter()
                        :xmlFileItemWriter();
            }
        });
        return writer;
    }

    @SneakyThrows
    @Bean
    public StaxEventItemWriter<User> xmlFileItemWriter(){
        StaxEventItemWriter<User> writer = new StaxEventItemWriter<>();

        XStreamMarshaller marshaller = new XStreamMarshaller();
        Map<String,Class> aliases = new HashMap<>();
        aliases.put("user",User.class);
        marshaller.setAliases(aliases);

        writer.setRootTagName("users");
        writer.setMarshaller(marshaller);

        String path = "D:\\test\\u2.xml";
        writer.setResource(new FileSystemResource(path));
        writer.afterPropertiesSet();
        return writer;
    }

    @SneakyThrows
    @Bean
    public FlatFileItemWriter<User> jsonFileItemWriter(){
        FlatFileItemWriter<User> writer = new FlatFileItemWriter<>();
        String path = "D:\\test\\u1.txt";
        writer.setResource(new FileSystemResource(path));
        //把对象转换为字符串
        writer.setLineAggregator(new LineAggregator<User>() {
            @SneakyThrows
            @Override
            public String aggregate(User user) {
                return new ObjectMapper()
                        .writeValueAsString(user);
            }
        });
        writer.afterPropertiesSet();
        return writer;
    }

    @Bean
    public ItemReader<User> myReader(){
        List<User> userList = new ArrayList<>();
        userList.add(new User().setId(1).setName("zhangsan").setPass("123"));
        userList.add(new User().setId(2).setName("lisi").setPass("123"));
        userList.add(new User().setId(3).setName("wangwu").setPass("123"));
        ListItemReader<User> reader = new ListItemReader<>(userList);
        return reader;
    }
}