Apache Beam ノートブックを使用した開発

JupyterLab ノートブックと Apache Beam インタラクティブ ランナーを使うと、「入力、評価、出力」ループ(REPL)のワークフローで反復するパイプラインの開発、パイプライン グラフの検査、個々の PCollection の解析ができます。これらの Apache Beam ノートブックは、AI Platform Notebooks で提供されています。これは、最新のデータ サイエンスおよび機械学習フレームワークがプリインストールされたノートブック仮想マシンをホストするマネージド サービスです。

このガイドでは、Apache Beam ノートブックを導入することで実現する機能を中心に取り上げますが、Apache Beam ノートブックのビルド方法については説明しません。Apache Beam の詳細については、Apache Beam プログラミング ガイドをご覧ください。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Compute Engine, AI Platform Notebooks API を有効にします。

    API を有効にする

このガイドの完了後に作成したリソースを削除すれば、今後課金は発生しません。詳しくは、クリーンアップをご覧ください。

Apache Beam ノートブック インスタンスの起動

  1. Google Cloud Console のプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。
  2. サイドパネルで [Dataflow] に移動し、[ノートブック] をクリックします。
  3. ツールバーで [ New Instance] をクリックします。
  4. [Apache Beam] > [GPU なし] を選択します。
  5. (省略可)GPU でノートブックを実行する場合は、[Apache Beam] > [1 つの NVIDIA Tesla T4] を選択します。
  6. [新しいノートブック インスタンス] ページで、ノートブック VM のネットワークを選択し、[作成] をクリックします。
  7. (省略可)GPU を備えたノートブック インスタンスを作成する場合は、[新しいノートブック インスタンス] ページで、[作成する] をクリックする前に、[NVIDIA GPU ドライバを自動的にインストールする] オプションをチェックします。
  8. (省略可)カスタム ノートブック インスタンスを設定する場合は、[カスタマイズ] をクリックします。インスタンス プロパティのカスタマイズについての詳細は、特定のプロパティを使用して AI Platform Notebooks インスタンスを作成するをご覧ください。
  9. リンクがアクティブになったら、[JupyterLab を開く] をクリックします。AI Platform Notebooks により、新しい Apache Beam ノートブック インスタンスが作成されます。

依存関係のインストール(省略可)

Apache Beam ノートブックには、Apache Beam と Google Cloud コネクタの依存関係がすでにインストールされています。サードパーティのライブラリに依存するカスタム コネクタや PTransform がパイプラインに含まれている場合は、ノートブック インスタンスを作成した後にインストールできます。詳しくは、AI Platform Notebooks ドキュメントの依存関係のインストールをご覧ください。

Apache Beam ノートブックを使ってみる

AI Platform Notebooks インスタンスを開くと、[Examples] フォルダにサンプル ノートブックが表示されます。現在利用できるサンプルは、次のとおりです。

  • Word Count
  • Streaming Word Count
  • Streaming NYC Taxi Ride Data
  • Dataflow Word Count
  • Apache Beam で GPU を使用する
  • データを可視化する

Apache Beam の基礎知識を説明する追加のチュートリアルがチュートリアル フォルダにあります。現在利用できるサンプルは、次のとおりです。

  • 基本オペレーション
  • 要素ごとのオペレーション
  • 集計
  • Windows
  • 入出力操作
  • ストリーミング

これらのノートブックには、Apache Beam のコンセプトと API の使用方法の理解を助ける説明テキストとコメント付きのコードブロックが含まれています。 また、チュートリアルでは、学習した内容を実践で練習できます。

ノートブック インスタンスの作成

[File] > [New] > [Notebook] に移動し、Apache Beam 2.22 以降のカーネルを選択します。

Apache Beam はノートブック インスタンスにインストールされているため、ノートブックには interactive_runner モジュールと interactive_beam モジュールが含まれます。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

ノートブックで他の Google API を使用している場合は、次の import ステートメントを追加します。

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

インタラクティブ オプションの設定

以下は、InteractiveRunner が無制限のソースからデータを記録するまでの時間を設定しています。この例では、期間は 10 分に設定されています。

ib.options.recording_duration = '10m'

また、recording_size_limit プロパティを使用して、無制限のソースの記録サイズ制限(バイト単位)を変更することもできます。

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

その他のインタラクティブ オプションについては、interactive_beam.options クラスをご覧ください。

パイプラインの作成

