Spring Batch 介绍

在企业域环境中针对关键环境进行商业操作的时候,有许多应用程序需要进行批量处理。这些业务运营包括:

  • 无需用户交互即可最有效地处理大量信息的自动化复杂处理。这些操作通常包括基于时间的事件 (例如,月末统计计算,通知或者消息通知)。

  • 在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整)。

  • 整合从内部或者外部系统中收到的信息,这些信息通常要求格式,校验和事务范式处理到记录系统中。 批处理通常被用来针对企业每天产生超过亿万级别的数据量。

Spring Batch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的数据批量处理业务。 Spring Batch构建了人们期望的Spring Framework 特性(生产环境,基于 POJO 的开发和易于使用), 同时让开发者很容易的访问和使用企业级服务。Spring Batch 不是一个自动调度运行框架。在市面上已经有了很多企 业级和开源的自动运行框架(例如 Quartz,Tivoli, Control-M 等)。 Spring Batch 被设计与计划任务和调度程序一同协作完成任务,而不是替换调度程序。

Spring Batch 提供了可重用的功能,这些功能被用来对大量数据和记录进行处理,包括有日志/跟踪(logging/tracing), 事务管理(transaction management),作业处理状态(job processing statistics),作业重启(job restart), 作业跳过(job skip)和资源管理(resource management)。 此外还提供了许多高级服务和特性, 使之能够通过优化(optimization ) 和分片技术(partitioning techniques) 来实现极高容量和性能的批处理作业。 Spring Batch 能够简单(例如,将文件读入到数据库中或者运行一个存储过程)和复杂(例如,在数据库之间对海量数据进行移动或转换等) 情况下都能很好的工作。 可以在框架高度扩展的方式下执行大批量的作业来处理海量的信息。

背景

在开源项目及其相关社区把大部分注意力集中在基于 web 和 微服务体系框架时框架中时,基于 Java 的批处理框架却无人问津, 尽管在企业 IT 环境中一直都有这种批处理的需求。但因为缺乏一个标准的、可重用的批处理框架导致在企业客户的 IT 系统中 存在着很多一次编写,一次使用的版本,以及很多不同的内部解决方案。

SpringSource (现被命名为 Pivotal) 和 Accenture(埃森哲)致力于通过合作来改善这种状况。 埃森哲在实现批处理架构上有着丰富的产业实践经验,SpringSource 有深入的技术开发积累, 背靠 Spring 框架提供的编程模型,意味着两者能够结合成为默契且强大的合作伙伴,创造出高质量的、市场认可的企业级 Java 解决方案, 填补这一重要的行业空白。两家公司都与许多通过开发基于Spring的批处理架构解决方案解决类似问题的客户合作。 这提供了一些有用的额外细节和实际约束,有助于确保解决方案可以应用于客户提出的现实问题。

埃森哲为Spring Batch项目贡献了以前专有的批处理体系结构框架,以及提供支持,增强功能和现有功能集的提交者资源。 埃森哲的贡献基于几十年来在使用最新几代平台构建批量架构方面的经验:COBOL / Mainframe,C / Unix以及现在的Java / Anywhere。

埃森哲与SpringSource之间的合作旨在促进软件处理方法,框架和工具的标准化, 在创建批处理应用程序时,企业用户可以始终如一地利用这些方法,框架和工具。希望为其企业IT环境提供标准的, 经过验证的解决方案的公司和政府机构可以从 Spring Batch 中受益。

使用场景

一般的典型批处理程序:

  • 从数据库,文件或队列中读取大量记录。

  • 以某种方式处理数据。

  • 以修改的形式写回数据。

Spring Batch自动执行此基本批处理迭代,提供处理类似事务的功能,通常在脱机环境中处理,无需任何用户交互。 批处理作业是大多数 IT 项目的一部分,Spring Batch 是唯一提供强大的企业级解决方案的开源框架。

业务场景

  • 周期提交批处理任务

  • 同时批处理进程:并非处理一个任务

  • 分阶段的企业消息驱动处理

  • 高并发批处理

  • 失败后的手动或定时重启

  • 按顺序处理任务依赖(使用工作流驱动的批处理插件)

  • 部分处理:跳过记录(例如,回滚)

  • 全批次事务:因为可能有小数据量的批处理或存在存储过程/脚本中

