外部化批处理执行
迄今为止讨论的集成方法表明,存在 Spring Integration 像外壳一样封装 Spring Batch 的用例。 然而,Spring Batch 也可以在内部使用 Spring Integration。 通过采用这种方法,Spring Batch 用户可以将条目甚至数据块的处理委托给外部进程。这使您能够卸载复杂的处理任务。Spring Batch Integration 为以下方面提供了专门支持:
-
远程分块
-
远程分区
远程分块
下图展示了当您结合使用 Spring Batch 和 Spring Integration 时,远程分块的一种工作方式:
更进一步,您还可以通过使用
ChunkMessageChannelItemWriter
(由 Spring Batch Integration 提供)将分块处理外部化,该组件负责发送项并收集结果。发送完成后,Spring Batch 将继续读取和分组项的过程,而无需等待结果。
相反,收集结果并将其重新集成到 Spring Batch 流程中是 ChunkMessageChannelItemWriter 的职责。
使用 Spring Integration,您可以完全控制进程的并发性(例如,通过使用 QueueChannel 而不是 DirectChannel)。此外,借助 Spring Integration 丰富的通道适配器集合(如 JMS 和 AMQP),您可以将批处理作业的分片分发到外部系统进行处理。
-
Java
-
XML
一个包含要远程分块步骤的作业,在 Java 中可能具有如下配置:
public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Person, Person>chunk(200, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
一个包含要远程分块步骤的作业,其在 XML 中的配置可能类似于以下内容:
<job id="personJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
</tasklet>
...
</step>
</job>
ItemReader 引用指向您希望在管理器上用于读取数据的 Bean。ItemWriter 引用指向一个特殊的ItemWriter(称为ChunkMessageChannelItemWriter),如前所述。处理器(如果有)未包含在管理器配置中,因为它是在工作器上配置的。在实现您的用例时,您应检查任何其他组件属性,例如节流限制等。
-
Java
-
XML
以下 Java 配置提供了一个基础的管理器设置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
= new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
以下 XML 配置提供了基本的管理器设置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>
<bean id="messagingTemplate"
class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="requests"/>
<property name="receiveTimeout" value="2000"/>
</bean>
<bean id="itemWriter"
class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
scope="step">
<property name="messagingOperations" ref="messagingTemplate"/>
<property name="replyChannel" ref="replies"/>
</bean>
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsReplies"
destination-name="replies"
channel="replies"/>
上述配置为我们提供了多个 Bean。我们使用 ActiveMQ 以及 Spring Integration 提供的入站和出站 JMS 适配器来配置消息中间件。如图所示,我们的 itemWriter Bean(由作业步骤引用)会使用 ChunkMessageChannelItemWriter 通过配置的中间件写入数据块。
现在我们可以继续进行工作器配置,如下例所示:
-
Java
-
XML
以下示例展示了 Java 中的工作器配置:
@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
return factory;
}
/*
* Configure inbound flow (requests coming from the manager)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
/*
* Configure outbound flow (replies going to the manager)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow
.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor
= new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
= new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
以下示例展示了 XML 中的 worker 配置:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<int:channel id="requests"/>
<int:channel id="replies"/>
<int-jms:message-driven-channel-adapter id="incomingRequests"
destination-name="requests"
channel="requests"/>
<int-jms:outbound-channel-adapter id="outgoingReplies"
destination-name="replies"
channel="replies">
</int-jms:outbound-channel-adapter>
<int:service-activator id="serviceActivator"
input-channel="requests"
output-channel="replies"
ref="chunkProcessorChunkHandler"
method="handleChunk"/>
<bean id="chunkProcessorChunkHandler"
class="org.springframework.batch.integration.chunk.ChunkProcessorChunkRequestHandler">
<property name="chunkProcessor">
<bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
<property name="itemWriter">
<bean class="io.spring.sbi.PersonItemWriter"/>
</property>
<property name="itemProcessor">
<bean class="io.spring.sbi.PersonItemProcessor"/>
</property>
</bean>
</property>
</bean>
这些配置项中的大多数在管理器配置中应该看起来很熟悉。工作节点不需要访问 Spring Batch JobRepository,也不需要访问实际的作业配置文件。主要关注的 Bean 是chunkProcessorChunkHandler。ChunkProcessorChunkRequestHandler的chunkProcessor属性接受一个已配置的SimpleChunkProcessor,在这里您可以提供对您的ItemWriter(以及可选的ItemProcessor)的引用,当工作节点从管理器接收到数据块时,将运行这些组件。
更多信息,请参阅“可扩展性”章节中关于 远程分块的部分。
从 4.1 版本开始,Spring Batch Integration 引入了 @EnableBatchIntegration
注解,可用于简化远程分块设置。该注解提供
两个 Bean,您可以在应用程序上下文中自动装配它们:
-
RemoteChunkingManagerStepBuilderFactory: 配置管理器步骤 -
RemoteChunkingWorkerBuilder: 配置远程工作者集成流程
这些 API 负责配置多个组件,如下图所示:
在管理器端,RemoteChunkingManagerStepBuilderFactory 允许您通过声明来逐步配置管理器:
-
用于读取项目并将其发送给工作器的项目读取器
-
用于向工作节点发送请求的输出通道(“传出请求”)
-
用于接收来自工作节点回复的输入通道(“传入回复”)
您无需显式配置 ChunkMessageChannelItemWriter 和 MessagingTemplate。
(如果您有理由这样做,仍然可以显式配置它们)。
在工作器端,RemoteChunkingWorkerBuilder 允许您配置工作器以:
-
监听管理器在输入通道(“传入请求”)上发送的请求
-
为每个请求调用
ChunkProcessorChunkRequestHandler的handleChunk方法, 并使用配置的ItemProcessor和ItemWriter -
在输出通道(“传出回复”)上向管理器发送回复
您无需显式配置 SimpleChunkProcessor
和 ChunkProcessorChunkRequestHandler。(如果您有理由这样做,仍然可以显式配置它们)。
以下示例展示了如何使用这些 API:
@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public TaskletStep managerStep() {
return this.managerStepBuilderFactory.get("managerStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the manager
.outputChannel(replies()) // replies sent to the manager
.build();
}
// Middleware beans setup omitted
}
}
您可以在这里找到一个远程分块作业的完整示例。
远程分区
下图展示了一个典型的远程分区场景:
另一方面,远程分区在处理项本身不是瓶颈,而是相关的 I/O 操作导致瓶颈时非常有用。通过远程分区,您可以将工作发送给执行完整 Spring Batch 步骤的工作节点。因此,每个工作节点都拥有自己的 ItemReader、ItemProcessor 和 ItemWriter。为此,Spring Batch Integration 提供了 MessageChannelPartitionHandler。
PartitionHandler 接口的此实现使用 MessageChannel 实例向远程工作节点发送指令并接收其响应。
这为用于与远程工作节点通信的传输层(如 JMS 和 AMQP)提供了良好的抽象。
“可扩展性”章节中涉及
远程分区的部分,概述了配置远程分区所需的概念和组件,并展示了使用默认
TaskExecutorPartitionHandler在独立的本地执行线程中进行分区的示例。若要实现跨多个 JVM 的远程分区,则需要另外两个组件:
-
一个远程处理结构或网格环境
-
一个支持所需远程处理架构或网格环境的
PartitionHandler实现
与远程分块类似,您可以使用 JMS 作为“远程通信架构”。在这种情况下,如前所述,请使用一个 MessageChannelPartitionHandler 实例作为 PartitionHandler 实现。
-
Java
-
XML
以下示例假设已存在一个分区作业,并重点关注 Java 中的 MessageChannelPartitionHandler 和 JMS 配置:
/*
* Configuration of the manager side
*/
@Bean
public PartitionHandler partitionHandler() {
MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
partitionHandler.setStepName("step1");
partitionHandler.setGridSize(3);
partitionHandler.setReplyChannel(outboundReplies());
MessagingTemplate template = new MessagingTemplate();
template.setDefaultChannel(outboundRequests());
template.setReceiveTimeout(100000);
partitionHandler.setMessagingOperations(template);
return partitionHandler;
}
@Bean
public QueueChannel outboundReplies() {
return new QueueChannel();
}
@Bean
public DirectChannel outboundRequests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsRequests() {
return IntegrationFlow.from("outboundRequests")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("requestsQueue"))
.get();
}
@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setProcessorBean(partitionHandler());
aggregatorFactoryBean.setOutputChannel(outboundReplies());
// configure other propeties of the aggregatorFactoryBean
return aggregatorFactoryBean;
}
@Bean
public DirectChannel inboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundJmsStaging() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("stagingQueue"))
.channel(inboundStaging())
.get();
}
/*
* Configuration of the worker side
*/
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
stepExecutionRequestHandler.setJobExplorer(jobExplorer);
stepExecutionRequestHandler.setStepLocator(stepLocator());
return stepExecutionRequestHandler;
}
@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
return stepExecutionRequestHandler();
}
@Bean
public DirectChannel inboundRequests() {
return new DirectChannel();
}
public IntegrationFlow inboundJmsRequests() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requestsQueue"))
.channel(inboundRequests())
.get();
}
@Bean
public DirectChannel outboundStaging() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundJmsStaging() {
return IntegrationFlow.from("outboundStaging")
.handle(Jms.outboundGateway(connectionFactory())
.requestDestination("stagingQueue"))
.get();
}
以下示例假设已存在一个分区作业,并重点关注
MessageChannelPartitionHandler 和 XML 中的 JMS 配置:
<bean id="partitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
<property name="stepName" value="step1"/>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outbound-replies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
<property name="defaultChannel" ref="outbound-requests"/>
<property name="receiveTimeout" value="100000"/>
</bean>
</property>
</bean>
<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
channel="outbound-requests"/>
<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
channel="inbound-requests"/>
<bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
<property name="jobExplorer" ref="jobExplorer"/>
<property name="stepLocator" ref="stepLocator"/>
</bean>
<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
output-channel="outbound-staging"/>
<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
channel="outbound-staging"/>
<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
channel="inbound-staging"/>
<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
output-channel="outbound-replies"/>
<int:channel id="outbound-replies">
<int:queue/>
</int:channel>
<bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
您还必须确保分区 handler 属性映射到 partitionHandler bean。
-
Java
-
XML
以下示例将分区 handler 属性映射到 Java 中的 partitionHandler:
public Job personJob(JobRepository jobRepository) {
return new JobBuilder("personJob", jobRepository)
.start(new StepBuilder("step1.manager", jobRepository)
.partitioner("step1.worker", partitioner())
.partitionHandler(partitionHandler())
.build())
.build();
}
以下示例将分区 handler 属性映射到 XML 中的 partitionHandler:
<job id="personJob">
<step id="step1.manager">
<partition partitioner="partitioner" handler="partitionHandler"/>
...
</step>
</job>
您可以在这里找到一个远程分区作业的完整示例。
您可以使用 @EnableBatchIntegration 注解来简化远程分区设置。此注解提供了两个对远程分区有用的 Bean:
-
RemotePartitioningManagerStepBuilderFactory: 配置管理器步骤 -
RemotePartitioningWorkerStepBuilderFactory: 配置工作步骤
这些 API 负责配置多个组件,如下图所示:
在管理器端,RemotePartitioningManagerStepBuilderFactory 允许您通过声明来逐步配置管理器:
-
用于分区数据的
Partitioner -
用于向工作节点发送请求的输出通道(“传出请求”)
-
用于接收来自工作线程的回复的输入通道(“传入回复”)(在配置回复聚合时)
-
轮询间隔和超时参数(在配置作业存储库轮询时)
您无需显式配置 MessageChannelPartitionHandler 和 MessagingTemplate。
(如果您有理由这样做,仍然可以显式配置它们)。
在工作器端,RemotePartitioningWorkerStepBuilderFactory 允许您配置工作器以:
-
监听管理器在输入通道(“传入请求”)上发送的请求
-
为每个请求调用
StepExecutionRequestHandler的handle方法 -
在输出通道(“传出回复”)上向管理器发送回复
您无需显式配置 StepExecutionRequestHandler。
(如果您有理由这样做,也可以显式配置它)。
以下示例展示了如何使用这些 API:
@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {
@Configuration
public static class ManagerConfiguration {
@Autowired
private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
@Bean
public Step managerStep() {
return this.managerStepBuilderFactory
.get("managerStep")
.partitioner("workerStep", partitioner())
.gridSize(10)
.outputChannel(outgoingRequestsToWorkers())
.inputChannel(incomingRepliesFromWorkers())
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
@Bean
public Step workerStep() {
return this.workerStepBuilderFactory
.get("workerStep")
.inputChannel(incomingRequestsFromManager())
.outputChannel(outgoingRepliesToManager())
.chunk(100)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// Middleware beans setup omitted
}
}