常见问题解答

以下部分解答了关于 Dataflow 的一些常见问题。

一般问题

在哪里可以获得更多支持?

您可以访问 Google Cloud 支持,获取包括 Dataflow 支持服务在内的 Google Cloud 支持套餐。

如需研究您的问题或提交新问题,您可以使用 StackOverflow。提交问题时,请为您的问题使用 google-cloud-dataflow 标记。Google 工程人员会密切关注此类问题,并乐于为您解惑。

您还可以在 UserVoice 论坛上提交问题、功能需求、错误或缺陷报告以及其他反馈。

是否可以跨流水线实例共享数据?

我们并未提供可在流水线之间共享数据或处理上下文的 Dataflow 专用跨流水线通信机制。您可以使用 Cloud Storage 等持久性存储服务或 App Engine 等内存缓存,在流水线实例之间共享数据。

有没有在给定时间或按照给定间隔执行流水线的内置时间安排机制?

您可以通过以下方式自动执行流水线:

如何确定在我的环境中安装/运行的 Dataflow SDK 版本?

安装细节取决于您的开发环境。如果您使用的是 Maven,您可以在一个或多个本地 Maven 代码库中“安装”多个版本的 Dataflow SDK。

Java

如需确定指定流水线正在运行的 Dataflow SDK 版本,您可以在使用 DataflowPipelineRunnerBlockingDataflowPipelineRunner 运行时查看控制台输出。控制台将显示如下消息,其中包含 Dataflow SDK 的版本信息:

Python

如需确定指定流水线正在运行的 Dataflow SDK 版本,您可以在使用 DataflowRunner 运行时查看控制台输出。控制台将显示如下消息,其中包含 Dataflow SDK 的版本信息:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

与您的 Cloud Dataflow 作业进行交互

我是否可以在流水线运行时访问作业的工作器机器(Compute Engine 虚拟机)?

您可以使用 Google Cloud Console 查看给定流水线的虚拟机实例。在该控制台中,您可以使用 SSH 访问每个实例。 但是,一旦您的作业完成或失败,Dataflow 服务便会自动关停并清理虚拟机实例。

在 Cloud Dataflow 监控界面中,为什么我看不到流式作业的预留 CPU 时间?

Dataflow 服务在作业完成后报告预留 CPU 时间。对于无界限作业,这意味着仅有作业取消或失败后,该服务才会报告预留 CPU 时间。

在 Cloud Dataflow 监控界面中,为什么没有提供最近更新的流处理作业的状态和水印信息?

更新操作会进行一些更改,这些更改需要几分钟才能传播到 Dataflow 监控界面。请试试在更新作业 5 分钟后刷新监控界面。

为什么我的自定义复合转换在 Dataflow 监控界面中显示为展开状态?

在流水线代码中,您可能按如下方式调用了复合转换:

result = transform.apply(input);

以这种方式调用的复合转换会忽略预期的嵌套,因此可能会在 Dataflow 监控界面中显示为展开状态。您的流水线在执行时,也可能会产生与固定的唯一名称有关的警告或错误。

如需避免这些问题,请确保使用建议的格式调用转换:

result = input.apply(transform);

为什么我在 Cloud Dataflow 监控界面中看不到之前曾看到过的进行中作业的信息了?

系统存在一个已知问题,可能会影响一些已持续运行一个月或更长时间的 Dataflow 作业。此类作业可能无法在 Dataflow 监控界面中加载,或者可能会显示过时的信息(即使该作业以前曾处于可见状态)。

使用 Dataflow 监控界面或 Dataflow 命令行界面时,您仍可以在作业列表中获取您作业的状态。但是,如果出现此问题,您将无法查看您作业的相关详情。

使用 Java 版 Apache Beam SDK 进行编程

我可以将附加(带外)数据传递到现有 ParDo 操作中吗?

可以。根据您的使用场景,有几种模式可供遵循:

  • 您可以将信息序列化为 DoFn 子类中的字段。
  • 匿名 DoFn 中的方法所引用的任何变量都将自动序列化。
  • 您可以在 DoFn.startBundle() 中计算数据。
  • 您可以通过 ParDo.withSideInputs 传入数据。

如需了解详情,请参阅 ParDo 文档(特别是有关“创建 DoFn”和“辅助输入”的章节)以及适用于 ParDo 的 Java 版 API 参考文档

Cloud Dataflow 中如何处理 Java 异常?