技术目标

  • 批量的开发者使用 Spring 的编程模型:开发者能够更加专注于业务逻辑,让框架来解决基础的功能

  • 在基础架构、批处理执行环境、批处理应用之间有明确的划分

  • 以接口形式提供通用的核心服务,以便所有项目都能使用

  • 提供简单的默认实现,以实现核心执行接口的“开箱即用”

  • 通过在所有层中对 Spring 框架进行平衡配置,能够实现更加容易的配置,自定义和扩展服务。

  • 所有存在的核心服务应该能够很容易的在不同系统架构层进行影响的情况进行替换或扩展。

  • 提供一个简单的部署模块,使用 Maven 来进行编译的 JARs 架构,并与应用完全分离。

Spring Batch 体系结构

Spring Batch 设计的时候充分考虑了可扩展性和各类最终用户。 下图显示了 Spring Batch 的架构层次示意图,这种架构层次为最终用户开发者提供了很好的扩展性与易用性。

Figure 1.1: 批量层级体系结构
Figure 1. Spring 批量层级体系结构

这个层级体系结构高亮显示了 Spring Batch 的 3 个主要组件:应用(Application),核心(Core)和 基础架构(Infrastructure)。 应用层包含了所有的批量作业和开发者使用 Spring Batch 写的所有自定义代码。批量核心层包含了所有运行和控制批量作业所需必要的运行时类。 同时还包括了有 JobLauncher, Job, 和 Step。应用层和核心层都构建在基础架构层之上。

基础架构层包含了有 读(readers)和 写(writers )以及服务(services)。例如有针对服务使用, RetryTemplate。 基础架构层的这些东西,这些能够被应用层开发(readers 和 writers, 例如 ItemReaderItemWriter)和批量核心框架(例如,retry,这个是核心层自己的库)所使用。

简单的来说,基础架构层为应用层和批量核心层提供了所需要的的基础内容,是整个 Spring Batch 的基础。我们针对 Spring Batch 的开发绝大部分情况是在应用层完成的。

一般批量处理的原则和使用指引

下面是一些关键的指导原则,可以在构批量处理解决方案可以参考。

  • 请记住,通常批量处理体系结构将会影响在线应用的体系结构,同时反过来也是一样的。 在你为批量任务和在线应用进行设计架构和环境的时候请尽可能的使用公共的模块。

  • 越简单越好,尽量在一个单独的批量应用中构建简单的批量处理,并避免复杂的逻辑结构。

  • 尽量的保持存储的数据和进程存储在同一个地方(换句话说就是尽量将数据保存到你程序运行的地方)。

  • 最小化系统资源的使用,尤其针对 I/O。尽量在内存中执行尽可能多的操作。

  • 检查应用的 I/O(分析 SQL 语句)来避免不必要的的物理 I/O 使用。特别是以下四个常见的缺陷(flaws)需要避免:

    • 在数据可以只读一次就可以缓存起来的情况下,针对每一个事务都来读取数据

    • 多次读取/查询同一事务中已经读取过的数据

    • 产生不必要的表格或者索引扫描

    • 在 SQL 查询中不指定 WHERE 查询的值

  • 在批量运行的时候不要将一件事情重复 2 次。例如,如果你需要针对你需要报表的数据汇总,请在处理每一条记录时使用增量来存储, 尽可能不要再去遍历一次同样的数据。

  • 为批量进程在开始的时候就分配足够的内存,以避免在运行的时候再次分配内存。

  • 总是将数据完整性假定为最坏情况。对数据进行适当的检查和数据校验以保持数据完整性(integrity)。

  • 可能的话,请实现内部校验(checksums )。例如,针对文本文件,应该有一条结尾记录, 这个记录将会说明文件中的总记录数和关键字段的集合(aggregate)。

  • 尽可能早地在模拟生产环境下使用真实的数据量,以便于进行计划和执行压力测试。

  • 在大数据量的批量中,数据备份可能会非常复杂和充满挑战,尤其是你的系统要求不间断(24 - 7)运行的系统。 数据库备份通常在设计时就考虑好了,但是文件备份也应该提升到同样的重要程度。如果系统依赖于文本文件, 文件备份程序不仅要正确设置和形成文档,还要定期进行测试。

