パイプライン メッセージをログに記録する

Apache Beam SDK の組み込みロギング インフラストラクチャを使用して、パイプラインの実行中に情報をログに記録できます。Google Cloud Console を使用して、パイプラインの実行中と実行後のロギング情報をモニタリングできます。

パイプラインにログメッセージを追加する

Java: SDK 2.x

Apache Beam SDK for Java では、オープンソースの SLF4J(Simple Logging Facade for Java)ライブラリを通じてワーカー メッセージをロギングすることをおすすめします。Apache Beam SDK for Java は必要なロギング インフラストラクチャを実装しているため、Java コードで必要なのは、SLF4J API をインポートすることだけです。次に、Logger をインスタンス化して、パイプライン コード内でメッセージ ロギングを有効にします。

既存のコードやライブラリについては、Apache Beam SDK for Java によって追加のロギング インフラストラクチャが設定されます。これは、以下のような Java のロギング ライブラリによって生成されたログメッセージをキャプチャするために、ワーカーで実行しているときに発生します。

Python

Apache Beam SDK for Python には logging ライブラリ パッケージが用意されており、パイプラインのワーカーでコードを使用してログメッセージを出力できます。ライブラリ関数を使用するには、ライブラリをインポートする必要があります。

import logging

Java: SDK 1.x

ワーカー ログメッセージ コードの例

Java: SDK 2.x

Apache Beam WordCount サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログメッセージを出力するように変更できます。以下の例では、追加されるコードは太字で示されています(わかりやすくするために、前後のコードも含めています)。

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Apache Beam の wordcount.py サンプルは、処理されたテキストの行に “love” という単語が見つかった場合にログメッセージを出力するように変更できます。

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

Java: SDK 1.x

Java: SDK 2.x

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。この動作を変更する場合は、パイプラインのワーカーログ レベルを設定するをご覧ください。

Python

変更された WordCount パイプラインは、デフォルトの DirectRunner を使用してローカルで実行され、出力はローカル ファイル(--output=./local-wordcounts)に送信されます。コンソールの出力には、追加されたログメッセージが表示されます。

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

デフォルトでは、INFO 以上がマークされたログ行のみが Cloud Logging に送信されます。

Java: SDK 1.x

ロギングの上限と抑制

ワーカー ログメッセージの上限は、ワーカーごとに 30 秒あたり 15,000 メッセージに制限されています。この上限に達すると、ロギングが抑制されたことを示す 1 つのワーカー ログメッセージが追加されます。

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
30 秒が経過するまで、それ以上のメッセージはログに記録されません。この上限は、Apache Beam SDK とユーザーコードで生成されるログメッセージによって共有されます。

パイプライン ログをモニタリングする

Dataflow サービスでパイプラインを実行すると、Dataflow Monitoring Interface を使用して、パイプラインによって出力されたログを表示できます。

Dataflow ワーカーログの例

変更された WordCount パイプラインは、次のオプションを使用してクラウドで実行できます。

Java: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

Java: SDK 1.x

ログの表示

WordCount クラウド パイプラインはブロック実行を使用するため、パイプラインの実行中にコンソール メッセージが出力されます。ジョブが開始すると、Cloud Console ページへのリンクがコンソールに出力され、続いてパイプラインのジョブ ID が出力されます。

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

コンソール URL は、送信されたジョブの概要ページがある Dataflow Monitoring Interface を表示します。左側には動的実行グラフが表示され、右側には概要情報が表示されます。下のパネルの をクリックしてログパネルを展開します。

ログパネルにはデフォルトで、ジョブ全体のステータスを報告する [ジョブのログ] が表示されます。[情報] [ログのフィルタ] をクリックすると、ログパネルに表示されるメッセージをフィルタできます。

グラフのパイプライン ステップを選択すると、ビューは、コードによって生成された [ステップのログ] と、パイプライン ステップで実行中の生成されたコードに変更されます。

[ジョブのログ] に戻るには、グラフの外側をクリックするか右側のパネルの [ステップを選択解除] ボタンを使用して、ステップの選択を解除します。

ログパネルから外部リンクボタンをクリックすると、Logging に移動し、さまざまなログタイプを選択するメニューが表示されます。

Logging には、パイプラインのその他のインフラストラクチャ ログも含まれます。ログを調べる方法の詳細については、ログ エクスプローラ ガイドをご覧ください。

ここでは、[Logging] ページで表示できるさまざまなログタイプの概要を示します。

  • job-message ログには、Dataflow のさまざまなコンポーネントが生成するジョブレベルのメッセージが記録されます。たとえば、自動スケーリング構成、ワーカーの起動やシャットダウン、ジョブステップの進捗状況、ジョブエラーなどが記録されます。ユーザーコードのクラッシュに起因するワーカーレベルのエラーや、ワーカーログに記録されたエラーも job-message ログに記録されます。
  • worker ログは Dataflow ワーカーによって生成されます。worker は、パイプライン処理の大部分(データへの ParDo の適用など)を実行します。worker のログには、コードと Dataflow によって記録されたメッセージが含まれます。
  • worker-startup ログは、ほとんどの Dataflow ジョブを表し、起動プロセスに関連するメッセージをキャプチャできます。起動プロセスには、Cloud Storage からのジョブの jar のダウンロードと、それに続くワーカーの起動が含まれます。ワーカーの起動に問題がある場合は、ここを調べることをおすすめします。
  • shuffler ログには、並列パイプライン オペレーションの結果を統合する、ワーカーからのメッセージが含まれます。
  • dockerkubelet のログには、Dataflow ワーカーで使用される、これらの一般公開されたテクノロジーに関連するメッセージが含まれます。

パイプラインのワーカーログ レベルを設定する

Java: SDK 2.x

