缩放和并行处理
许多批处理问题可以通过单线程、单进程作业来解决, 因此,在考虑之前正确检查它是否满足您的需求总是一个好主意 关于更复杂的实现。衡量实际作业的绩效,看看是否 最简单的实现首先满足您的需求。您可以读取和写入 即使在标准硬件下,也能在不到一分钟的时间内传输数百兆字节。
当您准备好开始使用一些并行处理来实现作业时,Spring Batch 提供了一系列选项,本章将介绍这些选项,尽管有些 功能将在其他地方介绍。在高层次上,有两种并行模式 加工:
-
单进程、多线程
-
多进程
这些也分为几类,如下所示:
-
多线程步骤(单进程)
-
并行步骤(单进程)
-
步骤的远程分块(多进程)
-
对步骤进行分区(单进程或多进程)
首先,我们回顾单进程选项。然后我们回顾多进程选项。
多线程步骤
启动并行处理的最简单方法是添加一个TaskExecutor
到你的步骤
配置。
-
Java
-
XML
使用 Java 配置时,可以添加TaskExecutor
到步骤,
如以下示例所示:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,您可以向tasklet
如下:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在此示例中,taskExecutor
是对另一个 bean 定义的引用,其中
实现TaskExecutor
接口。TaskExecutor
是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用的详细信息
实现。最简单的多线程TaskExecutor
是一个SimpleAsyncTaskExecutor
.
上述配置的结果是Step
通过读取、处理、
并在单独的执行线程中写入每个项目块(每个提交间隔)。
请注意,这意味着要处理的项目没有固定的顺序,并且块
可能包含与单线程情况相比不连续的项。在
添加到任务执行器设置的任何限制(例如它是否由
thread pool),任务 let 配置具有节流限制(默认值:4)。
您可能需要增加此限制以确保线程池得到充分利用。
-
Java
-
XML
使用 Java 配置时,构建器提供对节流限制的访问,如 遵循:
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
例如,您可以增加 throtttle-limit,如下所示:
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
另请注意,在
您的步骤,例如DataSource
.确保至少在这些资源中设置池
与步骤中所需的并发线程数一样大。
节流限制弃用
从 v5.0 开始,节流限制已弃用,无需替换。如果要将
默认中的当前限制机制 Java 配置
|
使用多线程有一些实际限制Step
实现
一些常见的批处理用例。许多参与者Step
(如读者和写作者)
是有状态的。如果状态未按线程隔离,则这些组件不是
可用于多线程Step
.特别是大多数读者和
Spring Batch 的编写器不是为多线程使用而设计的。然而,事实确实如此,
可以使用无状态或线程安全的读取器和写入器,并且有一个示例
(称为parallelJob
) 在Spring
批处理示例,显示使用进程指示器(请参阅防止状态持久性)来跟踪
在数据库输入表中处理的项目。
Spring Batch 提供了一些ItemWriter
和ItemReader
.通常
他们在 Javadoc 中说它们是否是线程安全的,或者您必须做什么来避免
并发环境中的问题。如果 Javadoc 中没有信息,您可以
检查实现以查看是否有任何状态。如果读取器不是线程安全的,
你可以用提供的SynchronizedItemStreamReader
或在您自己的中使用它
synchronizing 委托人。您可以将调用同步到read()
,并且,只要
处理和写入是块中最昂贵的部分,你的步骤可能仍然
比在单线程配置中更快地完成。
平行步骤
只要需要并行化的应用逻辑可以拆分为不同的 职责并分配给各个步骤,它可以在 单一进程。并行步骤执行易于配置和使用。
-
Java
-
XML
使用 Java 配置时,执行步骤(step1,step2)
与step3
很简单,如下所示:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,执行步骤(step1,step2)
与step3
直截了当,
如下:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的任务执行器用于指定哪个TaskExecutor
实现应执行各个流。默认值为SyncTaskExecutor
,但异步TaskExecutor
需要运行
平行。请注意,该作业确保拆分中的每个流在之前完成
聚合退出状态并转换。
有关更多详细信息,请参阅拆分流部分。
远程分块
在远程分块中,Step
处理分为多个流程,
通过一些中间件相互通信。下图显示了
模式:

管理器组件是单个进程,工作进程是多个远程进程。 如果管理器不是瓶颈,则此模式效果最佳,因此处理必须更多 比阅读项目更昂贵(实践中经常出现这种情况)。
管理器是 Spring Batch 的实现Step
使用ItemWriter
取代 由一个通用版本,该版本知道如何将项目块发送到中间件作为 消息。 工作线程是正在使用的任何中间件的标准侦听器(对于例如,对于 JMS,它们将是MesssageListener
实现),它们的作用是使用标准处理项目块ItemWriter
或ItemProcessor
加上一个ItemWriter
,通过ChunkProcessor
接口。 使用此模式的优点之一是读取器、处理器和写入器组件是现成的(与用于步骤的本地执行相同)。项目是动态划分的,并且工作通过中间件共享,因此,如果监听器都渴望消费者,负载平衡是自动的。
中间件必须是持久的,有保证的交付,并且每个中间件都有一个消费者 消息。 JMS 是显而易见的候选者,但其他选项(例如 JavaSpaces)存在于网格计算和共享内存产品空间中。
有关更多详细信息,请参阅有关 Spring Batch Integration - 远程分块的部分。
分区
Spring Batch 还提供了一个 SPI,用于对Step
执行并执行它 远程。 在这种情况下,远程参与者是Step
实例可以很容易配置并用于本地处理。下图显示了 模式:

这Job
在左侧运行,作为Step
实例,以及其中一个Step
instances 被标记为 manager。这张图片中的 worker 都是相同的实例Step
,这实际上可以取代经理,从而导致相同的结果Job
. 工作线程通常是远程服务,但也可以是本地执行线程。管理器发送给工作线程的消息在这种模式下不需要是持久的或有保证的传递。Spring 批处理元数据JobRepository
确保每个工作程序执行一次且仅执行一次 每Job
执行。
Spring Batch 中的 SPI 由一个特殊的实现组成Step
(称为PartitionStep
)和两个策略接口,需要针对特定的 环境。 策略接口是PartitionHandler
和StepExecutionSplitter
, 下面的序列图显示了它们的作用:

这Step
在这种情况下,右侧是“远程”工作程序,因此,可能存在许多对象和/或进程扮演此角色,并且PartitionStep
显示驾驶执行。
-
Java
-
XML
以下示例显示了PartitionStep
使用 Java 时的配置 配置:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
类似于多线程步骤的throttleLimit
方法,则gridSize
方法可防止任务执行器因来自单个
步。
以下示例显示了PartitionStep
使用 XML 时的配置
配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
类似于多线程步骤的throttle-limit
属性,则grid-size
属性可防止任务执行器因来自单个
步。
Spring 的单元测试套件
批处理
Samples(请参阅partition*Job.xml
configuration)有一个可以复制和扩展的简单示例。
Spring Batch 为名为step1:partition0
所以
上。许多人更喜欢调用经理步骤step1:manager
为了一致性。 您可以 为步骤使用别名(通过指定name
属性而不是id
属性)。
分区处理程序
PartitionHandler
是知道远程处理结构或grid 环境的组件。它能够发送StepExecution
对远程的请求Step
实例,以某种特定于结构的格式包装,例如 DTO。它不必知道如何拆分输入数据或如何聚合多个Step
执行。 一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是结构的特性。无论如何,Spring Batch 总是提供独立于结构的可重启性。一个失败Job
始终可以重新启动,并且,在这种情况下,只有失败的Steps
被重新执行。
这PartitionHandler
接口可以有针对各种结构类型的专门实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java空间、共享内存网格(例如 Terracotta 或 Coherence)和网格执行结构(例如 GridGain)。Spring Batch 不包含任何专有网格的实现或远程结构。
但是,Spring Batch 确实提供了一个有用的实现PartitionHandler
那 执行Step
实例在本地的单独执行线程中,使用TaskExecutor
策略。实现称为TaskExecutorPartitionHandler
.
-
Java
-
XML
您可以显式配置TaskExecutorPartitionHandler
使用 Java 配置, 如下:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
这TaskExecutorPartitionHandler
是使用 XML 配置的步骤的默认值命名空间。您也可以显式配置它,如下所示:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
这gridSize
属性确定要创建的单独步骤执行的数量,因此它可以与TaskExecutor
. 或者,它可以设置为大于可用线程数,这使得工作更小的块。
这TaskExecutorPartitionHandler
对于 IO 密集型Step
实例,例如复制大量文件或将文件系统复制到内容管理中 系统。 它还可以通过提供Step
实现 这是远程调用的代理(例如使用 Spring Remoting)。
分区器
这Partitioner
有一个更简单的职责:生成执行上下文作为输入
仅用于新步骤执行的参数(无需担心重新启动)。它有一个
single 方法,如以下接口定义所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值为每个步骤执行关联一个唯一的名称(String
)的输入参数,其输入参数的形式为ExecutionContext
.名字出现了
稍后在批处理元数据中作为分区StepExecutions
. 这ExecutionContext
只是一个名称-值对的包,因此它可能包含一系列主键、行号或输入文件的位置。远程Step
然后 通常使用#{…}
占位符(步骤中的后期绑定scope),如下一节所示。
步骤执行的名称(Map
返回者Partitioner
) 需要在Job
但没有任何其他特定的 要求。 最简单的方法(并使名称对用户有意义)是使用前缀+后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在Job
),后缀只是一个
计数器。有一个SimplePartitioner
在使用此约定的框架中。
您可以使用名为PartitionNameProvider
提供分区
名称与分区本身分开。如果Partitioner
实现了这个
接口,则在重新启动时仅查询名称。如果分区成本高昂,
这可能是一个有用的优化。由PartitionNameProvider
必须
匹配Partitioner
.
将输入数据绑定到步骤
对于执行的步骤来说,这是非常有效的PartitionHandler
拥有
相同的配置,并使其输入参数在运行时从ExecutionContext
.使用 Spring Batch 的 StepScope 功能很容易做到这一点
(在有关延迟绑定的部分中进行了更详细的介绍)。为
例如,如果Partitioner
创建ExecutionContext
具有属性键的实例
叫fileName
,指向每个步骤调用的不同文件(或目录),
这Partitioner
输出可能类似于下表的内容:
步骤执行名称(键) |
ExecutionContext(值) |
文件副本:分区0 |
文件名=/home/data/one |
文件副本:分区 1 |
文件名=/home/data/two |
文件复制:分区2 |
文件名=/home/data/three |
然后,可以使用对执行上下文的延迟绑定将文件名绑定到步骤。
-
Java
-
XML
以下示例显示了如何在 Java 中定义后期绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
以下示例显示了如何在 XML 中定义后期绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>