批量处理策略

为了帮助设计和实现批量处理系统,基本的批量应用是通过块和模式来构建的, 同时也应该能够为程序开发人员和设计人员提供结构的样例和基础的批量处理程序。

当你开始设计一个批量作业任务的时候,商业逻辑应该被拆分一系列的步骤,而这些步骤又是可以通过下面的标准构件块来实现的:

  • 转换应用程序(Conversion Applications): 针对每一个从外部系统导出或者提供的各种类型的文件, 我们都需要创建一个转换应用程序来讲这些类型的文件和数据转换为处理所需要的标准格式。 这个类型的批量应用程序可以是正规转换工具模块中的一部分,也可以是整个的转换工具模块(请查看:基本的批量服务(Basic Batch Services))。

  • 校验应用程序(Validation Applications): 校验应用程序能够保证所有的输入和输出记录都是正确和一致的。 校验通常是基于头和尾进行校验的,校验码和校验算法通常是针对记录的交叉验证。

  • 提取应用(Extract Applications): 这个应用程序通常被用来从数据库或者文本文件中读取一系列的记录, 并对记录的选择通常是基于预先确定的规则,然后将这些记录输出到输出文件中。

  • 提取/更新应用(Extract/Update Applications): 这个应用程序通常被用来从数据库或者文本文件中读取记录, 并将每一条读取的输入记录更新到数据库或者输出数据库中。

  • 处理和更新应用(Processing and Updating Applications): 这种程序对从提取或验证程序 传过来的输入事务记录进行处理。 这处理通常包括有读取数据库并且获得需要处理的数据,为输出处理更新数据库或创建记录。

  • 输出和格式化应用(Output/Format Applications): 一个应用通过读取一个输入文件, 对输入文件的结构重新格式化为需要的标准格式,然后创建一个打印的输出文件,或将数据传输到其他的程序或者系统中。

更多的,一个基本的应用外壳应该也能够被针对商业逻辑来提供,这个外壳通常不能通过上面介绍的这些标准模块来完成。

另外的一个主要的构建块,每一个引用通常可以使用下面的一个或者多个标准工具步骤,例如:

  • 分类(Sort): 一个程序可以读取输入文件后生成一个输出文件,在这个输出文件中可以对记录进行重新排序, 重新排序的是根据给定记录的关键字段进行重新排序的。分类通常使用标准的系统工具来执行。

  • 拆分(Split):一个程序可以读取输入文件后,根据需要的字段值,将输入的文件拆分为多个文件进行输出。拆分通常使用标准的系统工具来执行。

  • 合并(Merge):一个程序可以读取多个输入文件,然后将多个输入文件进行合并处理后生成为一个单一的输出文件。 合并可以自定义或者由参数驱动的(parameter-driven)系统实用程序来执行。

批量处理应用程序可以通过下面的输入数据类型来进行分类:

  • 数据库驱动应用程序(Database-driven applications)可以通过从数据库中获得的行或值来进行驱动。

  • 文件驱动应用程序(File-driven applications) 可以通过从文件中获得的数据来进行驱动。

  • 消息驱动应用程序(Message-driven applications) 可以通过从消息队列中获得的数据来进行驱动。

所有批量处理系统的处理基础都是策略(strategy)。对处理策略进行选择产生影响的因素包括有:预估批量处理需要处理的数据量, 在线并发量,和另外一个批量处理系统的在线并发量, 可用的批量处理时间窗口(很多企业都希望系统是能够不间断运行的,基本上来说批量处理可能没有处理时间窗口)。

针对批量处理的标准处理选项包括有(按实现复杂度的递增顺序):

  • 在一个批处理窗口中执行常规离线批处理

  • 并发批量 / 在线处理

  • 并发处理很多不同的批量处理或者有很多批量作业在同一时间运行

  • 分区(Partitioning),就是在同一时间有很多示例在运行相同的批量作业

  • 混合上面的一些需求

上面的一些选项或者所有选项能够被商业的任务调度所支持。

在下面的部分,我们将会针对上面的处理选项来对细节进行更多的说明。需要特别注意的是, 批量处理程序使用提交和锁定策略将会根据批量处理的不同而有所不同。作为最佳实践,在线锁策略应该使用相同的原则。 因此,在设计批处理整体架构时不能简单地拍脑袋决定,需要进行详细的分析和论证。

