项读取和写入
ItemReaders 和 ItemWriters
所有批处理都可以用最简单的形式来描述为大量读取数据,执行某种类型的计算或转换,并写入结果 外。 Spring Batch 提供了三个关键接口来帮助执行批量读写:ItemReader
,ItemProcessor
和ItemWriter
.
ItemReader
虽然是一个简单的概念,但ItemReader
是提供来自许多数据的手段不同类型的输入。最一般的例子包括:
-
平面文件:平面文件项目读取器从平面文件中读取数据行,通常描述具有由文件中的固定位置定义或分隔的数据字段的记录通过一些特殊字符(例如逗号)。
-
XML:XML
ItemReaders
独立于用于解析的技术处理 XML,映射和验证对象。输入数据允许验证 XML 文件针对 XSD 模式。 -
数据库:访问数据库资源以返回结果集,这些结果集可以映射到对象进行处理。默认 SQL
ItemReader
实现调用RowMapper
要返回对象,请在需要重新启动时跟踪当前行,存储基本statistics,并提供一些稍后解释的事务增强功能。
还有更多的可能性,但我们本章将重点放在基本的可能性上。 一个 所有可用的完整列表ItemReader
实现可以在附录 A 中找到。
ItemReader
是泛型input作的基本接口,如以下接口定义所示:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
这read
方法定义了ItemReader
. 调用它返回一个项目或null
如果没有更多项目。项目可能表示文件中的一行、数据库中的一行或 XML 文件中的元素。通常预计这些被映射到可用的域对象(例如Trade
,Foo
或其他),但合同中没有要求这样做。
预计ItemReader
接口仅转发。但是,如果底层资源是事务性的(例如 JMS 队列),则调用read
可能会在回滚方案中的后续调用中返回相同的逻辑项。 是的 还值得注意的是,缺乏要处理的项目ItemReader
不会导致抛出异常。例如,数据库ItemReader
配置了返回 0 个结果的查询返回null
在第一次调用read
.
ItemWriter
ItemWriter
在功能上类似于ItemReader
但具有逆运算。资源仍然需要定位、打开和关闭,但它们的不同之处在于ItemWriter
写出,而不是读入。对于数据库或队列,这些作可以是插入、更新或发送。序列化的格式输出特定于每个批处理作业。
与ItemReader
,ItemWriter
是一个相当通用的接口,如以下接口定义所示:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
与read
上ItemReader
,write
提供基本契约ItemWriter
. 它 尝试写出传入的项目列表,只要它处于打开状态。因为它是通常期望项目被“批处理”成一个块,然后输出,因此接口接受项目列表,而不是项目本身。写出列表后,可以在从写入返回之前执行任何可能需要的刷新 方法。 例如,如果写入 Hibernate DAO,则可以进行多次写入调用,每个项目一个。然后,编写器可以调用flush
在休眠会话之前 返回。
ItemStream
双ItemReaders
和ItemWriters
很好地服务于他们的个人目的,但有一个它们之间的共同问题需要另一个接口。一般来说,作为批处理作业范围的一部分,读取器和写入器需要打开、关闭,并且需要一种持久化状态的机制。 这ItemStream
接口可以达到这个目的,如以下示例所示:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在描述每种方法之前,我们应该提到ExecutionContext
. 客户ItemReader
也实现ItemStream
应该调用open
在调用任何read
,以打开任何资源(例如文件)或获取连接。类似的
限制适用于ItemWriter
实现ItemStream
.如中所述
第 2 章,如果在ExecutionContext
,它可以用来启动
这ItemReader
或ItemWriter
在初始状态以外的位置。相反close
用于确保在打开期间分配的任何资源都被安全释放。update
主要是为了确保当前持有的任何状态都被加载到
提供的ExecutionContext
.在提交之前调用此方法,以确保
当前状态在提交之前保留在数据库中。
在特殊情况下,客户端的ItemStream
是一个Step
(来自Spring
Batch Core),一个ExecutionContext
为每个 StepExecution 创建,以允许用户
存储特定执行的状态,并期望在
一样JobInstance
重新启动。对于熟悉 Quartz 的人来说,语义
与Quartz非常相似JobDataMap
.
委托模式和向步骤注册
请注意,CompositeItemWriter
是委托模式的一个示例,它是
常见于 Spring Batch。委托本身可能会实现回调接口,
如StepListener
.如果它们与 Spring 结合使用,并且它们是否与 Spring 结合使用
Batch Core 作为Step
在Job
,那么他们几乎肯定需要
使用Step
.直接
连接到Step
如果它实现了ItemStream
或StepListener
接口。但是,由于委托不为所知Step
,
它们需要作为侦听器或流(或两者(如果适用)注入。
以下示例演示如何在 XML 中将委托作为流注入:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
以下示例演示如何在 XML 中将委托作为流注入:
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(fooReader())
.processor(fooProcessor())
.writer(compositeItemWriter())
.stream(barWriter())
.build();
}
@Bean
public CustomCompositeItemWriter compositeItemWriter() {
CustomCompositeItemWriter writer = new CustomCompositeItemWriter();
writer.setDelegate(barWriter());
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter();
}
平面文件
交换批量数据的最常见机制之一一直是平面 文件。 与 XML 不同,XML 有一个商定的标准来定义它的结构方式(XSD),任何阅读平面文件的人都必须提前确切地了解文件的确切方式 结构。 通常,所有平面文件分为两种类型:分隔文件和固定长度文件。分隔文件是字段用分隔符(例如逗号)分隔的文件。固定长度文件具有设定长度的字段。
这FieldSet
在 Spring Batch 中处理平面文件时,无论是用于输入还是输出,最重要的类之一是FieldSet
. 许多架构和库包含用于帮助您从文件中读入的抽象,但它们通常返回一个String
或数组String
对象。 这真的只让你成功了一半 那里。 一个FieldSet
是 Spring Batch 的抽象,用于启用字段绑定文件资源。它允许开发人员以与他们将处理数据库输入的方式。 一个FieldSet
在概念上类似于 JDBCResultSet
.一个FieldSet
只需要一个参数:aString
Tokens数组。或者,您还可以配置字段的名称,以便字段可以通过索引或名称访问,如模式所示ResultSet
,如下所示 例:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
上还有更多选项FieldSet
接口,例如Date
长BigDecimal
,依此类推。最大的优点是FieldSet
是它提供了平面文件输入的一致解析。而不是每个批处理作业的解析方式不同潜在的意外方式,它可以是一致的,无论是在处理由format 异常引起的错误时,还是在进行简单的数据转换时。
FlatFileItemReader
平面文件是最多包含二维(表格)数据的任何类型的文件。在 Spring Batch 框架中读取平面文件是由名为FlatFileItemReader
,它提供了读取和解析平面的基本功能 文件。 两个最重要的必需依赖项FlatFileItemReader
是Resource
和LineMapper
. 这LineMapper
界面将在下文中进行更多探讨 部分。 资源属性表示 Spring CoreResource
. 文档 解释如何创建这种类型的 bean 可以在 Spring框架,第 5 章中找到。参考资料。因此,本指南不会详细介绍 创建Resource
对象,除了显示以下简单示例之外:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由企业应用程序集成 (EAI) 管理基础设施,其中建立了用于移动文件的外部接口的放置区从 FTP 位置到批处理位置,反之亦然。文件移动实用程序超出了 Spring Batch 架构的范围,但对于批处理作业流来说并不罕见,将文件移动实用程序作为作业流中的步骤包含在内。批处理架构只需要知道如何找到要处理的文件。Spring 批处理从这个起点开始将数据输入管道的过程。但是,Spring Integration 提供了许多这些类型的服务。
中的其他属性FlatFileItemReader
允许您进一步指定数据如何解释,如下表所述:
属性 | 类型 | 描述 |
---|---|---|
评论 |
字符串[] |
指定指示注释行的行前缀。 |
编码 |
字符串 |
指定要使用的文本编码。默认值为 |
线映射器 |
|
转换一个 |
行跳过 |
整数 |
文件顶部要忽略的行数。 |
记录分隔符策略 |
记录分隔符策略 |
用于确定行尾的位置并执行诸如在带引号的字符串内时继续执行行结尾之类的事情。 |
资源 |
|
要从中读取的资源。 |
skippedLinesCallback |
行回调处理程序 |
传递原始行内容的接口
文件中要跳过的行。如果 |
严格 |
布尔 |
在严格模式下,读取器会在 |
LineMapper
与RowMapper
,它采用低级结构,例如ResultSet
并返回
一Object
,平面文件处理需要相同的构造来转换String
线
变成一个Object
,如以下接口定义所示:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
基本合同是,给定当前行和它所在的行号
关联,映射器应返回生成的域对象。这类似于RowMapper
,因为每行都与其行号相关联,就像ResultSet
与其行号相关联。这允许将行号绑定到
生成的域对象,用于身份比较或提供更多信息的日志记录。然而
与RowMapper
这LineMapper
给出了一条原始线,如上所述,该线仅
让你成功了一半。该行必须标记为FieldSet
,然后可以是
映射到对象,如本文档后面所述。
LineTokenizer
用于将一行输入转换为FieldSet
是必要的,因为那里有
可以是多种格式的平面文件数据,需要转换为FieldSet
.在
Spring Batch,这个接口是LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
的合同LineTokenizer
是这样的,给定一行输入(理论上String
可以包含多行),一个FieldSet
表示线是
返回。这FieldSet
然后可以传递给FieldSetMapper
.Spring Batch 包含
以下LineTokenizer
实现:
-
DelimitedLineTokenizer
:用于记录中字段由 定界符。最常见的分隔符是逗号,但经常使用管道或分号 也。 -
FixedLengthTokenizer
:用于记录中每个字段都是“固定的 宽度“。必须为每种记录类型定义每个字段的宽度。 -
PatternMatchingCompositeLineTokenizer
:确定哪个LineTokenizer
在列表中 标记器应该通过检查模式来在特定行上使用。
FieldSetMapper
这FieldSetMapper
interface 定义了一个方法,mapFieldSet
,它需要一个FieldSet
对象并将其内容映射到对象。这个对象可以是一个自定义 DTO,一个
domain 对象或数组,具体取决于作业的需要。这FieldSetMapper
是
与LineTokenizer
从资源转换一行数据
转换为所需类型的对象,如以下接口定义所示:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
使用的模式与RowMapper
使用者JdbcTemplate
.
DefaultLineMapper
现在已经定义了用于读取平面文件的基本接口,它变成了 明确需要三个基本步骤:
-
从文件中读取一行。
-
将
String
行到LineTokenizer#tokenize()
检索FieldSet
. -
将
FieldSet
从标记化返回到FieldSetMapper
,返回 结果来自ItemReader#read()
方法。
上面描述的两个接口代表两个独立的任务:将一条线转换为FieldSet
并映射一个FieldSet
到域对象。因为输入的LineTokenizer
匹配LineMapper
(一行),以及FieldSetMapper
匹配LineMapper
,默认实现
同时使用LineTokenizer
和FieldSetMapper
被提供。这DefaultLineMapper
,
,表示大多数用户需要的行为:
public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
上述功能是在默认实现中提供的,而不是构建的 进入阅读器本身(就像在以前版本的框架中所做的那样)以允许用户 在控制解析过程方面具有更大的灵活性,尤其是在访问原始 需要线。
简单分隔文件读取示例
以下示例演示了如何使用实际域方案读取平面文件。 此特定批处理作业从以下文件中读取足球运动员:
ID,lastName,firstName,position,birthYear,debutYear "AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996", "AbduRa00,Abdullah,Rabih,rb,1975,1999", "AberWa00,Abercrombie,Walter,rb,1959,1982", "AbraDa00,Abramowicz,Danny,wr,1945,1967", "AdamBo00,Adams,Bob,te,1946,1969", "AdamCh00,Adams,Charlie,wr,1979,2003"
此文件的内容映射到以下内容Player
domain 对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
要映射一个FieldSet
变成一个Player
对象,一个FieldSetMapper
返回玩家需求
待定义,如以下示例所示:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后可以通过正确构造FlatFileItemReader
并调用read
,如以下示例所示:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
每次调用read
返回一个新的Player
对象。当文件的末尾是
达到null
被返回。
按名称映射字段
两者都允许一项附加功能DelimitedLineTokenizer
和FixedLengthTokenizer
并且在功能上类似于
JDBC 公司ResultSet
.字段的名称可以注入到其中任何一个LineTokenizer
实现来增加映射函数的可读性。
首先,将平面文件中所有字段的列名注入到分词器中,
如以下示例所示:
tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});
一个FieldSetMapper
可以按如下方式使用此信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if (fs == null) {
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
自动将 FieldSet 映射到域对象
对于许多人来说,必须编写特定的FieldSetMapper
和写作一样繁琐
一个特定的RowMapper
对于一个JdbcTemplate
.Spring Batch 通过提供
一个FieldSetMapper
通过将字段名称与 setter 匹配来自动映射字段
使用 JavaBean 规范在对象上。
再次使用足球的例子,BeanWrapperFieldSetMapper
配置如下所示
XML 中的以下代码段:
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
再次使用足球的例子,BeanWrapperFieldSetMapper
配置如下所示
Java 中的以下代码片段:
@Bean
public FieldSetMapper fieldSetMapper() {
BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
fieldSetMapper.setPrototypeBeanName("player");
return fieldSetMapper;
}
@Bean
@Scope("prototype")
public Player player() {
return new Player();
}
对于FieldSet
,映射器会在新的
实例Player
对象(因此,prototype scope 是必需的)
与 Spring 容器查找与属性名称匹配的 setter 的方式相同。每个可用
字段中的FieldSet
被映射,结果Player
对象,没有
需要代码。
固定长度文件格式
到目前为止,只详细讨论了分隔文件。但是,它们代表 只有一半的文件阅读图片。许多使用平面文件的组织使用 长度格式。下面是一个固定长度文件示例:
UK21341EAH4121131.11customer1 UK21341EAH4221232.11customer2 UK21341EAH4321333.11customer3 UK21341EAH4421434.11customer4 UK21341EAH4521535.11customer5
虽然这看起来像一个大字段,但它实际上代表 4 个不同的字段:
-
ISIN:所订购商品的唯一标识符 - 12 个字符长。
-
数量:订购商品的编号 - 3 个字符长。
-
价格:物品价格 - 5 个字符长。
-
客户:订购商品的客户的 ID - 9 个字符。
配置FixedLengthLineTokenizer
,则必须提供这些长度中的每一个
以范围的形式。
以下示例演示如何定义FixedLengthLineTokenizer
在
XML:
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
因为FixedLengthLineTokenizer
使用相同的LineTokenizer
interface 作为
前面讨论过,它返回相同的FieldSet
就好像使用了分隔符一样。这
允许使用相同的方法来处理其输出,例如使用BeanWrapperFieldSetMapper
.
支持前面的范围语法需要专门的属性编辑器 |
以下示例演示如何定义FixedLengthLineTokenizer
在
Java:
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
tokenizer.setColumns(new Range(1, 12),
new Range(13, 15),
new Range(16, 20),
new Range(21, 29));
return tokenizer;
}
因为FixedLengthLineTokenizer
使用相同的LineTokenizer
interface 作为
上面讨论过,它返回相同的FieldSet
就好像使用了分隔符一样。这
让在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper
.
单个文件中的多个记录类型
到目前为止,所有文件读取示例都做出了一个关键假设 为了简单起见:文件中的所有记录都具有相同的格式。但是,这可能 情况并非总是如此。文件可能具有不同的记录,这是很常见的 需要以不同的方式标记并映射到不同对象的格式。这 以下文件摘录说明了这一点:
USER;Smith;Peter;;T;20014539;F LINEA;1044391041ABC037.49G201XX1383.12H LINEB;2134776319DEF422.99M005LI
在这个文件中,我们有三种类型的记录,“USER”、“LINEA”和“LINEB”。“USER”行
对应于User
对象。“LINEA”和“LINEB”都对应于Line
对象
尽管“LINEA”比“LINEB”具有更多信息。
这ItemReader
单独读取每一行,但我们必须指定不同的LineTokenizer
和FieldSetMapper
对象,以便ItemWriter
接收
正确的项目。这PatternMatchingCompositeLineMapper
通过允许地图使这一切变得容易
的模式到LineTokenizers
和模式设置为FieldSetMappers
待配置。
以下示例演示如何定义FixedLengthLineTokenizer
在
XML:
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
PatternMatchingCompositeLineMapper lineMapper =
new PatternMatchingCompositeLineMapper();
Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
tokenizers.put("USER*", userTokenizer());
tokenizers.put("LINEA*", lineATokenizer());
tokenizers.put("LINEB*", lineBTokenizer());
lineMapper.setTokenizers(tokenizers);
Map<String, FieldSetMapper> mappers = new HashMap<>(2);
mappers.put("USER*", userFieldSetMapper());
mappers.put("LINE*", lineFieldSetMapper());
lineMapper.setFieldSetMappers(mappers);
return lineMapper;
}
在此示例中,“LINEA”和“LINEB”具有单独的LineTokenizer
实例,但它们都使用
一样FieldSetMapper
.
这PatternMatchingCompositeLineMapper
使用PatternMatcher#match
方法
以便为每行选择正确的委托。这PatternMatcher
允许
两个具有特殊含义的通配符:问号 (“?”) 恰好匹配一个
字符,而星号(“*”)匹配零个或多个字符。请注意,在
在配置之前,所有模式都以星号结尾,使它们有效地
前缀。这PatternMatcher
始终匹配最具体的模式
可能,无论配置中的顺序如何。因此,如果“LINE*”和“LINEA*”是
两者都列为模式,“LINEA”将匹配模式“LINEA*”,而“LINEB”将匹配
图案“LINE*”。此外,单个星号 (“*”) 可以通过匹配
任何其他模式不匹配的任何线。
以下示例显示了如何匹配 XML 中任何其他模式都不匹配的行:
<entry key="*" value-ref="defaultLineTokenizer" />
以下示例显示了如何匹配 Java 中任何其他模式都不匹配的行:
...
tokenizers.put("*", defaultLineTokenizer());
...
还有一个PatternMatchingCompositeLineTokenizer
可用于标记化
独自。
平面文件包含每条跨越多行的记录也很常见。自
处理这种情况,需要更复杂的策略。演示这一点
常见的模式可以在multiLineRecords
样本。
平面文件中的异常处理
在许多情况下,标记行可能会导致引发异常。多
平面文件不完美,包含格式不正确的记录。许多用户选择
在记录问题、原始行和行时跳过这些错误行
数。稍后可以手动检查这些日志,也可以由其他批处理作业检查。为此
原因,Spring Batch 提供了一个异常层次结构来处理解析异常:FlatFileParseException
和FlatFileFormatException
.FlatFileParseException
是
由FlatFileItemReader
当尝试读取
文件。FlatFileFormatException
是由LineTokenizer
接口,并指示在标记化时遇到的更具体的错误。
IncorrectTokenCountException
双DelimitedLineTokenizer
和FixedLengthLineTokenizer
有能力指定
可用于创建FieldSet
.但是,如果列数
names 与标记行时找到的列数不匹配,则FieldSet
无法创建,并且IncorrectTokenCountException
被抛出,其中包含
遇到的Tokens数和预期数,如以下示例所示:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
因为分词器配置了 4 个列名,但在
文件中,一个IncorrectTokenCountException
被扔了。
IncorrectLineLengthException
以固定长度格式格式化的文件在解析时有额外的要求 因为,与分隔格式不同,每列都必须严格遵守其预定义 宽度。如果总线长度不等于此列的最宽值,则 异常,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上述分词器的配置范围为:1-5、6-10 和 11-15。因此
线路总长度为15。但是,在前面的示例中,长度为 5
被传入,导致IncorrectLineLengthException
被扔掉。抛出一个
异常,而不是仅映射第一列,允许处理
行更早地失败,并且包含的信息比它失败时包含的信息更多,而它失败时
尝试在第 2 列中读取FieldSetMapper
.但是,在某些情况下,
线的长度并不总是恒定的。因此,行长度的验证可以
通过“strict”属性关闭,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
前面的示例与前面的示例几乎相同,只是tokenizer.setStrict(false)
被叫来。此设置告诉分词器不要强制执行
标记行时的行长度。一个FieldSet
现在已正确创建,并且
返回。但是,它仅包含其余值的空标记。
FlatFileItemWriter
写入平面文件与从文件读入具有相同的问题和问题 必须克服。步骤必须能够以 交易方式。
LineAggregator
就像LineTokenizer
接口是将项目转换为String
,文件编写必须有一种将多个字段聚合为单个字符串的方法
用于写入文件。在 Spring Batch 中,这是LineAggregator
,显示在
以下接口定义:
public interface LineAggregator<T> {
public String aggregate(T item);
}
这LineAggregator
是逻辑上的相反LineTokenizer
.LineTokenizer
需要一个String
并返回一个FieldSet
而LineAggregator
采用item
并返回一个String
.
PassThroughLineAggregator
最基本的实现LineAggregator
接口是PassThroughLineAggregator
,它假设对象已经是一个字符串,或者
它的字符串表示形式可以写入,如以下代码所示:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
如果直接控制创建字符串,则上述实现很有用
必需的,但FlatFileItemWriter
,例如事务和重启
支持,是必要的。
简化的文件写入示例
现在LineAggregator
接口及其最基本的实现,PassThroughLineAggregator
,已经定义了,写作的基本流程可以
解释:
-
要写入的对象被传递给
LineAggregator
为了获得String
. -
返回的
String
写入配置的文件。
以下摘录自FlatFileItemWriter
在代码中表达了这一点:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
在 XML 中,一个简单的配置示例可能如下所示:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
在 Java 中,一个简单的配置示例可能如下所示:
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<Foo>()
.name("itemWriter")
.resource(new FileSystemResource("target/test-outputs/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
FieldExtractor
前面的示例可能对写入文件的最基本用途有用。
但是,大多数用户FlatFileItemWriter
有一个需要的域对象
写出来,因此必须转换为一行。在文件阅读中,以下是
必填:
-
从文件中读取一行。
-
将行传递到
LineTokenizer#tokenize()
方法,以便检索FieldSet
. -
将
FieldSet
从标记化返回到FieldSetMapper
,返回 结果来自ItemReader#read()
方法。
文件写入有类似但相反的步骤:
-
将要写入的项目传递给编写器。
-
将项目上的字段转换为数组。
-
将生成的数组聚合成一行。
因为框架无法知道对象中的哪些字段需要
被写出来,一个FieldExtractor
必须编写才能完成将
item 放入数组中,如以下接口定义所示:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
的实现FieldExtractor
接口应该从字段创建一个数组
提供的对象,然后可以在
元素或作为固定宽度线的一部分。
PassThroughFieldExtractor
在许多情况下,集合(例如数组)Collection
或FieldSet
,
需要写出来。从这些集合类型之一中“提取”数组非常
简单。为此,请将集合转换为数组。因此,PassThroughFieldExtractor
应在此方案中使用。应该注意的是,如果
传入的对象不是集合类型,则PassThroughFieldExtractor
返回一个仅包含要提取的项的数组。
BeanWrapperFieldExtractor
与BeanWrapperFieldSetMapper
在文件读取部分中描述的是
通常最好配置如何将域对象转换为对象数组,而不是
而不是自己编写转换。这BeanWrapperFieldExtractor
提供
功能,如以下示例所示:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
此提取器实现只有一个必需属性:要
地图。就像BeanWrapperFieldSetMapper
需要字段名称来映射FieldSet
到所提供对象上的 setter,则BeanWrapperFieldExtractor
需要名称
映射到用于创建对象数组的 getter。值得注意的是,的顺序
名称确定数组中字段的顺序。
分隔文件写入示例
最基本的平面文件格式是所有字段都用分隔符分隔的格式。
这可以使用DelimitedLineAggregator
.以下示例将
输出一个简单的域对象,该对象表示客户帐户的信用额度:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
由于正在使用域对象,因此FieldExtractor
必须提供接口以及要使用的分隔符。
以下示例演示如何使用FieldExtractor
在 XML 中使用分隔符:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
以下示例演示如何使用FieldExtractor
在 Java 中使用分隔符:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
在前面的示例中,BeanWrapperFieldExtractor
前面描述的
章节用于将名称和信用字段转换为CustomerCredit
变成一个对象
数组,然后在每个字段之间用逗号写出。
也可以使用FlatFileItemWriterBuilder.DelimitedBuilder
自 自动创建BeanWrapperFieldExtractor
和DelimitedLineAggregator
如以下示例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.delimited()
.delimiter("|")
.names(new String[] {"name", "credit"})
.build();
}
固定宽度文件写入示例
分隔并不是唯一类型的平面文件格式。许多人更喜欢使用设定的宽度每列在字段之间进行划分,这通常称为“固定宽度”。Spring Batch 在文件写入中支持这一点,使用FormatterLineAggregator
.
使用相同的CustomerCredit
domain 对象,可以将其配置为
在 XML 中如下:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
使用相同的CustomerCredit
domain 对象,可以将其配置为
在 Java 中如下:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
前面的大多数示例应该看起来很熟悉。但是,格式的值 属性是新的。
以下示例显示了 XML 中的 format 属性:
<property name="format" value="%-9s%-2.0f" />
以下示例显示了 Java 中的 format 属性:
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...
底层实现是使用相同的Formatter
作为 Java 5 的一部分添加。爪哇Formatter
基于printf
C 编程的功能
语言。有关如何配置格式化程序的大多数详细信息,请访问
格式化程序的 Javadoc。
也可以使用FlatFileItemWriterBuilder.FormattedBuilder
自 自动创建BeanWrapperFieldExtractor
和FormatterLineAggregator
如以下示例所示:
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.formatted()
.format("%-9s%-2.0f")
.names(new String[] {"name", "credit"})
.build();
}
处理文件创建
FlatFileItemReader
与文件资源的关系非常简单。当读者
初始化时,它会打开文件(如果存在),如果不存在,则抛出异常。
文件编写并不是那么简单。乍一看,这似乎是一个类似的
简单的合约应该存在于FlatFileItemWriter
:如果文件已经
存在,抛出异常,如果没有,则创建它并开始写入。然而
可能会重新启动Job
可能会导致问题。在正常重启场景中,
合约被反转:如果文件存在,则从最后一个已知的 Good 开始写入它
position,如果没有,则抛出异常。但是,如果文件名
因为这份工作总是一样的?在这种情况下,如果文件
存在,除非是重启。由于这种可能性,FlatFileItemWriter
包含属性shouldDeleteIfExists
.将此属性设置为 true 会导致
打开写入器时要删除的具有相同名称的现有文件。
XML 项读取器和写入器
Spring Batch 提供了用于读取 XML 记录和 将它们映射到 Java 对象以及将 Java 对象写入 XML 记录。
流式处理 XML 的约束
StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理 处理要求(DOM 将整个输入立即加载到内存中,SAX 控件 通过允许用户仅提供回调来解析过程)。 |
我们需要考虑 XML 输入和输出在 Spring Batch 中的工作原理。首先,有一个
与文件读取和写入不同但在 Spring Batch 中常见的概念很少
XML 处理。使用 XML 处理时,而不是记录行 (FieldSet
实例)需要
要标记化,假设 XML 资源是“片段”的集合
对应于单个记录,如下图所示:

