设计 Dataflow 流水线工作流

流水线开发涉及不同的阶段和任务,例如代码开发、测试和交付到生产环境。本文介绍了相关信息。

  • 有关持续集成和持续交付 (CI/CD) 的注意事项,以支持自动构建、测试流水线,以及将流水线部署到不同环境。
  • Dataflow 功能可优化生产环境中的性能和资源利用率。
  • 用于更新生产环境中的流式传输流水线的方法和观察点。
  • 提高生产环境中流水线可靠性的最佳做法。

持续集成

持续集成 (CI) 要求开发者经常将代码合并到共享代码库中,这对于频繁更改的应用(例如频繁更新的网站)会很有用。虽然数据流水线通常不会像其他类型的应用那样频繁更改,但 CI 做法可为流水线开发提供许多优势。例如,测试自动化可在出现缺陷时快速提供反馈,还可以降低回归进入代码库的可能性。

测试自动化是 CI 的重要部分。将测试自动化与适当的测试覆盖范围结合使用时,测试自动化会在每次提交代码时运行测试套件。您的 CI 服务器可以与构建自动化工具(如 Maven)协同工作,将您的测试套件作为 CI 流水线的一个或多个步骤执行。您可以将成功通过单元测试和集成测试的代码打包到启动流水线的部署工件中。此构建称为“传递构建”。

通过传递构建创建的部署工件的数量和类型可能会有所不同,具体取决于流水线的启动方式。使用 Apache Beam Java SDK,您可以将流水线代码打包成自动执行 JAR 文件。然后,您可以将 JAR 文件存储在部署环境项目(例如预生产或生产 Google Cloud 项目)中托管的存储桶中。如果您使用经典模板(一种模板化执行),则部署工件包括 JSON 模板文件、流水线的 JAR 文件以及可选元数据模板。然后,您可以使用持续交付将工件部署到不同的部署环境中,如下一部分所述。

持续交付和部署

持续交付 (CD) 将部署工件复制到一个或多个已准备好手动启动的部署环境。通常,将 CI 服务器构建的工件部署到一个或多个生产环境中以运行端到端测试。如果所有端到端测试都成功通过,则将更新生产环境。

对于批量流水线,持续部署可以直接将流水线作为新的 Dataflow 作业启动。或者,其他系统可以根据需要使用工件来启动批量作业。例如,您可以使用 Cloud Composer 在工作流中运行批量作业,也可以使用 Cloud Scheduler 安排批量作业。

流式传输流水线的部署会比批量流水线更复杂,因此会更难以使用持续部署实现自动化。例如,您可能需要确定如何替换或更新现有流式传输流水线。如果您无法更新流水线,或者选择不更新流水线,则可以使用其他方法(例如协调多个 Dataflow 作业),以最大限度减少或防止业务中断。

CI/CD 所需的身份和角色

您的 CI/CD 流水线与不同系统互动以构建、测试和部署流水线。例如,您的流水线需要访问源代码库。要启用这些互动,请确保您的流水线具有相应的身份和角色。以下流水线活动可能还需要您的流水线拥有特定身份和角色。

使用 Google Cloud 等外部服务进行集成测试

当您使用 Direct Runner 运行临时测试或系统集成测试时,您的流水线将使用 Google Cloud CLI 凭据以使用 Google Cloud 数据源和接收器,或者其使用 GOOGLE_APPLICATION_CREDENTIALS 环境变量提供的凭据。确保用于为流水线访问的 Google Cloud 资源获取凭据的服务账号具有足够的角色和权限

将工件部署到不同的部署环境

尽可能为每个环境使用唯一凭据(实际上适用于每个项目),并相应地限制对资源的访问权限。

为每个项目使用唯一的服务账号,以读取部署工件并将其写入存储分区。部署过程可能需要暂存多个工件,具体取决于流水线是否使用模板。例如,创建并暂存 Dataflow 模板需要权限才能将启动流水线所需的部署工件(例如流水线的模板文件)写入 Cloud Storage 存储桶。

将流水线部署到不同的部署环境

尽可能为每个项目使用唯一服务账号来访问和管理项目中的 Google Cloud 资源,包括访问 Dataflow 本身。

您用于创建和管理 Dataflow 作业的服务账号需要具有足够的 IAM 权限才能进行作业管理。如需了解详情,请参阅“Dataflow 安全性和权限”页面中的Dataflow 服务账号部分。