Apache Beam SDK for Java によってワーカーに設定されるデフォルトの SLF4J ログレベルは INFO です。INFO 以上(INFOWARNERROR)のすべてのログメッセージが出力されます。別のデフォルト ログレベルを設定して、より低い SLF4J ログレベル(TRACE または DEBUG)をサポートできます。また、コード内の異なるクラス パッケージに異なるログレベルを設定することもできます。

コマンドラインまたはプログラムでワーカーログ レベルを設定できるように、2 つのパイプライン オプションが用意されています。

  • --defaultWorkerLogLevel=<level>: このオプションを使用して、指定したデフォルト レベルですべてのロガーを設定します。たとえば、次のコマンドライン オプションは、デフォルトの Dataflow INFO ログレベルをオーバーライドして、DEBUG に設定します。
    --defaultWorkerLogLevel=DEBUG
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: このオプションを使用して、指定したパッケージまたはクラスのログレベルを設定します。たとえば、com.google.cloud.dataflow パッケージのデフォルトのパイプライン ログレベルをオーバーライドし、TRACE に設定するには、次のようにします。
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    または、com.google.cloud.Foo クラスのデフォルトのパイプライン ログレベルをオーバーライドし、DEBUG に設定するには、次のようにします。

    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"} JSON マップを提供することで、複数のオーバーライドを実行できます。
    --workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...}

次のサンプル プログラムでは、パイプライン ログオプションに、コマンドラインからオーバーライドできるデフォルト値を設定します。

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

この機能は、Apache Beam SDK for Python ではまだ使用できません。

Java: SDK 1.x

起動された BigQuery ジョブのログを表示する

Dataflow パイプラインで BigQuery を使用すると、BigQuery ジョブが起動し、データの読み込みやエクスポートなど、ユーザーに代わってさまざまなアクションを実行できます。トラブルシューティングとモニタリングを行う場合、Dataflow モニタリング UI には、これらの BigQuery ジョブに関する追加情報が [ログ] パネルに表示されます。

[ログ] パネルに表示される BigQuery ジョブの情報は BigQuery システム テーブルに保存され、そこから読み込まれるので、基盤となる BigQuery テーブルに対してクエリが実行されると課金が発生します。

プロジェクトの設定

BigQuery ジョブの情報を表示するには、パイプラインで Apache Beam 2.24.0 以降を使用する必要があります。ただし、リリースされるまでは、メインブランチからビルドされた Apache Beam SDK の開発バージョンを使用する必要があります。

Java: SDK 2.x

  1. プロジェクトの pom.xml ファイルに次のプロファイルを追加します。

    <profiles>
      <!-- Additional profiles listed here. -->
      <profile>
        <id>snapshot</id>
        <repositories>
          <repository>
            <id>apache.snapshots</id>
            <url>https://repository.apache.org/content/repositories/snapshots</url>
          </repository>
        </repositories>
      </profile>
    </profiles>
    
  2. プロジェクトをテストまたは実行する場合は、プロファイル オプションを pom.xml にリストされている id 値に設定し、beam.version プロパティを 2.24.0-SNAPSHOT 以降に設定します。次に例を示します。

    mvn test -Psnapshot -Dbeam.version=2.24.0-SNAPSHOT
    

    他のスナップショット値については、スナップショット インデックスをご覧ください。

Python

  1. GitHub にログインします。

  2. 結果リストに移動し、Apache Beam Python SDK の構築が正常に完了していることを確認します。

  3. メイン(マスター)ブランチからビルドされた最近完了したジョブをクリックします。

  4. サイドパネルで、[List files on Google Cloud Storage Bucket] をクリックします。

  5. メインパネルで、[List files on Google Cloud Storage Bucket] を展開します。

  6. ファイルリストから .zip ファイルを、Python プロジェクトを実行するローカルマシンまたは場所にダウンロードします。

    Cloud Storage バケット名は beam-wheels-staging であるため、ダウンロード URL を作成するときはこれを含める必要があります。次に例を示します。

    gsutil cp gs://beam-wheels-staging/master/02bf081d0e86f16395af415cebee2812620aff4b-207975627/apache-beam-2.25.0.dev0.zip <var>SAVE_TO_LOCATION</var>
    
  7. ダウンロードした zip ファイルをインストールします。

    pip install apache-beam-2.25.0.dev0.zip
    
  8. Apache Beam パイプラインを実行する場合は、--sdk_location フラグを渡して SDK zip ファイルを参照します。

    --sdk_location=apache-beam-2.25.0.dev0.zip
    

Java: SDK 1.x

BigQuery ジョブの詳細の表示

BigQuery ジョブを一覧表示するには、[BigQuery ジョブ] タブを開いて、BigQuery ジョブのロケーションを選択します。次に、[BigQuery ジョブの読み込み] をクリックしてダイアログを確定します。クエリが完了すると、ジョブリストが表示されます。

BigQuery ジョブ情報テーブル内の [BigQuery ジョブの読み込み] ボタン

ジョブ ID、タイプ、期間など、各ジョブの基本情報が提供されます。

現在のパイプライン ジョブの実行中に実施された BigQuery ジョブを示すテーブル。

特定のジョブの詳細情報については、[詳細] 列の [コマンドライン] をクリックします。

コマンドラインのモーダル ウィンドウで bq jobs describe コマンドをコピーし、ローカルまたは Cloud Shell で実行します。

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

bq jobs describe コマンドは、JobStatistics を出力します。これにより、低速または停滞している BigQuery ジョブの診断時に役立つ追加の詳細情報が提供されます。

または、SQL クエリで BigQueryIO を使用すると、クエリジョブが発行されます。[詳細] 列の [クエリを表示] をクリックして、ジョブで使用される SQL クエリを表示します。