重复

RepeatTemplate

批处理涉及重复性操作,既可以作为一种简单的优化手段,也可以作为作业的一部分。为了对重复操作进行策略化和通用化,并提供一个相当于迭代器框架的功能,Spring Batch 提供了 RepeatOperations 接口。RepeatOperations 接口的定义如下:spring-doc.cadn.net.cn

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是一个接口,如下定义所示,它允许您插入一些需要重复执行业务逻辑:spring-doc.cadn.net.cn

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

回调会重复执行,直到实现逻辑确定迭代应当结束。这些接口中的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 枚举用于向重复操作的调用方传达是否仍有剩余工作。一般而言,RepeatOperations 的实现应当检查 RepeatStatus,并将其作为决定是否结束迭代的依据。任何希望向调用方信号表明已无剩余工作的回调,可以返回 RepeatStatus.FINISHEDspring-doc.cadn.net.cn

RepeatOperations 的最简单通用实现是 RepeatTemplatespring-doc.cadn.net.cn

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在前面的示例中,我们返回 RepeatStatus.CONTINUABLE,表示还有更多工作待完成。回调也可以返回 RepeatStatus.FINISHED,向调用者表明已无剩余工作。某些迭代可以根据回调内部执行工作的固有逻辑来终止;而另一些迭代(就回调而言)实际上是无限循环,其完成决策则委托给外部策略,正如前面示例所示。spring-doc.cadn.net.cn

重复上下文

RepeatCallback 的方法参数是一个 RepeatContext。许多回调会忽略上下文。然而,如有需要,您可以将其用作属性包,以在迭代期间存储临时数据。在 iterate 方法返回后,上下文将不再存在。spring-doc.cadn.net.cn

如果正在进行嵌套迭代,则 RepeatContext 具有父上下文。父上下文偶尔可用于存储需要在对 iterate 的调用之间共享的数据。例如,如果您想要统计迭代中某个事件发生的次数,并在后续调用中记住该计数,就会用到此功能。spring-doc.cadn.net.cn

重复状态

RepeatStatus 是 Spring Batch 用于指示处理是否已完成的一个枚举。它有两个可能的RepeatStatus值:spring-doc.cadn.net.cn

表 1. RepeatStatus 属性

spring-doc.cadn.net.cn

描述spring-doc.cadn.net.cn

CONTINUABLEspring-doc.cadn.net.cn

还有更多工作要做。spring-doc.cadn.net.cn

FINISHEDspring-doc.cadn.net.cn

不应再发生重复。spring-doc.cadn.net.cn

您可以使用 RepeatStatus 中的 and() 方法,通过逻辑与操作将 RepeatStatus 值组合起来。此举的效果是对可继续标志执行逻辑与运算。换句话说,如果任一状态为 FINISHED,则结果为 FINISHEDspring-doc.cadn.net.cn

完成策略

RepeatTemplate 内部,iterate 方法中循环的终止由 CompletionPolicy 决定,它同时也是 RepeatContext 的工厂。RepeatTemplate 负责使用当前策略创建 RepeatContext,并在迭代的每个阶段将其传递给 RepeatCallback。当回调完成其 doInIteration 后,RepeatTemplate 必须调用 CompletionPolicy 以请求其更新状态(该状态将存储在 RepeatContext 中)。然后,它会询问策略迭代是否已完成。spring-doc.cadn.net.cn

Spring Batch 提供了一些简单的通用CompletionPolicy实现。 SimpleCompletionPolicy允许执行固定次数(同时 code>2可随时强制提前完成)。spring-doc.cadn.net.cn

用户可能需要实现自己的完成策略,以应对更复杂的决策场景。例如,一个批处理窗口策略需要防止在线系统使用时执行批处理作业,这就要求自定义策略。spring-doc.cadn.net.cn

异常处理

如果在 RepeatCallback 内部抛出异常,RepeatTemplate 将咨询 ExceptionHandler,由它决定是否重新抛出该异常。spring-doc.cadn.net.cn

以下列表展示了 ExceptionHandler 接口定义:spring-doc.cadn.net.cn

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

一个常见的用例是统计给定类型的异常数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler 和一个稍微更灵活的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 具有一个限制属性以及一个应与当前异常进行比较的异常类型。所提供类型的所有子类也会被计入。在达到限制之前,给定类型的异常将被忽略,之后则会重新抛出。其他类型的异常则始终会被重新抛出。spring-doc.cadn.net.cn

SimpleLimitExceptionHandler 的一个重要可选属性是名为 useParent 的布尔标志。默认情况下,其值为 false,因此该限制仅在当前 RepeatContext 中生效。当设置为 true 时,该限制将在嵌套迭代中的兄弟上下文之间保持(例如步骤内的一组块)。spring-doc.cadn.net.cn

监听器

通常,能够在多次迭代中接收针对横切关注点的额外回调是非常有用的。为此,Spring Batch 提供了RepeatListener接口。RepeatTemplate允许用户注册RepeatListener实现,并在迭代过程中(如果可用)通过RepeatContextRepeatStatus提供回调。spring-doc.cadn.net.cn

RepeatListener 接口的定义如下:spring-doc.cadn.net.cn

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回调分别在整個迭代之前和之後執行。beforeafteronError 則應用於單個的 RepeatCallback 調用。spring-doc.cadn.net.cn

请注意,当存在多个监听器时,它们位于一个列表中,因此具有顺序。在这种情况下,openbefore 按相同顺序被调用,而 afteronErrorclose 则按相反顺序被调用。spring-doc.cadn.net.cn

并行处理

RepeatOperations 的实现并不局限于顺序执行回调。非常重要的是,某些实现能够并行执行其回调。为此,Spring Batch 提供了TaskExecutorRepeatTemplate,它使用 Spring 的TaskExecutor策略来运行RepeatCallback。默认情况下会使用SynchronousTaskExecutor,这会导致整个迭代在同一个线程中执行(与普通的RepeatTemplate相同)。spring-doc.cadn.net.cn

声明式迭代

有时,存在一些业务处理逻辑,您明确希望在每次发生时都重复执行。典型的例子是消息管道的优化。 如果一批消息频繁到达,那么批量处理它们比为每条消息单独承担事务开销更高效。Spring Batch 为此提供了一种 AOP 拦截器,它将方法调用包装在 RepeatOperations 对象中。RepeatOperationsInterceptor 会执行被拦截的方法,并根据所提供的 RepeatTemplate 中的 CompletionPolicy 进行重复执行。spring-doc.cadn.net.cn

以下示例使用 Java 配置来重复调用名为 processMessage 的服务方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):spring-doc.cadn.net.cn

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

以下示例展示了使用 Spring AOP 命名空间进行的声明式迭代,该迭代会重复调用名为 processMessage 的方法(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):spring-doc.cadn.net.cn

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

前面的示例在拦截器中使用了默认的 RepeatTemplate。若要更改策略、监听器及其他细节,您可以将一个 RepeatTemplate 实例注入到拦截器中。spring-doc.cadn.net.cn

如果被拦截的方法返回 void,拦截器将始终返回 RepeatStatus.CONTINUABLE(因此,如果 CompletionPolicy 没有有限的终止点,则存在无限循环的风险)。否则,它将返回 RepeatStatus.CONTINUABLE,直到被拦截方法的返回值为 null。此时,它将返回 RepeatStatus.FINISHED。因此,目标方法内的业务逻辑可以通过返回 null 或抛出一个由所提供 RepeatTemplate 中的 ExceptionHandler 重新抛出的异常,来表明不再有工作需要执行。spring-doc.cadn.net.cn