您用于运行 Dataflow 作业的工作器服务账号需要具有在作业运行时管理 Compute Engine 资源以及管理 Apache Beam 流水线与 Dataflow 服务之间的交互的权限。如需了解详情,请参阅“Dataflow 安全性和权限”页面中的工作器服务账号部分。

如需为作业指定用户管理的工作器服务账号,请使用 --serviceAccount 流水线选项。如果您在创建作业时未指定工作器服务账号,则 Dataflow 会尝试使用 Compute Engine 默认服务账号。我们建议您为生产环境指定用户管理的工作器服务账号,因为 Compute Engine 默认服务账号通常拥有一组比 Dataflow 作业所需权限更广泛的权限。

在生产场景中,我们建议您为 Dataflow 作业管理和工作器服务账号使用单独的服务账号,与使用单个服务账号相比,这样可以提高安全性。例如,用于创建 Dataflow 作业的服务账号可能不需要访问数据源和接收器,也不需要使用流水线所使用的其他资源。在此场景中,用于运行 Dataflow 作业的工作器服务账号具有使用流水线资源的权限。用于创建作业的其他服务账号具有管理(包括创建)Dataflow 作业的权限。

CI/CD 流水线示例

下图提供了与数据流水线 CI/CD 的工具无关常规视图。此外,上图还显示了开发任务、部署环境和流水线运行程序之间的关系。

CI/CD 流水线的各个阶段。

下图显示了以下阶段:

  • 代码开发:代码开发期间,开发者使用 Direct Runner 在本地运行流水线代码。此外,开发者使用沙盒环境通过 Dataflow Runner 执行临时流水线。

    在典型的 CI/CD 流水线中,当开发者对源代码控制系统进行更改(例如将新代码推送到代码库)时,将触发持续集成流程。

  • 构建和测试:持续集成流程会编译流水线代码,然后使用 Direct Runner 执行单元测试转换集成测试。还可以运行可选的系统集成测试,包括使用外部源和接收器通过小型测试数据集进行的集成测试。

    如果测试成功,CI 流程会将启动流水线所需的部署工件(可能包括 JAR 文件、Docker 映像和模板元数据)存储到可供持续交付流程访问的位置。根据已生成的部署工件的类型,您可以使用 Cloud Storage 和 Artifact Registry 存储不同的工件类型。

  • 交付和部署:持续交付流程会将部署工件复制到预生产环境,或为该环境提供这些工件。开发者可以使用 Dataflow Runner 手动运行端到端测试,也可以使用持续部署自动启动测试。通常,对批量流水线启用持续部署方法比流式传输流水线更简单,因为批量流水线不会持续运行,并且将其替换为新版本会更简单。

    更新流式传输流水线的过程可能很简单,也可能很复杂,而您应该在预生产环境中测试更新。各版本的更新过程并非始终一致。例如,流水线可能会发生更改,导致无法再进行就地更新。因此,有时使用持续部署自动执行流水线更新可能比较困难。

如果所有端到端测试均通过,您可以复制部署工件或将其提供给生产环境,作为持续交付流程的一部分。如果新流水线更新或替换现有流式传输流水线,则使用在预生产环境中测试的过程来部署新流水线。

非模板化作业与模板作业执行

您可以直接在开发环境中使用 Apache Beam SDK 创建 Dataflow 作业。这种类型的作业称为非模板化作业。虽然这种方法对开发者来说很方便,但您可能更希望将开发和运行流水线的任务分离。如需进行此类分离,您可以使用 Dataflow 模板,可让您暂存流水线并将流水线作为独立任务运行。暂存模板后,其他用户(包括非开发者)可以使用 Google Cloud CLI、Google Cloud 控制台或 Dataflow REST API 从模板执行作业。

Dataflow 提供以下作业模板类型

  • 经典模板:开发者使用 Apache Beam SDK 运行流水线代码,并将 JSON 序列化执行图保存为模板。Apache Beam SDK 会将模板文件以及流水线代码所需的任何依赖项暂存至 Cloud Storage 位置。
  • Flex 模板:开发者使用 Google Cloud CLI 将流水线打包为 Docker 映像,该映像随后存储在 Artifact Registry 中。系统还会自动生成 Flex 模板规范文件并将其存储在用户指定的 Cloud Storage 位置中。Flex 模板规范文件包含描述如何运行模板的元数据,例如流水线参数。

