本文档简要介绍了流水线部署,并重点介绍了您可以对已部署的流水线执行的一些操作。
运行流水线
创建和测试 Apache Beam 流水线后,请运行流水线。您可以在本地运行流水线,以便测试和调试 Apache Beam 流水线;也可以在 Dataflow 上运行流水线,这是一个可用于运行 Apache Beam 流水线的数据处理系统。
在本地运行
在本地运行流水线。
Java
以下示例代码摘自快速入门,展示了如何在本地运行 WordCount 流水线。如需了解详情,请参阅如何在本地运行 Java 流水线。
在终端中,运行以下命令:
mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
Python
以下示例代码摘自快速入门,展示了如何在本地运行 WordCount 流水线。如需了解详情,请参阅如何在本地运行 Python 流水线。
在终端中,运行以下命令:
python -m apache_beam.examples.wordcount \ --output outputs
Go
以下示例代码摘自快速入门,展示了如何在本地运行 WordCount 流水线。如需了解详情,请参阅如何在本地运行 Go 流水线。
在终端中,运行以下命令:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
了解如何在您的机器上使用直接运行程序在本地运行流水线。
在 Dataflow 上运行
在 Dataflow 上运行流水线。
Java
以下示例代码摘自快速入门,展示了如何在 Dataflow 上运行 WordCount 流水线。如需了解详情,请参阅如何在 Dataflow 上运行 Java 流水线。
在终端中,运行以下命令(从 word-count-beam
目录):
mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
请替换以下内容:
PROJECT_ID
:您的 Google Cloud 项目 IDBUCKET_NAME
:Cloud Storage 存储桶的名称REGION
:Dataflow 区域,例如us-central1
Python
以下示例代码摘自快速入门,展示了如何在 Dataflow 上运行 WordCount 流水线。如需了解详情,请参阅如何在 Dataflow 上运行 Python 流水线。
在终端中,运行以下命令:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://STORAGE_BUCKET/tmp/
替换以下内容:
Go
以下示例代码摘自快速入门,展示了如何在 Dataflow 上运行 WordCount 流水线。如需了解详情,请参阅如何在 Dataflow 上运行 Go 流水线。
在终端中,运行以下命令:
posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://STORAGE_BUCKET/results/outputs \ --runner dataflow \ --project PROJECT_ID \ --region DATAFLOW_REGION \ --staging_location gs://STORAGE_BUCKET/binaries/
请替换以下内容:
STORAGE_BUCKET
:Cloud Storage 存储桶名称。PROJECT_ID
:Google Cloud 项目 ID。DATAFLOW_REGION
:要在其中部署 Dataflow 作业的区域。例如europe-west1
。 如需查看可用位置的列表,请参阅 Dataflow 位置。请注意,--region
标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。
了解如何使用 Dataflow 运行程序在 Dataflow 服务上运行流水线。
当您在 Dataflow 上运行流水线时,Dataflow 会将您的 Apache Beam 流水线代码转换为 Dataflow 作业。Dataflow 会为您全面管理 Google Cloud 服务(例如 Compute Engine 和 Cloud Storage)以运行 Dataflow 作业,并自动启动和删除必要的资源。您可以在流水线生命周期中详细了解 Dataflow 如何将 Apache Beam 代码转换为 Dataflow 作业。
流水线验证
当您在 Dataflow 上运行流水线时,Dataflow 会在作业启动之前对流水线执行验证测试。当验证测试发现流水线存在问题时,Dataflow 会使作业提交提前失败。在作业日志中,Dataflow 包含以下内容的消息。每条消息还包含验证发现结果的详细信息以及解决问题的说明。
The preflight pipeline validation failed for job JOB_ID.
运行哪些验证测试取决于您的 Dataflow 作业使用的资源和服务。
- 如果为您的项目启用了 Service Usage API,则流水线验证测试会检查是否启用了运行 Dataflow 作业所需的服务。
- 如果您的项目启用了 Cloud Resource Manager API,则流水线验证测试会检查您是否有运行 Dataflow 作业所需的项目级配置。
如需详细了解如何启用服务,请参阅启用和停用服务。
如需了解如何解决在流水线验证期间发现的权限问题,请参阅流水线验证失败。
如果要覆盖流水线验证并启动存在验证错误的作业,请使用以下流水线服务选项:
Java
--dataflowServiceOptions=enable_preflight_validation=false
Python
--dataflow_service_options=enable_preflight_validation=false
Go
--dataflow_service_options=enable_preflight_validation=false
设置流水线选项
您可以在 Apache Beam 流水线代码中设置流水线选项,以此控制 Dataflow 在运行作业时的某些行为。例如,您可以使用流水线选项来设置流水线是在工作器虚拟机、Dataflow 服务后端还是本地运行。
管理流水线依赖项
许多 Apache Beam 流水线都可以使用默认的 Dataflow 运行时环境运行。但是,某些数据处理使用场景使用其他库或类更有优势。在这些情况下,您可能需要管理流水线依赖项。如需详细了解如何管理依赖项,请参阅在 Dataflow 中管理流水线依赖项。
监控作业
Dataflow 可让您通过 Dataflow 监控界面和 Dataflow 命令行界面等工具了解作业的运行情况。
访问工作器虚拟机
您可以使用 Google Cloud 控制台查看给定流水线的虚拟机实例。在该控制台中,您可以使用 SSH 访问每个实例。 但是,在您的作业完成或失败后,Dataflow 服务会自动关停并清理虚拟机实例。
作业优化
除了管理 Google Cloud 资源以外,Dataflow 还会为您自动执行和优化分布式并行处理的诸多方面。
并行处理和分布
Dataflow 会自动对您的数据进行分区,并将您的工作器代码分布到多个 Compute Engine 实例进行并行处理。如需了解详情,请参阅并行处理和分布。
融合和组合优化
Dataflow 会使用您的流水线代码创建一个执行图来表示流水线的 PCollection
和转换,并优化此执行图以实现最高效的性能和资源利用率。Dataflow 还会自动优化可能产生高额费用的操作,例如数据聚合。如需了解详情,请参阅融合优化和组合优化。
自动调节功能
Dataflow 服务包含多项可动态调整资源分配和数据分区的功能。这些功能有助于 Dataflow 尽可能快速、高效地执行作业。这些功能包括:
Streaming Engine
默认情况下,Dataflow 流水线运行程序完全在工作器虚拟机上执行流处理流水线的各步骤,并会消耗工作器 CPU、内存和 Persistent Disk 存储空间。Dataflow 的 Streaming Engine 会将流水线执行从工作器虚拟机中移出并移入 Dataflow 服务后端。如需了解详情,请参阅 Streaming Engine。
Dataflow Flexible Resource Scheduling
Dataflow FlexRS 通过使用高级调度技术、Dataflow Shuffle 服务以及抢占式虚拟机 (VM) 实例和常规虚拟机的组合,可以降低批处理费用。通过并行运行抢占式虚拟机和常规虚拟机,Dataflow 可在系统事件期间 Compute Engine 停止抢占式虚拟机实例的情况下改善用户体验。当 Compute Engine 抢占了抢占式虚拟机时,FlexRS 有助于确保流水线继续运行并且您不会丢失之前的工作成果。如需详细了解 FlexRS,请参阅在 Dataflow 中使用灵活资源调度服务。
Dataflow 安全强化型虚拟机
从 2022 年 6 月 1 日开始,Dataflow 服务对所有工作器使用安全强化型虚拟机。如需详细了解安全强化型虚拟机功能,请参阅安全强化型虚拟机。