对于最新稳定版本,请使用 Spring Batch 文档 6.0.3spring-doc.cadn.net.cn

扩展与并行处理

许多批处理问题可以通过单线程、单进程的作业来解决, 因此在考虑更复杂的实现之前,务必先确认这种简单方案是否满足您的需求。 请先测量一个实际作业的性能,看看最简单的实现是否足以应对。 即使使用标准硬件,读取和写入几百兆字节的文件也完全可以在一分钟内完成。spring-doc.cadn.net.cn

当您准备开始实施涉及并行处理的任务时,Spring Batch 提供了一系列选项,本章将对此进行描述,尽管部分功能在其他章节中已有涵盖。从高层角度来看,并行处理主要有两种模式:spring-doc.cadn.net.cn

这些也可以分为以下几类:spring-doc.cadn.net.cn

首先,我们回顾单进程选项。然后,我们回顾多进程选项。spring-doc.cadn.net.cn

多线程步骤

启动并行处理的最简单方法是在您的 Step 配置中添加一个 TaskExecutorspring-doc.cadn.net.cn

使用 Java 配置时,您可以向步骤中添加一个 TaskExecutor,如下例所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,您可以向 tasklet 添加一个属性,如下所示:spring-doc.cadn.net.cn

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor 是对另一个实现了 TaskExecutor 接口的 Bean 定义的引用。 TaskExecutor 是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用实现的详细信息。最简单的多线程 TaskExecutorSimpleAsyncTaskExecutorspring-doc.cadn.net.cn

上述配置的结果是,Step 通过在独立的执行线程中读取、处理和写入每一块数据项(每个提交间隔)来执行。请注意,这意味着数据项的处理没有固定的顺序,并且与单线程情况相比,一个数据块可能包含非连续的数据项。除了任务执行器施加的任何限制(例如是否由线程池支持)之外,tasklet 配置还有一个节流限制(默认值:4)。您可能需要增加此限制,以确保线程池得到充分利用。spring-doc.cadn.net.cn

使用 Java 配置时,构建器可提供对限流阈值的访问,如下所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

例如,您可以按以下方式增加节流限制:spring-doc.cadn.net.cn

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

另请注意,您的步骤中使用的任何池化资源(例如 DataSource)都可能对并发数施加限制。请确保这些资源中的池大小至少等于步骤中所需的并发线程数。spring-doc.cadn.net.cn

限流阈值已弃用

自 v5.0 起,节流限制已被弃用且无替代方案。如果您希望替换默认 TaskExecutorRepeatTemplate 中的当前节流机制,则需要提供一个自定义的 RepeatOperations 实现(基于带有有界任务队列的 TaskExecutor),并通过 StepBuilder#stepOperations 将其设置到步骤中:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

在某些常见的批处理使用场景中,使用多线程 Step 实现存在一些实际限制。Step 中的许多参与者(如读取器和写入器)都是有状态的。如果状态未按线程隔离,这些组件将无法在多线程 Step 中使用。特别是,Spring Batch 中的大多数读取器和写入器并非为多线程使用而设计。然而,可以使用无状态或线程安全的读取器和写入器,并且在 Spring Batch 示例 中提供了一个名为 parallelJob 的示例,该示例展示了如何使用处理指示器(参见 防止状态持久化)来跟踪已在数据库输入表中处理过的项目。spring-doc.cadn.net.cn

Spring Batch 提供了一些ItemWriterItemReader的实现。通常, 它们会在 Javadoc 中说明是否线程安全,或者在并发环境中需要采取哪些措施以避免 问题。如果 Javadoc 中没有相关信息,您可以检查实现代码以查看是否存在任何状态。如果某个读取器不是线程安全的, 您可以使用提供的SynchronizedItemStreamReader对其进行装饰,或者在您自己的同步委托器中使用它。您可以同步对read()的调用,并且,只要 处理和写入是块中最耗时的部分,您的步骤仍然可能比单线程配置完成得快得多。spring-doc.cadn.net.cn

并行步骤

只要需要并行化的应用程序逻辑可以拆分为不同的职责并分配给独立的步骤,就可以在单个进程中进行并行化。并行步骤的执行易于配置和使用。spring-doc.cadn.net.cn

使用 Java 配置时,并行执行步骤 (step1,step2)step3 非常简单,如下所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,并行执行步骤 (step1,step2)step3 非常直接, 如下所示:spring-doc.cadn.net.cn

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的任务执行器用于指定应由哪个 TaskExecutor 实现来执行各个流程。默认值为 SyncTaskExecutor,但若需并行运行步骤,则需要一个异步的 TaskExecutor。请注意,作业会确保拆分中的每个流程在完成后再聚合退出状态并进行转换。spring-doc.cadn.net.cn

