パイプライン メッセージをログに記録する

Cloud Dataflow SDK の組み込みロギング インフラストラクチャを使用して、パイプラインの実行中に情報を記録できます。Google Cloud Platform Console を使用して、パイプラインの実行中と実行後のロギング情報をモニタリングできます。

パイプラインにログメッセージを追加する

Java: SDK 2.x

Apache Beam SDK for Java では、オープンソースの SLF4J(Simple Logging Facade for Java)ライブラリを通じてワーカー メッセージをロギングすることをおすすめします。Apache Beam SDK for Java は必要なロギング インフラストラクチャを実装しているため、Java コードで必要なのは、SLF4J API をインポートすることだけです。次に、Logger をインスタンス化して、パイプライン コード内でメッセージ ロギングを有効にします。

既存のコードやライブラリについては、Apache Beam SDK for Java によって追加のロギング インフラストラクチャが設定されます。これは、以下のような Java のロギング ライブラリによって生成されたログメッセージをキャプチャするために、ワーカーで実行しているときに発生します。

Python

Apache Beam SDK for Python には logging ライブラリ パッケージが用意されており、パイプラインのワーカーでコードを使用してログメッセージを出力できます。ライブラリ関数を使用するには、ライブラリをインポートする必要があります。

import logging

Java: SDK 1.x

Apache Beam SDK for Java では、オープンソースの SLF4J(Simple Logging Facade for Java)ライブラリを通じてワーカー メッセージをロギングすることをおすすめします。Apache Beam SDK for Java は必要なロギング インフラストラクチャを実装しているため、Java コードで必要なのは、SLF4J API をインポートすることだけです。次に、Logger をインスタンス化して、パイプライン コード内でメッセージ ロギングを有効にします。

既存のコードやライブラリについては、Apache Beam SDK for Java によって追加のロギング インフラストラクチャが設定されます。これは、以下のような Java のロギング ライブラリによって生成されたログメッセージをキャプチャするために、ワーカーで実行しているときに発生します。

ワーカー ログメッセージ コードの例

Java: SDK 2.x

Apache Beam WordCount サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログメッセージを出力するように変更できます。以下の例では、追加されるコードは太字で示されています(わかりやすくするために、前後のコードも含めています)。

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Apache Beam の wordcount.py サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログメッセージを出力するように変更できます。

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):

  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java: SDK 1.x

WordCount サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログメッセージを出力するように変更できます。以下の例では、追加されるコードは太字で示されています(わかりやすくするために、前後のコードも含めています)。

 package com.google.cloud.dataflow.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @Override
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Java: SDK 2.x

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。この動作を変更する場合は、パイプラインのワーカーログ レベルを設定するをご覧ください。

Python

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。

Java: SDK 1.x

変更された WordCount パイプラインは、デフォルトの DirectPipelineRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO: Executing pipeline using the DirectPipelineRunner.
...
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM com.google.cloud.dataflow.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。この動作を変更する場合は、パイプラインのワーカーログ レベルを設定するをご覧ください。

パイプライン ログをモニタリングする

Cloud Dataflow サービスでパイプラインを実行すると、Cloud Dataflow Monitoring Interface を使用して、パイプラインによって出力されたログを表示できます。

Cloud Dataflow ワーカーログの例

変更された WordCount パイプラインは、次のオプションを使用してクラウドで実行できます。

Java: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java: SDK 1.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=BlockingDataflowPipelineRunner
--stagingLocation=gs://<bucket-name>/binaries

ジョブの概要とステータスを表示する

WordCount クラウド パイプラインはブロック実行を使用するため、パイプラインの実行中にコンソール メッセージが出力されます。ジョブが開始すると、GCP Console ページへのリンクがコンソールに出力され、続いてパイプラインのジョブ ID が出力されます。

INFO: To access the Cloud Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

コンソール URL は、送信されたジョブの概要ページがある Cloud Dataflow Monitoring Interface を表示します。左側には動的実行グラフが表示され、右側には概要情報が表示されます。

[ログ] ボタンをクリックすると下部のログパネルが開き、デフォルトではジョブの全体的なステータスを報告する [ジョブのログ] メッセージが表示されます。最小重要度セレクタを使用して、ジョブの処理状況とステータス メッセージをフィルタできます。

グラフのパイプライン ステップを選択すると、ビューは、コードによって生成された [ステップのログ] と、パイプライン ステップで実行中の生成されたコードに変更されます。

[ジョブのログ] に戻るには、グラフの外側をクリックするか右側のパネルの [閉じる] ボタンを使用して、ステップの選択を解除します。

ログの表示

Cloud Dataflow Monitoring Interface の [ステップのログ] には、最新のログメッセージのみが表示されます。ログペインの右側にある [Stackdriver] リンクをクリックすると、Stackdriver Logging でパイプライン ステップのすべてのステップのログを表示できます。

Logging には、パイプラインのその他のインフラストラクチャ ログも含まれます。[ジョブのログ] から外部リンクボタンをクリックすると、Cloud Logging に移動し、さまざまなログタイプを選択するメニューが表示されます。

ここでは、[モニタリング] → [ログ] ページで表示できるさまざまなログタイプの概要を示します。

  • worker は、Cloud Dataflow ワーカーによって生成されます。worker は、パイプライン処理の大部分(データへの ParDo の適用など)を実行します。worker のログには、コードと Cloud Dataflow によって記録されたメッセージが含まれます。
  • worker-startup ログは、ほとんどの Cloud Dataflow ジョブを表し、起動プロセスに関連するメッセージをキャプチャできます。起動プロセスには、Cloud Storage からのジョブの jar のダウンロードと、それに続くワーカーの起動が含まれます。ワーカーの起動に問題がある場合は、ここを調べることをおすすめします。
  • shuffler ログには、並列パイプライン オペレーションの結果を統合する、ワーカーからのメッセージが含まれます。
  • dockerkubelet のログには、Cloud Dataflow ワーカーで使用される、これらの一般公開されたテクノロジーに関連するメッセージが含まれます。

パイプラインのワーカーログ レベルを設定する

Java: SDK 2.x

Apache Beam SDK for Java によってワーカーに設定されるデフォルトの SLF4J ログレベルは INFO です。INFO 以上(INFOWARNERROR)のすべてのログメッセージが出力されます。別のデフォルト ログレベルを設定して、より低い SLF4J ログレベル(TRACE または DEBUG)をサポートしたり、コード内の異なるクラス パッケージに異なるログレベルを設定したりできます。

コマンドラインまたはプログラムでワーカーログ レベルを設定できるように、2 つのパイプライン オプションが用意されています。

  • --defaultWorkerLogLevel=<level>: このオプションを使用して、指定したデフォルト レベルですべてのロガーを設定します。たとえば、次のコマンドライン オプションは、デフォルトの Cloud Dataflow INFO ログレベルをオーバーライドし、DEBUG に設定します。
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: このオプションを使用して、指定したパッケージまたはクラスのログレベルを設定します。たとえば、com.google.cloud.dataflow パッケージのデフォルトのパイプライン ログレベルをオーバーライドし、TRACE に設定するには、次のようにします。
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    または、com.google.cloud.Foo クラスのデフォルトのパイプライン ログレベルをオーバーライドし、DEBUG に設定するには、次のようにします。
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    JSON マップを提供することで、複数のオーバーライドを実行できます。
    --workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}

次のサンプル プログラムでは、パイプライン ログオプションに、コマンドラインからオーバーライドできるデフォルト値を設定します。

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

この機能は、Apache Beam SDK for Python ではまだ使用できません。

Java: SDK 1.x

Apache Beam SDK for Java によってワーカーに設定されるデフォルトの SLF4J ログレベルは INFO です。INFO 以上(INFOWARNERROR)のすべてのログメッセージが出力されます。別のデフォルト ログレベルを設定して、より低い SLF4J ログレベル(TRACE または DEBUG)をサポートしたり、コード内の異なるクラス パッケージに異なるログレベルを設定したりできます。

コマンドラインまたはプログラムでワーカーログ レベルを設定できるように、2 つのパイプライン オプションが用意されています。

  • --defaultWorkerLogLevel=<level>: このオプションを使用して、指定したデフォルト レベルですべてのロガーを設定します。たとえば、次のコマンドライン オプションでは、デフォルトの INFO ログレベルがオーバーライドされ、DEBUG に設定されます。
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: このオプションを使用して、指定したパッケージまたはクラスのログレベルを設定します。たとえば、com.google.cloud.dataflow パッケージのデフォルトのパイプライン ログレベルをオーバーライドし、TRACE に設定するには、次のようにします。
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    または、com.google.cloud.Foo クラスのデフォルトのパイプライン ログレベルをオーバーライドし、DEBUG に設定するには、次のようにします。
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    JSON マップを提供することで、複数のオーバーライドを実行できます。
    --workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}

次のサンプル プログラムでは、パイプライン ログオプションに、コマンドラインからオーバーライドできるデフォルト値を設定します。

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。