锁定策略可以仅仅使用常见的数据库锁或者你也可以在系统架构中使用其他的自定义锁定服务。 这个锁服务将会跟踪数据库的锁(例如在一个专用的数据库表(db-table)中存储必要的信息),然后在应用程序请求数据库操作时授予权限或拒绝。 重试逻辑应该也需要在系统架构中实现,以避免批量作业中的因资源锁定而导致批量任务被终止。

1. 批量处理作业窗口中的常规处理 针对运行在一个单独批处理窗口中的简单批量处理,更新的数据对在线用户或其他批处理来说并没有实时性要求, 也没有并发问题,在批处理运行完成后执行单次提交即可。

大多数情况下,一种更健壮的方法会更合适。要记住的是,批处理系统会随着时间的流逝而增长,包括复杂度和需要处理的数据量。 如果没有合适的锁定策略,系统仍然依赖于一个单一的提交点,则修改批处理程序会是一件痛苦的事情。 因此,即使是最简单的批处理系统, 也应该为重启-恢复(restart-recovery)选项考虑提交逻辑。针对下面的情况,批量处理就更加复杂了。

2. 并发批量 / 在线处理 批处理程序处理的数据如果会同时被在线用户实时更新,就不应该锁定在线用户需要的所有任何数据(不管是数据库还是文件), 即使只需要锁定几秒钟的时间。同时还应该每处理一批事务就提交一次数据库。这减少了其他程序不可用的数据数据量,也压缩了数据不可用的时间。

另一个可以使用的方案就是使用逻辑行基本的锁定实现来替代物理锁定。 通过使用乐观锁(Optimistic Locking )或悲观锁(Pessimistic Locking)模式。

  • 乐观锁假设记录争用的可能性很低。这通常意味着并发批处理和在线处理所使用的每个数据表中都有一个时间戳列。 当程序读取一行进行处理时,同时也获得对应的时间戳。当程序处理完该行以后尝试更新时,在 update 操作的 WHERE 子句中使用原来的时间戳作为条件。 如果时间戳相匹配,则数据和时间戳都更新成功。如果时间戳不匹配,这表明在本程序上次获取和此次更新这段时间内已经有另一个程序修改了同一条记录,因此更新不会被执行。

  • 悲观锁定策略假设记录争用的可能性很高,因此在检索时需要获得一个物理锁或逻辑锁。有一种悲观逻辑锁在数据表中使用一个专用的 lock-column 列。 当程序想要为更新目的而获取一行时,它在 lock column 上设置一个标志。如果为某一行设置了标志位,其他程序在试图获取同一行时将会逻辑上获取失败。 当设置标志的程序更新该行时,它也同时清除标志位,允许其他程序获取该行。 请注意,在初步获取和初次设置标志位这段时间内必须维护数据的完整性,比如使用数据库锁(例如,SELECT FOR UPDATE)。 还请注意,这种方法和物理锁都有相同的缺点,除了它在构建一个超时机制时比较容易管理。比如记录而用户去吃午餐了,则超时时间到了以后锁会被自动释放。

这些模式并不一定适用于批处理,但他们可以被用在并发批处理和在线处理的情况下(例如,数据库不支持行级锁)。作为一般规则,乐观锁更适合于在线应用, 而悲观锁更适合于批处理应用。只要使用了逻辑锁,那么所有访问逻辑锁保护的数据的程序都必须采用同样的方案。

请注意:这两种解决方案都只锁定(address locking)单条记录。但很多情况下我们需要锁定一组相关的记录。如果使用物理锁,你必须非常小心地管理这些以避免潜在的死锁。 如果使用逻辑锁,通常最好的解决办法是创建一个逻辑锁管理器,使管理器能理解你想要保护的逻辑记录分组(groups),并确保连贯和没有死锁(non-deadlocking)。 这种逻辑锁管理器通常使用其私有的表来进行锁管理、争用报告、超时机制 等等。

