常见批处理模式
一些批处理作业可以完全由 Spring Batch 中的现成组件组装而成。
例如,ItemReader 和 ItemWriter 的实现可配置为覆盖广泛的场景。然而,在大多数情况下,仍需编写自定义代码。应用程序开发者的主要 API 入口点是 Tasklet、ItemReader、ItemWriter 以及各种监听器接口。大多数简单的批处理作业可以使用来自 Spring Batch ItemReader 的现成输入,但通常情况下,处理和写入环节存在自定义需求,要求开发者实现 ItemWriter 或 ItemProcessor。
在本章中,我们提供了一些自定义业务逻辑中常见模式的示例。
这些示例主要展示了监听器接口。需要注意的是,如果合适,
ItemReader 或 ItemWriter 也可以实现监听器接口。
记录项处理和失败
一个常见的用例是需要对步骤中的错误进行逐项特殊处理,
例如记录到专用通道或将记录插入数据库。面向块(chunk-oriented)的 Step(由步骤工厂 Bean 创建)
允许用户通过为 read 上的错误实现一个简单的 ItemReadListener,以及为 write 上的错误实现一个 ItemWriteListener 来满足此用例。
以下代码片段展示了一个同时记录读取和写入失败的监听器:
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实现此监听器后,必须将其注册到步骤中。
-
Java
-
XML
以下示例展示了如何使用 Java 步骤注册监听器:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例展示了如何在 XML 中为步骤注册监听器:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果您的监听器在 onError() 方法中执行任何操作,则该操作必须位于一个将被回滚的事务中。如果您需要在 onError() 方法中使用事务性资源(例如数据库),请考虑为该方法的传播属性添加声明式事务(详见 Spring 核心参考指南),并将其传播属性值设置为 REQUIRES_NEW。 |
因业务原因手动停止作业
Spring Batch 通过 JobOperator 接口提供了一个 stop() 方法,但这实际上是供操作员使用的,而非应用程序程序员。有时,从业务逻辑内部停止作业执行会更方便或更合理。
最简单的做法是抛出一个 RuntimeException(既不会无限重试也不会被跳过)。例如,可以使用自定义异常类型,如下例所示:
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
另一种阻止步骤执行的简单方法是从ItemReader返回null,如下例所示:
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前面的示例实际上依赖于这样一个事实:CompletionPolicy 策略存在一个默认实现,当待处理项为 null 时,该实现会信号表示批次已完成。更复杂的完成策略可以被实现,并通过 SimpleStepFactoryBean 注入到 Step 中。
-
Java
-
XML
以下示例展示了如何在 Java 中将完成策略注入到步骤中:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例展示了如何在 XML 中将完成策略注入到步骤中:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种方法是在 StepExecution 中设置一个标志,该标志由框架中在项处理之间执行的 Step 实现进行检查。要实现此替代方案,我们需要访问当前的 StepExecution,这可以通过实现 StepListener 并将其注册到 Step 来实现。以下示例展示了一个设置该标志的监听器:
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
当标志被设置时,默认行为是该步骤抛出一个
JobInterruptedException。此行为可以通过
StepInterruptionPolicy进行控制。然而,唯一的选择是抛出或不抛出异常,
因此这始终是作业的非正常结束。
添加页脚记录
通常,在写入平面文件时,必须在所有处理完成后,将一条“页脚”记录追加到文件末尾。这可以通过 Spring Batch 提供的 FlatFileFooterCallback 接口来实现。FlatFileFooterCallback(及其对应的 FlatFileHeaderCallback)是 FlatFileItemWriter 的可选属性,可以添加到项写入器中。
-
Java
-
XML
以下示例展示了如何在 Java 中使用FlatFileHeaderCallback和FlatFileFooterCallback:
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
以下示例展示了如何在 XML 中使用 FlatFileHeaderCallback 和 FlatFileFooterCallback:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口只有一个方法,当需要写入页脚时会调用该方法,如下面的接口定义所示:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
编写摘要页脚
涉及页脚记录的一个常见需求是在输出过程中聚合信息,并将这些信息附加到文件末尾。此页脚通常用作文件的摘要或提供校验和。
例如,如果批处理作业正在向平面文件写入 Trade 条记录,并且要求将所有 Trades 的总金额放在页脚中,则可以使用以下 ItemWriter 实现:
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
此 TradeItemWriter 存储一个 totalAmount 值,该值会随着写入的每个 Trade 项通过 amount 递增。在处理完最后一个 Trade 后,框架会调用 writeFooter,该方法将 totalAmount 写入文件。请注意,write 方法使用了一个临时变量 chunkTotal,用于存储当前块中所有 Trade 金额的总和。这样做的目的是确保如果在 write 方法中发生跳过操作,totalAmount 的值保持不变。只有在 write 方法执行完毕且确认未抛出任何异常后,才会更新 totalAmount。
为了使 writeFooter 方法被调用,TradeItemWriter(它实现了 FlatFileFooterCallback)必须作为 footerCallback 被装配到 FlatFileItemWriter 中。
-
Java
-
XML
以下示例展示了如何在 Java 中装配 TradeItemWriter:
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
以下示例展示了如何在 XML 中装配 TradeItemWriter:
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
到目前为止,TradeItemWriter 的编写方式仅在 Step 不可重启的情况下才能正常工作。这是因为该类是有状态的(因为它存储了 totalAmount),但 totalAmount 并未持久化到数据库中。因此,在发生重启时无法检索它。为了使该类可重启,应实现 ItemStream 接口以及 open 和 update 方法,如下例所示:
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update 方法在对象持久化到数据库之前,将totalAmount的最新版本存储到ExecutionContext中。open 方法从ExecutionContext检索任何现有的totalAmount,并将其用作处理的起点,使得TradeItemWriter能够在上次运行Step中断的位置重新启动并继续执行。
基于驱动查询的 ItemReader
在关于读取器和写入器的章节中,讨论了使用分页进行数据库输入。许多数据库提供商(例如 DB2)具有极为悲观的锁定策略,如果正在读取的表也需要被在线应用程序的其他部分使用,则可能会引发问题。此外,在来自某些提供商的数据库上,针对极大数据集打开游标也可能导致问题。因此,许多项目更倾向于使用“驱动查询”方法来读取数据。如下图所示,该方法通过迭代键(而非需要返回的整个对象)来实现:
如您所见,前图中展示的示例使用了与基于游标的示例相同的 'FOO' 表。然而,SQL 语句并未选择整行数据,而仅选择了 ID。因此,从 read 返回的不是一个 FOO 对象,而是一个 Integer。随后可以使用该数字查询“详细信息”,从而获取一个完整的 Foo 对象,如下图所示:
应使用 ItemProcessor 将从驱动查询中获取的键转换为完整的 Foo 对象。可以使用现有的 DAO 根据该键查询完整对象。
多行记录
虽然对于平面文件而言,通常每条记录都限制在单行内,但文件中出现跨越多行且具有多种格式的记录也很常见。以下文件摘录展示了此类排列的一个示例:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
以'HEA'开头的行和以'FOT'开头的行之间的所有内容被视为一条记录。为了正确处理这种情况,必须考虑以下几点:
-
ItemReader不能一次只读取一条记录,而必须将多行记录的每一行作为一个整体进行读取,以便完整地传递给ItemWriter。 -
每种行类型可能需要不同的分词方式。
由于单条记录跨越多行,且我们可能不知道具体有多少行,因此 ItemReader 必须确保始终读取完整的记录。为此,应实现一个自定义的 ItemReader 作为 FlatFileItemReader 的包装器。
-
Java
-
XML
以下示例展示了如何在 Java 中实现自定义的 ItemReader:
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例展示了如何在 XML 中实现自定义的 ItemReader:
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
为了确保每一行都能被正确分词,这对于固定长度输入尤为重要,可以在委托的 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。有关更多详细信息,请参阅 读者与写入者章节中的 FlatFileItemReader。随后,委托读取器会使用 PassThroughFieldSetMapper 将每一行的 FieldSet 返回给外层的 ItemReader。
-
Java
-
XML
以下示例展示了如何在 Java 中确保每一行都被正确分词:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例展示了如何确保 XML 中的每一行都被正确标记化:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
此包装器必须能够识别记录的结尾,以便它可以持续在其委托对象上调用 read(),直到到达结尾。对于读取的每一行,包装器应构建要返回的项。一旦到达脚注,该项即可返回并交付给 ItemProcessor 和 ItemWriter,如下例所示:
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业需要在作业内部调用外部命令。 此类进程可以由调度器单独启动,但将失去关于运行的通用元数据的优势。此外,多步骤作业也需要被拆分为多个作业。
由于这种需求非常普遍,Spring Batch 提供了一个用于调用系统命令的 Tasklet 实现。
-
Java
-
XML
以下示例展示了如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例展示了如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
处理未找到输入时的步骤完成
在许多批处理场景中,在数据库或文件中未找到要处理的行并不属于异常情况。Step 仅表示未找到任何工作,并以读取 0 条项完成。Spring Batch 开箱即用的所有 ItemReader 实现默认都采用此方法。如果即使存在输入也未写入任何内容(这通常发生在文件命名错误或其他类似问题时),这可能会导致一些困惑。因此,应检查元数据本身,以确定框架找到了多少需要处理的工作。然而,如果未找到输入被视为异常情况怎么办?在这种情况下,以编程方式检查元数据中是否有未处理的项并触发失败是最佳解决方案。由于这是一种常见用例,Spring Batch 提供了一个具有此确切功能的监听器,如 NoWorkFoundStepExecutionListener 的类定义所示:
public class NoWorkFoundStepExecutionListener implements StepExecutionListener {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前述的 StepExecutionListener 会在 'afterStep' 阶段检查 StepExecution 的 readCount 属性,以确定是否未读取任何项。如果是这种情况,则返回退出代码 FAILED,表示 Step 应该失败。否则,返回 null,这不会影响 Step 的状态。
将数据传递到后续步骤
在不同步骤之间传递信息通常非常有用。这可以通过ExecutionContext来实现。需要注意的是,存在两种ExecutionContexts:一种位于Step级别,另一种位于Job级别。StepExecutionContext仅在当前步骤执行期间有效,而JobExecutionContext则在整个Job过程中持续存在。另一方面,每当Step提交一个数据块时,StepExecutionContext就会被更新;而JobExecutionContext仅在每个Step结束时才进行更新。
这种分离的后果是,所有数据必须放置在 Step
ExecutionContext 中,同时 Step 正在执行。这样做可确保在 Step 运行时数据被正确存储。如果数据被存储到 Job ExecutionContext,
那么它在 Step 执行期间将不会被持久化。如果 Step 失败,这些数据将会丢失。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
为了使数据对未来的Steps可用,必须在步骤完成后将其“提升”到Job
ExecutionContext。Spring Batch 为此提供了
ExecutionContextPromotionListener。必须使用ExecutionContext中与需要提升的数据相关的键来配置该监听器。此外,还可以(可选地)配置一个退出代码模式列表,以指定在哪些情况下应执行提升操作(默认为COMPLETED)。与所有监听器一样,它必须在Step上进行注册。
-
Java
-
XML
以下示例展示了如何在 Java 中将步骤提升为 Job ExecutionContext:
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例展示了如何在 XML 中将步骤提升为 Job ExecutionContext:
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,必须从 Job ExecutionContext 中检索保存的值,如下例所示:
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}