InteractiveRunner オブジェクトを使用して、パイプラインを初期化します。

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

データの読み取りと可視化

次の例は、指定された Pub/Sub トピックにサブスクリプションを作成し、そのサブスクリプションから読み取りを行う Apache Beam パイプラインを示しています。

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

パイプラインは、ソースからウィンドウごとに単語をカウントします。固定されたウィンドウを作成し、各ウィンドウの長さを 10 秒に設定します。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

データがウィンドウ処理されると、ウィンドウごとに単語をカウントします。

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

show() メソッドは、作成された PCollection をノートブックに可視化します。

ib.show(windowed_word_counts, include_window_info=True)

PCollection をテーブル形式で可視化する show メソッド。

nduration の 2 つのオプション パラメータを設定すると、show() から結果セットを絞り込むことができます。n を設定すると、結果セットが最大で n の要素数(20 など)を表示するように制限されます。n が設定されていない場合、デフォルトの動作として、ソースの記録が終了するまで最新の要素が一覧表示されます。duration を設定すると、結果セットはソース記録の開始から指定した秒数分のデータに制限されます。duration が設定されていない場合、デフォルトの動作として、記録が終了するまですべての要素が一覧表示されます。

両方のパラメータを設定した場合、いずれかがしきい値に達すると show() が停止します。次の例では、show() は記録されたソースからの最初の 30 秒に相当するデータに基づいて、計算された最大 20 個の要素を返します。

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

データの可視化を表示するには、visualize_data=Trueshow() メソッドに渡します。可視化には複数のフィルタを適用できます。次の可視化では、ラベルと軸でフィルタリングできます。

フィルタできる多様な UI 要素のセットとして PCollection を可視化する show メソッド。

Apache Beam ノートブックの可視化では、Pandas DataFrame も役立ちます。次の例では、最初に単語を小文字に変換してから、各単語の頻度を計算します。

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

collect() メソッドにより、Pandas DataFrame の出力を取得できます。

ib.collect(windowed_lower_word_counts, include_window_info=True)

Pandas DataFrame 内の PCollection を表す collect メソッド。

パイプラインの記録ステータスを理解する

可視化だけでなく、describe を呼び出すことで、ノートブック インスタンス内の 1 つまたはすべてのパイプラインの記録ステータスを調べることもできます。

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

describe() メソッドは、次の詳細情報を提供します。

  • ディスク上のパイプラインのすべての記録の合計サイズ(バイト単位)
  • バックグラウンドの記録ジョブの開始時間(Unix エポックからの秒数)
  • バックグラウンドの記録ジョブの現在のパイプライン ステータス
  • パイプラインの Python 変数

ノートブックに作成されたパイプラインから Dataflow ジョブを起動

  1. (省略可)ノートブックを使用して Dataflow ジョブを実行する前に、カーネルを再起動し、すべてのセルを再実行して出力を確認します。この手順を省略すると、ノートブックの隠れた状態がパイプライン オブジェクトのジョブグラフに影響を及ぼす可能性があります。
  2. Dataflow API を有効にします
  3. 次の import ステートメントを追加します。

    from apache_beam.runners import DataflowRunner
    
  4. パイプライン オプションを渡します。

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # Set the SDK location. This is used by Dataflow to locate the
    # SDK needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    パラメータ値は調整できます。たとえば、region の値を us-central1 から変更できます。

  5. DataflowRunner を使用してパイプラインを実行します。これにより、Dataflow サービスでジョブが実行されます。

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p は、パイプラインの作成のパイプライン オブジェクトです。

インタラクティブ ノートブックでこの変換を実行する方法の例については、ノートブック インスタンスの Dataflow Word Count ノートブックを参照してください。

あるいは、ノートブックを実行可能スクリプトとしてエクスポートし、生成された .py ファイルを前の手順で変更してから、Dataflow サービスにパイプラインをデプロイすることもできます。

ノートブックの保存

作成したノートブックは、実行中のノートブック インスタンスにローカルに保存されます。開発中にノートブック インスタンスをリセットまたはシャットダウンすると、これらの新しいノートブックは削除されます。ノートブックを後で使用できるようにするには、ワークステーションにローカルにダウンロードするか、GitHub に保存するか、別のファイル形式にエクスポートします。

クリーンアップ

Apache Beam ノートブック インスタンスの使用が終了したら、ノートブック インスタンスをシャットダウンして、Google Cloud で作成したリソースをクリーンアップします。