3. 并行处理 并行处理允许多个批量处理运行(run)/任务(job)同时并行地运行。以使批量处理总运行时间降到最低。 如果多个任务不使用相同的文件、数据表、索引空间时,批量处理这些不算什么问题。如果确实存在共享和竞争,那么这个服务就应该使用分区数据来实现。 另一种选择是使用控制表来构建一个架构模块以维护他们之间的相互依赖关系。控制表应该为每个共享资源分配一行记录,不管这些资源是否被某个程序所使用。 执行并行作业的批处理架构或程序随后将查询这个控制表,以确定是否可以访问所需的资源。

如果解决了数据访问的问题,并行处理就可以通过使用额外的线程来并行实现。在传统的大型主机环境中,并行作业类上通常被用来确保所有进程都有充足的 CPU 时间。 无论如何,解决方案必须足够强劲,以确保所有正在运行的进程都有足够的运行处理时间。

并行处理的其他关键问题还包括负载平衡以及一般系统资源的可用性(如文件、数据库缓冲池等)。请注意,控制表本身也可能很容易变成一个至关重要的资源(有可能发生严重竞争)。

4. 分区 分区技术允许多版本的大型批处理程序并发地(concurrently)运行。这样做的目的是减少超长批处理作业过程所需的时间。 可以成功分区的过程主要是那些可以拆分的输入文件 和/或 主要的数据库表被分区以允许程序使用不同的数据来运行。

此外,被分区的过程必须设计为只处理分配给他的数据集。分区架构与数据库设计和数据库分区策略是密切相关的。 请注意,数据库分区并不一定指数据库需要在物理上实现分区,尽管在大多数情况下这是明智的。 下面的图片展示了分区的方法:

Figure 1.2: 分区处理
Figure 2. 分区处理

系统架构应该足够灵活,以允许动态配置分区的数量。自动控制和用户配置都应该纳入考虑范围。 自动配置可以根据参数来决定,例如输入文件大小 和/或 输入记录的数量。

4.1 分区方案 下面列出了一些可能的分区方案,至于具体选择哪种分区方案,要根据具体情况来确定:

1. 固定和均衡拆分记录集

这涉及到将输入的记录集合分解成均衡的部分(例如,拆分为 10 份,这样每部分是整个数据集的十分之一)。 每个拆分的部分稍后由一个批处理/提取程序实例来处理。

为了使用这种方案,需要在预处理时候就将记录集进行拆分。拆分的结果有一个最大值和最小值的位置,这两个值可以用作限制每个 批处理/提取程序处理部分的输入。

预处理可能有一个很大的开销,因为它必须计算并确定的每部分数据集的边界。

2. 通过关键字段(Key Column)拆分

这涉及到将输入记录按照某个关键字段来拆分,比如一个地区代码(location code),并将每个键分配给一个批处理实例。为了达到这个目标,也可以使用列值:

  • 通过分区表来指派给一个批量处理实例。

  • 通过值的一部分(例如 0000-0999,1000-1999等)分配给批处理实例。

在选项 1 下,添加新值意味着手动重新配置批处理/提取以确保将新值添加到特定实例。

在选项 2 下,这可确保通过批处理作业的实例覆盖所有值。但是,一个实例处理的值的数量取决于列值的分布(0000-0999范围内可能有大量位置,1000-1999范围内可能很少)。 在此选项下,数据范围应设计为考虑分区。

在这两个选项下,无法实现记录到批处理实例的最佳均匀分布。没有使用批处理实例数的动态配置。

3. 通过视图(Views)

这种方法基本上是根据键列来分解,但不同的是在数据库级进行分解。它涉及到将记录集分解成视图。这些视图将被批处理程序的各个实例在处理时使用。分解将通过数据分组来完成。

使用这个方法时,批处理的每个实例都必须为其配置一个特定的视图(而非主表)。当然,对于新添加的数据,这个新的数据分组必须被包含在某个视图中。 也没有自动配置功能,实例数量的变化将导致视图需要进行相应的改变。

4. 附加的处理识别器

这涉及到输入表一个附加的新列,它充当一个指示器。在预处理阶段,所有指示器都被标志为未处理。在批处理程序获取记录阶段,只会读取被标记为未处理的记录, 一旦他们被读取(并加锁),它们就被标记为正在处理状态。当记录处理完成,指示器将被更新为完成或错误。 批处理程序的多个实例不需要改变就可以开始,因为附加列确保每条纪录只被处理一次。 在“完成时,指标被标记为完成”的顺序中的一两句话。

