扩展与并行处理
许多批处理问题可以通过单线程、单进程作业来解决, 所以在考虑之前,最好先确认一下它是否符合你的需求 关于更复杂的实现。衡量一个现实工作的表现,看看是否 最简单的实现先满足你的需求。你可以读写一个文件 即使是标准硬件,也不到一分钟就能处理数百兆字节。
当你准备开始实现带有并行处理的作业时,Spring 批次提供了多种选项,本章将详细说明,尽管部分内容 相关内容在其他地方有介绍。从高层次来看,并联有两种模式 加工:
-
单进程,多线程
-
多进程
这些也细分为以下几类:
-
多线程步骤(单进程)
-
并行步骤(单进程)
-
局部分块步骤(单过程)
-
远程分块步骤(多进程)
-
分区步骤(单进程或多进程)
-
远程步骤(多进程)
首先,我们回顾单流程选项。然后我们会回顾多流程选项。
多线程步骤
开始并行处理最简单的方法是添加一个任务执行者到你的阶梯
配置。
-
Java
-
XML
使用 Java 配置时,你可以添加一个任务执行者到台阶上,
如下示例所示:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,你可以在任务小如下:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在这个例子中,任务执行者是对另一个Beans定义的引用,使得
实现任务执行者接口。任务执行者是标准的 Spring 接口,因此请查看 Spring 用户指南以获取可用的详细信息
实现。最简单的多线程任务执行者是SimpleAsyncTaskExecutor.
前述配置的结果是步通过读取、处理执行,
并将每个项目块(每个提交间隔)写入独立的执行线程。
注意,这意味着处理物品没有固定顺序,且有一个块
可能包含与单线程大写不同为非连续的项。
还请注意,任何被集中的资源对并发可能有限制。
你的步伐,比如数据来源.至少要确保这些资源的池子
与步中期望的并发线程数相同。
使用多线程有一些实际限制步实现
一些常见的批处理使用场景。许多参与者步(比如读者和作者)
是有状态的。如果状态没有被线程隔离,这些组件就不会被隔离
可用于多线程步.尤其是大多数读者和
Spring Batch 的写作者并非为多线程使用设计。然而,它确实如此,
可以与无状态或线程安全的读写器合作,并且有一个示例
(称为parallelJob)在Spring
批量样本,展示使用过程指示器(参见防止状态持久化)以跟踪
数据库输入表中已处理的项目。
Spring Batch 提供了一些ItemWriter和ItemReader.通常
Javadoc里会说这些设备是否线程安全,或者你需要做些什么以避免
同时发生的环境中的问题。如果 Javadoc 里没有相关信息,你可以
检查实现是否存在任何状态。如果读卡器不安全,
你可以用提供的装饰品同步项目流阅读器或者用在你自己的游戏里
同步委托器。你可以将调用同步到read(),且,只要
处理和写作是整个数据块中最昂贵的部分,你的步骤可能仍然存在
比单线程配置更快完成。
平行步骤
只要需要并行化的应用逻辑可以拆分成不同的 职责并分配给各个步骤,可以在 单一过程。并行步执行易于配置和使用。
-
Java
-
XML
使用 Java 配置时,执行步骤(第1步,第2步)并行于步骤3是直接的,具体如下:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,执行步骤(第1步,第2步)并行于步骤3很简单,
如下:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置任务执行器用于指定具体任务任务执行者实现应执行各个流程。默认为同步任务执行器,但任务执行者运行 步骤
平行。注意,作业确保分拆中的所有流都在之前完成
汇总退出状态和过渡。
详情请参见分流部分。
本地分块
本地分块是 v6.0 中的一项新功能,允许你在同一 JVM 内部用多个线程并行处理项目块。 当你需要处理大量项目并希望利用多核处理器时,这尤其有用。 通过本地分块,你可以配置一个块导向步骤,使用多个线程同时处理项目块。 每个线程独立读取、处理和写入自己的项目块,而该步骤则负责整体执行并提交结果。
此功能通过使用区块消息频道项目写作,这是一个提交分块的项目编写器
向当地工人提出请求任务执行者:
@Bean
public ChunkTaskExecutorItemWriter<Vet> itemWriter(ChunkProcessor<Vet> chunkProcessor) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setThreadNamePrefix("worker-thread-");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.afterPropertiesSet();
return new ChunkTaskExecutorItemWriter<>(chunkProcessor, taskExecutor);
}
这区块消息频道项目写作需要一个任务执行者并发处理块
以及块处理器定义每个区块该怎么处理。这里有一个示例
一个块处理器,将每个块的项目写入关系数据库表:
@Bean
public ChunkProcessor<Vet> chunkProcessor(DataSource dataSource, TransactionTemplate transactionTemplate) {
String sql = "insert into vets (firstname, lastname) values (?, ?)";
JdbcBatchItemWriter<Vet> itemWriter = new JdbcBatchItemWriterBuilder<Vet>().dataSource(dataSource)
.sql(sql)
.itemPreparedStatementSetter((item, ps) -> {
ps.setString(1, item.firstname());
ps.setString(2, item.lastname());
})
.build();
return (chunk, contribution) -> transactionTemplate.executeWithoutResult(transactionStatus -> {
try {
itemWriter.write(chunk);
contribution.incrementWriteCount(chunk.size());
contribution.setExitStatus(ExitStatus.COMPLETED);
}
catch (Exception e) {
transactionStatus.setRollbackOnly();
contribution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
}
});
}
你可以在局部块化样本中找到这种缩放技术的一个示例。
远程分块
在远程分块中,步处理分为多个流程,
通过某种中间件相互通信。下图显示了
模式:
管理者组件是一个单一进程,工人是多个远程进程。 这种模式在管理者不是瓶颈时效果最佳,因此处理必须更多 比阅读物品本身还贵(这在实际作中很常见)。
管理器是春季批处理的实现步其中ItemWriter取代
通过一个通用版本,知道如何将项目块发送到中间件,比如
消息。这些工作者是所使用的中间件的标准监听器(对于
例如,对于JMS,它们会是MesssageListener实现),其作用为
通过使用标准处理这些物品块ItemWriter或ItemProcessor加一个ItemWriter,通过块处理器接口。使用它的一个优势
模式是读者、处理器和写入组件都是现成的(相同)
与该步的局部执行所用相同)。这些物品被动态分配,
而且工作通过中间件共享,所以如果听众都很渴望
消费者,负载均衡是自动的。
中间件必须耐用,保证交付,每个中间件都有单一用户 消息。JMS 是显而易见的选择,但也有其他选项(比如 JavaSpaces)存在 网格计算与共享内存产品空间。
详情请参见 Spring Batch 集成 - 远程分块部分。
分区
Spring Batch 还提供了一个用于分区步执行与执行
远程。在这种情况下,远程参与者是步这些实例本可以像
很容易配置并用于本地处理。下图显示了
模式:
这工作在左侧以序列形式运行步实例,以及其中一个步实例被标记为管理器。这张照片中的工人都是一模一样的
实例步,实际上可以取代经理的位置,导致
结果与工作.工作人员通常是远程服务,但
也可以是执行的本地线程。经理发给工人的信息
在这种模式下,不需要耐用或保证送达。Spring Batch
元数据中的JobRepository确保每个工作者执行一次,且仅执行一次,满足
每工作执行。
Spring Batch中的SPI包含一个特殊的实现步(称为分区步)以及两个需要为特定
环境。策略界面包括分区处理和StepExecutionSplitter,
以下序列图展示了它们的作用:
这步右边是“远程”工作人员,所以可能有
许多对象和/或过程扮演此角色,且分区步画面显示他驾驶
处决。
-
Java
-
XML
以下示例展示了分区步使用 Java 时的配置
配置:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
类似于多线程步骤的节气门限制方法,网格大小方法防止任务执行器被单个请求淹没
步。
以下示例展示了分区步使用 XML 时的配置
配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
类似于多线程步骤的油门限制属性,该网格大小属性防止任务执行器被单个请求所淹没
步。
Spring 的单元测试套件
批次样本(参见分区*Job.xmlconfiguration)有一个简单的示例,你可以复制并扩展。
Spring Batch 为名为 的分区创建步骤执行步骤1:分区0因此
上。很多人更愿意称经理为“Step”步骤1:经理为了保持一致性。您可以
为该步骤使用别名(通过指定名称属性代替身份证属性)。
分区处理
分区处理是知道远程结构的组件,或者
网格环境。它能够发送步执行向远程发送请求步实例,被包裹成某种特定织物格式,比如DTO。它不必知道
如何拆分输入数据或如何汇总多个数据的结果步执行。
一般来说,它可能也不需要知道韧性或故障切换,
因为这些在很多情况下都是面料的特征。无论如何,春季总有
提供独立于织物的可重启性。一个失败的工作总能重启,
在这种情况下,只有失败的步骤被重新执行。
这分区处理接口可以针对各种
织体类型,包括简单的RMI远程处理、EJB远程处理、自定义Web服务、JMS、Java
空间、共享内存网格(如Terracotta或Coherence)以及网格执行织物
(例如GridGain)。Spring Batch 不包含任何专有网格的实现
或者是可传送的面料。
不过,Spring Batch 确实提供了有用的实现分区处理那
执行步实例在本地的不同执行线程中,使用任务执行者Spring的策略。实现如下:任务执行者分区处理器.
-
Java
-
XML
你可以显式配置任务执行者分区处理器Java 配置,
如下:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
这任务执行者分区处理器是配置为 XML 的步骤的默认值
名称空间,之前显示。你也可以显式配置,具体如下:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
这网格大小属性决定要创建的独立步骤执行次数,因此
它可以与任务执行者.或者,它
可以设置为大于可用线程数,这使得
做得更小。
这任务执行者分区处理器对于输入输出密集型(IO)非常有用步实例,例如
复制大量文件或将文件系统复制到内容管理中
系统。它也可以通过提供步实现
即远程调用的代理(如使用Spring Remoting)。
分区器
这分区器其职责更简单:作为输入生成执行上下文
仅用于新步骤执行的参数(无需担心重启)。它有一个
单一方法,如下接口定义所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
该方法的返回值为每个步骤执行关联一个唯一名称(该步骤字符串) 其输入参数为执行上下文.名字都出现了
在批处理的后面,作为分区中的步长名步进执行.这执行上下文只是一个由名值对组成的袋子,因此可能包含一个范围
主键、行号或输入文件的位置。遥控器步然后
通常通过 绑定上下文输入#{…}占位符(步进后期绑定)
范围),如下一节所示。
步骤执行的名称(即地图返回者分区器)
在 的步骤执行中唯一工作但没有其他具体的
要求。实现这一点最简单的方法(并且让名字对用户有意义)是
使用前缀+后缀命名惯例,其中前缀是步骤名称
正在执行(该本身在工作而后缀只是一个
计数器。有一个SimplePartitioner在使用该约定的框架中。
你可以使用一个可选接口,叫做PartitionNameProvider以提供分区
名称与分区本身分开。如果分区器实现了这一点
界面,重启时只查询名称。如果分区成本高,
这可以是一个有用的优化方法。以下名称由PartitionNameProvider必须
与分区器.
将输入数据绑定到步
对于由分区处理拥有
配置相同,且输入参数在运行时从执行上下文.这在Spring Batch的StepScope功能中非常简单
(在“迟绑定”章节中有更详细的介绍)。为
例如,如果分区器创建执行上下文带有属性键的实例
叫文件名,每个步调用指向不同的文件(或目录),
这分区器输出可能类似于下表的内容:
步骤执行名称(键) |
ExecutionContext(值) |
文件复制:partition0 |
fileName=/home/data/one |
文件复制:分区1 |
fileName=/home/data/two |
文件复制:partition2 |
fileName=/home/data/three |
然后可以通过晚期绑定绑定到执行上下文,将文件名绑定到步骤。
-
Java
-
XML
以下示例展示了如何在 Java 中定义晚绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
以下示例展示了如何在XML中定义晚绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
远程步骤执行
从 v6.0 起,Spring Batch 支持远程步骤执行,允许你在远程机器或集群上执行批处理作业的步骤。
此功能在大规模批处理场景中尤为有用,这些场景希望将工作负载分散到多个节点以提升性能和可扩展性。
远程步骤执行由远程步进该类使用Spring Integration消息通道,使本地作业执行环境与远程步骤执行器之间能够通信。
一个远程步进通过提供远程步骤名称和消息模板,将步骤执行请求分配给远程工作人员,将其配置为常规步骤:
@Bean
public Step step(MessagingTemplate messagingTemplate, JobRepository jobRepository) {
return new RemoteStep("step", "workerStep", jobRepository, messagingTemplate);
}
在工作端,你需要定义执行的远程步骤(workerStep在此示例中)并配置
一个 Spring 集成流程,用于拦截步骤执行请求并调用StepExecutionRequestHandler:
@Bean
public Step workerStep(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
return new StepBuilder("workerStep", jobRepository)
// define step logic
.build();
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory, JobRepository jobRepository,
StepLocator stepLocator) {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobRepository(jobRepository);
stepExecutionRequestHandler.setStepLocator(stepLocator);
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.handle(stepExecutionRequestHandler, "handle")
.get();
}
@Bean
public StepLocator stepLocator(BeanFactory beanFactory) {
BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
beanFactoryStepLocator.setBeanFactory(beanFactory);
return beanFactoryStepLocator;
}
你可以在远程步骤示例中找到完整的示例。