数据库
与大多数企业应用程序样式一样,数据库是 批。 然而,batch 与其他应用程序样式不同,因为系统必须使用的数据集的绝对大小。如果 SQL 语句返回 100 万行,则结果集可能会将所有返回的结果保存在内存中,直到读取所有行。Spring Batch 为此问题提供了两种类型的解决方案:
基于游标ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库对“流式传输”关系数据问题的解决方案。 这 JavaResultSet
class 本质上是一种面向对象的机制,用于作 光标。 一个ResultSet
维护当前数据行的光标。 叫next
在ResultSet
将此游标移动到下一行。基于游标的 Spring BatchItemReader
实现在初始化时打开一个游标,并将游标向前移动一行,以便每次调用read
,返回可用于处理的映射对象。 这close
然后调用方法以确保释放所有资源。Spring 核心JdbcTemplate
通过使用回调模式来完全映射中的所有行来解决这个问题ResultSet
并关闭,然后再将控制权返回给方法调用方。但是,在批处理中,这必须等到步骤完成。下图显示了基于光标的通用图ItemReader
工程。 请注意,虽然示例使用 SQL(因为 SQL 广为人知),但任何技术都可以实现基本的 方法。

此示例说明了基本模式。给定一个“FOO”表,该表有三列:ID
,NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。 这 将光标的开头(第 1 行)放在 ID 2 上。此行的结果应为完全映射Foo
对象。 叫read()
再次将光标移动到下一行,这是Foo
ID 为 3。这些读取的结果在每次读取后都会写出read
,允许对对象进行垃圾回收(假设没有实例变量维护对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它工作直接与ResultSet
并要求 SQL 语句针对连接运行从DataSource
. 以下数据库模式用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人更喜欢为每一行使用一个域对象,因此以下示例使用实现RowMapper
接口映射CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为JdbcCursorItemReader
与JdbcTemplate
,它对查看如何使用JdbcTemplate
,以便对比它与ItemReader
. 出于此示例的目的,假设 这CUSTOMER
数据库。 第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行前面的代码片段后,customerCredits
列表包含 1,000 个CustomerCredit
对象。 在查询方法中,从DataSource
,提供的 SQL 将针对它运行,并且mapRow
方法被调用中的每一行ResultSet
. 将此与JdbcCursorItemReader
,如以下示例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行前面的代码片段后,计数器等于 1,000。如果上面的代码有将返回的customerCredit
到列表中,结果将完全是与JdbcTemplate
例。 然而,最大的优势ItemReader
是它允许项目被“流式传输”。 这read
方法可以调用一次,则该项可以由一个ItemWriter
,然后可以通过以下方式获得下一个项目read
. 这允许以“块”形式完成项目读取和写入并提交定期,这是高性能批处理的本质。此外,它很容易配置为注入到 Spring Batch 中Step
.
-
Java
-
XML
以下示例演示如何注入ItemReader
变成一个Step
在 Java 中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
以下示例演示如何注入ItemReader
变成一个Step
在 XML 中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
其他属性
因为在 Java 中打开光标有很多不同的选项,所以有很多
属性JdbcCursorItemReader
可以设置,如下所述
桌子:
忽略警告 |
确定是否记录 SQLWarnings 或导致异常。
默认值为 |
fetch大小 |
向 JDBC 驱动程序提供有关应获取的行数的提示
当需要更多行时,从数据库中 |
最大行数 |
设置基础的最大行数限制 |
queryTimeout |
设置驱动程序等待 |
verifyCursorPosition |
因为同样 |
saveState |
指示是否应将读取器的状态保存在 |
driverSupports绝对 |
指示 JDBC 驱动程序是否支持
在 |
setUseSharedExtendedConnection(设置使用共享扩展连接) |
指示连接是否
用于游标的 应由所有其他处理使用,从而共享相同的
交易。如果将其设置为 |
StoredProcedureItemReader
有时需要使用存储过程获取游标数据。 这StoredProcedureItemReader
工作方式类似于JdbcCursorItemReader
,但相反,它运行查询以获取游标,而是运行返回游标的存储过程。存储过程可以通过三种不同的方式返回游标:
-
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为作为 out 参数返回的 ref-cursor(由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
-
Java
-
XML
以下 Java 示例配置使用与前面的示例相同的“客户信用”示例:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
以下 XML 示例配置使用与前面相同的“客户信用”示例 例子:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖于存储过程来提供ResultSet
作为
返回的结果(前面的选项 1)。
如果存储过程返回了ref-cursor
(选项 2),那么我们需要提供
返回的 out 参数的位置ref-cursor
.
-
Java
-
XML
以下示例显示如何使用第一个参数作为 ref-cursor Java:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
以下示例显示如何使用第一个参数作为 ref-cursor XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是从存储函数返回的(选项 3),我们需要将
属性 “function” 设置为true
.它默认为false
.
-
Java
-
XML
以下示例显示属性true
在 Java 中:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
以下示例显示属性true
在 XML 中:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有这些情况下,我们都需要定义一个RowMapper
以及DataSource
和
实际过程名称。
如果存储过程或函数接受参数,则必须声明它们并
使用parameters
财产。以下示例对于 Oracle,声明了三个
参数。第一个是out
返回 ref-cursor 的参数,以及
第二个和第三个在参数中,该参数采用类型INTEGER
.
-
Java
-
XML
以下示例显示了如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
以下示例演示如何使用 XML 中的参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明之外,我们还需要指定一个PreparedStatementSetter
设置调用参数值的实现。这与
这JdbcCursorItemReader
以上。其他属性中列出的所有附加属性都适用于StoredProcedureItemReader
也。
寻呼ItemReader
实现
使用数据库游标的另一种方法是运行多个查询,其中每个查询获取结果的一部分。我们将此部分称为页面。每个查询必须指定起始行号和我们希望在页面中返回的行数。
JdbcPagingItemReader
分页的一个实现ItemReader
是JdbcPagingItemReader
. 这JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供 SQL用于检索构成页面的行的查询。由于每个数据库都有自己的策略来提供分页支持,因此我们需要使用不同的PagingQueryProvider
对于每种受支持的数据库类型。还有SqlPagingQueryProviderFactoryBean
自动检测正在使用的数据库并确定适当的PagingQueryProvider
实现。 这简化了配置,并且是推荐的最佳实践。
这SqlPagingQueryProviderFactoryBean
要求您指定select
子句和from
第。 您还可以提供可选的where
第。 这些条款和 必填sortKey
用于构建 SQL 语句。
对sortKey 以保证执行之间不会丢失任何数据。 |
打开读取器后,它每次调用都会将一个项目传递给read
在相同的基本时尚与其他时尚一样ItemReader
. 分页发生在幕后需要额外的行。
-
Java
-
XML
以下 Java 示例配置使用与
基于游标ItemReaders
之前显示:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下 XML 示例配置使用与
基于游标ItemReaders
之前显示:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
这配置了ItemReader
返回CustomerCredit
对象使用RowMapper
,
必须指定。“pageSize”属性确定读取的实体数
来自数据库的每个查询运行。
'parameterValues' 属性可用于指定Map
的参数值
查询。如果您在where
子句中,每个条目的键应该
匹配命名参数的名称。如果您使用传统的“?” 占位符,则
每个条目的键应该是占位符的编号,从 1 开始。
JpaPagingItemReader
分页的另一种实现ItemReader
是JpaPagingItemReader
.JPA 确实如此
没有类似于 Hibernate 的概念StatelessSession
,所以我们必须使用其他
JPA 规范提供的功能。由于 JPA 支持分页,因此这是自然的
在使用 JPA 进行批处理时的选择。阅读每一页后,
实体将分离,并且持久性上下文被清除,以允许实体
处理页面后进行垃圾回收。
这JpaPagingItemReader
允许您声明 JPQL 语句并传入EntityManagerFactory
. 然后,它每次调用都会传回一个项目以相同的基本方式读取与任何其他方式一样ItemReader
. 当需要额外的实体时,分页会在幕后进行。
-
Java
-
XML
以下 Java 示例配置使用与 JDBC 读取器前面显示:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
以下 XML 示例配置使用与前面显示的JDBC 读取器相同的“客户信用”示例:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
这配置了ItemReader
返回CustomerCredit
对象,其方式与描述的JdbcPagingItemReader
上面,假设CustomerCredit
对象具有正确的 JPA 注释或 ORM 映射文件。'pageSize'属性确定每次查询执行从数据库读取的实体数。
数据库项编写器
虽然平面文件和 XML 文件都有特定的ItemWriter
实例,没有完全等效的在数据库世界中。这是因为事务提供了所有需要的功能。ItemWriter
实现对于文件来说是必要的,因为它们必须像事务性一样运行,跟踪写入的项目并在适当的时间刷新或清除。数据库不需要此功能,因为写入已经包含在 交易。 用户可以创建自己的 DAO,实现ItemWriter
interface 或使用自定义ItemWriter
这是为一般处理问题而编写的。 也 方式,它们应该可以毫无问题地工作。需要注意的一件事是性能以及通过批处理输出提供的错误处理功能。这是最使用休眠作为ItemWriter
但在使用时可能会遇到同样的问题
JDBC 批处理模式。批处理数据库输出没有任何固有缺陷,假设我们
小心刷新,数据中没有错误。但是,在
书写可能会引起混乱,因为无法知道是哪一个项目导致了
例外情况,或者即使任何单个项目负责,如
下图:

如果在写入之前缓冲了项目,则在缓冲区之前不会抛出任何错误
在提交之前刷新。例如,假设每个块写入 20 个项目,
第 15 项抛出一个DataIntegrityViolationException
.至于Step
涉及,所有 20 项都写成功,因为没有办法知道
错误发生,直到它们被实际写入。一次Session#flush()
调用,则
buffer 被清空并命中异常。此时,没有任何Step
可以做。必须回滚事务。通常,此异常可能会导致
要跳过的项目(取决于跳过/重试策略),然后不写入
再。但是,在批处理方案中,无法知道是哪个项目导致了
问题。发生故障时,正在写入整个缓冲区。唯一的方法
解决此问题是在每个项目之后刷新,如下图所示:

这是一个常见的用例,尤其是在使用 Hibernate 时,以及的实现ItemWriter
是在每次调用时刷新write()
. 这样做可以可靠地跳过项目,Spring Batch 在内部处理对ItemWriter
错误后。