创建自定义 ItemReader 和 ItemWriters

到目前为止,本章已经讨论了 Spring 中读写的基本契约Batch 以及一些常见的实现。然而,这些都是相当泛型的,并且有许多潜在的场景可能不是开箱即用 实现。 本节通过使用一个简单的示例演示如何创建自定义ItemReaderItemWriter正确执行并执行他们的合同。 这ItemReader还实现ItemStream,以说明如何使读取器或writer 可重启。spring-doc.cadn.net.cn

习惯ItemReader示例

出于本示例的目的,我们创建一个简单的ItemReader实现从提供的列表中读取。我们首先实现最基本的ItemReaderread方法,如下代码所示:spring-doc.cadn.net.cn

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类获取一个项目列表,一次返回一个项目,从列表中删除每个。当列表为空时,它返回null,从而满足最基本的要求ItemReader,如以下测试代码所示:spring-doc.cadn.net.cn

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使ItemReader可重启

最后一个挑战是使ItemReaderrestartable 的 restartable 中。目前,如果处理被interrupted 并再次开始,则ItemReader必须从头开始。 这是 实际上在许多情况下都有效,但有时最好是批处理作业从中断的地方重新启动。关键判别因素通常是读取器是有状态的还是无状态的。无状态读取器不需要担心可重启性,但有状态读取器必须尝试在重启时重构其最后的已知状态。出于这个原因,我们建议您尽可能保持自定义读取器无状态,这样您就不必担心关于可重启性。spring-doc.cadn.net.cn

如果您确实需要存储状态,则ItemStream应使用接口:spring-doc.cadn.net.cn

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用ItemStream update方法,则ItemReader存储在提供的ExecutionContext键为 'current.index'。当ItemStream open方法调用时,ExecutionContext检查它是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个相当微不足道的示例,但它仍然符合一般合同:spring-doc.cadn.net.cn

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

ItemReaders具有更复杂的重启逻辑。 这JdbcCursorItemReader,例如,将最后处理的行的行 ID 存储在 光标。spring-doc.cadn.net.cn

还值得注意的是,在ExecutionContext不应该是 琐碎。 那是因为同样的ExecutionContext用于所有ItemStreams在 一个Step. 在大多数情况下,只需在键前面加上类名就足够了以保证唯一性。然而,在极少数情况下,两个相同类型的ItemStream在同一步骤中使用(如果需要两个文件输出),则需要一个更独特的名称。出于这个原因,许多 Spring BatchItemReaderItemWriter实现有一个setName()属性,允许此键名称被覆盖。spring-doc.cadn.net.cn

习惯ItemWriter示例

实现自定义ItemWriter在许多方面与ItemReader例 但差异足够大,足以保证自己的例子。但是,添加restartability 本质上是相同的,因此本示例中不涉及它。与ItemReader例如,一个List用于使示例尽可能简单 可能:spring-doc.cadn.net.cn

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使ItemWriter可重启

要使ItemWriterrestartable 时,我们将遵循与ItemReader,添加并实现ItemStream接口来同步执行上下文。在示例中,我们可能必须计算处理的项目数并将其添加为页脚记录。如果我们需要这样做,我们可以实现ItemStream在我们的ItemWriter以便计数器从执行中重新构成上下文,如果流被重新打开。spring-doc.cadn.net.cn

在许多现实情况下,自定义ItemWriters还委托给另一个写入器,该写入器本身是可重启的(例如,在写入文件时),否则它会写入事务资源,因此不需要可重启,因为它是无状态的。当您有一个有状态的写入器时,您可能应该确保实现ItemStream如 以及ItemWriter. 还要记住,作者的客户端需要意识到 这ItemStream,因此您可能需要在配置中将其注册为流。spring-doc.cadn.net.cn