您的流水线在处理数据时可能会抛出异常。其中一些错误是瞬态的(例如,暂时无法访问外部服务),但有些错误是永久性的(例如,因输入数据损坏或无法解析引发的错误,或者计算期间的 NULL 指针)。

Dataflow 会处理任意软件包中的元素,并会在针对该软件包中的任何元素抛出错误时重试整个软件包。 以批量模式运行时,含有失败项的软件包将重试 4 次。单个软件包失败 4 次后,流水线将完全失败。以流处理模式运行时,含有失败项的软件包将无限地重试,这可能会导致您的流水线永久性停滞。

Dataflow 监控界面会报告用户代码(例如 DoFn 实例)中的异常。如果使用 BlockingDataflowPipelineRunner 运行流水线,您还可在控制台或终端窗口中看到输出的错误消息。

考虑通过添加异常处理程序来防止代码中的错误。例如,如果您因为某些元素未能通过在 ParDo 中执行的一些自定义输入验证而要舍弃它们,请在 ParDo 中使用 try/catch 块,以便处理异常并舍弃这些元素。您可能还需要使用 Aggregator 跟踪错误计数。

使用 Python 版 Cloud Dataflow SDK 进行编程

如何处理 NameError

如果您在使用 Dataflow 服务执行流水线时收到 NameError,而在本地执行该流水线时(例如使用 DirectRunner)没有收到此错误,则表明您的 DoFn 可能正在使用全局命名空间中的值,而这些值不可用于 Dataflow 工作器。

默认情况下,在 Dataflow 作业序列化期间,系统不会保存在主会话中定义的全局导入项、函数和变量。例如,如果 DoFn 是在主文件中定义的,并且引用了全局命名空间中的导入项和函数,您可以将 --save_main_session 流水线选项设置为 True。这样一来,系统就会通过 Pickle 模块将全局命名空间的状态序列化并载入 Dataflow 工作器。

请注意,如果您的全局命名空间中的对象无法序列化,则会出现 pickle 错误。如果此错误与 Python 发行版中本应提供的模块有关,您可以在使用该模块的位置本地导入该模块来解决此问题。

例如,您不应使用以下命令:

import re
…
def myfunc():
  # use re module

而应使用以下命令:

def myfunc():
  import re
  # use re module

或者,如果您的 DoFn 涵盖多个文件,建议使用其他方法来封装工作流和管理依赖项

流水线 I/O

TextIO 来源和接收器是否支持压缩文件(例如 GZip)?

可以。Dataflow Java 可以读取使用 gzipbzip2 压缩的文件。如需了解更多信息,请参阅 TextIO 文档

我可以使用正则表达式来定位来源为 TextIO 的特定文件吗?

Dataflow 支持常规通配符模式;glob 表达式可以出现在文件路径中的任何位置。但是,Dataflow 不支持递归通配符 (**)。

TextIO 输入来源是否支持 JSON?

但是,如果想让 Cloud Dataflow 服务并行处理输入和输出,您必须使用换行符来分隔来源数据。

为什么动态工作负载再平衡没有随同自定义来源一起激活?

动态工作负载再平衡功能使用自定义来源 getProgress() 方法的返回值来激活。getProgress() 的默认实现返回的是 null。为了确保自动扩缩功能可以激活,请务必使自定义来源重写 getProgress() 并返回适当的值。

如何访问属于其他 Google Cloud Platform 项目(即不是当前使用的 Cloud Dataflow 所属的项目)的 BigQuery 数据集或 Pub/Sub 主题或订阅?

请参阅 Dataflow 的安全和权限指南,了解如何访问其他 Google Cloud 项目(不同于当前使用 Dataflow 所处理的项目)中的 BigQuery 或 Pub/Sub 数据。

使用 BigQuery 连接器时为什么会收到“rateLimitExceeded”错误,我该怎么办?

如果短时间内发送过多 API 请求,则 BigQuery 适用短期配额限制。您的 Dataflow 流水线可能会暂时超出此类配额。发生这种情况时,您从 Dataflow 流水线到 BigQuery 的 API 请求可能会失败,可能会导致工作器日志出现 rateLimitExceeded 错误。请注意,Dataflow 会重试此类失败,因此您可以放心地忽略这些错误。如果您确信流水线由于 rateLimitExceeded 错误而受到影响,请与 Google Cloud 支持团队联系。

我正在使用 BigQuery 连接器,通过流式插入将数据写入 BigQuery,我的写入吞吐量低于预期。如何补救这个问题?