有关更多详细信息,请参阅 拆分流程 部分。spring-doc.cadn.net.cn

远程分块

在远程分片中,Step 处理被拆分到多个进程中, 并通过某种中间件相互通信。下图展示了该 模式:spring-doc.cadn.net.cn

Remote Chunking
图 1. 远程分块

管理器组件是一个单一进程,而工作器是多个远程进程。 如果管理器不是瓶颈,这种模式效果最佳,因此处理操作的开销必须大于读取项的开销(实际情况通常如此)。spring-doc.cadn.net.cn

管理器是 Spring Batch Step 的一个实现,其中ItemWriter被替换为一个通用版本,该版本知道如何将数据块作为消息发送到中间件。工作者是所用任意中间件的标准监听器(例如,对于 JMS,它们将是MesssageListener实现),其角色是通过ChunkProcessor接口,使用标准的ItemWriterItemProcessor加上一个ItemWriter来处理数据块。使用此模式的优势之一是读取器、处理器和写入器组件均为现成可用(与本地执行步骤时所使用的组件相同)。数据项会被动态划分,工作通过中间件进行共享,因此如果所有监听器都是积极的消费者,则负载均衡将自动实现。spring-doc.cadn.net.cn

中间件必须具备持久性,保证消息交付,并且每条消息只能有一个消费者。JMS 是显而易见的选择,但在网格计算和共享内存产品领域也存在其他选项(例如 JavaSpaces)。spring-doc.cadn.net.cn

请参阅 Spring Batch 集成 - 远程分块 部分以获取更多详情。spring-doc.cadn.net.cn

分区

Spring Batch 还提供了用于分区 Step 执行并远程执行它的 SPI。在这种情况下,远程参与者是 Step 实例,它们同样可以被配置并用于本地处理。下图展示了该模式:spring-doc.cadn.net.cn

Partitioning Overview
图 2. 分区

Job 在左侧作为一系列 Step 实例运行,其中一个 Step 实例被标记为管理器。图中的所有工作器都是 Step 的相同实例,它们实际上可以取代管理器,从而为 Job 产生相同的结果。工作器通常是远程服务,但也可能是本地执行线程。在此模式中,管理器发送给工作器的消息不需要具备持久性或保证交付。JobRepository 中的 Spring Batch 元数据确保每个工作器针对每次 Job 执行仅执行一次且只执行一次。spring-doc.cadn.net.cn

Spring Batch 中的 SPI 由 Step 的一个特殊实现(称为PartitionStep)以及两个需要针对特定环境实现的策略接口组成。这两个策略接口是 PartitionHandlerStepExecutionSplitter,下面的序列图展示了它们的作用:spring-doc.cadn.net.cn

Partitioning SPI
图 3. 分区 SPI

在这种情况下,右侧的 Step 是“远程”工作节点,因此可能有多个对象和/或进程扮演此角色,而 PartitionStep 则负责驱动执行。spring-doc.cadn.net.cn

以下示例展示了在使用 Java 配置时的 PartitionStep 配置:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程步骤的 throttleLimit 方法类似,gridSize 方法可防止任务执行器被单个步骤的请求占满。spring-doc.cadn.net.cn

以下示例展示了在使用 XML 配置时的 PartitionStep 配置:spring-doc.cadn.net.cn

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程步骤的 throttle-limit 属性类似,grid-size 属性可防止任务执行器被单个步骤的请求占满。spring-doc.cadn.net.cn

用于 Spring Batch 示例 的单元测试套件(参见 partition*Job.xml 配置)提供了一个简单的示例,您可以复制并扩展它。spring-doc.cadn.net.cn

Spring Batch 为名为 step1:partition0 的分区创建步骤执行,依此类推。许多人为了保持一致性,更倾向于将管理器步骤称为 step1:manager。您可以为步骤使用别名(通过指定 name 属性而不是 id 属性)。spring-doc.cadn.net.cn

分区处理器

PartitionHandler 是了解远程或网格环境结构的组件。它能够向远程 Step 实例发送 StepExecution 请求,这些请求被封装在特定于结构的格式中(例如 DTO)。它无需知道如何拆分输入数据,也无需知道如何聚合多个 Step 执行的结果。一般而言,它可能也无需了解弹性或故障转移功能,因为在许多情况下,这些都是结构本身提供的特性。无论如何,Spring Batch 始终提供独立于结构的重新启动能力。失败的 Job 总是可以重新启动,在这种情况下,仅会重新执行失败的 Stepsspring-doc.cadn.net.cn

