数据库

像大多数企业应用程序风格一样,数据库是批处理的核心存储机制。然而,由于系统必须处理的数据集规模巨大,批处理与其他应用程序风格有所不同。如果一条 SQL 语句返回 100 万行数据,结果集可能会在读取所有行之前将所有返回的结果保存在内存中。Spring Batch 为此问题提供了两种解决方案:spring-doc.cadn.net.cn

基于游标的ItemReader实现

使用数据库游标通常是大多数批处理开发人员的默认方法, 因为它是数据库针对“流式”关系数据问题的解决方案。Java ResultSet 类本质上是一种用于操作游标的面向对象机制。一个 ResultSet 维护指向当前数据行的游标。在 next 上调用 ResultSet 会将此光标移动到下一行。Spring Batch 基于游标的 ItemReader 实现会在初始化时打开一个游标,并在每次调用 read 时将游标向前移动一行,返回一个可用于处理的映射对象。随后调用close方法,以确保释放所有资源。Spring 核心 JdbcTemplate 通过使用回调模式解决了这个问题,它能够完整映射 ResultSet 中的所有行,并在将控制权返回给方法调用者之前将其关闭。然而,在批处理中,这必须等到步骤完成。下图展示了基于游标的 ItemReader 如何工作的通用示意图。请注意,虽然示例使用了 SQL(因为 SQL 广为人知),但任何技术都可以实现这一基本方法。spring-doc.cadn.net.cn

Cursor Example
图 1. 游标示例

此示例说明了基本模式。给定一个名为 'FOO' 的表,该表包含三列: IDNAMEBAR,选择所有 ID 大于 1 但小于 7 的行。这会将游标的起始位置(第 1 行)定位到 ID 为 2 的行。该行的结果应是一个完全映射的 Foo 对象。再次调用 read() 会将游标移动到下一行,即 ID 为 3 的 Foo。这些读取的结果会在每次 read 之后写出,从而允许对象被垃圾回收(假设没有实例变量维持对它们的引用)。spring-doc.cadn.net.cn

JdbcCursorItemReader

JdbcCursorItemReader 是基于游标技术的 JDBC 实现。它直接操作 ResultSet,并需要针对从 DataSource 获取的连接执行 SQL 语句。以下数据库模式用作示例:spring-doc.cadn.net.cn

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人更喜欢为每一行使用一个领域对象,因此以下示例使用 RowMapper 接口的实现来映射 CustomerCredit 对象:spring-doc.cadn.net.cn

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

由于 JdbcCursorItemReaderJdbcTemplate 共享关键接口,因此通过 JdbcTemplate 读取此数据的示例有助于将其与 ItemReader 进行对比。就本示例而言,假设 CUSTOMER 数据库中包含 1,000 行数据。第一个示例使用了 JdbcTemplatespring-doc.cadn.net.cn

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

运行上述代码片段后,customerCredits 列表包含 1,000 个CustomerCredit对象。在查询方法中,从DataSource获取一个连接,针对该连接执行提供的 SQL,并为ResultSet中的每一行调用mapRow方法。将此方法与以下示例中展示的JdbcCursorItemReader的方法进行对比:spring-doc.cadn.net.cn

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

运行上述代码片段后,计数器等于 1,000。如果上面的代码将返回的 customerCredit 放入一个列表中,结果将与 JdbcTemplate 示例完全相同。然而,ItemReader 的最大优势在于它支持以“流式”方式处理项。read 方法只需调用一次,项可由 ItemWriter 写出,然后通过 read 获取下一项。这使得项的读取和写入可以分“块”进行,并定期提交,这正是高性能批处理的核心所在。此外,它还易于配置并注入到 Spring Batch 的 Step 中。spring-doc.cadn.net.cn

以下示例展示了如何在 Java 中将一个 ItemReader 注入到 Step 中:spring-doc.cadn.net.cn

Java 配置
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}

以下示例展示了如何在 XML 中将 ItemReader 注入到 Step 中:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

additionalProperties

由于在 Java 中打开游标有多种不同的选项,因此 JdbcCursorItemReader 上可以设置许多属性,如下表所述:spring-doc.cadn.net.cn

表 1. JdbcCursorItemReader 属性

忽略警告spring-doc.cadn.net.cn

确定是否记录 SQLWarnings 或抛出异常。 默认值为 true(表示记录警告)。spring-doc.cadn.net.cn

获取大小spring-doc.cadn.net.cn

为 JDBC 驱动程序提供一个提示,指明当ItemReader使用的ResultSet对象需要更多行时,应从数据库获取的行数。默认情况下,不提供任何提示。spring-doc.cadn.net.cn

最大行数spring-doc.cadn.net.cn

设置底层 ResultSet 在任意时刻可容纳的最大行数限制。spring-doc.cadn.net.cn

查询超时spring-doc.cadn.net.cn

设置驱动程序等待 Statement 对象运行的秒数。如果超过限制,将抛出 DataAccessException。(详细信息请咨询您的驱动程序提供商文档)。spring-doc.cadn.net.cn

