配置和运行作业
配置和运行作业
在域部分,整体 讨论了架构设计,使用下图作为 指导:

虽然Job
对象可能看起来很简单
容器中,有许多配置选项,其中
开发人员必须知道。此外,还有许多考虑因素
如何Job
将运行及其元数据将如何运行
存储在该运行期间。本章将解释各种配置
选项和运行时问题Job
.
配置作业
有多种实现Job
接口。然而
构建者抽象化了配置上的差异。
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
一个Job
(通常任何Step
在其中)需要一个JobRepository
.这
配置JobRepository
通过BatchConfigurer
.
上面的示例说明了Job
由三个组成Step
实例。工作相关
构建器还可以包含有助于并行化的其他元素(Split
),
声明式流控制 (Decision
)和流定义的外部化(Flow
).
无论您使用 Java 还是 XML,都有多种Job
接口。但是,命名空间抽象化了配置中的差异。它有
只有三个必需的依赖项:名称、JobRepository
,以及Step
实例。
<job id="footballJob">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s2" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
</job>
此处的示例使用父 Bean 定义来创建步骤。 有关内联声明特定步骤详细信息的更多选项,请参阅步骤配置部分。XML 命名空间 默认引用 id 为“jobRepository”的存储库,其中 是一个合理的默认值。但是,可以显式覆盖此作:
<job id="footballJob" job-repository="specialRepository">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s3" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
</job>
除了步骤之外,作业配置还可以包含有助于
并行化 (<split>
)、声明性流控制 (<decision>
)和外部化
流定义 (<flow/>
).
可重启性
执行批处理作业时的一个关键问题涉及Job
当它是
重新 启动。启动Job
如果JobExecution
已经存在,用于特定的JobInstance
.理想情况下,所有作业都应该能够启动
从他们离开的地方开始,但在某些情况下这是不可能的。是的
完全由开发人员来确保新的JobInstance
创建于此
场景。但是,Spring Batch 确实提供了一些帮助。如果Job
永远不应该
restarted,但应始终作为新JobInstance
,则
restartable 属性可以设置为“false”。
以下示例演示如何设置restartable
字段设置为false
在 XML 中:
<job id="footballJob" restartable="false">
...
</job>
以下示例演示如何设置restartable
字段设置为false
在 Java 中:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.preventRestart()
...
.build();
}
换句话说,将 restartable 设置为 false 意味着“这个Job
不支持重新启动“。重新启动Job
那不是
restartable 会导致JobRestartException
自
被扔掉。
Job job = new SimpleJob();
job.setRestartable(false);
JobParameters jobParameters = new JobParameters();
JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);
try {
jobRepository.createJobExecution(job, jobParameters);
fail();
}
catch (JobRestartException e) {
// expected
}
这个 JUnit 代码片段展示了如何尝试创建JobExecution
第一次对于不可重启的
作业不会造成任何问题。然而,第二个
尝试将抛出一个JobRestartException
.
拦截作业执行
在执行过程中
工作,收到各种通知可能会有用
事件,以便可以执行自定义代码。这SimpleJob
通过调用JobListener
在适当的时候:
public interface JobExecutionListener {
void beforeJob(JobExecution jobExecution);
void afterJob(JobExecution jobExecution);
}
JobListeners
可以添加到SimpleJob
通过在作业上设置侦听器。
以下示例显示了如何将侦听器元素添加到 XML 作业定义中:
<job id="footballJob">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s2" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
<listeners>
<listener ref="sampleListener"/>
</listeners>
</job>
以下示例显示了如何将侦听器方法添加到 Java 作业定义中:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.listener(sampleListener())
...
.build();
}
应该注意的是,afterJob
方法是否成功或
失败Job
.如果需要确定成功或失败,可以获得
从JobExecution
如下:
public void afterJob(JobExecution jobExecution){
if (jobExecution.getStatus() == BatchStatus.COMPLETED ) {
//job success
}
else if (jobExecution.getStatus() == BatchStatus.FAILED) {
//job failure
}
}
与此接口对应的注释是:
-
@BeforeJob
-
@AfterJob
从父作业继承
如果一组作业共享相似,但不是
相同,配置,那么定义“父”可能会有所帮助Job
混凝土从中
作业可以继承属性。与class相似
Java 中的继承,“子”Job
将结合
其元素和属性与父级的元素和属性。
在下面的示例中,“baseJob”是一个抽象Job
仅定义列表的定义
听众。这Job
“job1”是一个具体的
从“baseJob”继承监听器列表并合并的定义
它与自己的侦听器列表一起生成一个Job
有两个听众和一个Step
, “step1”。
<job id="baseJob" abstract="true">
<listeners>
<listener ref="listenerOne"/>
<listeners>
</job>
<job id="job1" parent="baseJob">
<step id="step1" parent="standaloneStep"/>
<listeners merge="true">
<listener ref="listenerTwo"/>
<listeners>
</job>
有关更多详细信息,请参阅从父步骤继承部分。
作业参数验证器
在 XML 命名空间中声明的作业或使用AbstractJob
可以选择为
运行。例如,当您需要断言作业时,这很有用
以所有必需参数开头。有一个DefaultJobParametersValidator
可用于约束组合
简单的强制参数和可选参数,以及更复杂的参数
constraints 您可以自己实现接口。
验证器的配置通过子项的 XML 命名空间支持 元素,如以下示例所示:
<job id="job1" parent="baseJob3">
<step id="step1" parent="standaloneStep"/>
<validator ref="parametersValidator"/>
</job>
验证器可以指定为引用(如前面所示)或嵌套 bean bean 命名空间中的定义。
验证器的配置通过 java 构建器支持,如 以下示例:
@Bean
public Job job1() {
return this.jobBuilderFactory.get("job1")
.validator(parametersValidator())
...
.build();
}
Java 配置
Spring 3 带来了通过 java 而不是 XML 配置应用程序的能力。截至
Spring Batch 2.2.0,可以使用相同的 java 配置配置批处理作业。
基于 java 的配置有两个组件:@EnableBatchProcessing
注释和两个构建器。
这@EnableBatchProcessing
其工作@Enable方式与
Spring家族。在这种情况下,@EnableBatchProcessing
提供基本配置
构建批处理作业。在此基本配置中,一个StepScope
是
除了许多可供自动连接的 bean 之外,还创建了:
-
JobRepository
:bean 名称“jobRepository” -
JobLauncher
:bean 名称“jobLauncher” -
JobRegistry
:bean 名称“jobRegistry” -
PlatformTransactionManager
:bean 名称“transactionManager” -
JobBuilderFactory
:bean 名称“jobBuilders” -
StepBuilderFactory
:bean 名称 “stepBuilders”
此配置的核心接口是BatchConfigurer
.默认值
实现提供了上面提到的 bean,并且需要一个DataSource
作为豆子
在要提供的上下文中。此数据源由 JobRepository 使用。
您可以自定义这些 bean 中的任何一个
通过创建BatchConfigurer
接口。
通常,将DefaultBatchConfigurer
(如果BatchConfigurer
未找到),覆盖所需的 getter 就足够了。
但是,可能需要从头开始实施自己的。以下内容
示例显示了如何提供自定义事务管理器:
@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new MyTransactionManager();
}
};
}
只有一个配置类需要具有 |
基本配置到位后,用户可以使用提供的构建器工厂来
配置作业。以下示例显示了配置了JobBuilderFactory
和StepBuilderFactory
:
@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
return jobs.get("myJob").start(step1).next(step2).build();
}
@Bean
protected Step step1(ItemReader<Person> reader,
ItemProcessor<Person, Person> processor,
ItemWriter<Person> writer) {
return steps.get("step1")
.<Person, Person> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
protected Step step2(Tasklet tasklet) {
return steps.get("step2")
.tasklet(tasklet)
.build();
}
}
配置 JobRepository
使用时@EnableBatchProcessing
一个JobRepository
开箱即用。
本节介绍如何配置您自己的。
如前所述,该JobRepository
用于各种持久化的基本 CRUD作
domain 对象,例如JobExecution
和StepExecution
.许多主要都需要它
框架功能,例如JobLauncher
,Job
和Step
.
批处理命名空间抽象化了JobRepository
实现及其合作者。不过,还是有一些
可用的配置选项,如以下示例所示:
<job-repository id="jobRepository"
data-source="dataSource"
transaction-manager="transactionManager"
isolation-level-for-create="SERIALIZABLE"
table-prefix="BATCH_"
max-varchar-length="1000"/>
除了id
.如果他们是
未设置,则将使用上面显示的默认值。上面显示了它们以提高认识
目的。这max-varchar-length
默认为 2500,这是 long 的长度VARCHAR
示例架构中的列
脚本。
使用 java 配置时,一个JobRepository
为您提供。基于 JDBC 的
如果DataSource
提供,则Map
如果没有,则基于一个。然而
您可以自定义JobRepository
通过实现BatchConfigurer
接口。
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
factory.setTablePrefix("BATCH_");
factory.setMaxVarCharLength(1000);
return factory.getObject();
}
...
除了
dataSource 和 transactionManager。如果未设置,则上面显示的默认值
将被使用。上面显示了它们,以提高认识。这
max varchar 长度默认为 2500,即
长的长度VARCHAR
示例架构脚本中的列
JobRepository 的事务配置
如果命名空间或提供的FactoryBean
,事务建议是围绕存储库自动创建的。这是为了确保批处理元数据包括故障后重新启动所需的状态被正确保留。如果存储库方法不是 事务。 中的隔离级别create*
method 属性被单独指定以确保在启动作业时,如果两个进程尝试同时启动同一个作业,则只有一个成功。该方法是SERIALIZABLE
,这是相当激进的。READ_COMMITTED
将像 井。READ_UNCOMMITTED
如果两个进程不太可能在此中发生冲突,那就没问题了 道路。 但是,由于调用create*
方法很短,不太可能SERIALIZED
导致问题,只要数据库平台支持它。但是,这个可以被覆盖。
以下示例演示如何覆盖 XML 中的隔离级别:
<job-repository id="jobRepository"
isolation-level-for-create="REPEATABLE_READ" />
以下示例显示了如何在 Java 中覆盖隔离级别:
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
return factory.getObject();
}
如果不使用命名空间或工厂 bean,那么配置 使用 AOP 的存储库的事务行为。
以下示例演示如何配置存储库的事务行为 在 XML 中:
<aop:config>
<aop:advisor
pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
<advice-ref="txAdvice" />
</aop:config>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice>
前面的片段几乎可以按原样使用,几乎没有任何更改。还要记住 包括适当的命名空间声明,并确保 spring-tx 和 spring-aop (或整个 Spring)都在类路径上。
以下示例演示如何配置存储库的事务行为 在 Java 中:
@Bean
public TransactionProxyFactoryBean baseProxy() {
TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
Properties transactionAttributes = new Properties();
transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
transactionProxyFactoryBean.setTarget(jobRepository());
transactionProxyFactoryBean.setTransactionManager(transactionManager());
return transactionProxyFactoryBean;
}
更改表前缀
另一个可修改的属性JobRepository
是元数据的表前缀
表。默认情况下,它们都以BATCH_
.BATCH_JOB_EXECUTION
和BATCH_STEP_EXECUTION
是两个例子。但是,有潜在的原因需要修改它
前缀。如果模式名称需要在表名称前面加上,或者如果有多个
在同一架构中需要一组元数据表,则表前缀需要
被更改:
以下示例显示了如何更改 XML 中的表前缀:
<job-repository id="jobRepository"
table-prefix="SYSTEM.TEST_" />
以下示例显示了如何在 Java 中更改表前缀:
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setTablePrefix("SYSTEM.TEST_");
return factory.getObject();
}
鉴于上述更改,对元数据表的每个查询都以SYSTEM.TEST_
.BATCH_JOB_EXECUTION
称为 SYSTEM。TEST_JOB_EXECUTION
.
只有表前缀是可配置的。表和列名称不是。 |
内存存储库
在某些情况下,您可能不希望将域对象持久化到
数据库。原因之一可能是速度;在每个提交点存储域对象需要额外的时间
时间。另一个原因可能是您只是不需要为特定
工作。出于这个原因,Spring batch 提供了一个内存中Map
作业版本
存储 库。
以下示例显示了包含MapJobRepositoryFactoryBean
在 XML 中:
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
以下示例显示了包含MapJobRepositoryFactoryBean
在 Java 中:
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
factory.setTransactionManager(transactionManager);
return factory.getObject();
}
请注意,内存中的存储库是易失性的,因此不允许在 JVM 之间重新启动
实例。它也不能保证具有相同参数的两个作业实例
同时启动,不适合在多线程 Job 中使用,也不适合在本地
分区Step
.因此,无论在需要什么地方,都可以使用存储库的数据库版本
特征。
但是,它确实需要定义事务管理器,因为有回滚
语义,并且由于业务逻辑可能仍是
事务性(例如 RDBMS 访问)。出于测试目的,许多人发现ResourcelessTransactionManager
有用。
这
一旦您将嵌入式数据源定义为应用程序上下文中的 bean,就应该选择它
如果您使用 |
存储库中的非标准数据库类型
如果您使用的数据库平台不在受支持的平台列表中,则
如果 SQL 变体足够接近,则可以使用受支持的类型之一。去做
这,您可以使用原始的JobRepositoryFactoryBean
而不是命名空间快捷方式和
使用它将数据库类型设置为最接近的匹配项。
以下示例演示如何使用JobRepositoryFactoryBean
设置数据库类型
到 XML 中最接近的匹配项:
<bean id="jobRepository" class="org...JobRepositoryFactoryBean">
<property name="databaseType" value="db2"/>
<property name="dataSource" ref="dataSource"/>
</bean>
以下示例演示如何使用JobRepositoryFactoryBean
设置数据库类型
到 Java 中最接近的匹配项:
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setDatabaseType("db2");
factory.setTransactionManager(transactionManager);
return factory.getObject();
}
(这JobRepositoryFactoryBean
尝试
自动检测数据库类型DataSource
如果未指定。平台之间的主要区别是
主要由递增主键的策略来解释,所以
通常可能需要覆盖incrementerFactory
以及(使用标准之一
Spring Framework 中的实现)。
如果即使这样也不起作用,或者您没有使用 RDBMS,那么
唯一的选项可能是实现各种Dao
接口,其中SimpleJobRepository
取决于
打开并以正常的弹簧方式手动连接一个。
配置 JobLauncher
使用时@EnableBatchProcessing
一个JobRegistry
开箱即用。
本节介绍如何配置您自己的。
最基本的实现JobLauncher
接口是SimpleJobLauncher
.
它唯一必需的依赖项是JobRepository
,以获得执行。
以下示例显示了SimpleJobLauncher
在 XML 中:
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
以下示例显示了SimpleJobLauncher
在 Java 中:
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
...
一旦获得 JobExecution,它就会传递给execute 的Job
,最终返回JobExecution
到调用方,如下图所示:如下图所示:

该序列很简单,从调度程序启动时效果很好。 然而 尝试从 HTTP 请求启动时会出现问题。在这种情况下,启动需要异步完成,以便SimpleJobLauncher
立即返回到其
访客。这是因为保持 HTTP 请求打开对于
长时间运行的进程(如批处理)所需的时间量。下图显示
示例序列:

这SimpleJobLauncher
可以通过配置TaskExecutor
.
以下 XML 示例显示了SimpleJobLauncher
配置为立即返回:
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>
</bean>
以下 Java 示例显示了SimpleJobLauncher
配置为立即返回:
@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
弹簧的任何实现TaskExecutor
接口可用于控制作业的异步方式
执行。
运行作业
启动批处理作业至少需要两件事:Job
要启动,并JobLauncher
.两者都可以包含在同一个
上下文或不同的上下文。例如,如果从
命令行,将为每个 Job 实例化一个新的 JVM,因此每个 Job
作业将有自己的JobLauncher
.但是,如果
在HttpRequest
,通常会有一个JobLauncher
,为异步作业配置
launching,则多个请求将调用以启动其作业。
从命令行运行作业
对于想要从企业运行作业的用户调度程序,命令行是主要界面。这是因为大多数调度程序(Quartz 除外,除非使用NativeJob)直接与作系统进程一起工作,主要由 shell 脚本启动。有很多方法除了 shell 脚本之外,还可以启动 Java 进程,例如 Perl、Ruby 或甚至“构建工具”,例如 ant 或 Maven。然而,由于大多数人熟悉 shell 脚本,因此本示例将重点介绍它们。
The CommandLineJobRunner
因为启动作业的脚本必须启动一个 Java虚拟机,所以需要有一个类,有一个 main 方法来充当作为主要入口点。Spring Batch 提供了一个实现它正用于这个目的:CommandLineJobRunner
. 重要的是要注意这只是引导应用程序的一种方法,但有启动 Java 进程的方法有很多种,而这个类绝不应该是被视为确定的。 这CommandLineJobRunner
执行四项任务:
-
加载适当的
ApplicationContext
-
将命令行参数解析为
JobParameters
-
根据参数找到合适的作业
-
使用
JobLauncher
在 应用程序上下文来启动作业。
所有这些任务都仅使用参数来完成 通过了。以下是必需的参数:
作业路径 |
将用于
创建一个 |
作业名称 |
要运行的作业的名称。 |
这些参数必须先传入路径,其次传入名称。所有参数 当这些被认为是作业参数后,将转换为 JobParameters 对象, 并且必须采用“name=value”的格式。
以下示例显示了作为作业参数传递给 XML 中定义的作业的日期:
<bash$ java CommandLineJobRunner endOfDayJob.xml endOfDay schedule.date(date)=2007/05/05
以下示例显示了作为作业参数传递给Java中定义的作业的日期:
<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2007/05/05
默认情况下, 在以下示例中,
可以使用自定义 |
在大多数情况下,您希望使用清单在 jar 中声明您的主类,但是,
为简单起见,直接使用该类。此示例使用相同的“EndOfDay”
domainLanguageOfBatch 中的示例。第一个
参数是 'endOfDayJob.xml',它是包含Job
.第二个参数 'endOfDay' 表示作业名称。最后一个论点
'schedule.date(date)=2007/05/05',转换为 JobParameters 对象。
以下示例显示了endOfDay
在 XML 中:
<job id="endOfDay">
<step id="step1" parent="simpleStep" />
</job>
<!-- Launcher details removed for clarity -->
<beans:bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher" />
在大多数情况下,您希望使用清单在 jar 中声明您的主类,但是,
为简单起见,直接使用该类。此示例使用相同的“EndOfDay”
domainLanguageOfBatch 中的示例。第一个
参数是'io.spring.EndOfDayJobConfiguration',这是完全限定的类名
到包含作业的配置类。第二个参数 'endOfDay' 表示
作业名称。最后一个参数 'schedule.date(date)=2007/05/05' 被转换为JobParameters
对象。java 配置的示例如下:
以下示例显示了endOfDay
在 Java 中:
@Configuration
@EnableBatchProcessing
public class EndOfDayJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job endOfDay() {
return this.jobBuilderFactory.get("endOfDay")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> null)
.build();
}
}
前面的示例过于简单化,因为对一般来说,在 Spring Batch 中运行批处理作业,但它用于显示两个主要的要求CommandLineJobRunner
:Job
和JobLauncher
.
退出代码
从命令行启动批处理作业时,企业
经常使用调度程序。大多数调度程序都相当愚蠢,只能工作
在过程层面。这意味着他们只知道一些
作系统进程,例如他们正在调用的 shell 脚本。
在这种情况下,与调度程序通信的唯一方法
关于作业的成功或失败是通过返回代码。一个
返回代码是进程返回给调度程序的数字
这表示运行的结果。在最简单的情况下:0 是
成功,1 是失败。但是,可能还有更复杂的
场景:如果作业 A 返回 4 个踢球,则启动作业 B,如果返回 5 个踢球
下班 C.这种类型的行为是在调度程序级别配置的,
但重要的是,像 Spring Batch 这样的处理框架
提供一种返回“退出代码”的数字表示的方法
对于特定的批处理作业。在 Spring Batch 中,这是封装的
在ExitStatus
,其中涵盖了更多
详情见第 5 章。为了讨论退出代码,该
唯一需要知道的重要事情是ExitStatus
具有 Exit Code 属性,该属性为
由框架(或开发人员)设置,并作为JobExecution
从JobLauncher
.这CommandLineJobRunner
转换此字符串值
使用ExitCodeMapper
接口:
public interface ExitCodeMapper {
public int intValue(String exitCode);
}
基本契约ExitCodeMapper
是,给定一个字符串出口
code,将返回一个数字表示。默认值
作业运行器使用的实现是SimpleJvmExitCodeMapper
返回 0 表示完成,1 表示通用错误,2 表示任何作业
运行器错误,例如无法找到Job
在提供的上下文中。如果还有的话
复杂于上述 3 个值,则自定义
实现ExitCodeMapper
接口
必须提供。因为CommandLineJobRunner
是创建
一ApplicationContext
,因此不能
'wired together',任何需要覆盖的值都必须是
自动接线。这意味着,如果实现ExitCodeMapper
在BeanFactory
,
创建上下文后,它将注入到运行器中。都
需要这样做才能提供您自己的ExitCodeMapper
是声明实现
作为根级 Bean,并确保它是ApplicationContext
由
跑步者。
从 Web 容器内运行作业
从历史上看,批处理作业等脱机处理一直
如上所述,从命令行启动。但是,有
在许多情况下,从HttpRequest
是
更好的选择。许多此类用例包括报告、临时作业
运行和 Web 应用程序支持。因为根据定义,批处理作业
长期运行,最重要的关注点是确保推出
异步作业:

本例中的控制器是 Spring MVC 控制器。更多
有关 Spring MVC 的信息,请访问:https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#mvc。
控制器启动一个Job
使用JobLauncher
已配置为异步启动,该
立即返回一个JobExecution
.这Job
可能仍会运行,但是,这个
非阻塞行为允许控制器立即返回,这
在处理HttpRequest
.一
示例如下:
@Controller
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/jobLauncher.html")
public void handle() throws Exception{
jobLauncher.run(job, new JobParameters());
}
}
高级元数据使用
到目前为止,无论是JobLauncher
和JobRepository
接口已被
讨论。它们一起代表简单的工作启动,并且基本
批处理域对象的 CRUD作:

一个JobLauncher
使用JobRepository
创建新的JobExecution
对象并运行它们。Job
和Step
实现
以后使用相同的JobRepository
对于基本更新
在作业运行期间执行相同的执行。
基本作对于简单的场景来说已经足够了,但在大批量中
具有数百个批处理作业和复杂调度的环境
要求,需要对元数据进行更高级的访问:

这JobExplorer
和JobOperator
接口,将讨论
下面,添加用于查询和控制元数据的附加功能
数据。
查询存储库
在任何高级功能之前,最基本的需求是能够
查询存储库中的现有执行。此功能是
由JobExplorer
接口:
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
从上面的方法签名中可以明显看出,JobExplorer
是
这JobRepository
,并且,就像JobRepository
,可以使用
工厂豆:
以下示例演示如何配置JobExplorer
在 XML 中:
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
以下示例演示如何配置JobExplorer
在 Java 中:
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
在本章前面,我们注意到表前缀
的JobRepository
可以修改以允许不同的版本或模式。因为
这JobExplorer
适用于相同的表,它也需要设置前缀的能力。
以下示例演示如何为JobExplorer
在 XML 中:
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
以下示例演示如何为JobExplorer
在 Java 中:
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
作业注册表
一个JobRegistry
(及其父接口JobLocator
) 不是强制性的,但可以是
如果您想跟踪上下文中可用的作业,则很有用。它也是
对于在创建作业时在应用程序上下文中集中收集作业非常有用
其他地方(例如,在子上下文中)。习惯JobRegistry
实现还可以用于作已注册作业的名称和其他属性。该框架仅提供一种实现,该实现基于一个简单的从作业名称映射到作业实例。
以下示例演示如何包含JobRegistry
对于用 XML 定义的作业:
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
以下示例演示如何包含JobRegistry
对于用 Java 定义的作业:
使用时@EnableBatchProcessing
一个JobRegistry
开箱即用。
如果要配置自己的:
...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the getter in the SimpleBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
有两种方法可以填充JobRegistry
自动: 使用
一个 Bean 后处理器,并使用注册器生命周期组件。这些
以下部分将介绍两种机制。
JobRegistryBean后处理器
这是一个 bean 后处理器,可以在创建作业时注册所有作业。
以下示例演示了如何包含JobRegistryBeanPostProcessor
为了工作
在 XML 中定义:
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
以下示例演示了如何包含JobRegistryBeanPostProcessor
为了工作
在 Java 中定义:
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry());
return postProcessor;
}
虽然不是绝对必要的,但 example 已被赋予一个 id,以便它可以包含在子 上下文(例如,作为父 Bean 定义)并导致创建的所有作业 那里也会自动注册。
AutomaticJobRegistrar
这是一个生命周期组件,用于创建子上下文并从这些上下文中注册作业
上下文创建时。这样做的一个好处是,虽然作业名称在
子上下文在注册表中仍然必须是全局唯一的,它们的依赖关系
可以有“自然”名称。因此,例如,您可以创建一组 XML 配置文件
每个都只有一个作业,但都有不同的定义ItemReader
使用
相同的 bean 名称,例如 “reader”。如果所有这些文件都导入到同一个上下文中,
读者定义会相互冲突并覆盖彼此,但自动
注册商避免了这一点。这使得集成贡献的作业变得更加容易
应用程序的单独模块。
以下示例演示了如何包含AutomaticJobRegistrar
对于已定义的作业
在 XML 中:
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
以下示例演示了如何包含AutomaticJobRegistrar
对于已定义的作业
在 Java 中:
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
注册商有两个强制性属性,一个是ApplicationContextFactory
(这里从
方便工厂豆),另一个是JobLoader
.这JobLoader
负责管理子上下文的生命周期,并且
在JobRegistry
.
这ApplicationContextFactory
是
负责创建子上下文和最常见的用法
将如上所述,使用ClassPathXmlApplicationContextFactory
.其中之一
这个工厂的特点是默认情况下它复制了一些
配置从父上下文向下到子上下文。所以对于
实例,您不必重新定义PropertyPlaceholderConfigurer
或 AOP
配置,如果它应该与
父母。
这AutomaticJobRegistrar
可用于
与JobRegistryBeanPostProcessor
如果需要(只要DefaultJobLoader
是
也使用)。例如,如果有作业,这可能是可取的
在主父上下文和子上下文中定义
地点。
作业运算符
如前所述,该JobRepository
提供对元数据的 CRUD作,并且JobExplorer
在
元数据。但是,这些作一起使用时最有用
执行常见的监视任务,例如停止、重新启动或
总结作业,就像批处理运算符通常所做的那样。弹簧批次
通过JobOperator
接口:
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
上述作表示来自许多不同接口的方法,例如JobLauncher
,JobRepository
,JobExplorer
和JobRegistry
.因此,
提供了JobOperator
,SimpleJobOperator
,有许多依赖项。
以下示例显示了SimpleJobOperator
在 XML 中:
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
以下示例显示了SimpleJobOperator
在 Java 中:
/**
* All injected dependencies for this bean are provided by the @EnableBatchProcessing
* infrastructure out of the box.
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
如果您在作业存储库上设置表前缀,请不要忘记在作业资源管理器上也设置它。 |
JobParameters递增器
大多数方法JobOperator
是
不言自明,更详细的解释可以在界面的 javadoc 上找到。但是,startNextInstance
方法值得注意。这
方法将始终启动 Job 的新实例。
如果存在严重问题,这将非常有用JobExecution
和工作
需要从头开始。与JobLauncher
不过,这需要一个新的JobParameters
对象,该对象将触发新的JobInstance
如果参数与
任何先前的参数集,则startNextInstance
方法将使用JobParametersIncrementer
绑定到Job
强制Job
设置为
新实例:
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
合同JobParametersIncrementer
是
给定一个 JobParameters 对象,它将返回“下一个”JobParameters
对象,通过递增它可能包含的任何必要值。这
策略很有用,因为框架无法知道什么
更改JobParameters
让它成为“下一个”
实例。例如,如果JobParameters
是一个日期,下一个实例
应该创建,该值是否应该增加一天?或者一个
周(例如,如果作业是每周一次)?对于任何
有助于识别作业的数值,
如下图所示:
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在此示例中,键为“run.id”的值用于
区分JobInstances
.如果JobParameters
传入为 null,则可以是
假设Job
以前从未运行过
因此可以返回其初始状态。但是,如果没有,旧的
值被获取,递增 1,然后返回。
对于在 XML 中定义的作业,增量器可以与Job
通过
'incrementer' 属性,如下所示:
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
对于用 Java 定义的作业,可以通过incrementer
方法,如下所示:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.incrementer(sampleIncrementer())
...
.build();
}
停止作业
最常见的用例之一JobOperator
正在优雅地停止
工作:
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭不是立即的,因为没有办法强制
立即关闭,特别是如果执行当前处于
框架无法控制的开发人员代码,例如
商业服务。但是,一旦控制权返回给
框架,它将设置当前StepExecution
自BatchStatus.STOPPED
,保存它,然后执行相同的作
对于JobExecution
在完成之前。
中止工作
作业执行,即FAILED
可以
restarted (如果Job
是可重启的)。状态为ABANDONED
框架不会重新启动。
这ABANDONED
status 也用于步骤
执行,以在重新启动的作业执行中将其标记为可跳过:如果
作业正在执行,遇到已标记的步骤ABANDONED
在上一个失败的作业执行中,它将进入下一步(由作业流定义和步骤执行退出状态确定)。
如果进程终止 (kill -9
或服务器
failure),作业当然不是正在运行,而是JobRepository
有
无从知道,因为在过程结束之前没有人告诉它。你
必须手动告诉它你知道执行失败了
或应被视为已中止(将其状态更改为FAILED
或ABANDONED
).这是
一个业务决策,并且没有办法将其自动化。将
status 设置为FAILED
仅当它是可重启的,并且你知道重启数据有效时。