创建自定义 ItemReader 和 ItemWriters
到目前为止,本章已经讨论了 Spring 中读写的基本契约Batch 以及一些常见的实现。然而,这些都是相当泛型的,并且有许多潜在的场景可能不是开箱即用 实现。 本节通过使用一个简单的示例演示如何创建自定义ItemReader
和ItemWriter
正确执行并执行他们的合同。 这ItemReader
还实现ItemStream
,以说明如何使读取器或writer 可重启。
习惯ItemReader
示例
出于本示例的目的,我们创建一个简单的ItemReader
实现从提供的列表中读取。我们首先实现最基本的ItemReader
这read
方法,如下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类获取一个项目列表,一次返回一个项目,从列表中删除每个。当列表为空时,它返回null
,从而满足最基本的要求ItemReader
,如以下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使ItemReader
可重启
最后一个挑战是使ItemReader
restartable 的 restartable 中。目前,如果处理被interrupted 并再次开始,则ItemReader
必须从头开始。 这是 实际上在许多情况下都有效,但有时最好是批处理作业从中断的地方重新启动。关键判别因素通常是读取器是有状态的还是无状态的。无状态读取器不需要担心可重启性,但有状态读取器必须尝试在重启时重构其最后的已知状态。出于这个原因,我们建议您尽可能保持自定义读取器无状态,这样您就不必担心关于可重启性。
如果您确实需要存储状态,则ItemStream
应使用接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用ItemStream
update
方法,则ItemReader
存储在提供的ExecutionContext
键为 'current.index'。当ItemStream
open
方法调用时,ExecutionContext
检查它是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个相当微不足道的示例,但它仍然符合一般合同:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
最ItemReaders
具有更复杂的重启逻辑。 这JdbcCursorItemReader
,例如,将最后处理的行的行 ID 存储在 光标。
还值得注意的是,在ExecutionContext
不应该是 琐碎。 那是因为同样的ExecutionContext
用于所有ItemStreams
在 一个Step
. 在大多数情况下,只需在键前面加上类名就足够了以保证唯一性。然而,在极少数情况下,两个相同类型的ItemStream
在同一步骤中使用(如果需要两个文件输出),则需要一个更独特的名称。出于这个原因,许多 Spring BatchItemReader
和ItemWriter
实现有一个setName()
属性,允许此键名称被覆盖。
习惯ItemWriter
示例
实现自定义ItemWriter
在许多方面与ItemReader
例 但差异足够大,足以保证自己的例子。但是,添加restartability 本质上是相同的,因此本示例中不涉及它。与ItemReader
例如,一个List
用于使示例尽可能简单 可能:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使ItemWriter
可重启
要使ItemWriter
restartable 时,我们将遵循与ItemReader
,添加并实现ItemStream
接口来同步执行上下文。在示例中,我们可能必须计算处理的项目数并将其添加为页脚记录。如果我们需要这样做,我们可以实现ItemStream
在我们的ItemWriter
以便计数器从执行中重新构成上下文,如果流被重新打开。
在许多现实情况下,自定义ItemWriters
还委托给另一个写入器,该写入器本身是可重启的(例如,在写入文件时),否则它会写入事务资源,因此不需要可重启,因为它是无状态的。当您有一个有状态的写入器时,您可能应该确保实现ItemStream
如 以及ItemWriter
. 还要记住,作者的客户端需要意识到 这ItemStream
,因此您可能需要在配置中将其注册为流。