使用流水线日志

在运行流水线时,您可以使用 Apache Beam SDK 的内置日志记录基础架构来记录信息。使用 Google Cloud 控制台可以在流水线运行期间或之后监控日志记录信息。

将日志消息添加到流水线

Java

Java 版 Apache Beam SDK 建议您通过开源 Java 简易日志记录门面 (SLF4J) 库记录工作器消息。Java 版 Apache Beam SDK 会实现必需的日志记录基础架构,因此 Java 代码只需导入 SLF4J API。然后实例化日志记录器,即可在流水线代码中启用消息日志记录功能。

对于现有代码和/或库,Java 版 Apache Beam SDK 会设置额外的日志记录基础架构。系统会捕获由以下 Java 版日志记录库生成的日志消息:

Python

Python 版 Apache Beam SDK 提供 logging 库软件包,可支持流水线工作器输出日志消息。要使用库函数,您必须导入相应的库:

import logging

Go

Go 版 Apache Beam SDK 提供 log 库软件包,可支持流水线工作器输出日志消息。要使用库函数,您必须导入相应的库:

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

工作器日志消息代码示例

Java

以下示例使用 SLF4J 来进行 Dataflow 日志记录。如需详细了解如何配置 SLF4J 来进行 Dataflow 日志记录,请参阅 Java 提示文章。

您可对 Apache Beam WordCount 示例进行修改,令其在处理的文本行中发现“love”一词时输出一条日志消息。在以下示例中,添加的代码以粗体表示(包含了环境代码,用于提供上下文)。

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

可以对 Apache Beam wordcount.py 示例进行修改,以在处理的文本行中发现“love”一词时输出日志消息。

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

可以对 Apache Beam wordcount.go 示例进行修改,以在处理的文本行中发现“love”一词时输出日志消息。

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

如果使用默认的 DirectRunner 在本地运行修改后的 WordCount 流水线,将输出发送至本地文件 (--output=./local-wordcounts),则控制台输出将包含增加的日志消息:

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

默认情况下,只有标记为 INFO 及更高级别的日志行将发送至 Cloud Logging。如果要更改此行为,请参阅设置流水线工作器日志级别

Python

如果使用默认的 DirectRunner 在本地运行修改后的 WordCount 流水线,将输出发送至本地文件 (--output=./local-wordcounts),则控制台输出将包含增加的日志消息:

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

默认情况下,只有标记为 INFO 及更高级别的日志行将发送至 Cloud Logging。

Go

如果使用默认的 DirectRunner 在本地运行修改后的 WordCount 流水线,将输出发送至本地文件 (--output=./local-wordcounts),则控制台输出将包含增加的日志消息:

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

默认情况下,只有标记为 INFO 及更高级别的日志行将发送至 Cloud Logging。

控制日志量

您还可以通过更改流水线日志级别来减少生成的日志量。如果您不想继续注入部分或全部 Dataflow 日志,请添加 Logging 排除项以排除 Dataflow 日志。然后,将日志导出到其他目标位置,例如 BigQuery、Cloud Storage 或 Pub/Sub。如需了解详情,请参阅控制 Dataflow 日志注入

日志记录限制

每个工作器每 30 秒最多只能发送 15000 条工作器日志消息。如果达到此限制,系统会添加一条工作器日志消息,表示日志记录受到限制:

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
在 30 秒间隔结束之前,系统不会再记录更多消息。此限制包括 Apache Beam SDK 和用户代码生成的日志消息。

日志存储和保留

操作日志存储在 _Default 日志存储桶中。 Logging API 服务名称为 dataflow.googleapis.com。如需详细了解 Cloud Logging 中使用的 Google Cloud 受监控的资源类型和服务,请参阅受监控的资源和服务

如需详细了解 Logging 将保留日志条目多长时间,请参阅“配额和限制:日志保留期限”中的保留期限信息。

如需了解如何查看操作日志,请参阅监控和查看流水线日志

监控和查看流水线日志

当您在 Dataflow 服务上运行流水线时,可以使用 Dataflow 监控界面查看流水线发出的日志。

Dataflow 工作器日志示例

修改后的 WordCount 流水线可以使用下列选项在云端运行:

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

查看日志

WordCount 云端流水线使用阻塞式执行方式,因此流水线执行期间会输出 Console 消息。作业开始后,流水线将向控制台输出一个指向 Google Cloud 控制台页面的链接,后跟流水线作业 ID:

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

控制台网址指向 Dataflow 监控界面,其中显示已提交作业的摘要页面。页面左侧显示一个动态执行图,右侧显示摘要信息。点击底部面板上的 ,以展开日志面板。

日志面板默认显示报告作业的整体状态的作业日志。您可以通过点击信息过滤日志来过滤日志面板中显示的消息。

选择图表中的流水线步骤可将视图更改为由您的代码生成的步骤日志,以及在流水线步骤中运行的生成的代码。

如需返回作业日志,请点击图表外部区域或使用右侧面板中的取消选择步骤按钮来清除步骤。

点击日志面板中的外部链接按钮即可转至 Logging,其中包含可选择不同日志类型的菜单。

Logging 还包含流水线的其他基础架构日志。如需详细了解如何浏览日志,请参阅日志浏览器指南。

日志类型