验证光标位置spring-doc.cadn.net.cn

由于由 ItemReader 持有的相同 ResultSet 被传递给了 RowMapper,因此用户可能会自行调用 ResultSet.next(),这可能导致读取器内部计数出现问题。将此值设置为 true 后,如果在 RowMapper 调用之后游标位置与调用之前不一致,则会抛出异常。spring-doc.cadn.net.cn

保存状态spring-doc.cadn.net.cn

指示是否应将读取器的状态保存在由ItemStream#update(ExecutionContext)提供的ExecutionContext中。默认值为truespring-doc.cadn.net.cn

driverSupportsAbsolutespring-doc.cadn.net.cn

指示 JDBC 驱动程序是否支持在 ResultSet 上设置绝对行。对于支持 ResultSet.absolute() 的 JDBC 驱动程序,建议将其设置为 true,因为这可能会提高性能,尤其是在处理大型数据集时步骤失败的情况下。默认值为 falsespring-doc.cadn.net.cn

setUseSharedExtendedConnectionspring-doc.cadn.net.cn

指示用于游标的连接是否应由所有其他处理过程共用,从而共享同一事务。如果将此值设置为 false,则游标将使用其自己的连接打开,并且不参与为其余步骤处理启动的任何事务。如果将此标志设置为 true,则必须将 DataSource 包装在 ExtendedConnectionDataSourceProxy 中,以防止连接在每次提交后被关闭和释放。当将此选项设置为 true 时,用于打开游标的语句将同时使用 'READ_ONLY' 和 'HOLD_CURSORS_OVER_COMMIT' 选项创建。这允许在步骤处理中发生的事务开始和提交期间保持游标处于打开状态。要使用此功能,您需要一个支持该功能的数据库以及支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认值为 falsespring-doc.cadn.net.cn

StoredProcedureItemReader

有时需要通过存储过程获取游标数据。StoredProcedureItemReader 的工作方式与 JdbcCursorItemReader 类似,不同之处在于,它不是执行查询来获取游标,而是执行一个返回游标的存储过程。该存储过程可以通过三种不同的方式返回游标:spring-doc.cadn.net.cn

以下 Java 示例配置使用了与前面示例相同的“客户信用”示例:spring-doc.cadn.net.cn

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

以下 XML 示例配置使用了与前面示例相同的“客户信用”示例:spring-doc.cadn.net.cn

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

前面的示例依赖于存储过程提供 ResultSet 作为返回结果(即前面提到的选项 1)。spring-doc.cadn.net.cn

如果存储过程返回了 ref-cursor(选项 2),那么我们需要提供作为返回值的输出参数 ref-cursor 的位置。spring-doc.cadn.net.cn

以下示例展示了如何在 Java 中处理第一个参数为 ref-cursor 的情况:spring-doc.cadn.net.cn

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

以下示例展示了如何在 XML 中处理第一个参数为 ref-cursor 的情况:spring-doc.cadn.net.cn

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

如果游标是从存储函数返回的(选项 3),我们需要将属性"function"设置为 true。其默认值为 falsespring-doc.cadn.net.cn

以下示例展示了在 Java 中将属性设置为 truespring-doc.cadn.net.cn

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

以下示例展示了在 XML 中将属性设置为 truespring-doc.cadn.net.cn

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

在所有这些情况下,我们需要定义一个 RowMapper 和一个 DataSource,以及实际的过程名称。spring-doc.cadn.net.cn

如果存储过程或函数需要参数,则必须使用 parameters 属性进行声明和设置。以下示例针对 Oracle 数据库,声明了三个参数。第一个是 out 参数,用于返回游标(ref-cursor);第二个和第三个是输入参数,其值类型为 INTEGERspring-doc.cadn.net.cn

以下示例展示了如何在 Java 中使用参数:spring-doc.cadn.net.cn

Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

以下示例展示了如何在 XML 中使用参数:spring-doc.cadn.net.cn

XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

除了参数声明外,我们还需要指定一个 PreparedStatementSetter 实现,用于为调用设置参数值。其工作方式与上述的 JdbcCursorItemReader 相同。其他属性 中列出的所有附加属性同样适用于 StoredProcedureItemReaderspring-doc.cadn.net.cn

分页ItemReader实现

使用数据库游标的替代方案是运行多个查询,其中每个查询获取结果的一部分。我们将这一部分称为一页。每个查询必须指定起始行号以及我们希望在页面中返回的行数。spring-doc.cadn.net.cn

JdbcPagingItemReader

分页 ItemReader 的一种实现是 JdbcPagingItemReaderJdbcPagingItemReader 需要一个 PagingQueryProvider,负责提供用于检索构成页面的行的 SQL 查询。由于每个数据库都有其自己的分页支持策略,因此我们需要为每种支持的数据库类型使用不同的 PagingQueryProvider。此外,还有 SqlPagingQueryProviderFactoryBean,它可以自动检测正在使用的数据库并确定合适的 PagingQueryProvider 实现。这简化了配置,是推荐的最佳实践。spring-doc.cadn.net.cn