除了链接文档中说明的 Flex 模板功能外,Flex 模板在模板管理方面,较之经典模板更具优势。

  • 通过经典模板,可将多个工件(例如 JAR 文件)存储在 Cloud Storage 暂存位置中,但经典模板并未提供任何功能来管理这些工件。相比之下,Flex 模板会封装在 Docker 映像中。此映像会将流水线所需的所有依赖项(除 Flex 模板规范之外)打包到一个由 Artifact Registry 管理的部署工件中。
  • 您可以将 Docker 映像管理功能用于 Flex 模板。例如,您可以通过向 Artifact Registry 授予拉取(及可选的推送)权限安全地共享 Flex 模板,并将 Docker 映像标记用于不同版本的 Flex 模板。

开发者可以使用经典模板和 Flex 模板在与拥有注册表的项目不同的项目和用于托管模板资源的存储桶中启动作业,或者仅在您使用时启动存储桶经典模板。如果您需要将多个客户的数据处理隔离到不同的项目和流水线作业中,则此功能会非常有用。通过 Flex 模板,您可以进一步指定启动流水线时使用的不同 Docker 映像版本。这样,当您日后更新模板时,便可从多个项目逐步替换批处理流水线或流处理流水线。

用于优化资源使用情况的 Dataflow 功能

Dataflow 提供了以下特定于运行程序的功能来优化资源用量,从而提升性能并降低费用:

  • Streaming Engine:Streaming Engine 会将流处理流水线的执行从虚拟机工作器中移出并移入专用服务。其优势包括提高自动扩缩响应能力、减少使用的工作器虚拟机资源以及自动服务更新,而且无需重新部署。在某些情况下,您还可以通过使用至少一次处理来减少资源使用量,以适应可以容忍重复的情况。建议为流处理流水线启用 Streaming Engine。当您使用最新版本的 Apache Beam Java SDK 或 Python SDK 时,该功能默认处于启用状态。
  • Dataflow Shuffle:Dataflow Shuffle 会将批处理流水线的 shuffle 操作从虚拟机工作器中移出并移入专用服务。其优势包括提高大多数批量流水线的执行速度,减少工作器虚拟机的资源消耗量,提高自动扩缩响应能力,以及提高容错能力。建议为批处理流水线启用 Dataflow Shuffle。默认情况下,该功能使用 Apache Beam Java SDK 和最新的 Python SDK 启用。
  • 灵活资源调度 (FlexRS):通过使用高级调度技术、Dataflow Shuffle 服务以及抢占式虚拟机和常规虚拟机的组合,FlexRS 可以降低批处理费用。

在生产环境中更新流式处理流水线

请参阅升级流式处理流水线

Dataflow 作业的生命周期

Dataflow 作业会经历一个以各种作业状态表示的生命周期。如需运行 Dataflow 作业,请将其提交到一个区域。然后,作业会路由到该区域的一个可用区中可用的 Dataflow 后端。在 Dataflow 分配后端之前,它会验证您是否有足够的配额和权限来运行作业。如果这些预检检查完成且已分配后端,则作业会进入 JOB_STATE_PENDING 状态。对于 FlexRS 作业,后端分配可能会延迟到未来的时间,并且这些作业进入 JOB_STATE_QUEUED 状态。

分配的后端会获取要运行的作业,并尝试在 Google Cloud 项目中启动 Dataflow 工作器。为工作器虚拟机选择的可用区取决于多个因素。对于使用 Dataflow Shuffle 的批量作业,该服务还会尝试确保 Dataflow 后端和工作器虚拟机位于同一可用区,以避免跨可用区流量。

启动 Dataflow 工作器后,它们会从 Dataflow 后端请求工作。后端负责将工作拆分成可在工作器中分配、可同时载入的区块(称为软件包)。如果工作器无法处理现有工作,并且启用了自动扩缩,则后端会启动更多工作器来处理工作。同样,如果启动的工作器多于需求,则系统会关停部分工作器。

Dataflow 工作器启动后,Dataflow 后端充当控制平面来编排作业的执行。在处理期间,作业的数据平面会执行重排操作,例如 GroupByKeyCoGroupByKeyCombine。作业将以下一个数据层面实施用于重排操作:

  • 数据层面在 Dataflow 工作器上运行,而重排数据存储在永久性磁盘上。
  • 数据层面作为服务运行,并从工作器虚拟机进行外部化。此实现有两个变体,您可以在创建作业时指定:Dataflow Shuffle(用于批处理流水线)和 Streaming Engine(用于流处理流水线)。与基于工作器的重排相比,基于服务的重排可显著提高数据重排操作的性能和可扩缩性。