在上述场景中,“trade”标签被定义为“根元素”。万事 “<贸易>”和“</贸易>”之间被视为一个“片段”。弹簧批次 使用对象/XML 映射 (OXM) 将片段绑定到对象。但是,Spring Batch 不是 绑定到任何特定的 XML 绑定技术。典型的用途是委托给 Spring OXM,这 为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 是可选的,您可以选择实现 Spring Batch 特定的接口 如果需要的话。与 OXM 支持的技术的关系显示在 下图:

通过对 OXM 的介绍以及如何使用 XML 片段来表示记录,我们 现在可以更仔细地检查读者和作者。
StaxEventItemReader
这StaxEventItemReader
配置为处理
来自 XML 输入流的记录。首先,考虑以下一组 XML 记录,其中
这StaxEventItemReader
可以处理:
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
为了能够处理 XML 记录,需要满足以下条件:
-
根元素名称:构成 要映射的对象。示例配置通过贸易值演示了这一点。
-
资源:表示要读取的文件的 Spring 资源。
-
Unmarshaller
:Spring OXM 提供的用于映射 XML 的解组工具 fragment 添加到对象。
以下示例显示如何定义StaxEventItemReader
与根一起使用
名为trade
,资源data/iosample/input/input.xml
,以及一个 unmarshaller
叫tradeMarshaller
在 XML 中:
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
以下示例显示如何定义StaxEventItemReader
与根一起使用
名为trade
,资源data/iosample/input/input.xml
,以及一个 unmarshaller
叫tradeMarshaller
在 Java 中:
@Bean
public StaxEventItemReader itemReader() {
return new StaxEventItemReaderBuilder<Trade>()
.name("itemReader")
.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(tradeMarshaller())
.build();
}
请注意,在此示例中,我们选择使用XStreamMarshaller
,它接受
作为映射传入的别名,第一个键和值是片段的名称
(即根元素)和要绑定的对象类型。然后,类似于FieldSet
这
映射到对象类型内字段的其他元素的名称描述为
映射中的键/值对。在配置文件中,我们可以使用 Spring 配置
实用程序来描述所需的别名。
以下示例演示如何在 XML 中描述别名:
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
以下示例显示了如何在 Java 中描述别名:
@Bean
public XStreamMarshaller tradeMarshaller() {
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
return marshaller;
}
输入时,读取器读取 XML 资源,直到它识别出新片段
即将开始。默认情况下,读取器会匹配元素名称以识别新的
fragment 即将开始。读者从
fragment 并将文档传递给反序列化器(通常是 Spring
OXM的Unmarshaller
) 将 XML 映射到 Java 对象。
总之,此过程类似于以下 Java 代码,它使用 注入 由 Spring 配置提供:
StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true;
Trade trade = null;
while (hasNext) {
trade = xmlStaxEventItemReader.read();
if (trade == null) {
hasNext = false;
}
else {
System.out.println(trade);
}
}
StaxEventItemWriter
输出与输入对称。这StaxEventItemWriter
需要一个Resource
一个
marshaller,以及一个rootTagName
.Java 对象被传递给封送程序(通常是
standard Spring OXM Marshaller),它写入Resource
通过使用自定义事件
过滤StartDocument
和EndDocument
为每个
OXM 工具的片段。
以下 XML 示例使用MarshallingEventWriterSerializer
:
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="rootTagName" value="trade" />
<property name="overwriteOutput" value="true" />
</bean>
以下 Java 示例使用MarshallingEventWriterSerializer
:
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
return new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(tradeMarshaller())
.resource(outputResource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
}
前面的配置设置了三个必需的属性,并将可选的overwriteOutput=true
attrbute,在本章前面提到,用于指定是否
可以覆盖现有文件。
以下 XML 示例使用与读取示例中使用的封送处理程序相同的封送处理程序 本章前面显示:
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
以下 Java 示例使用与读取示例中使用的封送程序相同的封送程序 本章前面显示:
@Bean
public XStreamMarshaller customerCreditMarshaller() {
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
marshaller.setAliases(aliases);
return marshaller;
}
用 Java 示例进行总结,以下代码说明了所有要点 讨论,演示了所需属性的编程设置:
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
StaxEventItemWriter staxItemWriter =
new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(marshaller)
.resource(resource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
staxItemWriter.afterPropertiesSet();
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);
JSON 项读取器和写入器
Spring Batch 支持以下格式的读取和写入 JSON 资源:
[
{
"isin": "123",
"quantity": 1,
"price": 1.2,
"customer": "foo"
},
{
"isin": "456",
"quantity": 2,
"price": 1.4,
"customer": "bar"
}
]
假设 JSON 资源是对应于 单个项目。Spring Batch 不绑定到任何特定的 JSON 库。
JsonItemReader
这JsonItemReader
将 JSON 解析和绑定委托给org.springframework.batch.item.json.JsonObjectReader
接口。这个接口
旨在通过使用流式处理 API 读取 JSON 对象来实现
以块状。目前提供了两种实现:
为了能够处理 JSON 记录,需要满足以下条件:
-
Resource
:表示要读取的 JSON 文件的 Spring 资源。 -
JsonObjectReader
:用于解析 JSON 对象并将其绑定到项目的 JSON 对象读取器
以下示例显示如何定义JsonItemReader
与
上一个 JSON 资源org/springframework/batch/item/json/trades.json
和JsonObjectReader
基于Jackson:
@Bean
public JsonItemReader<Trade> jsonItemReader() {
return new JsonItemReaderBuilder<Trade>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonItemReader")
.build();
}
JsonFileItemWriter
这JsonFileItemWriter
将项的封送处理委托给org.springframework.batch.item.json.JsonObjectMarshaller
接口。合同
这个接口是获取一个对象并将其编组到 JSONString
.
目前提供了两种实现:
为了能够写入 JSON 记录,需要满足以下条件:
-
Resource
: 弹簧Resource
表示要写入的 JSON 文件 -
JsonObjectMarshaller
:一个 JSON 对象封送程序,用于将对象封送为 JSON 格式
以下示例显示如何定义JsonFileItemWriter
:
@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
return new JsonFileItemWriterBuilder<Trade>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonFileItemWriter")
.build();
}
多文件输入
在单个文件中处理多个文件是一种常见的要求Step
.假设
文件都具有相同的格式,则MultiResourceItemReader
支持这种类型的
XML 和平面文件处理的输入。考虑目录中的以下文件:
file-1.txt file-2.txt ignored.txt
file-1.txt和file-2.txt的格式相同,出于业务原因,应为
一起处理。这MultiResourceItemReader
可用于通过以下方式读取两个文件
使用通配符。
以下示例显示了如何读取 XML 中带有通配符的文件:
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
以下示例显示了如何在 Java 中读取带有通配符的文件:
@Bean
public MultiResourceItemReader multiResourceReader() {
return new MultiResourceItemReaderBuilder<Foo>()
.delegate(flatFileItemReader())
.resources(resources())
.build();
}
引用的委托是一个简单的FlatFileItemReader
.上面的配置如下:
来自两个文件的输入,处理回滚和重启方案。应该注意的是,
与任何ItemReader
,添加额外的输入(在本例中为文件)可能会导致潜在的
重新启动时出现问题。建议批处理作业与自己的个人一起工作
目录,直到成功完成。
输入资源的排序方式为MultiResourceItemReader#setComparator(Comparator) 以确保在重启方案中的作业运行之间保留资源排序。 |
数据库
与大多数企业应用程序样式一样,数据库是 批。但是,由于 batch 的绝对大小,批次与其他应用程序样式不同 系统必须使用的数据集。如果 SQL 语句返回 100 万行,则 结果集可能会将所有返回的结果保存在内存中,直到读取所有行。 Spring Batch 针对此问题提供了两种类型的解决方案:
基于游标ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法,
因为它是数据库对“流式传输”关系数据问题的解决方案。这
JavaResultSet
class 本质上是一种面向对象的机制,用于作 光标。 一个ResultSet
维护当前数据行的光标。 叫next
在ResultSet
将此游标移动到下一行。基于游标的 Spring BatchItemReader
实现在初始化时打开一个游标,并将游标向前移动一行,以便每次调用read
,返回可用于处理的映射对象。 这close
然后调用方法以确保释放所有资源。Spring 核心JdbcTemplate
通过使用回调模式来完全映射中的所有行来解决这个问题ResultSet
并关闭,然后再将控制权返回给方法调用方。但是,在批处理中,这必须等到步骤完成。下图显示了基于光标的通用图ItemReader
工程。 请注意,虽然示例使用 SQL(因为 SQL 广为人知),但任何技术都可以实现基本的 方法。

