使用数据抽样来观察流水线数据

通过数据抽样,您可以在 Dataflow 流水线的每个步骤中观察数据。此信息可显示正在运行或已完成的作业的实际输入和输出,从而帮助您调试流水线问题。

数据抽样的用途包括:

  • 在开发过程中,查看整个流水线中生成的元素。

  • 如果流水线抛出异常,请查看与该异常相关的元素。

  • 调试时,查看转换的输出,以确保输出正确无误。

  • 无需检查流水线代码,即可了解流水线的行为。

  • 之后在作业完成后查看抽样的元素,或比较抽样数据与先前运行的数据。

概览

Dataflow 可以通过以下方式对流水线数据进行抽样:

  • 定期抽样。使用此类型的抽样时,Dataflow 会在作业运行时收集样本。您可以使用抽样数据来检查您的流水线是否按预期处理元素,并诊断热键或输出错误等运行时问题。如需了解详情,请参阅本文档中的使用定期数据采样

  • 异常抽样。使用此类型的采样时,如果流水线抛出异常,Dataflow 会收集示例。您可以使用这些示例查看发生异常时正在处理的数据。异常采样默认处于启用状态,并且可以停用。如需了解详情,请参阅本文档中的使用例外采样

Dataflow 会将抽样元素写入由 temp_location 流水线选项指定的 Cloud Storage 路径。您可以在 Google Cloud 控制台中查看抽样数据,也可以检查 Cloud Storage 中的原始数据文件。这些文件会一直保留在 Cloud Storage 中,直到您将其删除。

数据抽样由 Dataflow 工作器执行。抽样是尽力而为的。如果发生暂时性错误,则系统可能会丢弃样本。

要求

如需使用数据抽样,您必须启用 Runner v2。如需了解详情,请参阅启用 Dataflow Runner v2

要在 Google Cloud 控制台中查看抽样数据,您需要以下 Identity and Access Management 权限

  • storage.buckets.get
  • storage.objects.get
  • storage.objects.list

周期性抽样需要使用以下 Apache Beam SDK:

  • Apache Beam Java SDK 2.47.0 或更高版本
  • Apache Beam Python SDK 2.46.0 或更高版本
  • Apache Beam Go SDK 2.53.0 或更高版本

异常抽样需要使用以下 Apache Beam SDK:

  • Apache Beam Java SDK 2.51.0 或更高版本
  • Apache Beam Python SDK 2.51.0 或更高版本
  • Apache Beam Go SDK 不支持异常抽样。

从这些 SDK 开始,Dataflow 会默认为所有作业启用异常采样。

使用定期数据采样

本部分介绍了如何在作业运行时持续抽取流水线数据。

启用定期数据抽样

周期性抽样默认处于停用状态。如需启用此功能,请设置以下流水线选项:

Java

--experiments=enable_data_sampling

Python

--experiments=enable_data_sampling

Go

--experiments=enable_data_sampling

您可以通过编程方式或使用命令行设置此选项。如需了解详情,请参阅设置实验性流水线选项

运行 Dataflow 模板时,请使用 additional-experiments 标志启用数据抽样:

--additional-experiments=enable_data_sampling

启用周期性抽样后,Dataflow 会从作业图中的每个 PCollection 收集样本。采样率大约为每 30 秒采样一次。

定期数据抽样可能会带来显著的性能开销,具体取决于数据量。因此,我们建议您仅在测试期间启用定期抽样功能,并在生产工作负载中将其停用。

查看抽样数据

要在 Google Cloud 控制台中查看抽样数据,请执行以下步骤:

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    进入“作业”

  2. 选择一个作业。

  3. 点击底部面板上的 ,以展开日志面板。

  4. 点击数据抽样标签页。

  5. 步骤字段中,选择一个流水线步骤。您还可以在作业图中选择步骤。

  6. 集合字段中,选择 PCollection

如果 Dataflow 已收集该 PCollection 的样本,则抽样数据将显示在标签页中。对于每个示例,该标签页都会显示创建日期和输出元素。输出元素是集合元素的序列化表示形式,包括元素数据、时间戳以及窗口和窗格信息。