此吞吐量可能由于您的流水线超出可用的 BigQuery 流式插入配额。在这种情况下,您应该会在 Dataflow 工作器日志中看到来自 BigQuery 的配额相关的错误消息(查找 quotaExceeded 错误)。如果您看到此类错误,请考虑在使用 Java 版 Apache Beam SDK 时设置 BigQuery 接收器选项 ignoreInsertIds(),或者在使用适用于 Python 的 Apache Beam SDK 时改用 ignore_insert_ids 选项,以便自动适合 1 GB/秒/项目的 BigQuery 流式插入吞吐量。如需详细了解与自动删除重复消息相关的注意事项,请参阅 BigQuery 文档。如需将 BigQuery 流式插入配额增加到 1 GB/秒以上,您需要通过 Cloud Console 提交请求

如果您在工作器日志中没有看到与配额相关的错误,则问题在于,默认捆绑或批处理相关参数无法提供流水线的扩缩能力。有多种 Dataflow BigQuery 连接器相关的配置,您可以考虑调整这些配置,以在使用流式插入功能写入 BigQuery 时实现预期性能。例如,对于 Java 版 Apache Beam SDK,调整 numStreamingKeys 以匹配最大工作器数量,并考虑增加 insertBundleParallelism 以将 BigQuery 连接器配置为使用更多并行线程来写入 BigQuery。如需获取 Java 版 Apache Beam SDK 中提供的配置,请参阅 BigQueryPipelineOptions;如需获取 Python 版 Apache Beam SDK 中提供的配置,请参阅 WriteToBigQuery 转换

流式插入

如何在流式模式下运行管道?

执行流水线时,您可以在命令行中设置 --streaming 标志。您还可以在构建流水线时以编程方式设置流式模式。

流处理模式支持哪些数据源和接收器?

您可以从 Pub/Sub 读取流数据,也可以将流式数据写入 Pub/Sub 或 BigQuery。

流处理模式目前有哪些限制?

Dataflow 的流处理模式具有以下限制:

  • 流式模式尚不支持批量来源。
  • 对 Dataflow 服务的自动扩缩功能的支持目前处于 Beta 版阶段。

从 Pub/Sub 读取数据的流处理流水线似乎正在变慢,我该怎么做?

您的项目可能没有足够的 Pub/Sub 配额。 您可以通过检查是否出现 429 (Rate limit exceeded) 客户端错误,判断您的项目是否配额不足:

  1. 转到 Google Cloud Console
  2. 在左侧菜单中,选择 API 和服务
  3. 搜索框中,搜索 Cloud Pub/Sub
  4. 点击用量标签页。
  5. 检查响应代码并查找 (4xx) 客户端错误代码。

当我使用更大的工作器池更新自己的流水线时,为什么我的流处理作业没有相应地扩充资源?

Java

对于不使用 Streaming Engine 的流式作业,扩容后的资源量不能超出原始作业开始时分配的原始工作器数量以及永久性磁盘资源量。当您更新 Dataflow 作业并在新作业中指定数量更多的工作器时,您所能指定的工作器数量上限为您为原始作业指定的 --maxNumWorkers 值。

Python

对于不使用 Streaming Engine 的流式作业,扩容后的资源量不能超出原始作业开始时分配的原始工作器数量以及永久性磁盘资源量。当您更新 Dataflow 作业并在新作业中指定数量更多的工作器时,您所能指定的工作器数量上限为您为原始作业指定的 --max_num_workers 值。

流式自动扩缩

如果我希望使用固定数量的工作器,应该怎么办?

默认情况下不启用流式自动扩缩功能,如需启用,您需要选择开启。当前选项的语义没有改变,因此,如果您希望继续使用固定数量的工作器,不需要执行任何操作。

我担心自动扩缩功能会增加费用。我该如何限制此功能?

Java

您可以指定 --maxNumWorkers,以此限制用于处理作业的扩缩范围。

Python

您可以指定 --max_num_workers,以此限制用于处理作业的扩缩范围。

流式自动扩缩流水线的扩缩范围是怎样的?

Java

对于不使用 Streaming Engine 的流式自动扩缩作业,Dataflow 服务会为每个工作器分配 1 至 15 个永久性磁盘。这意味着,流式自动扩缩流水线使用的工作器数量下限为 N/15,其中 N 为 --maxNumWorkers 的值。

对于使用 Streaming Engine 的流式自动扩缩作业,工作器数量下限为 1 个。

