Spring Batch 批处理框架

这篇具有很好参考价值的文章主要介绍了Spring Batch 批处理框架。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、SpringBatch 介绍

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统的日常操作至关重要的健壮的批处理应用程序。Spring Batch 建立在人们期望的 Spring Framework 特性(生产力、基于 POJO 的开发方法和一般易用性)的基础上,同时使开发人员可以在必要时轻松访问和使用更高级的企业服务。

Spring Batch 不是一个调度框架。在商业和开源领域都有许多优秀的企业调度程序(例如 Quartz、Tivoli、Control-M 等)。Spring Batch 旨在与调度程序结合使用,而不是替代调度程序。
Spring Batch 批处理框架,spring,batch,java

二、业务场景

我们在业务开发中经常遇到这种情况:
Spring Batch 批处理框架,spring,batch,java

Spring Batch 支持以下业务场景:

  • 定期提交批处理。
  • 并发批处理:并行处理作业。
  • 分阶段的企业消息驱动处理。
  • 大规模并行批处理。
  • 失败后手动或计划重启。
  • 相关步骤的顺序处理(扩展到工作流驱动的批次)。
  • 部分处理:跳过记录(例如,在回滚时)。
  • 整批交易,适用于批量较小或已有存储过程或脚本的情况。

三、基础知识

3.1、整体架构

官方文档:https://docs.spring.io/spring-batch/docs/current/reference/html/index-single.html#domainLanguageOfBatch
Spring Batch 批处理框架,spring,batch,java

名称 作用
JobRepository 为所有的原型(Job、JobInstance、Step)提供持久化的机制
JobLauncher JobLauncher表示一个简单的接口,用于启动一个Job给定的集合 JobParameters
Job Job是封装了整个批处理过程的实体
Step Step是一个域对象,它封装了批处理作业的一个独立的顺序阶段

3.2、核心接口

  • ItemReader: is an abstraction that represents the output of a Step,
    one batch or chunk of items at a time
  • ItemProcessor:an abstraction that represents the business processing
    of an item.
  • ItemWriter: is an abstraction that represents the output of a Step,
    one batch or chunk of items at a time.
    Spring Batch 批处理框架,spring,batch,java
    大体即为 输入→数据加工→输出 ,一个Job定义多个Step及处理流程,一个Step通常涵盖ItemReader、ItemProcessor、ItemWriter

四、基础实操

4.0、引入 SpringBatch

pom 文件引入 springboot

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.2.5.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>

pom 文件引入 spring-batch 及相关依赖

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
  </dependencies>

mysql 创建依赖的库表
Spring Batch 批处理框架,spring,batch,java
sql 脚本的 jar 包路径:…\maven\repository\org\springframework\batch\spring-batch-core\4.2.1.RELEASE\spring-batch-core-4.2.1.RELEASE.jar!\org\springframework\batch\core\schema-mysql.sql

启动类标志@EnableBatchProcessing

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchStartApplication
{
    public static void main(String[] args) {
        SpringApplication.run(SpringBatchStartApplication.class, args);
    }
}

FirstJobDemo

@Component
public class FirstJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job firstJob() {
        return jobBuilderFactory.get("firstJob")
                .start(step())
                .build();
    }

    private Step step() {
        return stepBuilderFactory.get("step")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("执行步骤....");
                    return RepeatStatus.FINISHED;
                }).build();
    }
}

4.1、流程控制

A、多步骤任务

@Bean
public Job multiStepJob() {
    return jobBuilderFactory.get("multiStepJob2")
            .start(step1())
            .on(ExitStatus.COMPLETED.getExitCode()).to(step2())
            .from(step2())
            .on(ExitStatus.COMPLETED.getExitCode()).to(step3())
            .from(step3()).end()
            .build();
}


private Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet((stepContribution, chunkContext) -> {
                System.out.println("执行步骤一操作。。。");
                return RepeatStatus.FINISHED;
            }).build();
}

private Step step2() {
    return stepBuilderFactory.get("step2")
            .tasklet((stepContribution, chunkContext) -> {
                System.out.println("执行步骤二操作。。。");
                return RepeatStatus.FINISHED;
            }).build();
}

private Step step3() {
    return stepBuilderFactory.get("step3")
            .tasklet((stepContribution, chunkContext) -> {
                System.out.println("执行步骤三操作。。。");
                return RepeatStatus.FINISHED;
            }).build();
}

