通过消息启动批处理作业

当使用核心 Spring Batch API 启动批处理作业时,您基本上有两个选项:spring-doc.cadn.net.cn

例如,在使用 shell 脚本调用批处理作业时,您可能希望使用 CommandLineJobOperator。或者,您也可以直接使用 JobOperator(例如,当将 Spring Batch 作为 Web 应用程序的一部分使用时)。然而,对于更复杂的使用场景该怎么办呢?也许您需要轮询远程 (S)FTP 服务器以获取批处理作业所需的数据,或者您的应用程序必须同时支持多种不同的数据源。例如,您接收的数据文件可能不仅来自 Web,还来自 FTP 和其他来源。在调用 Spring Batch 之前,可能还需要对输入文件进行额外的转换。spring-doc.cadn.net.cn

因此,使用 Spring Integration 及其众多适配器来执行批处理作业将更为强大。例如,您可以使用文件入站通道适配器来监控文件系统中的某个目录,并在输入文件到达时立即启动批处理作业。此外,您还可以创建 Spring Integration 流程,利用多种不同的适配器,仅通过配置即可轻松地从多个来源同时为您的批处理作业摄入数据。使用 Spring Integration 实现所有这些场景非常简单,因为它支持对JobOperator进行解耦的、事件驱动的执行。spring-doc.cadn.net.cn

Spring Batch Integration 提供了 JobLaunchingMessageHandler 类,您可以使用它来启动批处理作业。 JobLaunchingMessageHandler 的输入由 Spring Integration 消息提供,该消息的有效负载类型为 JobLaunchRequest。此类是对要启动的 Job 以及启动批处理作业所需的 JobParameters 的包装器。spring-doc.cadn.net.cn

下图展示了启动批量作业所需的典型 Spring Integration 消息流。 EIP(企业集成模式)网站 提供了消息图标的完整概述及其描述。spring-doc.cadn.net.cn

Launch Batch Job
图 1. 启动批处理作业

将文件转换为 JobLaunchRequest

以下示例将文件转换为 JobLaunchRequestspring-doc.cadn.net.cn

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

作业执行响应

当批处理作业正在执行时,将返回一个JobExecution实例。您可以使用此实例来确定执行状态。如果JobExecution能够成功创建,则无论实际执行是否成功,它总是会被返回。spring-doc.cadn.net.cn

JobExecution 实例的确切返回行为取决于所提供的 TaskExecutor。如果使用的是(单线程)synchronous TaskExecutor 实现,则仅在作业完成after才会返回JobExecution响应。当使用asynchronous TaskExecutor时,JobExecution实例会立即返回。随后,您可以获取JobExecution实例的id(通过JobExecution.getJobInstanceId()),并使用JobExplorer查询JobRepository以获取作业的最新状态。更多信息请参阅 查询存储库spring-doc.cadn.net.cn

Spring Batch 集成配置

考虑这样一种情况:某人需要创建一个文件 inbound-channel-adapter 来监听指定目录中的 CSV 文件,将它们传递给转换器(FileMessageToJobRequest),通过作业启动网关启动作业,并使用 logging-channel-adapter 记录 JobExecution 的输出。spring-doc.cadn.net.cn

以下示例展示了如何在 Java 中配置这种常见情况:spring-doc.cadn.net.cn

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

以下示例展示了如何在 XML 中配置这种常见情况:spring-doc.cadn.net.cn

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

示例 ItemReader 配置

现在我们已经轮询文件并启动了作业,接下来需要配置我们的 Spring Batch ItemReader(例如),使其使用在名为"input.file.name"的作业参数所定义的位置找到的文件,如下面的 Bean 配置所示:spring-doc.cadn.net.cn

以下 Java 示例展示了必要的 Bean 配置:spring-doc.cadn.net.cn

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

以下 XML 示例展示了必要的 bean 配置:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

前述示例中的主要关注点是将值 #{jobParameters['input.file.name']} 注入为 Resource 属性的值,并将 ItemReader bean 设置为步骤作用域(step scope)。将 bean 设置为步骤作用域可利用延迟绑定支持,从而允许访问 jobParameters 变量。spring-doc.cadn.net.cn