PartitionHandler 接口可以针对多种架构类型提供专用实现,包括简单的 RMI 远程调用、EJB 远程调用、自定义 Web 服务、JMS、Java Spaces、共享内存网格(如 Terracotta 或 Coherence)以及网格执行架构(如 GridGain)。Spring Batch 不包含任何专有网格或远程调用架构的实现。spring-doc.cadn.net.cn

然而,Spring Batch 提供了一个实用的 PartitionHandler 实现,该实现使用 Spring 的 TaskExecutor 策略,在独立的执行线程中本地执行 Step 实例。该实现称为 TaskExecutorPartitionHandlerspring-doc.cadn.net.cn

您可以使用 Java 配置显式配置 TaskExecutorPartitionHandler,如下所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

TaskExecutorPartitionHandler 是使用前述 XML 命名空间配置的步骤的默认值。您也可以按如下方式显式配置它:spring-doc.cadn.net.cn

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 属性用于确定要创建的独立步骤执行的数量,因此它可以与 TaskExecutor 中的线程池大小相匹配。或者,也可以将其设置为大于可用线程数,从而使工作块更小。spring-doc.cadn.net.cn

TaskExecutorPartitionHandler 对于 IO 密集型的 Step 实例非常有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可用于远程执行,只需提供一个作为远程调用代理的 Step 实现(例如使用 Spring Remoting)。spring-doc.cadn.net.cn

分区器

Partitioner 的职责更为简单:仅作为新步骤执行的输入参数来生成执行上下文(无需担心重启)。它只有一个方法,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值为每个步骤执行关联一个唯一名称(即String),并将其与采用ExecutionContext形式的输入参数绑定。这些名称随后会作为分区StepExecutions中的步骤名称出现在批处理元数据中。ExecutionContext仅仅是一个名值对的集合,因此它可能包含一系列主键、行号或输入文件的位置。远程Step通常通过使用#{…​}占位符(步骤作用域中的延迟绑定)绑定到上下文输入,如下节所示。spring-doc.cadn.net.cn

步骤执行的名称(由Partitioner返回的Map中的键)在同一个Job的步骤执行中必须是唯一的,但没有其他特定要求。实现这一点的最简单方法(并使名称对用户具有意义)是使用“前缀 + 后缀”的命名约定,其中前缀是正在执行的步骤的名称(该名称在Job中本身是唯一的),后缀只是一个计数器。框架中提供了一个SimplePartitioner,它使用了此约定。spring-doc.cadn.net.cn

您可以使用一个名为 PartitionNameProvider 的可选接口,将分区名称与分区本身分开提供。如果 Partitioner 实现了此接口,则在重启时仅查询名称。如果分区操作开销较大,这可能是一项有用的优化。PartitionNameProvider 提供的名称必须与 Partitioner 提供的名称相匹配。spring-doc.cadn.net.cn

将输入数据绑定到步骤

对于由PartitionHandler执行的步骤而言,拥有相同的配置并在运行时从ExecutionContext绑定其输入参数是非常高效的。利用 Spring Batch 的 StepScope 特性(在关于延迟绑定的章节中有更详细的介绍),可以轻松地实现这一点。例如,如果Partitioner创建了带有名为fileName的属性键的ExecutionContext实例,且该键指向每个步骤调用时不同的文件(或目录),那么Partitioner的输出可能类似于下表所示内容:spring-doc.cadn.net.cn

表 1. 由 Partitioner 提供的示例步骤执行名称到执行上下文的映射,针对目录处理

步骤执行名称(键)spring-doc.cadn.net.cn

执行上下文 (value)spring-doc.cadn.net.cn

文件复制:分区 0spring-doc.cadn.net.cn

fileName=/home/data/onespring-doc.cadn.net.cn

文件复制:分区 1spring-doc.cadn.net.cn

fileName=/home/data/twospring-doc.cadn.net.cn

文件复制:分区 2spring-doc.cadn.net.cn

fileName=/home/data/三spring-doc.cadn.net.cn

然后,可以通过延迟绑定到执行上下文,将文件名绑定到一个步骤。spring-doc.cadn.net.cn

以下示例展示了如何在 Java 中定义迟绑定:spring-doc.cadn.net.cn

Java 配置
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

以下示例展示了如何在 XML 中定义后期绑定:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>