通过消息启动批处理作业
使用核心的 Spring Batch API 启动批处理作业时,你 基本上有两个选择:
-
从命令行中,使用
命令行作业操作员 -
在程序化方面,任一
JobOperator.start()
例如,你可能想使用命令行作业操作员调用批处理作业时
使用shell脚本。或者,你也可以使用作业操作员直接(例如,当使用
作为网页应用的一部分使用 Spring Batch)。但是,那又如何呢
更复杂的用例?也许你需要轮询一个远程(S)FTP
服务器以获取批处理作业或应用的数据
必须同时支持多个不同的数据源。为
例如,您可能不仅从网络接收数据文件,还可能来自
FTP和其他渠道。也许对输入文件进行额外的转换是
在调用Spring Batch之前,需要。
因此,执行批量作业会更强大
通过使用 Spring Integration 及其众多适配器。例如
你可以使用文件入站通道适配器
监控文件系统中的一个目录,并以
输入文件一到就要立足。此外,你还可以创建Spring
集成流程使用多个不同适配器,方便使用
从多个来源获取批处理作业的数据
同时仅使用配置即可实现。实现这些
使用 Spring 集成的场景很简单,因为它允许
解耦、事件驱动的执行作业操作员.
春季批处理集成提供了JobLaunchingMessageHandler你能
用来启动批处理作业。输入JobLaunchingMessageHandler由
Spring Integration 消息,其有效载荷类型为JobLaunchRequest.该类是包裹工作将发射并绕过作业参数即
启动批次作业所需的。
下图展示了典型的Spring积分 启动批处理作业所需的消息流。EIP(企业集成模式)网站提供了消息图标及其描述的完整概述。
将文件转换为 JobLaunchRequest
以下示例将文件转换为JobLaunchRequest:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
当批处理作业正在执行时,一个作业执行实例被返回。你可以用这个
实例以确定执行状态。如果
一个作业执行能够被创造出来
成功时,无论是否成功,它都会被返回
或者实际执行失败。
具体行为如何作业执行实例返回取决于所提供的任务执行者.如果同步(单螺纹)任务执行者实现时,使用作业执行仅返回后任务完成。当使用异步
任务执行者这作业执行实例返回
马上。然后你可以选择身份证之作业执行实例
(其中JobExecution.getJobInstanceId())并查询JobRepository职位的最新状态
使用JobExplorer.更多内容
信息,请参见查询仓库。
Spring Batch集成配置
考虑一个情况,有人需要创建一个文件入站信道适配器去倾听
对于提供目录中的CSV文件,交给变换器
(文件消息到工作请求),通过作业启动网关启动作业,并且
log 的输出作业执行其中日志通道适配器.
-
Java
-
XML
以下示例展示了如何在 Java 中配置这种常见情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例展示了如何在XML中配置这种常见情况:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
示例 ItemReader 配置
既然我们已经轮询文件并启动作业,就需要配置我们的 Spring
批ItemReader(例如)使用作业定义位置上的文件
参数称为“input.file.name”,如下豆状配置所示:
-
Java
-
XML
以下 Java 示例展示了所需的豆配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下XML示例展示了所需的豆子配置:
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前述示例的主要关注点是如何注入#{jobParameters['input.file.name']}作为资源属性值,并将ItemReader豆
用步进镜。将豆子设置为步进范围可以利用
晚期绑定支持,允许访问jobParameters(工作参数)变量。