进入 JOB_STATE_RUNNING 状态的流处理作业在被取消排空之前会继续无限期地运行,除非发生作业故障。批量作业会在完成所有处理或发生不可恢复的错误时自动终止。根据作业停止方式,Dataflow 会将作业的状态设置为多种终端状态之一,包括 JOB_STATE_CANCELLEDJOB_STATE_DRAINEDJOB_STATE_DONE

流水线可靠性最佳做法

本部分介绍了使用 Dataflow 时可能发生的故障以及 Dataflow 作业的最佳实践。

遵循隔离原则

提高整体流水线可靠性的一般建议是遵循地区和区域背后的隔离原则。确保流水线不具有严重跨区域依赖性。如果您的流水线严重依赖于来自多个区域的服务,则其中任何一个区域的故障都会影响您的流水线。为了帮助避免此问题,请部署到多个地区以实现冗余和备份。

创建 Dataflow 快照

Dataflow 提供快照功能,该功能提供流水线状态的备份。您可以将流水线快照恢复到其他可用区或区域中的新流式传输 Dataflow 流水线。然后,您可以开始重新处理 Pub/Sub 或 Kafka 主题中的消息,从快照时间戳开始。如果您设置了流水线的常规快照,则可以最大限度地减少恢复时间目标 (RTO) 时间。

如需详细了解 Dataflow 快照,请参阅使用 Dataflow 快照

处理作业提交故障

您可以使用 Apache Beam SDK 提交非模板 Dataflow 作业。要提交作业,可使用 Dataflow Runner 运行流水线,这可在流水线选项中指定。Apache Beam SDK 在 Cloud Storage 中暂存文件,创建作业请求文件,并将该文件提交到 Dataflow。

或者,通过 Dataflow 模板创建的作业使用不同的提交方法,这些方法通常依赖于模板 API

您可能会看到 Dataflow 返回的不同错误,表明模板作业和非模板作业的作业故障。本部分讨论不同类型的作业提交故障,以及处理或缓解这些故障的最佳做法。

在瞬时失败后重试作业提交

如果作业因 Dataflow 服务问题而无法启动,请重试作业多次。重试可缓解暂时性服务问题。

通过指定工作器地区缓解区域性故障

Dataflow 提供地区可用性,并且可用于多个地区。当用户将作业提交到区域而未显式指定可用区时,Dataflow 会根据资源可用性将作业路由到指定区域中的可用区。

建议的作业放置选项是尽可能使用 --region 标志(而非 --zone 标志)指定工作器区域。这样,Dataflow 就可以自动为作业选择尽可能最佳的区域,为您的流水线提供额外的容错能力。指定显式可用区的作业没有此优势,并且如果此可用区内出现问题,则作业会失败。如果由于区域问题导致作业提交失败,您通常可以通过重试作业来解决问题,而无需显式指定区域。

通过将数据存储在多个地区中来缓解地区性故障

如果整个地区不可用,请在其他地区中尝试作业。当跨地区的作业失败时,请务必考虑数据的可用性。要防止单地区故障,而无需手动将数据复制到多个地区,请使用自动在多个地区中存储数据的 Google Cloud 资源。例如,对数据集或 Cloud Storage 双地区多地区存储桶使用 BigQuery 多地区位置。如果某个地区不可用,您可以在数据可用的其他地区中重新运行流水线。

如需查看将多地区服务与 Dataflow 搭配使用的示例,请参阅高可用性和地理冗余

处理正在运行的流水线中的故障

作业已提交且被接受后,该作业的唯一有效操作如下所示:

  • 对于批量作业则取消
  • 更新、排空或取消流式传输作业

提交作业后,您无法更改正在运行的作业的位置。如果您未使用 FlexRS,则作业通常会在提交后的几分钟内开始处理数据。(FlexRS 作业最多可能需要 6 小时才能开始处理数据处理。)

本部分讨论正在运行的作业的故障以及处理故障的最佳做法。

监控作业以确定并解决瞬时错误导致的问题

对于批量作业,将重试含失败项的软件包 4 次。当单个软件包失败 4 次时,Dataflow 会终止作业。这可解决许多瞬时问题。但是,如果持续失败,则通常会快速达到最大重试限制,从而使得作业快速失败。

对于监控和突发事件管理,请配置提醒规则来检测失败的作业。如果作业失败,请检查作业日志,以确定因失败工作项超过重试限制而导致的作业故障。