SqlPagingQueryProviderFactoryBean 要求您指定一个 select 子句和一个 from 子句。您还可以提供一个可选的 where 子句。这些子句以及必需的 sortKey 将用于构建 SQL 语句。spring-doc.cadn.net.cn

sortKey 上设置唯一键约束非常重要,以确保在执行之间不会丢失任何数据。

读取器打开后,每次调用 read 会返回一个条目,其基本方式与其他任何 ItemReader 相同。当需要更多行时,分页会在后台自动进行。spring-doc.cadn.net.cn

以下 Java 示例配置使用了与之前基于游标的 ItemReaders 类似的“客户信用”示例:spring-doc.cadn.net.cn

Java 配置
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

以下 XML 示例配置使用了与之前基于游标的 ItemReaders 类似的“客户信用”示例:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

此配置的 ItemReader 使用必须指定的 RowMapper 返回 CustomerCredit 个对象。'pageSize' 属性决定每次查询运行时从数据库读取的实体数量。spring-doc.cadn.net.cn

'parameterValues' 属性可用于为查询指定 Map 个参数值。如果在 where 子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果使用传统的 '?' 占位符,则每个条目的键应为占位符的编号,从 1 开始。spring-doc.cadn.net.cn

JpaPagingItemReader

另一种分页 ItemReader 的实现方式是 JpaPagingItemReader。JPA 没有类似于 Hibernate StatelessSession 的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在将 JPA 用于批处理时,这是一个自然的选择。在读取每一页后,实体将变为分离状态,并清除持久化上下文,以便在该页处理完成后允许这些实体被垃圾回收。spring-doc.cadn.net.cn

JpaPagingItemReader 允许您声明一个 JPQL 语句并传入一个EntityManagerFactory。随后,它以与任何其他ItemReader相同的基本方式,在每次调用读取时返回一项。当需要更多实体时,分页会在后台自动进行。spring-doc.cadn.net.cn

以下 Java 示例配置使用了与之前展示的 JDBC 读取器相同的“客户信用”示例:spring-doc.cadn.net.cn

Java 配置
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

以下 XML 示例配置使用了与之前展示的 JDBC 读取器相同的“客户信用”示例:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

此配置的 ItemReader 将以与上述 JdbcPagingItemReader 描述完全相同的方式返回 CustomerCredit 个对象,前提是 CustomerCredit 对象具有正确的 JPA 注解或 ORM 映射文件。'pageSize' 属性决定了每次查询执行时从数据库读取的实体数量。spring-doc.cadn.net.cn

数据库项写入器

虽然平面文件和 XML 文件都有一个特定的 ItemWriter 实例,但在数据库世界中并没有完全对应的等价物。这是因为事务提供了所有所需的功能。ItemWriter 个实现对于文件是必需的,因为它们必须表现得如同具有事务性,跟踪已写入的项,并在适当时机进行刷新或清空。数据库不需要此功能,因为写入操作已经包含在事务中。用户可以创建自己的 DAO,实现 ItemWriter 接口,或使用为通用处理需求编写的自定义 ItemWriter 中提供的 DAO。无论哪种方式,它们都应该能够正常工作,不会出现任何问题。需要注意的一件事是,通过批量输出所提供的性能和错误处理能力。这在使用 Hibernate 作为 ItemWriter 时最为常见,但在使用 JDBC 批处理模式时也可能出现相同的问题。批量处理数据库输出本身没有任何缺陷,前提是我们谨慎地执行刷新操作且数据中不存在错误。然而,写入过程中出现的任何错误都可能导致混淆,因为无法确定是哪个具体项引发了异常,甚至无法判断是否由某个具体项引起,如下图所示:spring-doc.cadn.net.cn

Error On Flush
图 2. 刷新时出错

如果在写入之前对项进行了缓冲,那么任何错误都不会被抛出,直到缓冲区在提交前被刷新。例如,假设每个块写入 20 个项,而第 15 个项抛出了一个 DataIntegrityViolationException。就 Step 而言,所有 20 个项都已成功写入,因为在实际写入之前无法知道发生了错误。一旦调用 Session#flush(),缓冲区将被清空并触发异常。此时,Step 已无能为力。事务必须回滚。通常,此异常可能导致该项被跳过(取决于跳过/重试策略),然后不再重新写入。然而,在批量处理场景中,无法确定是哪个项导致了问题。故障发生时,整个缓冲区正在被写入。解决此问题的唯一方法是在每个项之后进行刷新,如下图所示:spring-doc.cadn.net.cn

Error On Write
图 3. 写入错误

这是一个常见的使用场景,尤其是在使用 Hibernate 时。对于 ItemWriter 的实现,简单的指导原则是在每次调用 write() 时执行刷新操作。这样做可以可靠地跳过某些项,而 Spring Batch 会在内部处理错误发生后对 ItemWriter 调用的粒度控制。spring-doc.cadn.net.cn