B、并行执行
创建了两个 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通过JobBuilderFactory的split方法,指定一个异步执行器,将 flow1 和 flow2 异步执行(也就是并行)

@Component
public class SplitJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job splitJob() {
        return jobBuilderFactory.get("splitJob")
                .start(flow1())
                .split(new SimpleAsyncTaskExecutor()).add(flow2())
                .end()
                .build();

    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤一操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤二操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Step step3() {
        return stepBuilderFactory.get("step3")
                .tasklet((stepContribution, chunkContext) -> {
                    System.out.println("执行步骤三操作。。。");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private Flow flow1() {
        return new FlowBuilder<Flow>("flow1")
                .start(step1())
                .next(step2())
                .build();
    }

    private Flow flow2() {
        return new FlowBuilder<Flow>("flow2")
                .start(step3())
                .build();
    }
}

C、任务决策
决策器的作用就是可以指定程序在不同的情况下运行不同的任务流程,比如今天是周末,则让任务执行 step1 和 step2,如果是工作日,则之心 step1 和 step3。

@Component
public class MyDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        LocalDate now = LocalDate.now();
        DayOfWeek dayOfWeek = now.getDayOfWeek();

        if (dayOfWeek == DayOfWeek.SATURDAY || dayOfWeek == DayOfWeek.SUNDAY) {
            return new FlowExecutionStatus("weekend");
        } else {
            return new FlowExecutionStatus("workingDay");
        }
    }
}
@Bean
public Job deciderJob() {
 return jobBuilderFactory.get("deciderJob")
   .start(step1())
   .next(myDecider)
   .from(myDecider).on("weekend").to(step2())
   .from(myDecider).on("workingDay").to(step3())
   .from(step3()).on("*").to(step4())
   .end()
   .build();
}
private Step step1() {
 return stepBuilderFactory.get("step1")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤一操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}

private Step step2() {
 return stepBuilderFactory.get("step2")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤二操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}

private Step step3() {
 return stepBuilderFactory.get("step3")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤三操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}


private Step step4() {
 return stepBuilderFactory.get("step4")
   .tasklet((stepContribution, chunkContext) -> {
    System.out.println("执行步骤四操作。。。");
    return RepeatStatus.FINISHED;
   }).build();
}

D、任务嵌套
任务 Job 除了可以由 Step 或者 Flow 构成外,我们还可以将多个任务 Job 转换为特殊的 Step,然后再赋给另一个任务 Job,这就是任务的嵌套。

@Component
public class NestedJobDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private PlatformTransactionManager platformTransactionManager;

    // 父任务
    @Bean
    public Job parentJob() {
        return jobBuilderFactory.get("parentJob")
                .start(childJobOneStep())
                .next(childJobTwoStep())
                .build();
    }


    // 将任务转换为特殊的步骤
    private Step childJobOneStep() {
        return new JobStepBuilder(new StepBuilder("childJobOneStep"))
                .job(childJobOne())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }

    // 将任务转换为特殊的步骤
    private Step childJobTwoStep() {
        return new JobStepBuilder(new StepBuilder("childJobTwoStep"))
                .job(childJobTwo())
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(platformTransactionManager)
                .build();
    }

    // 子任务一
    private Job childJobOne() {
        return jobBuilderFactory.get("childJobOne")
                .start(
                    stepBuilderFactory.get("childJobOneStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务一执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }

    // 子任务二
    private Job childJobTwo() {
        return jobBuilderFactory.get("childJobTwo")
                .start(
                    stepBuilderFactory.get("childJobTwoStep")
                            .tasklet((stepContribution, chunkContext) -> {
                                System.out.println("子任务二执行步骤。。。");
                                return RepeatStatus.FINISHED;
                            }).build()
                ).build();
    }
}

4.2、读取数据

定义 Model TestData,下面同一

@Data
public class TestData {
    private int id;
    private String field1;
    private String field2;
    private String field3;
}

读取数据包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等,具体自己查资料。

文本数据读取 Demo

@Component
public class FileItemReaderDemo {

    // 任务创建工厂
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    // 步骤创建工厂
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job fileItemReaderJob() {
        return jobBuilderFactory.get("fileItemReaderJob2")
                .start(step())
                .build();
    }

    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(fileItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }

    private ItemReader<TestData> fileItemReader() {
        FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("reader/file")); // 设置文件资源地址
        reader.setLinesToSkip(1); // 忽略第一行

        // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
        // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

        // 设置属性名,类似于表头
        tokenizer.setNames("id", "field1", "field2", "field3");

        // 将每行数据转换为TestData对象
        DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();
        // 设置LineTokenizer
        mapper.setLineTokenizer(tokenizer);

        // 设置映射方式,即读取到的文本怎么转换为对应的POJO
        mapper.setFieldSetMapper(fieldSet -> {
            TestData data = new TestData();
            data.setId(fieldSet.readInt("id"));
            data.setField1(fieldSet.readString("field1"));
            data.setField2(fieldSet.readString("field2"));
            data.setField3(fieldSet.readString("field3"));
            return data;
        });
        reader.setLineMapper(mapper);
        return reader;
    }

}

4.3、输出数据

输出数据也包含:文本数据读取、数据库数据读取、XML 数据读取、JSON 数据读取等

@Component
public class FileItemWriterDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Resource(name = "writerSimpleReader")
    private ListItemReader<TestData> writerSimpleReader;

    @Bean
    public Job fileItemWriterJob() throws Exception {
        return jobBuilderFactory.get("fileItemWriterJob")
                .start(step())
                .build();
    }

    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(writerSimpleReader)
                .writer(fileItemWriter())
                .build();
    }

    private FlatFileItemWriter<TestData> fileItemWriter() throws Exception {
        FlatFileItemWriter<TestData> writer = new FlatFileItemWriter<>();

        FileSystemResource file = new FileSystemResource("D:/code/spring-batch-demo/src/main/resources/writer/writer-file");
        Path path = Paths.get(file.getPath());
        if (!Files.exists(path)) {
            Files.createFile(path);
        }
        // 设置输出文件路径
        writer.setResource(file);

        // 把读到的每个TestData对象转换为JSON字符串
        LineAggregator<TestData> aggregator = item -> {
            try {
                ObjectMapper mapper = new ObjectMapper();
                return mapper.writeValueAsString(item);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            return "";
        };

        writer.setLineAggregator(aggregator);
        writer.afterPropertiesSet();
        return writer;
    }

}

4.5、处理数据

@Component
public class ValidatingItemProcessorDemo {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Resource(name = "processorSimpleReader")
    private ListItemReader<TestData> processorSimpleReader;

    @Bean
    public Job validatingItemProcessorJob() throws Exception {
        return jobBuilderFactory.get("validatingItemProcessorJob3")
                .start(step())
                .build();
    }

    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(processorSimpleReader)
                .processor(beanValidatingItemProcessor())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }

//    private ValidatingItemProcessor<TestData> validatingItemProcessor() {
//        ValidatingItemProcessor<TestData> processor = new ValidatingItemProcessor<>();
//        processor.setValidator(value -> {
//            // 对每一条数据进行校验
//            if ("".equals(value.getField3())) {
//                // 如果field3的值为空串,则抛异常
//                throw new ValidationException("field3的值不合法");
//            }
//        });
//        return processor;
//    }

    private BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor() throws Exception {
        BeanValidatingItemProcessor<TestData> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
        // 开启过滤,不符合规则的数据被过滤掉;
//        beanValidatingItemProcessor.setFilter(true);
        beanValidatingItemProcessor.afterPropertiesSet();
        return beanValidatingItemProcessor;
    }

}

4.6、任务调度

可以配合 quartz 或者 xxljob 实现定时任务执行文章来源地址https://www.toymoban.com/news/detail-531660.html

@RestController
@RequestMapping("job")
public class JobController {

    @Autowired
    private Job job;
    @Autowired
    private JobLauncher jobLauncher;

    @GetMapping("launcher/{message}")
    public String launcher(@PathVariable String message) throws Exception {
        JobParameters parameters = new JobParametersBuilder()
                .addString("message", message)
                .toJobParameters();
        // 将参数传递给任务
        jobLauncher.run(job, parameters);
        return "success";
    }
}

到了这里,关于Spring Batch 批处理框架的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 9 | Tensorflow中的batch批处理

    TensorFlow支持批处理(batch processing)。批处理是指同时处理多个样本或数据点而不是单个样本。在深度学习中,批处理通常用于提高训练的效率和稳定性。 在TensorFlow中,可以使用 tf.data.Dataset API来设置和处理批处理数据。这允许以批处理的方式加载和处理数据,适用于训练神

    2024年01月19日
    浏览(33)
  • Spring Batch -配置步骤 (XML/Java)

    面向块的处理 Spring Batch 在最常见的情况下使用“面向块”的处理方式 实现。面向块的处理是指一次读取一个数据, 创建在事务边界内写出的“块”。一次的数量 读取的项目等于提交间隔,整个块由 写出,然后提交事务。 以下伪代码以简化的形式显示了​​ItemWriter​​相

    2024年02月09日
    浏览(37)
  • Spring Batch 作业对象-作业参数设置与获取

    目录 引言 JobParameters 作业参数设置 作业参数获取 方案1:使用ChunkContext类   方案2:使用@Value 延时获取 转视频版 书接上篇Spring Batch批处理-作业Job简介,上篇带小伙伴们了解色作业Job对象,那这篇就看一下作业参数是啥一回事,同时要怎么设置参数并获取参数的。 前面提到

    2023年04月15日
    浏览(43)
  • Spring Batch之读数据—读多文件(三十三)

            前面的所有文件的读取基本上是对单文件执行的,在实际应用中,我们经常操作批量的文件。         Spring Batch框架提供了现有的组件MultiResourceItemReader支持对多文件的读取,通过MultiResourceItemReader读取批量文件非常简单。MultiResourceItemReader通过代理的ItemReade

    2024年02月16日
    浏览(35)
  • Spring Batch之读数据库—JdbcPagingItemReader(四十一)

            Spring Batch框架提供了对JDBC分页读取支持的组件JdbcPagingItemReader。JdbcPaginItemReader实现ItemReader接口,核心作用是将数据库中记录通过分页的方式转换为Java对象。在JdbcPagingItemReader将数据库记录转换为Java对象是主要有两步工作:首先根据SimpleJdbcTemplate与PagingQueryProvid

    2024年02月16日
    浏览(33)
  • Spring Kafka消费模式(single, batch)及确认模式(自动、手动)示例

    Spring Kafka消费消息的模式分为2种模式(对应spring.kafka.listener.type配置): single - 每次消费单条记录 batch - 批量消费消息列表 且每种模式都分为2种提交已消费消息offset的ack模式: 自动确认 手动确认 接下来依次讲解这两种消费模式及其对应的ack模式的示例配置及代码。 本章节

    2023年04月08日
    浏览(37)
  • 初探Flink的Java实现流处理和批处理

    端午假期,夏日炎炎,温度连续40度以上,在家学习Flink相关知识,记录下来,方便备查。 开发工具 :IntelliJ Idea Flink版本 :1.13.0 本次主要用Flink实现 批处理 (DataSet API) 和 流处理 (DataStream API)简单实现。 第一步、创建项目与添加依赖 1)新建项目 打开Idea,新建Maven项目

    2024年02月10日
    浏览(49)
  • sql文件批处理程序-java桌面应用

        支持sql文件夹批处理,选中文件夹或者sql文件 支持测试连接,可以校验数据库配置 支持报错回显,弹出报错文件名以及问题语句 支持在程序中修改错误语句,用户可以选择保存修改内容继续执行或不保存修改只执行 支持动态显示执行进度 支持自动识别文件编码进行解

    2024年02月06日
    浏览(46)
  • ‘java‘ 不是内部或外部命令,也不是可运行的程序 或批处理文件

    1.‘java’ 不是内部或外部命令,也不是可运行的程序 或批处理文件。 这种情况一般来说是没有配置环境变量或者是没有配置好 (1)找到安装java的位置 (2)进入控制面板==》系统与安全==》系统==》高级设置–》环境变量 2.开始配置 (1)系统变量中新建 变量名:JAVA_HOME 变

    2024年02月07日
    浏览(41)
  • 批处理命令大全 | Windows批处理教程 - ChatGPT

    批处理以.bat或.cmd文件的形式存在,在Windows命令提示符下运行,也可以通过双击批处理文件来运行。批处理文件由一系列命令组成,可以按照顺序执行,也可以根据条件或循环控制选择性地执行。 在Windows上创建一个批处理文件非常简单,在编辑器中输入一系列命令并保存为

    2024年02月04日
    浏览(80)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包