对于流处理作业,Dataflow 会无限期地重试失败的工作项。作业未终止。但作业可能会停滞,直至问题得到解决。创建监控政策以检测停滞的流水线的迹象,例如系统延迟时间增加和数据新鲜度下降。在流水线代码中实施错误日志记录,以帮助识别因工作项反复失败而导致的流水线停滞。

发生区域性中断时在其他区域重启作业

作业启动后,运行用户代码的 Dataflow 工作器受限于单个区域。如果发生可用区级服务中断,Dataflow 作业通常会受到影响,具体取决于服务中断范围。

对于只影响 Dataflow 后端的中断,代管式服务会自动将后端迁移到不同的区域,以便后端可以继续处理作业。如果作业使用 Dataflow Shuffle,则无法跨区域移动后端。如果发生 Dataflow 后端迁移,则可能会暂时停滞作业。

如果发生可用区级服务中断,则正在运行的作业可能会失败或停滞,直到可用区可用性恢复为止。如果某个地区长时间不可用,您应停止作业(对于批量作业则取消,而对于流式传输作业则排空),然后对其进行重启,以使 Dataflow 选择运行状况良好的新区域。

发生地区性中断时在其他地区重启批量作业

如果 Dataflow 作业运行所在的地区发生地区性中断,则作业可能会失败或停滞。对于批量作业,请在其他地区重启作业(如果可以)。务必确保您的数据在不同区域中可用。

使用高可用性或故障切换来缓解地区性中断

对于流处理作业,根据应用的容错和预算,您可以使用不同的方法来缓解故障。对于地区性中断,最简单且最具成本效益的选项是等待中断结束。但是,如果您的应用对延迟敏感,或者数据处理不得中断或应以最低延迟恢复,以下部分将讨论方法。

高可用性:对延迟时间敏感,不会丢失数据

如果您的应用无法容忍数据丢失,请在两个不同的地区中并行运行重复流水线,并让流水线使用相同的数据。相同的数据源需要在两个区域都可用。依赖于这些流水线输出的下游应用必须能够在这两个地区的输出之间切换。由于资源复制,相比其他选项,此选项会产生最高费用。如需查看部署示例,请参阅下一部分:高可用性和地理冗余

故障切换:对延迟时间敏感,可能会丢失数据

如果应用可以容忍潜在数据丢失,则将流式传输数据源设为在多个地区可用。例如,使用 Pub/Sub 会为同一主题维护两个独立订阅,每个区域一个。如果发生地区性中断,您可以在另一个地区中启动替换流水线,并让流水线使用备份订阅中的数据。

您应该将备份订阅重放到适当时间,以将数据丢失降至最低,同时又不牺牲延迟时间。下游应用必须了解如何切换到正在运行的流水线的输出(类似于高可用性选项)。此选项使用的资源比运行重复流水线少,因为仅复制数据。

高可用性和地理冗余

您可以并行运行多个流式传输流水线以实现高可用性数据处理。例如,您可以在不同的地区运行两个并行流式传输作业,从而为数据处理提供地理冗余和容错。

通过考虑数据源和接收器的地理可用性,您可以在可用性高的多区域配置中操作端到端流水线。下图展示了一个示例部署。

2 个地区流水线使用单独的订阅来从全球 Pub/Sub 主题读取。流水线写入单独的多地区 BigQuery 表,一个在美国,一个在欧洲。

下图展示了以下流程:

  1. Pub/Sub 在全球大多数区域运行,这使得该服务可以提供快速的全球数据访问权限,同时让您可控制消息的存储位置。Pub/Sub 会自动将发布的消息存储在距离订阅者最近的 Google Cloud 区域中,也可以使用消息存储政策将其配置为使用特定区域或一组区域

    随后,无论消息存储在何处,Pub/Sub 都会将消息传送给全世界的订阅者。Pub/Sub 客户端无需知道它们所连接的服务器位置,因为全球负载均衡机制会将流量定向到存储消息的最近 Google Cloud 数据中心。

  2. Dataflow 在特定 Google Cloud 地区中运行。通过在单独的 Google Cloud 地区中运行并行流水线,您可以将作业与影响单个地区的故障隔离。该图展示了同一流水线的两个实例,每个实例都在单独的 Google Cloud 地区中运行。

  3. BigQuery 提供地区和多地区数据集位置。选择多地区位置时,您的数据集将位于至少两个地理地区中。该图描述了两个独立流水线,每个流水线均写入单独的多地区数据集。