此示例说明了基本模式。给定一个“FOO”表,它有三列:ID
,NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。这
将光标的开头(第 1 行)放在 ID 2 上。此行的结果应为
完全映射Foo
对象。叫read()
再次将光标移动到下一行,
这是Foo
ID 为 3。这些读取的结果在每次读取后都会写出read
,允许对对象进行垃圾回收(假设没有实例变量
维护对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它有效
直接与ResultSet
并且需要针对连接运行 SQL 语句
从DataSource
.以下数据库模式用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人更喜欢为每一行使用一个域对象,因此以下示例使用
实现RowMapper
接口映射CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为JdbcCursorItemReader
与JdbcTemplate
,它对
请参阅如何使用JdbcTemplate
,以对比它
使用ItemReader
.出于此示例的目的,假设
这CUSTOMER
数据库。第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行前面的代码片段后,customerCredits
列表包含 1,000 个CustomerCredit
对象。在查询方法中,从DataSource
,提供的 SQL 将针对它运行,并且mapRow
方法被调用
中的每一行ResultSet
.将此与JdbcCursorItemReader
,如以下示例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行前面的代码片段后,计数器等于 1,000。如果上面的代码有
将返回的customerCredit
到列表中,结果将完全是
与JdbcTemplate
例。然而,最大的优势ItemReader
是它允许项目被“流式传输”。这read
方法可以调用一次,则该项
可以由ItemWriter
,然后可以通过以下方式获得下一个项目read
.这允许在“块”中完成项目读取和写入并提交
定期,这是高性能批处理的本质。此外,它
易于配置为注入 Spring BatchStep
.
以下示例演示如何注入ItemReader
变成一个Step
在 XML 中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例演示如何注入ItemReader
变成一个Step
在 Java 中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
其他属性
因为在 Java 中打开光标有很多不同的选项,所以有很多
属性JdbcCursorItemReader
可以设置,如下所述
桌子:
忽略警告 |
确定是否记录 SQLWarnings 或导致异常。
默认值为 |
fetch大小 |
向 JDBC 驱动程序提供有关应获取的行数的提示
当需要更多行时,从数据库中 |
最大行数 |
设置基础的最大行数限制 |
queryTimeout |
设置驱动程序等待 |
verifyCursorPosition |
因为同样 |
saveState |
指示是否应将读取器的状态保存在 |
driverSupports绝对 |
指示 JDBC 驱动程序是否支持
在 |
setUseSharedExtendedConnection(设置使用共享扩展连接) |
指示连接是否
用于游标的 应由所有其他处理使用,从而共享相同的
交易。如果将其设置为 |
HibernateCursorItemReader
就像正常的 Spring 用户就是否使用 ORM 做出重要决定一样
解决方案,这会影响它们是否使用JdbcTemplate
或HibernateTemplate
,Spring Batch 用户有相同的选项。HibernateCursorItemReader
是游标技术的 Hibernate 实现。
Hibernate 的批量使用一直存在相当大的争议。这主要是因为
Hibernate 最初是为了支持在线应用程序样式而开发的。然而,那
并不意味着它不能用于批处理。最简单的解决方法
这个问题是使用StatelessSession
而不是标准会议。这会删除
Hibernate 使用的所有缓存和脏检查都可能导致
批处理方案。有关无状态和正常之间差异的更多信息
Hibernate 会话,请参阅特定 Hibernate 版本的文档。这HibernateCursorItemReader
允许您声明 HQL 语句并传入SessionFactory
,它将每次调用传回一个项目以在同一基本中读取
时尚作为JdbcCursorItemReader
.以下示例配置使用相同的
'customer credit' 示例作为 JDBC 读取器:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
这配置了ItemReader
返回CustomerCredit
以完全相同的方式对象
如JdbcCursorItemReader
,假设休眠映射文件已
为Customer
桌子。'useStatelessSession' 属性默认为
设置为 true,但已在此处添加以引起人们对打开或关闭它的能力的注意。
还值得注意的是,底层游标的获取大小可以使用setFetchSize
财产。与JdbcCursorItemReader
,配置为
简单。
以下示例显示了如何注入 HibernateItemReader
在 XML 中:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
以下示例显示了如何注入 HibernateItemReader
在 Java 中:
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
StoredProcedureItemReader
有时需要使用存储过程获取游标数据。这StoredProcedureItemReader
工作方式类似于JdbcCursorItemReader
,但相反,相反
运行查询以获取游标时,它会运行返回游标的存储过程。
存储过程可以通过三种不同的方式返回游标:
-
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为作为 out 参数返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
以下 XML 示例配置使用与前面相同的“客户信用”示例 例子:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下 Java 示例配置使用与 早期示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
前面的示例依赖于存储过程来提供ResultSet
作为
返回的结果(前面的选项 1)。
如果存储过程返回了ref-cursor
(选项 2),那么我们需要提供
返回的 out 参数的位置ref-cursor
.
以下示例显示如何使用第一个参数作为 ref-cursor XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例显示如何使用第一个参数作为 ref-cursor Java:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
如果游标是从存储函数返回的(选项 3),我们需要将
属性 “function” 设置为true
.它默认为false
.
以下示例显示属性true
在 XML 中:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
以下示例显示属性true
在 Java 中:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
在所有这些情况下,我们都需要定义一个RowMapper
以及DataSource
和
实际过程名称。
如果存储过程或函数接受参数,则必须声明它们并
使用parameters
财产。以下示例对于 Oracle,声明了三个
参数。第一个是out
返回 ref-cursor 的参数,以及
第二个和第三个在参数中,该参数采用类型INTEGER
.
以下示例演示如何使用 XML 中的参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
以下示例显示了如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
除了参数声明之外,我们还需要指定一个PreparedStatementSetter
设置调用参数值的实现。这与
这JdbcCursorItemReader
以上。其他属性中列出的所有附加属性都适用于StoredProcedureItemReader
也。
寻呼ItemReader
实现
使用数据库游标的替代方法是运行多个查询,其中每个查询 获取部分结果。我们将这一部分称为页面。每个查询都必须 指定起始行号和我们希望在页面中返回的行数。
JdbcPagingItemReader
分页的一个实现ItemReader
是JdbcPagingItemReader
. 这JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供 SQL
用于检索构成页面的行的查询。由于每个数据库都有自己的
策略来提供分页支持,我们需要使用不同的PagingQueryProvider
对于每种受支持的数据库类型。还有SqlPagingQueryProviderFactoryBean
自动检测正在使用的数据库并确定适当的PagingQueryProvider
实现。这简化了配置,并且是
推荐的最佳做法。
这SqlPagingQueryProviderFactoryBean
要求您指定select
子句和from
第。您还可以提供可选的where
第。这些条款和
必填sortKey
用于构建 SQL 语句。
对sortKey 以保证
执行之间不会丢失任何数据。 |
打开读取器后,它每次调用都会将一个项目传递给read
在同一个
基本时尚与其他时尚一样ItemReader
.分页发生在幕后,当
需要额外的行。
以下 XML 示例配置使用与
基于游标ItemReaders
之前显示:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
以下 Java 示例配置使用与
基于游标ItemReaders
之前显示:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
这配置了ItemReader
返回CustomerCredit
对象使用RowMapper
,
必须指定。“pageSize”属性确定读取的实体数
来自数据库的每个查询运行。
'parameterValues' 属性可用于指定Map
的参数值
查询。如果您在where
子句中,每个条目的键应该
匹配命名参数的名称。如果您使用传统的“?” 占位符,则
每个条目的键应该是占位符的编号,从 1 开始。
JpaPagingItemReader
分页的另一种实现ItemReader
是JpaPagingItemReader
.JPA 确实如此
没有类似于 Hibernate 的概念StatelessSession
,所以我们必须使用其他
JPA 规范提供的功能。由于 JPA 支持分页,因此这是自然的
在使用 JPA 进行批处理时的选择。阅读每一页后,
实体将分离,并且持久性上下文被清除,以允许实体
处理页面后进行垃圾回收。
这JpaPagingItemReader
允许您声明 JPQL 语句并传入EntityManagerFactory
. 然后,它每次调用都会传回一个项目以相同的基本方式读取与任何其他方式一样ItemReader
. 当需要额外的实体时,分页会在幕后进行。
以下 XML 示例配置使用与前面显示的JDBC 读取器相同的“客户信用”示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
以下 Java 示例配置使用与 JDBC 读取器前面显示:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
这配置了ItemReader
返回CustomerCredit
对象,其方式与
描述为JdbcPagingItemReader
上面,假设CustomerCredit
object 具有
更正 JPA 注释或 ORM 映射文件。'pageSize' 属性确定
每次查询执行从数据库读取的实体数。
数据库项编写器
虽然平面文件和 XML 文件都有特定的ItemWriter
实例,没有完全等效的
在数据库世界中。这是因为事务提供了所有需要的功能。ItemWriter
实现对于文件来说是必要的,因为它们必须像事务性一样运行,
跟踪书面物品并在适当的时间冲洗或清理。
数据库不需要此功能,因为写入已经包含在
交易。用户可以创建自己的 DAO,实现ItemWriter
interface 或
使用自定义中的一个ItemWriter
这是为一般处理问题而写的。也
方式,它们应该可以毫无问题地工作。需要注意的一件事是性能
以及通过批处理输出提供的错误处理功能。这是最
使用 Hibernate 作为ItemWriter
但在使用时可能会遇到同样的问题
JDBC 批处理模式。批处理数据库输出没有任何固有缺陷,假设我们
小心刷新,数据中没有错误。但是,在
书写可能会引起混乱,因为无法知道是哪一个项目导致了
例外情况,或者即使任何单个项目负责,如
下图:

如果在写入之前缓冲了项目,则在缓冲区之前不会抛出任何错误
在提交之前刷新。例如,假设每个块写入 20 个项目,
第 15 项抛出一个DataIntegrityViolationException
.至于Step
涉及,所有 20 项都写成功,因为没有办法知道
错误发生,直到它们被实际写入。一次Session#flush()
调用,则
buffer 被清空并命中异常。此时,没有任何Step
可以做。必须回滚事务。通常,此异常可能会导致
要跳过的项目(取决于跳过/重试策略),然后不写入
再。但是,在批处理方案中,无法知道是哪个项目导致了
问题。发生故障时,正在写入整个缓冲区。唯一的方法
解决此问题是在每个项目之后刷新,如下图所示:

这是一个常见的用例,尤其是在使用 Hibernate 时,以及
的实现ItemWriter
是在每次调用时刷新write()
.这样做可以
对于要可靠地跳过的项目,Spring Batch 在内部处理
调用的粒度ItemWriter
错误后。
重用现有服务
批处理系统通常与其他应用程序样式结合使用。最
Common 是一个在线系统,但它也可能支持集成甚至厚客户端
应用程序,通过移动每个应用程序样式使用的必要批量数据。为此
原因,许多用户希望重用现有的 DAO 或
他们的批处理作业。Spring 容器本身通过允许任何
必要的类。但是,在某些情况下,现有服务
需要充当ItemReader
或ItemWriter
,要么满足
另一个 Spring Batch 类,或者因为它确实是主要的ItemReader
一步。是的
为每个需要包装的服务编写一个适配器类相当简单,但是
因为它是一个非常普遍的问题,Spring Batch 提供了实现:ItemReaderAdapter
和ItemWriterAdapter
.这两个类都实现了标准 Spring
方法,并且设置起来相当简单。
以下 XML 示例使用ItemReaderAdapter
:
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
以下 Java 示例使用ItemReaderAdapter
:
@Bean
public ItemReaderAdapter itemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(fooService());
reader.setTargetMethod("generateFoo");
return reader;
}
@Bean
public FooService fooService() {
return new FooService();
}
需要注意的重要一点是,合同的targetMethod
必须相同
作为合同read
:用尽时返回null
.否则,它返回一个Object
.任何其他事情都会阻止框架知道何时应该结束处理,
导致无限循环或错误失败,具体取决于实现
的ItemWriter
.
以下 XML 示例使用ItemWriterAdapter
:
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
以下 Java 示例使用ItemWriterAdapter
:
@Bean
public ItemWriterAdapter itemWriter() {
ItemWriterAdapter writer = new ItemWriterAdapter();
writer.setTargetObject(fooService());
writer.setTargetMethod("processFoo");
return writer;
}
@Bean
public FooService fooService() {
return new FooService();
}
防止状态持久性
默认情况下,所有ItemReader
和ItemWriter
实现存储其当前
状态在ExecutionContext
在提交之前。然而,这可能并不总是如此
期望的行为。例如,许多开发人员选择将他们的数据库读取器
“rerunnable”,使用进程指示器。将一列添加到输入数据中,以
指示是否已处理。当读取特定记录时(或
写入)处理后的标志从false
自true
.然后,SQL 语句可以
在where
子句,例如where PROCESSED_IND = false
,
从而确保在重新启动时仅返回未处理的记录。在
这种情况下,最好不要存储任何状态,比如当前行号、
因为它在重新启动时无关紧要。因此,所有读者和写入器都包括
'saveState' 属性。
以下 Bean 定义显示了如何防止 XML 中的状态持久性:
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
以下 bean 定义显示了如何防止 Java 中的状态持久性:
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PlayerSummary>()
.dataSource(dataSource)
.rowMapper(new PlayerSummaryMapper())
.saveState(false)
.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
+ "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
+ "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
+ "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
+ "from games, players where players.player_id ="
+ "games.player_id group by games.player_id, games.year_no")
.build();
}
这ItemReader
上面配置的不会在ExecutionContext
为
它参与的任何处决。
创建自定义 ItemReader 和 ItemWriters
至此,本章已经讨论了 Spring 中读写的基本契约
Batch 和一些常见的实现。然而,这些都是公平的
通用的,并且有许多潜在的场景可能不是开箱即用
实现。本节通过使用一个简单的示例演示如何创建自定义ItemReader
和ItemWriter
正确执行并执行他们的合同。这ItemReader
还实现ItemStream
,以说明如何制作读者或
writer 可重启。
习惯ItemReader
示例
出于本示例的目的,我们创建一个简单的ItemReader
实现
从提供的列表中读取。我们从实现最基本的契约开始ItemReader
这read
方法,如下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类获取一个项目列表,并一次返回一个项目,删除每个项目
从列表中。当列表为空时,它返回null
,从而满足最基本的
要求ItemReader
,如以下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使ItemReader
可重启
最后一个挑战是使ItemReader
可重启。目前,如果处理是
中断并重新开始,则ItemReader
必须从头开始。这是
实际上在许多情况下都有效,但有时批处理作业更可取
从中断的地方重新启动。关键判别因素通常是读者是否是有状态的
或无状态。无状态读取器不需要担心可重启性,而是
有状态的 One 必须尝试在重新启动时重建其最后一个已知状态。出于这个原因,
我们建议您尽可能保持自定义读取器无状态,这样您就不必担心
关于可重启性。
如果您确实需要存储状态,则ItemStream
应使用接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用ItemStream
update
方法,则ItemReader
存储在提供的ExecutionContext
键为 'current.index'。当ItemStream
open
方法调用时,ExecutionContext
检查是否
包含具有该键的条目。如果找到该键,则将当前索引移动到
那个位置。这是一个相当微不足道的例子,但它仍然符合总约:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
最ItemReaders
具有更复杂的重启逻辑。这JdbcCursorItemReader
,例如,将最后处理的行的行 ID 存储在
光标。
还值得注意的是,在ExecutionContext
不应该是
琐碎。那是因为同样ExecutionContext
用于所有ItemStreams
在
一个Step
.在大多数情况下,只需在键前面加上类名就足够了
以保证唯一性。但是,在极少数情况下,两个相同类型的ItemStream
在同一步骤中使用(如果需要两个文件
output),需要一个更独特的名称。出于这个原因,许多 Spring BatchItemReader
和ItemWriter
实现有一个setName()
属性,允许
键名称被覆盖。
习惯ItemWriter
示例
实现自定义ItemWriter
在许多方面与ItemReader
例
上面,但在足够多的方面有所不同,足以保证自己的例子。但是,添加
可重启性本质上是相同的,因此本示例中不涉及。与ItemReader
例如,一个List
用于使示例尽可能简单
可能:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(List<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使ItemWriter
可重启
要使ItemWriter
restartable 时,我们将遵循与ItemReader
,添加并实现ItemStream
接口来同步
执行上下文。在示例中,我们可能必须计算处理的项目数
并将其添加为页脚记录。如果我们需要这样做,我们可以实现ItemStream
在我们的ItemWriter
因此,计数器从执行中重新构成
上下文(如果重新打开了流)。
在许多现实情况下,自定义ItemWriters
还委托给另一个作者本身
是可重新启动的(例如,写入文件时),否则它会写入
事务资源,因此不需要可重启,因为它是无状态的。
当您有一个有状态的编写器时,您可能应该确保实现ItemStream
如
以及ItemWriter
.还要记住,作者的客户端需要意识到
这ItemStream
,因此您可能需要在配置中将其注册为流。
项读取器和写入器实现
在本节中,我们将向您介绍尚未了解过的读者和作家 在前面的部分中讨论过。
装饰
在某些情况下,用户需要将专用行为附加到预先存在的ItemReader
.Spring Batch 提供了一些开箱即用的装饰器,可以添加
附加行为对您的ItemReader
和ItemWriter
实现。
Spring Batch 包括以下装饰器:
SynchronizedItemStreamReader
使用ItemReader
不是线程安全的,Spring Batch 提供了SynchronizedItemStreamReader
decorator,可用于使ItemReader
线程安全。Spring Batch 提供了一个SynchronizedItemStreamReaderBuilder
构造
的实例SynchronizedItemStreamReader
.
SingleItemPeekableItemReader
Spring Batch 包括一个装饰器,该装饰器将 peek 方法添加到ItemReader
.这个窥视
方法允许用户向前查看一个项目。重复调用速览返回相同的
item,这是从read
方法。Spring Batch 提供了一个SingleItemPeekableItemReaderBuilder
构造SingleItemPeekableItemReader
.
SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为它不会 可以在多个线程中接受速览。只有一条线程偷看 将在下一次调用中读取该项目。 |
SynchronizedItemStreamWriter
使用ItemWriter
不是线程安全的,Spring Batch 提供了SynchronizedItemStreamWriter
decorator,可用于使ItemWriter
线程安全。Spring Batch 提供了一个SynchronizedItemStreamWriterBuilder
构造
的实例SynchronizedItemStreamWriter
.
MultiResourceItemWriter
这MultiResourceItemWriter
包装一个ResourceAwareItemWriterItemStream
并创建一个新的
output 资源,当当前资源中写入的项目计数超过itemCountLimitPerResource
.Spring Batch 提供了一个MultiResourceItemWriterBuilder
自
构造一个MultiResourceItemWriter
.
消息读者和写入者
Spring Batch 为常用消息传递系统提供以下读取器和写入器:
AmqpItemReader
这AmqpItemReader
是一个ItemReader
使用AmqpTemplate
接收或转换
来自交流的消息。Spring Batch 提供了一个AmqpItemReaderBuilder
构造
的实例AmqpItemReader
.
AmqpItemWriter
这AmqpItemWriter
是一个ItemWriter
使用AmqpTemplate
将消息发送到
AMQP 交易所。如果名称未在
提供的AmqpTemplate
.Spring Batch 提供了一个AmqpItemWriterBuilder
自
构造一个AmqpItemWriter
.
JmsItemReader
这JmsItemReader
是一个ItemReader
对于使用JmsTemplate
.模板
应该有一个默认目标,用于为read()
方法。Spring Batch 提供了一个JmsItemReaderBuilder
构造JmsItemReader
.
JmsItemWriter
这JmsItemWriter
是一个ItemWriter
对于使用JmsTemplate
.模板
应该有一个默认的目的地,用于将项目发送到write(List)
.Spring
Batch 提供了一个JmsItemWriterBuilder
构造JmsItemWriter
.
数据库读取器
Spring Batch 提供以下数据库读取器:
Neo4jItemReader
这Neo4jItemReader
是一个ItemReader
从图形数据库 Neo4j 中读取对象通过使用分页技术。Spring Batch 提供了一个Neo4jItemReaderBuilder
自
构造一个Neo4jItemReader
.
MongoItemReader
这MongoItemReader
是一个ItemReader
使用
分页技术。Spring Batch 提供了一个MongoItemReaderBuilder
构造一个
实例MongoItemReader
.
HibernateCursorItemReader
这HibernateCursorItemReader
是一个ItemStreamReader
用于读取数据库记录
构建在 Hibernate 之上。它执行 HQL 查询,然后在初始化时迭代
在结果集上作为read()
方法,依次返回一个对象
对应于当前行。Spring Batch 提供了一个HibernateCursorItemReaderBuilder
构造HibernateCursorItemReader
.
数据库编写者
Spring Batch 提供以下数据库编写器:
Neo4jItemWriter
这Neo4jItemWriter
是一个ItemWriter
写入 Neo4j 数据库的实现。
Spring Batch 提供了一个Neo4jItemWriterBuilder
构造Neo4jItemWriter
.
MongoItemWriter
这MongoItemWriter
是一个ItemWriter
写入 MongoDB 存储的实现
使用 Spring Data 的MongoOperations
.Spring Batch 提供了一个MongoItemWriterBuilder
构造MongoItemWriter
.
RepositoryItemWriter
这RepositoryItemWriter
是一个ItemWriter
包装器CrudRepository
从Spring开始
数据。Spring Batch 提供了一个RepositoryItemWriterBuilder
构造
这RepositoryItemWriter
.
HibernateItemWriter
这HibernateItemWriter
是一个ItemWriter
使用 Hibernate 会话来保存或
更新不属于当前 Hibernate 会话的实体。Spring Batch 提供
一个HibernateItemWriterBuilder
构造HibernateItemWriter
.
JdbcBatchItemWriter
这JdbcBatchItemWriter
是一个ItemWriter
使用NamedParameterJdbcTemplate
为提供的所有项目执行一批语句。
Spring Batch 提供了一个JdbcBatchItemWriterBuilder
构造JdbcBatchItemWriter
.
专业读者
Spring Batch 提供以下专业阅读器:
LdifReader
这LdifReader
从Resource
,
解析它们,并返回一个LdapAttribute
对象read
执行。弹簧批次
提供一个LdifReaderBuilder
构造LdifReader
.
专业作家
Spring Batch 提供以下专业编写器: