パイプライン ログを操作する

Apache Beam SDK の組み込みロギング インフラストラクチャを使用して、パイプラインの実行時に情報をロギングできます。Google Cloud コンソールを使用して、パイプラインの実行中と実行後のロギング情報をモニタリングできます。

パイプラインにログメッセージを追加する

Java

Apache Beam SDK for Java では、オープンソースの Simple Logging Facade for Java(SLF4J)ライブラリを通じてワーカー メッセージをロギングすることをおすすめします。Apache Beam SDK for Java は必要なロギング インフラストラクチャを実装しているため、Java コードで必要なのは、SLF4J API をインポートすることだけです。次に、Logger をインスタンス化して、パイプライン コード内でメッセージ ロギングを有効にします。

既存のコードやライブラリについては、Apache Beam SDK for Java によって追加のロギング インフラストラクチャが設定されます。Java 用の次のロギング ライブラリによって生成されたログメッセージがキャプチャされます。

Python

Apache Beam SDK for Python には logging ライブラリ パッケージが用意されており、パイプラインのワーカーでコードを使用してログメッセージを出力できます。ライブラリ関数を使用するには、ライブラリをインポートする必要があります。

import logging

Go

Apache Beam SDK for Go には log ライブラリ パッケージが用意されており、パイプラインのワーカーでコードを使用してログメッセージを出力できます。ライブラリ関数を使用するには、ライブラリをインポートする必要があります。

import "github.com/apache/beam/sdks/v2/go/pkg/beam/log"

ワーカー ログ メッセージ コードの例

Java

次の例では、Dataflow ロギングに SLF4J を使用します。Dataflow ロギング用の SLF4J の構成の詳細については、Java のヒントの記事をご覧ください。

Apache Beam WordCount サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログ メッセージを出力するように変更できます。以下の例では、追加されるコードは太字で示されています(わかりやすくするために、前後のコードも含めています)。

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Apache Beam の wordcount.py サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログ メッセージを出力するように変更できます。

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Go

Apache Beam の wordcount.go サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログ メッセージを出力するように変更できます。

func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        // increment the counter for small words if length of words is
        // less than small_word_length
        if strings.ToLower(word) == "love" {
            log.Infof(ctx, "Found : %s", strings.ToLower(word))
        }

        emit(word)
    }
}

// Remaining Wordcount example

Java

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。この動作を変更する場合は、パイプライン ワーカーのログレベルの設定をご覧ください。

Python

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。

Go

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love
2022/05/26 11:36:44 Found : love

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。

ログの量を制御する

また、パイプラインのログレベルを変更して、生成されるログの量を減らすこともできます。一部またはすべての Dataflow ログの取り込みを続行しない場合は、Logging の除外を追加して Dataflow ログを除外します。次に、BigQuery、Cloud Storage、Pub/Sub などの別の宛先にログをエクスポートします。詳細については、Dataflow ログ取り込みの制御をご覧ください。

ロギングの上限と抑制

ワーカー ログメッセージの上限は、ワーカーごとに 30 秒あたり 15,000 メッセージに制限されています。この上限に達すると、ロギングが抑制されたことを示す 1 つのワーカー ログ メッセージが追加されます。

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
30 秒が経過するまで、それ以上のメッセージはログに記録されません。この上限は、Apache Beam SDK とユーザーコードで生成されるログメッセージによって共有されます。

ログの保存と保持

オペレーション ログは、_Default ログバケットに保存されます。Logging API サービス名は dataflow.googleapis.com です。Cloud Logging で使用される Google Cloud のモニタリング対象リソースタイプとサービスの詳細については、モニタリング対象リソースとサービスをご覧ください。

Logging でログエントリが保持される期間の詳細については、割り当てと上限: ログの保持期間で保持情報をご覧ください。

オペレーション ログの表示方法については、パイプライン ログをモニタリングおよび表示するをご覧ください。

パイプライン ログのモニタリングと表示

Dataflow サービスでパイプラインを実行すると、Dataflow モニタリング インターフェースを使用して、パイプラインによって出力されたログを表示できます。

Dataflow ワーカーログの例

変更された WordCount パイプラインは、次のオプションを使用してクラウドで実行できます。

Java

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Go

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

ログの表示

WordCount クラウド パイプラインはブロック実行を使用するため、パイプラインの実行中にコンソール メッセージが出力されます。ジョブが開始すると、Google Cloud コンソール ページへのリンクがコンソールに出力され、続いてパイプラインのジョブ ID が出力されます。

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

コンソール URL は、送信されたジョブの概要ページがある Dataflow モニタリング インターフェースを表示します。左側には動的実行グラフが表示され、右側には概要情報が表示されます。下のパネルの をクリックしてログパネルを展開します。

ログパネルにはデフォルトで、ジョブ全体のステータスを報告する [ジョブのログ] が表示されます。[情報] [ログのフィルタ] をクリックすると、ログパネルに表示されるメッセージをフィルタできます。

グラフのパイプライン ステップを選択すると、ビューは、コードによって生成された [ステップのログ] と、パイプライン ステップで実行中の生成されたコードに変更されます。

[ジョブのログ] に戻るには、グラフの外側をクリックするか右側のパネルの [ステップを選択解除] ボタンを使用して、ステップの選択を解除します。

ログパネルから外部リンクボタンをクリックすると、Logging に移動し、さまざまなログタイプを選択するメニューが表示されます。

Logging には、パイプラインのその他のインフラストラクチャ ログも含まれます。ログを調べる方法の詳細については、ログ エクスプローラ ガイドをご覧ください。

ログタイプ

ここでは、[Logging] ページで表示できるさまざまなログタイプの概要を示します。

  • job-message ログには、Dataflow のさまざまなコンポーネントが生成するジョブレベルのメッセージが記録されます。たとえば、自動スケーリング構成、ワーカーの起動やシャットダウン、ジョブステップの進捗状況、ジョブエラーなどが記録されます。ユーザーコードのクラッシュに起因するワーカーレベルのエラーや、ワーカーログに記録されたエラーも job-message ログに記録されます。
  • worker ログは Dataflow ワーカーによって生成されます。worker は、パイプライン処理の大部分(データへの ParDo の適用など)を実行します。worker のログには、コードと Dataflow によって記録されたメッセージが含まれます。
  • worker-startup ログは、ほとんどの Dataflow ジョブを表し、起動プロセスに関連するメッセージをキャプチャできます。起動プロセスでは、Cloud Storage からジョブの jar をダウンロードし、ワーカーを起動します。ワーカーの起動に問題がある場合は、ここを調べることをおすすめします。
  • shuffler ログには、並列パイプライン オペレーションの結果を統合する、ワーカーからのメッセージが含まれます。
  • dockerkubelet のログには、Dataflow ワーカーで使用される、これらの一般公開されたテクノロジーに関連するメッセージが含まれます。
  • nvidia-mps ログには、NVIDIA Multi-Process Service(MPS)オペレーションに関するメッセージが含まれます。

パイプラインのワーカーログ レベルを設定する

Java

Apache Beam SDK for Java によってワーカーに設定されるデフォルトの SLF4J ロギングレベルは INFO です。INFO 以上(INFOWARNERROR)のすべてのログメッセージが出力されます。別のデフォルト ログレベルを設定して、より低い SLF4J ログレベル(TRACE または DEBUG)をサポートできます。また、コード内の異なるクラス パッケージに異なるログレベルを設定することもできます。

コマンドラインまたはプログラムでワーカーログ レベルを設定できるように、次のパイプライン オプションが用意されています。

  • --defaultSdkHarnessLogLevel=<level>: このオプションを使用して、指定したデフォルト レベルですべてのロガーを設定します。たとえば、次のコマンドライン オプションは、デフォルトの Dataflow INFO ログレベルをオーバーライドして、DEBUG に設定します。
    --defaultSdkHarnessLogLevel=DEBUG
  • --sdkHarnessLogLevelOverrides={"<package or class>":"<level>"}: このオプションを使用して、指定したパッケージまたはクラスのログレベルを設定します。たとえば、org.apache.beam.runners.dataflow パッケージのデフォルトのパイプライン ログレベルをオーバーライドして、TRACE に設定するには、次のようにします。
    --sdkHarnessLogLevelOverrides='{"org.apache.beam.runners.dataflow":"TRACE"}'
    複数のオーバーライドを行うには、次のように JSON マップを指定します。
    --sdkHarnessLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}
  • defaultSdkHarnessLogLevelsdkHarnessLogLevelOverrides のパイプライン オプションは、Runner v2 を使用しない Apache Beam SDK バージョン 2.50.0 以前を使用するパイプラインではサポートされていません。 その場合、--defaultWorkerLogLevel=<level>--workerLogLevelOverrides={"<package or class>":"<level>"} のパイプライン オプションを使用します。複数のオーバーライドを行うには、次のように JSON マップを指定します。
    --workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}

次のサンプル プログラムでは、パイプライン ログオプションに、コマンドラインからオーバーライドできるデフォルト値を設定します。

 PipelineOptions options = ...
 SdkHarnessOptions loggingOptions = options.as(SdkHarnessOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultSdkHarnessLogLevel(LogLevel.TRACE);
 // Overrides the Foo class and "org.apache.beam.runners.dataflow" package to emit logs at WARN or higher.
 loggingOptions.getSdkHarnessLogLevelOverrides()
     .addOverrideForClass(Foo.class, LogLevel.WARN)
     .addOverrideForPackage(Package.getPackage("org.apache.beam.runners.dataflow"), LogLevel.WARN);

Python

Apache Beam SDK for Python によってワーカーに設定されるデフォルトのロギングレベルは INFO です。INFO 以上(INFOWARNINGERRORCRITICAL)のすべてのログメッセージが出力されます。別のデフォルトのログレベルを設定して、より低いロギングレベル(DEBUG)をサポートできます。また、コード内のモジュールごとに異なるログレベルを設定することもできます。

コマンドラインまたはプログラムでワーカーログ レベルを設定できるように、2 つのパイプライン オプションが用意されています。

  • --default_sdk_harness_log_level=<level>: このオプションを使用して、指定したデフォルト レベルですべてのロガーを設定します。たとえば、次のコマンドライン オプションは、デフォルトの Dataflow INFO ログレベルをオーバーライドして、DEBUG に設定します。
    --default_sdk_harness_log_level=DEBUG
  • --sdk_harness_log_level_overrides={\"<module>\":\"<level>\"}: このオプションを使用して、指定したモジュールのロギングレベルを設定します。たとえば、apache_beam.runners.dataflow モジュールのデフォルトのパイプライン ログレベルをオーバーライドして、DEBUG に設定するには、次のようにします。
    --sdk_harness_log_level_overrides={\"apache_beam.runners.dataflow\":\"DEBUG\"}
    複数のオーバーライドを行うには、次のように JSON マップを指定します。
    --sdk_harness_log_level_overrides={\"<module>\":\"<level>\",\"<module>\":\"<level>\",...}

次の例では、WorkerOptions クラスを使用して、コマンドラインからオーバーライドできるパイプライン ロギング オプションをプログラムで設定しています。

  from apache_beam.options.pipeline_options import PipelineOptions, WorkerOptions

  pipeline_args = [
    '--project=PROJECT_NAME',
    '--job_name=JOB_NAME',
    '--staging_location=gs://STORAGE_BUCKET/staging/',
    '--temp_location=gs://STORAGE_BUCKET/tmp/',
    '--region=DATAFLOW_REGION',
    '--runner=DataflowRunner'
  ]

  pipeline_options = PipelineOptions(pipeline_args)
  worker_options = pipeline_options.view_as(WorkerOptions)
  worker_options.default_sdk_harness_log_level = 'WARNING'

  # Note: In Apache Beam SDK 2.42.0 and earlier versions, use ['{"apache_beam.runners.dataflow":"WARNING"}']
  worker_options.sdk_harness_log_level_overrides = {"apache_beam.runners.dataflow":"WARNING"}

  # Pass in pipeline options during pipeline creation.
  with beam.Pipeline(options=pipeline_options) as pipeline:

次のように置き換えます。

  • PROJECT_NAME: プロジェクトの名前
  • JOB_NAME: ジョブの名前
  • STORAGE_BUCKET: Cloud Storage 名
  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン

    --region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

Go

この機能は、Apache Beam SDK for Go では使用できません。

起動された BigQuery ジョブのログを表示する

Dataflow パイプラインで BigQuery を使用すると、BigQuery ジョブが起動し、ユーザーに代わってさまざまなアクションが実行されます。これらのアクションには、データの読み込み、エクスポートなどが含まれます。トラブルシューティングとモニタリングを行う場合、Dataflow モニタリング インターフェースには、これらの BigQuery ジョブに関する追加情報が [ログ] パネルに表示されます。

[ログ] パネルに表示される BigQuery ジョブの情報は BigQuery システム テーブルに保存され、そこから読み込まれます。基盤となる BigQuery テーブルに対してクエリが実行されると課金が発生します。

BigQuery ジョブの詳細を表示する

BigQuery ジョブの情報を表示するには、パイプラインで Apache Beam 2.24.0 以降を使用する必要があります。

BigQuery ジョブを一覧表示するには、[BigQuery ジョブ] タブを開いて、BigQuery ジョブのロケーションを選択します。次に、[BigQuery ジョブの読み込み] をクリックしてダイアログを確定します。クエリが完了すると、ジョブリストが表示されます。

BigQuery ジョブ情報テーブル内の [BigQuery ジョブの読み込み] ボタン

ジョブ ID、タイプ、期間など、各ジョブの基本情報が提供されます。

現在のパイプライン ジョブの実行中に実施された BigQuery ジョブを示すテーブル。

特定のジョブの詳細情報については、[詳細] 列の [コマンドライン] をクリックします。

コマンドラインのモーダル ウィンドウで bq jobs describe コマンドをコピーし、ローカルまたは Cloud Shell で実行します。

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

bq jobs describe コマンドは、JobStatistics を出力します。これにより、低速または停滞している BigQuery ジョブの診断時に役立つ追加の詳細情報が提供されます。

また、SQL クエリで BigQueryIO を使用すると、クエリジョブが発行されます。ジョブで使用される SQL クエリを表示するには、[詳細] 列で [クエリを表示] をクリックします。