下面汇总了可从 Logging 页面查看的不同日志类型:

  • job-message 日志包含 Dataflow 的各个组件生成的作业级消息。相关示例包括自动扩缩配置、工作器启动或关停的时间、作业步骤进度以及作业错误。源自崩溃用户代码以及 worker 日志中出现的工作器级错误也会记录到 job-message 日志中。
  • worker 日志由 Dataflow 工作器生成。工作器负责完成大多数流水线工作(例如将 ParDo 应用到数据)。Worker 日志包含由您的代码和 Dataflow 记录的消息。
  • worker-startup 日志存在于大多数 Dataflow 作业中,可捕获与启动过程相关的消息。启动过程包括从 Cloud Storage 下载作业的 jar 文件,然后启动工作器。如果启动工作器时出现问题,则建议查看此日志。
  • shuffler 日志中包含的消息来自负责整合并行流水线操作结果的工作器。
  • dockerkubelet 日志包含与在 Dataflow 工作器上使用的这些公开技术相关的消息。
  • nvidia-mps 日志包含有关 NVIDIA 多进程服务 (MPS) 操作的消息。

设置流水线工作器日志级别

Java

Java 版 Apache Beam SDK 在工作器上设置的默认 SLF4J 日志记录级别为 INFO,因此级别为 INFO 或更高级别(INFOWARNERROR)的所有日志消息都将发出。您可以设置其他默认日志级别,以支持较低的 SLF4J 日志记录级别(TRACEDEBUG),也可以为代码中不同的软件包或类设置不同的日志级别。

提供了以下流水线选项,可让您通过命令行或以编程方式设置工作器日志级别:

  • --defaultSdkHarnessLogLevel=<level>:用于为所有日志记录器设置指定的默认级别。例如,以下命令行选项将替换默认的 Dataflow INFO 日志级别,并将其设置为 DEBUG
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}:使用此选项可为指定的软件包或类设置日志记录级别。例如,如需替换 org.apache.beam.runners.dataflow 包的默认流水线日志级别,并将其设置为 TRACE,请使用以下选项:
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    如需进行多项替换,请提供 JSON 映射:
    (--sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})。
  • 如果流水线使用 Apache Beam SDK 2.50.0 及更低版本,但没有 Runner v2,则不支持 defaultSdkHarnessLogLevelsdkHarnessLogLevelOverrides 流水线选项。在这种情况下,请使用 --defaultWorkerLogLevel=<level>--workerLogLevelOverrides={"<package or class>":"<level>"} 流水线选项。如需进行多项替换,请提供 JSON 映射:
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})

以下示例以编程方式设置流水线日志记录选项,其默认值可通过命令行进行替换:

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

Python 版 Apache Beam SDK 在工作器上设置的默认日志记录级别为 INFO。这意味着,INFO 级别或更高级别(即 INFOWARNINGERRORCRITICAL)的所有日志消息都会发出。您可以设置其他默认日志级别,以支持较低的日志记录级别 (DEBUG),也可以为代码中不同的模块设置不同的日志级别。

系统提供了两个流水线选项,支持通过命令行或以编程方式设置工作器日志级别:

  • --default_sdk_harness_log_level=<level>:用于为所有日志记录器设置指定的默认级别。例如,以下命令行选项会替换默认的 Dataflow 日志级别(即 INFO),并将其设置为 DEBUG
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}:使用此选项可为指定的模块设置日志记录级别。例如,如需替换 apache_beam.runners.dataflow 模块的默认流水线日志级别,并将其设置为 DEBUG,请使用以下选项:
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    如需进行多项替换,请提供 JSON 映射:
    (--sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...})。

以下示例使用 WorkerOptions 类以编程方式设置可通过命令行替换的流水线日志记录选项:

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

请替换以下内容:

  • PROJECT_NAME:项目的名称
  • JOB_NAME:作业的名称
  • STORAGE_BUCKET:Cloud Storage 名称
  • DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域

    --region 标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。

Go

Go 版 Apache Beam SDK 未提供此功能。

查看已发布 BigQuery 作业的日志

在 Dataflow 流水线中使用 BigQuery 时,系统会启动 BigQuery 作业,以代表您执行各种操作。这些操作可能包括加载数据、导出数据等。为了进行问题排查和监控,Dataflow 监控界面在日志面板中提供了有关这些 BigQuery 作业的详细信息。

日志面板中显示的 BigQuery 作业信息是从 BigQuery 系统表中存储和加载的,因此查询底层 BigQuery 表时会产生结算费用

查看 BigQuery 作业详情

要查看 BigQuery 作业信息,您的流水线必须使用 Apache Beam 2.24.0 或更高版本。

如需列出 BigQuery 作业,请打开 BigQuery 作业标签页并选择 BigQuery 作业的位置。接下来,点击加载 BigQuery 作业并确认对话框。查询完成后,系统会显示作业列表。

BigQuery 作业信息表中的“加载 BigQuery 作业”按钮

提供了有关每个作业的基本信息,包括作业 ID、类型、时长等。

一个表显示了在当前流水线作业执行期间运行的 BigQuery 作业。

如需详细了解特定作业,请点击更多信息列中的命令行

在命令行的模态窗口中,复制 bq jobs describe 命令,然后在本地或 Cloud Shell 中运行它。

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

bq jobs describe 命令会输出 JobStatistics,可在诊断慢速或卡住的 BigQuery 作业时提供更多有用详情。

或者,当您将 BigQueryIO 与 SQL 查询结合使用时,系统会发起查询作业。如需查看作业使用的 SQL 查询,请点击更多信息列中的查看查询