拦截 Step 执行

正如 Job 一样,在 Step 的执行过程中有许多事件点,用户可能需要在这些时刻执行某些功能。例如,若要写入一个需要页脚的平面文件,则必须在 Step 完成后通知 ItemWriter,以便写入页脚。这可以通过多种 Step 作用域的监听器之一来实现。spring-doc.cadn.net.cn

您可以通过 listeners 元素,将任何实现了 StepListener 扩展接口之一的类(但不能是该接口本身,因为它是空的)应用到一个步骤中。 listeners 元素在 step、tasklet 或 chunk 声明内部是有效的。我们建议您在其功能适用的层级声明监听器,或者,如果它具有多种功能(例如 StepExecutionListenerItemReadListener),则在其适用的最细粒度层级进行声明。spring-doc.cadn.net.cn

以下示例展示了在 Java 中应用于块(chunk)级别的监听器:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10).transactionManager(transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

以下示例展示了在 XML 中应用于块(chunk)级别的监听器:spring-doc.cadn.net.cn

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

一个实现了 StepListener 接口之一的 ItemReaderItemWriterItemProcessor,如果使用了命名空间 <step> 元素或其中一个 *StepFactoryBean 工厂,将自动注册到 Step。这仅适用于直接注入到 Step 中的组件。如果监听器嵌套在另一个组件内部,则需要显式注册它(如之前在使用 Step 注册 ItemStream中所述)。spring-doc.cadn.net.cn

除了 StepListener 接口之外,还提供了注解来解决相同的问题。普通的 Java 对象可以拥有带有这些注解的方法,随后它们会被转换为相应的 StepListener 类型。此外,为分块组件的自定义实现添加注解也很常见,例如 ItemReaderItemWriterTasklet。这些注解既会被用于 <listener/> 元素的 XML 解析器分析,也会通过构建器中的 listener 方法进行注册,因此您只需使用 XML 命名空间或构建器将监听器注册到步骤中即可。spring-doc.cadn.net.cn

StepExecutionListener

StepExecutionListener 代表用于 Step 执行的最通用监听器。它允许在 Step 启动之前以及在其结束时(无论是正常结束还是失败)接收通知,如下例所示:spring-doc.cadn.net.cn

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

afterStep 的返回类型为 ExitStatus,以便让监听器有机会修改在 Step 完成时返回的退出代码。spring-doc.cadn.net.cn

与此接口对应的注解是:spring-doc.cadn.net.cn

ChunkListener

“块”(chunk)定义为在事务范围内处理的项目。在每个提交间隔提交事务时,即提交一个块。您可以使用 ChunkListener 在块开始处理之前、块成功完成之后或失败时执行逻辑,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface ChunkListener<I, O> extends StepListener {

    void beforeChunk(Chunk<I> chunk);
    void afterChunk(Chunk<O> chunk);
    void afterChunkError(Exception exception, Chunk<O> chunk);

}

beforeChunk 方法在读取一批项后、事务启动之后但在处理开始之前被调用。相反,afterChunk 方法在该批项写入之后、但在事务提交或回滚之前被调用。spring-doc.cadn.net.cn

ChunkListener 监听器接口不会在并发步骤中被调用。

与此接口对应的注解是:spring-doc.cadn.net.cn

ChunkListener 的设计目的不是抛出受检异常。错误必须在实现中处理,否则步骤将终止。spring-doc.cadn.net.cn

ItemReadListener

在之前讨论跳过逻辑时,曾提到记录被跳过的数据以便后续处理可能是有益的。对于读取错误的情况,可以通过 ItemReaderListener 来实现,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

在每次对 ItemReader 调用 read 方法之前,会先调用 beforeRead 方法。afterRead 方法会在每次成功读取后被调用,并传入已读取的项。如果读取过程中发生错误,则会调用 onReadError 方法。遇到的异常将被提供以便记录日志。spring-doc.cadn.net.cn

与此接口对应的注解是:spring-doc.cadn.net.cn

ItemProcessListener

ItemReadListener 一样,可以“监听”项的处理过程,如下接口定义所示:spring-doc.cadn.net.cn

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

ItemProcessor上,beforeProcess方法会在process之前被调用,并接收待处理的项目。afterProcess方法在项目成功处理后被调用。如果处理过程中发生错误,则会调用onProcessError方法。此时将提供所遇到的异常以及尝试处理的项目,以便进行日志记录。spring-doc.cadn.net.cn

与此接口对应的注解是:spring-doc.cadn.net.cn

ItemWriteListener

您可以使用 ItemWriteListener“监听”条目的写入,如下接口定义所示:spring-doc.cadn.net.cn

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriter 上的 write 之前被调用,并接收已写入的项列表。afterWrite 方法在项成功写入之后、但在提交与该块处理相关联的事务之前被调用。如果写入过程中发生错误,则会调用 onWriteError 方法。 系统将提供所遇到的异常以及尝试写入的项,以便进行日志记录。spring-doc.cadn.net.cn

与此接口对应的注解是:spring-doc.cadn.net.cn

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了错误通知机制,但没有任何一个会告知您某条记录实际上已被跳过。例如,即使某个项目经过重试并最终成功,onWriteError 仍会被调用。因此,需要一个单独的接口来跟踪被跳过的项目,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead 在读取过程中跳过某个项时会被调用。需要注意的是,回滚可能导致同一个项被多次注册为已跳过。 onSkipInWrite 在写入过程中跳过某个项时会被调用。由于该项已成功读取(且未被跳过),因此还会将该项本身作为参数提供。spring-doc.cadn.net.cn

与此接口对应的注解是:spring-doc.cadn.net.cn

跳过监听器与事务

SkipListener 最常见的用例之一是记录被跳过的项,以便另一个批处理过程甚至人工过程可以用于评估并修复导致跳过的问题。由于在许多情况下原始事务可能会被回滚,Spring Batch 提供了两项保证:spring-doc.cadn.net.cn

  • 适当的 skip 方法(取决于错误发生的时间)每个项目仅调用一次。spring-doc.cadn.net.cn

  • SkipListener 总是在事务提交之前被调用。这是为了确保监听器调用的任何事务性资源不会因为 ItemWriter 内部的失败而回滚。spring-doc.cadn.net.cn