常见批次模式
某些批处理作业可以在Spring Batch中完全由现成组件组装完成。例如,ItemReader和ItemWriter实现可以配置为覆盖广泛的场景。然而,在大多数情况下,自定义代码必须是 写。 应用程序开发者的主要API入口是任务这ItemReader这ItemWriter,以及各种监听器接口。最简单批次
作业可以使用Spring Batch的现成输入ItemReader,但通常是
如果在处理和编写过程中存在需要开发者的自定义问题
实现ItemWriter或ItemProcessor.
本章中,我们将提供一些自定义业务逻辑中常见的模式示例。
这些示例主要以监听者界面为特色。需要注意的是,一个ItemReader或ItemWriter如果合适的话,也可以实现监听器接口。
日志项目处理与故障
一个常见的用例是需要逐项处理错误的步骤,
也许是登录到特殊通道,或者将记录插入数据库。一个
块导向步(由 Step Factory 豆子创建)允许用户实现这种使用方式
带有简单项目阅读听者对于 上的错误读以及ItemWriteListener为
错误写.以下代码片段展示了一个记录读和记录的监听器
以及写入失败:
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实现该监听器后,必须对其进行步注册。
-
Java
-
XML
以下示例展示了如何用步进 Java 注册监听器:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例展示了如何用XML步骤注册监听器:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果你的听众在onError()方法,必须在里面
这笔交易将被回滚。如果你需要使用事务型
资源,例如数据库,位于onError()方法,考虑添加声明式
交易对该方法进行(详情请参见 Spring Core 参考指南),并赋予其
传播属性 一个值REQUIRES_NEW. |
因业务原因手动停止工作
Spring Batch提供了停止()通过作业操作员界面,但这是
实际上是供操作员使用,而非应用程序开发者。有时候,确实如此
更方便或更合理地从业务内部停止工作执行
逻辑。
最简单的做法是抛出一个运行异常(这场审判既没有被重审也没有
无限期,也未被跳过)。例如,可以使用自定义异常类型,如图所示
以下示例:
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
另一个简单的方法是返回零来自ItemReader如下例所示:
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
之前的例子实际上依赖于默认实现的事实
关于完成政策该策略在该项目被完成时发出信号
处理后零.可以实施更复杂的完成策略,
注射到中步通过简单步工厂豆.
-
Java
-
XML
以下示例展示了如何在 Java 中的步骤中注入完成策略:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例展示了如何在XML中的步骤中注入完成策略:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种方法是在步执行,该检查由步在项目处理之间的框架中实现。实现这一点
否则,我们需要获得当前的步执行,这可以通过以下方式实现
实现StepListener(听音器)并用步.以下示例
显示一个设置旗帜的监听器:
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
当 标志被设置时,默认行为是该步抛出JobInterruptedException.这种行为可以通过步中断政策.然而,唯一的选择是抛出或不抛出例外,
所以这总是工作不正常的结局。
添加页脚记录
通常,在写入平面文件时,必须在
文件,在所有处理完成后。这可以通过平板文件脚注回调由 Spring Batch 提供接口。这平板文件脚注回调(以及它的对应词,FlatFileHeader回调)是 的可选性质平板文件ItemWriter并且可以添加到题目编写器中。
-
Java
-
XML
以下示例展示了如何使用FlatFileHeader回调以及平板文件脚注回调在爪哇语中:
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
以下示例展示了如何使用FlatFileHeader回调以及平板文件脚注回调以XML形式表示:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口只有一种方法,当页脚必须 如下接口定义所示:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
撰写摘要页脚
涉及页脚记录的一个常见要求是在 输出过程,并将这些信息附加到文件末尾。这个基础经常 作为文件的摘要或校验和。
例如,如果批处理作业正在写入贸易记录映射到一个平面文件,并且有
要求所有行业放在页脚中,然后
以后ItemWriter实现方式可包括:
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
这交易物品作家商店总金额当量从每个人贸易已写成。在最后一次之后贸易是处理的,框架调用writeFooter,这使得总金额进了档案。注意写方法
使用一个临时变量,块总计,存储 的总和贸易分块中的金额。这样做是为了确保,如果发生跳跃写方法,总金额保持不变。它仅在写方法,一旦我们保证没有抛出异常,我们会更新总金额.
为了writeFooter该方法被调用为交易物品作家(即
实现平板文件脚注回调)必须接入平板文件ItemWriter作为页脚回调.
-
Java
-
XML
以下示例展示了如何接线交易物品作家在爪哇语中:
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
以下示例展示了如何接线交易物品作家以XML形式表示:
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
这种方式交易物品作家目前为止,只有当 函数才正确
这步不可重启。这是因为该类是有状态的(因为它存储总金额),但总金额不会持久化到数据库。因此,它
在重启时无法取回。为了使该类可重启,
这ItemStream接口应与方法一起实现打开和更新如下例所示:
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
更新方法存储的是 的最新版本总金额前往执行上下文就在该对象被持久化到数据库之前。开放法
检索任何存在的总金额来自执行上下文并用它作为
处理的起点,允许交易物品作家在重启时接手
上次结束了步被运行了。
驱动基于查询的ItemReaders
在关于读者与写作者的章节中,数据库输入使用 讨论了呼叫。许多数据库厂商,如DB2,持极其悲观的态度 如果被读取的表还需要 使用,可能会引发问题的锁定策略 在线申请的其他部分。此外,光标会极度打开 大型数据集可能会在某些厂商的数据库上引发问题。因此,许多 项目更倾向于采用“驱动查询”方法来读取数据。这种方法有效 通过遍历密钥,而非整个需要返回的对象,作为 下图说明:
如你所见,前图示例使用了与之前相同的“FOO”表
用于基于光标的示例。然而,并非选择整行,而是仅选中
ID 是在 SQL 语句中选择的。所以,不是呜呜物品被归还
从读一整数被归还。该号码随后可用于查询
“细节”,即完整的福对象,如下图所示:
一ItemProcessor应用于转换从驱动查询获得的密钥
变成了完整的福对象。可以使用现有的DAO查询基于对象的完整内容
在钥匙上。
多线唱片
而平放文件通常每个记录都局限于一个 文件中常见的记录可能跨越多行,包含多个 格式。以下文件摘录展示了此类安排的一个示例:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
从以“HEA”开头到以“FOT”开头的行之间的所有字都是 这被视为一张唱片。为了 正确处理这种情况:
-
与其一次读取一条记录,不如说
ItemReader必须阅读 多行记录作为一组,以便可以传递给ItemWriter完整。 -
每种行类型可能需要不同的分牌化方式。
因为一条记录跨越多行,而且我们可能不知道有多少行
有,有的ItemReader必须小心,务必阅读整条记录。为了
做这个,习俗ItemReader应作为FlatFileItemReader.
-
Java
-
XML
以下示例展示了如何实现自定义ItemReader在爪哇语中:
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例展示了如何实现自定义ItemReader以XML形式表示:
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
确保每条线都正确标记,这尤其重要于
固定长度输入,即模式匹配复合线分词器可用于
委托FlatFileItemReader.看FlatFileItemReader在读者和
作家章节更多细节请阅读。委托读者随后使用一个PassThroughFieldSetMapper以交付野外集每一条线回到包裹处ItemReader.
-
Java
-
XML
以下示例展示了如何确保每行在 Java 中正确标记:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例展示了如何确保每行在XML中正确标记:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
这个包装器必须能够识别记录的结尾,以便持续地
叫read()直到最后一路到最后。对于每行读取的行,,
包装纸应该会叠加待退回的物品。一旦到达页脚,物品就可以
将被退回交付给ItemProcessor和ItemWriter,如
以下示例:
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业要求在批处理作业内部调用外部命令。 调度者可以单独启动这样的过程,但 关于这次运行的通用元数据将丢失。此外,多步骤的工作也会 还需要分成多个工作。
由于需求如此普遍,Spring Batch提供了任务实现
呼叫系统命令。
-
Java
-
XML
以下示例展示了如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例展示了如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
在未找到输入时处理步骤完成
在许多批处理场景中,找不到数据库或文件中的行进行处理并不是
特殊。这步简单地认为没有找到功,且完备时为0
物品已读。所有ItemReader春季开箱即用的实现
批量默认采用这种方法。如果没有写明,可能会导致一些混淆
即使有输入(通常发生在文件命名错误或类似情况时)
问题由此产生)。因此,应检查元数据本身以确定如何
框架发现了大量工作。然而,如果找不到输入
被认为是特别的吗?在这种情况下,程序性检查元数据中没有任何项目
加工后导致故障是最佳解决方案。因为这是一个常见的使用场景,
Spring Batch 提供了具备此功能的监听器,如所示
类定义NoWorkFoundStepExecutionListener:
public class NoWorkFoundStepExecutionListener implements StepExecutionListener {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前述StepExecutionListener检查阅读计数的属性步执行在“后步”阶段,以确定是否没有读取任何项目。如果有的话
是情况,是出口代码失败返回,表示步应该会失败。
否则零返回,但不影响步.
数据传递给未来步骤
从一个步骤传递信息到另一个步骤通常很有用。这可以通过以下方式实现
这执行上下文.关键是有两个执行上下文:一步水平,一处工作水平。这步 执行上下文仅为
长到台阶,而工作 执行上下文整个地区工作.上
另一方面,步 执行上下文每次步提交
块状,而工作 执行上下文只在每个的结尾更新步.
这种分离的结果是所有数据都必须被放置在步
执行上下文而步正在执行。这样做确保数据为
在步运行。如果数据存储在工作 执行上下文,
则在 期间不被持久化步执行。如果步失败了,数据就丢失了。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
以便将数据提供给未来步骤,必须被“提升”为工作
执行上下文步骤完成后。Spring Batch提供了ExecutionContextPromotionListener为此目的。监听器必须配置
其中密钥与执行上下文这必须被推广。可以
此外,可选配置为晋升的退出代码模式列表
应该发生 (完成是默认的)。与所有听众一样,必须注册
在步.
-
Java
-
XML
以下示例展示了如何将一步提升到工作 执行上下文在爪哇语中:
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例展示了如何将一步提升到工作 执行上下文以XML形式表示:
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,必须从以下工作 执行上下文,如图所示
以下示例:
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}