创建自定义ItemReader和物品编写者
到目前为止,本章讨论了春季阅读和写作的基本契约
批处理以及一些常见的实现方式。不过,这些都比较容易
通用的,且有许多潜在情况可能不被开箱即用产品覆盖
实现。本节通过一个简单的示例展示了如何创建自定义ItemReader和ItemWriter正确执行和执行合同。这ItemReader其他实现ItemStream,以说明如何制作读者或
作者可重启。
习惯ItemReader示例
为了这个例子,我们创建一个简单的ItemReader实现
从提供的列表中阅读。我们首先实现最基本的合同ItemReader这读如下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前一类会逐项返回,逐项移除
从名单上。当列表为空时,返回零,因此满足最基本的
要求ItemReader如以下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
制作ItemReader可重启
最后一个挑战是制作ItemReader可重启。目前,如果处理是
被打断后又重新开始,ItemReader必须从头开始。这是
在很多情况下确实有效,但有时批处理作业更合适
从中断处重新开始。关键的判别因素通常是读者是否具备状态
或者无国籍。无状态读卡器无需担心可重启性,但
有状态的1必须尝试在重启时重构其最后已知状态。因此,
我们建议尽可能保持自定义阅读器无状态,这样你无需担心
关于可重启性。
如果你确实需要存储状态,那么ItemStream应采用以下接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次通话时ItemStream 更新方法,即当前的索引ItemReader存储在执行上下文键为“current.index”。当ItemStream 打开方法称为,执行上下文检查是否
包含该密钥的条目。如果找到了密钥,则当前索引被移动到
那个地点。这是一个相当简单的例子,但它仍然符合通用合同规定:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
最ItemReader要有更复杂的重启逻辑。这JdbcCursorItemReader例如,存储了最后处理行的行ID,
光标。
还值得注意的是,所使用的密钥在执行上下文不应该
琐碎。这是因为执行上下文用于所有ItemStreams在
一个步.在大多数情况下,只需在键前加上类名就足够了
以确保独特性。然而,在极少数情况下,两份相同类型的ItemStream在同一步骤中使用(如果需要两个文件
输出),需要一个更独特的名称。因此,许多春季学员ItemReader和ItemWriter实现具有setName()使得
关键字被覆盖。
习惯ItemWriter示例
实现自定义ItemWriter在许多方面与ItemReader例
但有足够多的不同,值得单独举例说明。然而,补充道
可重启性本质上相同,因此本例未涵盖。与ItemReader示例,a列表为了保持示例的简单性,使用
可能:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
制作ItemWriter可重启
要使ItemWriter可重启,我们会遵循与ItemReader,添加并实现ItemStream用于同步
执行背景。在这个例子中,我们可能需要统计处理的物品数量
然后把它加到脚注记录。如果需要,我们可以实施ItemStream在我们的ItemWriter以重新构成处决的反对
如果直播被重新开放,上下文。
在许多现实情况下,是定制的物品撰稿人同时也将该写作委托给另一位作者
是可重启的(例如,写入文件时),否则它会写入
事务资源和SO无需可重启,因为它是无状态的。
当你有一个有条理的写手时,你应该确保实施ItemStream如
如ItemWriter.还要记住,作者的客户需要了解
这ItemStream,所以你可能需要在配置中将其注册为流。