项目处理

ItemReader 和 ItemWriter 接口对于其特定任务都非常有用,但如果您想在写入之前插入业务逻辑该怎么办?读取和写入的一个选项是使用组合模式:创建一个包含另一个 ItemWriterItemWriter,或创建一个包含另一个 ItemReaderItemReader。以下代码展示了一个示例:spring-doc.cadn.net.cn

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(Chunk<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上述类包含另一个 ItemWriter,在提供了一些业务逻辑之后,它将委托给该对象。这种模式也可以轻松地用于 ItemReader,例如基于主 ItemReader 提供的输入获取更多参考数据。如果您需要自行控制对 write 的调用,这也非常有用。然而,如果您只想在实际写入之前“转换”传入的待写入项,则无需自行 write。您可以直接修改该项。针对此场景,Spring Batch 提供了 ItemProcessor 接口,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

一个ItemProcessor很简单。给定一个对象,对其进行转换并返回另一个对象。 提供的对象可能与原对象类型相同,也可能不同。关键在于在此过程中可以应用业务逻辑,而该逻辑的创建完全由开发者决定。一个ItemProcessor可以直接接入到一个步骤中。例如,假设一个 ItemReader提供类型为Foo的类,并且在写入之前需要将其转换为类型Bar。以下示例展示了一个执行该转换的ItemProcessorspring-doc.cadn.net.cn

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar> {
    public void write(Chunk<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的示例中,有一个名为 Foo 的类、一个名为 Bar 的类,以及一个遵循 ItemProcessor 接口的名为 FooProcessor 的类。此转换很简单,但此处也可以执行任何类型的转换。BarWriter 会写入 Bar 对象,如果提供了任何其他类型,则会抛出异常。类似地,如果提供的不是 FooFooProcessor 也会抛出异常。然后,如以下示例所示,可以将 FooProcessor 注入到 Step 中:spring-doc.cadn.net.cn

Java 配置
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Bar>chunk(2).transactionManager(transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}
XML 配置
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

ItemProcessorItemReaderItemWriter 的区别在于,对于 Step 而言,ItemProcessor 是可选的。spring-doc.cadn.net.cn

链式 ItemProcessors

执行单个转换在许多场景中很有用,但如果您想将多个 ItemProcessor 实现“链接”在一起该怎么办?您可以使用前面提到的组合模式来实现。要更新之前的单个转换示例,Foo 被转换为 Bar,然后 Bar 又被转换为 Foobar 并写入输出,如下例所示:spring-doc.cadn.net.cn

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(Chunk<? extends Foobar> items) throws Exception {
        //write items
    }
}

一个 FooProcessor 和一个 BarProcessor 可以“链接”在一起,得到结果Foobar,如下例所示:spring-doc.cadn.net.cn

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,您可以将复合处理器配置到 Stepspring-doc.cadn.net.cn

Java 配置
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Foobar>chunk(2).transactionManager(transactionManager)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}
XML 配置
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.infrastructure.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

过滤记录

项处理器的一个典型用途是在将记录传递给ItemWriter之前将其过滤掉。过滤是一种不同于跳过(skip)的操作。跳过表示某条记录无效,而过滤则表示该记录不应被写入。spring-doc.cadn.net.cn

例如,考虑一个批处理作业,它读取一个包含三种不同类型记录的文件:要插入的记录、要更新的记录和要删除的记录。如果系统不支持记录删除,我们就不希望将任何可删除的记录发送到 ItemWriter。然而,由于这些记录实际上并不是错误记录,我们希望将它们过滤掉,而不是跳过它们。因此,ItemWriter 将只接收可插入和可更新的记录。spring-doc.cadn.net.cn

要过滤一条记录,您可以从 ItemProcessor 返回 null。框架会检测到结果为 null,从而避免将该项添加到交付给 ItemWriter 的记录列表中。从 ItemProcessor 抛出的异常将导致跳过该记录。spring-doc.cadn.net.cn

验证输入

ItemReaders 和 ItemWriters 章节讨论了多种解析输入的方法。每个主要实现如果格式不正确,都会抛出异常。” FixedLengthTokenizer 如果缺少一定范围的数据,则会抛出异常。同样地, 尝试访问 RowMapperFieldSetMapper 中不存在的索引, 或其格式与预期不符时,将导致抛出异常。所有这些类型的异常都会在 read 返回之前被抛出。然而,它们并未解决返回项是否有效的问题。例如,如果其中一个字段是年龄,则不能为负数。它可能会正确解析,因为它存在且是一个数字,但不会引发异常。由于已经存在大量的验证框架,Spring Batch 并不试图再提供另一个。相反,它提供了一个名为 Validator 的简单接口,您可以由任意数量的框架来实现该接口,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

约定是:如果对象无效,validate 方法将抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个ValidatingItemProcessor,如下面的 Bean 定义所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}
XML 配置
<bean class="org.springframework.batch.infrastructure.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.infrastructure.item.validator.SpringValidator">
	<property name="validator">
		<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
	</property>
</bean>

您还可以使用 BeanValidatingItemProcessor 来验证带有 Bean Validation API (JSR-303) 注解的项。例如,考虑以下类型 Personspring-doc.cadn.net.cn

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

您可以通过在应用程序上下文中声明一个 BeanValidatingItemProcessor Bean,并将其注册为面向块步骤中的处理器来验证项:spring-doc.cadn.net.cn

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

容错性

当块(chunk)回滚时,读取过程中缓存的项目可能会被重新处理。如果步骤配置为容错(通常通过使用跳过或重试处理),那么任何使用的 ItemProcessor 都应以幂等的方式实现。通常这意味着对输入项目在执行 ItemProcessor 时不进行任何更改,仅更新作为结果的那个实例。spring-doc.cadn.net.cn