以下示例展示了抽样元素。

Java

TimestampedValueInGlobalWindow{value=KV{way, [21]},
timestamp=294247-01-09T04:00:54.775Z, pane=PaneInfo{isFirst=true, isLast=true,
timing=ON_TIME, index=0, onTimeIndex=0}}

Python

(('THE', 1), MIN_TIMESTAMP, (GloblWindow,), PaneInfo(first: True, last: True,
timing: UNKNOWN, index: 0, nonspeculative_index: 0))

Go

KV<THE,1> [@1708122738999:[[*]]:{3 true true 0 0}]

下图显示了抽样数据在 Google Cloud 控制台中的显示方式。

Google Cloud 控制台中的抽样数据

使用例外采样

如果您的流水线抛出未处理的异常,您可以同时查看异常和与该异常相关的输入元素。当您使用受支持的 Apache Beam SDK 时,异常采样默认处于启用状态。

查看异常

如需查看例外情况,请执行以下步骤:

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    进入“作业”

  2. 选择一个作业。

  3. 如需展开日志面板,请点击日志面板上的 切换面板

  4. 点击数据抽样标签页。

  5. 步骤字段中,选择一个流水线步骤。您还可以在作业图中选择步骤。

  6. 集合字段中,选择 PCollection

    Exception 列包含异常详情。异常没有输出元素。相反,输出元素列包含消息 Failed to process input element: INPUT_ELEMENT,其中 INPUT_ELEMENT 是相关的输入元素。

  7. 如需在新窗口中查看输入示例和异常详情,请点击 在新窗口中打开

下图显示了异常在 Google Cloud 控制台中的显示方式。

Google Cloud 控制台中的抽样异常

停用例外情况采样

如需停用异常抽样,请设置以下流水线选项:

Java

--experiments=disable_always_on_exception_sampling

Python

--experiments=disable_always_on_exception_sampling

您可以通过编程方式或使用命令行设置此选项。如需了解详情,请参阅设置实验性流水线选项

运行 Dataflow 模板时,请使用 additional-experiments 标志停用异常抽样:

--additional-experiments=disable_always_on_exception_sampling

安全注意事项

Dataflow 将抽样数据写入您创建和管理的 Cloud Storage 存储桶。使用 Cloud Storage 的安全功能来保护数据的安全。具体而言,请考虑采取以下额外安全措施:

您还可以混淆 PCollection 数据类型中的各个字段,使原始值不会显示在抽样数据中:

  • Python:替换 __repr____str__ 方法。
  • Java:替换 toString 方法。

但是,您无法对 I/O 连接器的输入和输出进行混淆处理,除非您修改连接器源代码以执行此操作。

结算

Dataflow 执行数据抽样时,您需要为 Cloud Storage 数据存储以及 Cloud Storage 上的读写操作付费。如需了解详情,请参阅 Cloud Storage 价格

每个 Dataflow 工作器批量写入样本,每个批次产生一项读取操作和一项写入操作。

问题排查

本部分介绍使用数据抽样时的常见问题。

权限错误

如果您无权查看示例,Google Cloud 控制台会显示以下错误:

You don't have permission to view a data sample.

如需解决此错误,请检查您是否拥有所需的 IAM 权限。如果错误仍然存在,则可能是您受到了 IAM 拒绝政策的影响。

我没有看到任何示例

如果您没有看到任何示例,请检查以下各项:

  1. 通过设置 enable_data_sampling 选项,确保启用数据抽样。请参阅启用数据抽样
  2. 确保您使用的是 Runner v2
  3. 确保工作器已启动。只有在工作器启动之后,抽样才会开始。
  4. 确保作业和工作器处于正常状态。
  5. 仔细检查项目的 Cloud Storage 配额。如果您超出 Cloud Storage 配额限制,Dataflow 将无法写入示例数据。
  6. 数据采样无法从可迭代对象中采样。无法提供这类数据流的示例。

后续步骤