Dataflow 会平衡各工作器之间的永久性磁盘数量。例如,如果您的流水线在稳定状态下需要 3 到 4 个工作器,您可以设置 --maxNumWorkers=15。流水线会自动在 1 至 15 个工作器之间进行扩缩;使用 1、2、3、4、5、8 或 15 个工作器时,每个工作器所需的永久性磁盘数量分别是 15、8、5、4、3、2 或 1。

--maxNumWorkers 的上限为 1000。

Python

对于不使用 Streaming Engine 的流式自动扩缩作业,Dataflow 服务会为每个工作器分配 1 至 15 个永久性磁盘。这意味着,流式自动扩缩流水线使用的工作器数量下限为 N/15,其中 N 为 --max_num_workers 的值。

对于使用 Streaming Engine 的流式自动扩缩作业,工作器数量下限为 1 个。

Dataflow 会平衡各工作器之间的永久性磁盘数量。例如,如果您的流水线在稳定状态下需要 3 到 4 个工作器,您可以设置 --max_num_workers=15。流水线会自动在 1 至 15 个工作器之间进行扩缩;使用 1、2、3、4、5、8 或 15 个工作器时,每个工作器所需的永久性磁盘数量分别是 15、8、5、4、3、2 或 1。

--max_num_workers 的上限为 1000。

自动扩缩功能可能使用的工作器数量上限是多少?

Java

Dataflow 运行时使用的工作器数量不超过您项目的 Compute Engine 实例计数配额或 maxNumWorkers 值(以较小者为准)。

Python

Dataflow 运行时使用的工作器数量不超过您项目的 Compute Engine 实例计数配额或 max_num_workers 值(以较小者为准)。

我可以在流处理流水线上关闭自动扩缩功能吗?

Java

可以。只需设置 --autoscalingAlgorithm=NONE。请使用固定的集群规格更新流水线(如手动扩缩文档中所述),其中 numWorkers 要在扩缩范围内。

Python

可以。只需设置 --autoscaling_algorithm=NONE。请使用固定的集群规格更新流水线(如手动扩缩文档中所述),其中 num_workers 要在扩缩范围内。

我可以更改流处理流水线的扩缩范围吗?

Java

可以,但不能以更新* 方式来执行此操作。您必须使用取消排空方式来停止流水线,然后再重新部署流水线,并为其指定所需的新 maxNumWorkers

Python

可以,但不能以更新* 方式来执行此操作。您必须使用取消排空方式来停止流水线,然后再重新部署流水线,并为其指定所需的新 max_num_workers

设置 Google Cloud Platform 项目以使用 Cloud Dataflow

如何确定我当前使用 Cloud Dataflow 处理的项目是否拥有我想要读取或写入数据的 Cloud Storage 存储分区?

如需确定您的 Google Cloud 项目是否拥有特定 Cloud Storage 存储分区,您可以使用以下控制台命令:

gsutil acl get gs://<your-bucket>

该命令会输出如下所示的 JSON 字符串:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

相关条目是“角色”为所有者的条目。关联的 projectNumber 会指出该存储分区属于哪个项目。如果该项目编号与您的项目编号不匹配,您需要执行以下任一操作:

  • 创建您的项目所拥有的新存储分区。
  • 为相应的帐号授予对此存储分区的访问权限。

如何创建属于我的 Cloud Dataflow 项目的新存储分区?

如需在您正在使用 Dataflow 的 Google Cloud 项目中创建新的存储分区,您可以使用以下控制台命令:

gsutil mb -p <Project to own the bucket> <bucket-name>

如何将属于其他项目的存储分区设置为可供我当前使用 Cloud Dataflow 所处理的 Google Cloud Platform 项目进行读取或写入操作?

请参阅 Dataflow 的安全和权限指南,了解 Dataflow 流水线如何才能访问其他 Google Cloud 项目所拥有的 Google Cloud 资源。

尝试运行 Cloud Dataflow 作业时,我看到一条错误消息:“需要为您的项目启用某些 Cloud API,Cloud Dataflow 才能运行此作业。”我该怎么做?

如需运行 Dataflow 作业,您必须在项目中启用以下 Google Cloud API:

  • Compute Engine API (Compute Engine)
  • Cloud Logging API
  • Cloud Storage
  • Cloud Storage JSON API
  • BigQuery API
  • Pub/Sub
  • Datastore API

如需了解详细说明,请参阅有关启用 Google Cloud API 的“使用入门”部分