使用该选项时,表上的I/O会动态地增长。在批量更新的程序中,这种影响被降低了,因为写操作是必定要进行的。

5. 提取表到无格式文件

这包括将表中的数据提取到一个文件中。然后可以将这个文件拆分成多个部分,作为批处理实例的输入。

使用这个选项时,将数据提取到文件中,并将文件拆分的额外开销,有可能抵消多分区处理(multi-partitioning)的效果。可以通过改变文件分割脚本来实现动态配置。

6. 使用哈希列(Hashing Column)

这个计划需要在数据库表中增加一个哈希列(key/index)来检索驱动(driver)记录。这个哈希列将有一个指示器来确定将由批处理程序的哪个实例处理某个特定的行。 例如,如果启动了三个批处理实例,那么 “A” 指示器将标记某行由实例 1 来处理,“B”将标记着将由实例 2 来处理,“C”将标记着将由实例 3 来处理,以此类推。

稍后用于检索记录的过程(procedure)程序,将有一个额外的 WHERE 子句来选择以一个特定指标标记的所有行。 这个表的插入(insert)需要附加的标记字段,默认值将是其中的某一个实例(例如“A”)。

一个简单的批处理程序将被用来更新不同实例之间的重新分配负载的指标。当添加足够多的新行时,这个批处理会被运行(在任何时间,除了在批处理窗口中)。

批处理应用程序的其他实例只需要像上面这样的批处理程序运行着以重新分配指标,以决定新实例的数量。

4.2 数据库和应用设计原则

如果一个支持多分区(multi-partitioned)的应用程序架构,基于数据库采用关键列(key column)分区方法拆成的多个表,则应该包含一个中心分区仓库来存储分区参数。 这种方式提供了灵活性,并保证了可维护性。这个中心仓库通常只由单个表组成,叫做分区表。

存储在分区表中的信息应该是是静态的,并且只能由 DBA 维护。每个多分区程序对应的单个分区有一行记录,组成这个表。 这个表应该包含这些列:程序 ID 编号,分区编号(分区的逻辑ID),一个分区对应的关键列(key column)的最小值,分区对应的关键列的最大值。

在程序启动时,应用程序架构(Control Processing Tasklet, 控制处理微线程)应该将程序 id 和分区号传递给该程序。 这些变量被用于读取分区表,来确定应用程序应该处理的数据范围(如果使用关键列的话)。另外分区号必须在整个处理过程中用来:

  • 为了使合并程序正常工作,需要将分区号添加到输出文件/数据库更新

  • 向框架的错误处理程序报告正常处理批处理日志和执行期间发生的所有错误

4.3 死锁最小化

当程序并行或分区运行时,会导致数据库资源的争用,还可能会发生死锁(Deadlocks)。其中的关键是数据库设计团队在进行数据库设计时必须考虑尽可能消除潜在的竞争情况。

还要确保设计数据库表的索引时考虑到性能以及死锁预防。

死锁或热点往往发生在管理或架构表上,如日志表、控制表、锁表(lock tables)。这些影响也应该纳入考虑。为了确定架构可能的瓶颈,一个真实的压力测试是至关重要的。

要最小化数据冲突的影响,架构应该提供一些服务,如附加到数据库或遇到死锁时的 等待-重试(wait-and-retry)间隔时间。 这意味着要有一个内置的机制来处理数据库返回码,而不是立即引发错误处理,需要等待一个预定的时间并重试执行数据库操作。

4.4 参数处理和校验

对程序开发人员来说,分区架构应该相对透明。框架以分区模式运行时应该执行的相关任务包括:

  • 在程序启动之前获取分区参数

  • 在程序启动之前验证分区参数

  • 在启动时将参数传递给应用程序

验证(validation)要包含必要的检查,以确保:

  • 应用程序已经足够涵盖整个数据的分区

  • 在各个分区之间没有遗漏断代(gaps)

如果数据库是分区的,可能需要一些额外的验证来保证单个分区不会跨越数据库的片区。

体系架构应该考虑整合分区(partitions).包括以下关键问题:

  • 在进入下一个任务步骤之前是否所有的分区都必须完成?

  • 如果一个分区 Job 中止了要怎么处理?