Apache Beam ノートブックを使用した開発

JupyterLab ノートブックと Apache Beam インタラクティブ ランナーを使うと、「入力、評価、出力」ループ(REPL)のワークフローで反復するパイプラインの開発、パイプライン グラフの検査、個々の PCollection の解析ができます。これらの Apache Beam ノートブックは、Vertex AI Workbench ユーザー管理ノートブックで提供されています。これは、最新のデータ サイエンスと機械学習フレームワークがプリインストールされたノートブック仮想マシンをホストするマネージド サービスです。

このガイドでは、Apache Beam ノートブックを導入することで実現する機能を中心に取り上げますが、Apache Beam ノートブックのビルド方法については説明しません。Apache Beam の詳細については、Apache Beam プログラミング ガイドをご覧ください。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Compute Engine, Notebooks API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  7. Compute Engine, Notebooks API を有効にします。

    API を有効にする

このガイドの完了後に作成したリソースを削除すれば、今後課金は発生しません。詳しくは、クリーンアップをご覧ください。

Apache Beam ノートブック インスタンスを起動する

  1. Google Cloud コンソールの [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
  2. サイドパネルで [Dataflow] に移動し、[ワークベンチ] をクリックします。
  3. ツールバーで [新しいインスタンス] をクリックします。
  4. [Apache Beam] > [Withoug GPUs] を選択します。
  5. (省略可)GPU 上でノートブックを実行する場合は、[Apache Beam] > [With 1 NVIDIA Tesla T4] を選択します。
  6. [新しいノートブック インスタンス] ページで、ノートブック VM のネットワークを選択し、[作成] をクリックします。
  7. (省略可)GPU を備えたノートブック インスタンスを作成する場合は、[新しいノートブック インスタンス] ページで、[作成] をクリックする前に、[NVIDIA GPU ドライバを自動的にインストールする] オプションをチェックします。
  8. (省略可)カスタム ノートブック インスタンスを設定する場合は、[カスタマイズ] をクリックします。インスタンス プロパティのカスタマイズについて詳しくは、特定のプロパティでユーザー管理のノートブック インスタンスを作成するをご覧ください。
  9. リンクがアクティブになったら、[JupyterLab を開く] をクリックします。Vertex AI Workbench が、新しい Apache Beam ノートブック インスタンスを作成します。

依存関係をインストールする(省略可)

Apache Beam ノートブックには、Apache Beam と Google Cloud コネクタの依存関係がすでにインストールされています。サードパーティのライブラリに依存するカスタム コネクタや PTransform がパイプラインに含まれている場合は、ノートブック インスタンスを作成した後にインストールできます。詳細については、ユーザー管理ノートブックのドキュメントに関する依存関係のインストールをご覧ください。

Apache Beam ノートブックを使ってみる

ユーザー管理のノートブック インスタンスを開くと、Examples フォルダにサンプル ノートブックが表示されます。現在利用できるサンプルは、次のとおりです。

  • Word Count
  • Streaming Word Count
  • Streaming NYC Taxi Ride Data
  • Dataflow Word Count
  • Apache Beam SQL in notebooks
  • Apache Beam で GPU を使用する
  • データを可視化する

Apache Beam の基礎知識を説明する追加のチュートリアルが [Tutorials] フォルダにあります。現在利用できるサンプルは、次のとおりです。

  • 基本オペレーション
  • 要素ごとのオペレーション
  • 集計
  • Windows
  • IO オペレーション
  • ストリーミング

これらのノートブックには、Apache Beam のコンセプトと API の使用方法の理解を助ける説明テキストとコメント付きのコードブロックが含まれています。また、チュートリアルでは、学習したコンセプトを実践で練習できます。

ノートブック インスタンスの作成

[File] > [New] > [Notebook] に移動し、Apache Beam 2.22 以降のカーネルを選択します。

Apache Beam はノートブック インスタンスにインストールされているため、ノートブックには interactive_runner モジュールと interactive_beam モジュールが含まれます。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

ノートブックで他の Google API を使用している場合は、次の import ステートメントを追加します。

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

インタラクティブ オプションを設定する

以下は、InteractiveRunner が無制限のソースからデータを記録するまでの時間を設定しています。この例では、期間は 10 分に設定されています。

ib.options.recording_duration = '10m'

また、recording_size_limit プロパティを使用して、無制限のソースの記録サイズ制限(バイト単位)を変更することもできます。

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

その他のインタラクティブ オプションについては、interactive_beam.options クラスをご覧ください。

パイプラインを作成する

InteractiveRunner オブジェクトを使用して、パイプラインを初期化します。

options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(InteractiveRunner(), options=options)

データの読み取りと可視化

次の例は、指定された Pub/Sub トピックにサブスクリプションを作成し、そのサブスクリプションから読み取りを行う Apache Beam パイプラインを示しています。

words = p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

パイプラインは、ソースからウィンドウごとに単語をカウントします。固定されたウィンドウを作成し、各ウィンドウの長さを 10 秒に設定します。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

データがウィンドウ処理されると、ウィンドウごとに単語をカウントします。

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

show() メソッドは、作成された PCollection をノートブックに可視化します。

ib.show(windowed_word_counts, include_window_info=True)

PCollection をテーブル形式で可視化する show メソッド。

nduration の 2 つのオプション パラメータを設定すると、show() から結果セットを絞り込むことができます。n を設定すると、結果セットが最大で n の要素数(20 など)を表示するように制限されます。n が設定されていない場合、デフォルトの動作として、ソースの記録が終了するまで最新の要素が一覧表示されます。duration を設定すると、結果セットはソース記録の開始から指定した秒数分のデータに制限されます。duration が設定されていない場合、デフォルトの動作として、記録が終了するまですべての要素が一覧表示されます。

両方のパラメータを設定した場合、いずれかがしきい値に達すると show() が停止します。次の例では、show() は記録されたソースからの最初の 30 秒に相当するデータに基づいて、計算された最大 20 個の要素を返します。

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

データの可視化を表示するには、visualize_data=Trueshow() メソッドに渡します。可視化には複数のフィルタを適用できます。次の可視化では、ラベルと軸でフィルタリングできます。

フィルタできる多様な UI 要素のセットとして PCollection を可視化する show メソッド。

Apache Beam ノートブックの可視化では、Pandas DataFrame も役立ちます。次の例では、最初に単語を小文字に変換してから、各単語の頻度を計算します。

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

collect() メソッドにより、Pandas DataFrame の出力を取得できます。

ib.collect(windowed_lower_word_counts, include_window_info=True)

Pandas DataFrame 内の PCollection を表す collect メソッド。

Interactive Beam インスペクタによるデータの可視化

show()collect() を絶え間なく呼び出すことで、PCollection のデータの取り込みを邪魔してしまう場合があります。特に出力が画面スペースの多くを占有し、ノートブックの操作が困難になることがあります。また、複数の PCollection を並べて比較し、変換が意図したとおりに機能するかどうかを検証することもできます。たとえば、1 つの PCollection が変換され、別の変換が生成される場合などです。これらのユースケースでは、Interactive Beam インスペクタのほうが便利です。

Interactive Beam インスペクタは、Apache Beam ノートブックにプリインストールされている JupyterLab 拡張機能 apache-beam-jupyterlab-sidepanel として提供されています。この拡張機能を使用すると、show()collect() を明示的に呼び出すことなく、各 PCollection に関連付けられたパイプラインとデータの状態をインタラクティブに検査できます。

インスペクタを開く方法は 3 つあります。

  • JupyterLab の上部メニューバーで [Interactive Beam] をクリックします。プルダウンで [Open Inspector] をクリックしてインスペクタを開きます。

    メニューからインスペクタを開く

  • ランチャー ページを使用します。ランチャー ページが開いていない場合は、[File] -> [New Launcher] の順にクリックして開きます。ランチャー ページで Interactive Beam を探して [Open Inspector] をクリックし、インスペクタを開きます。

    ランチャーからインスペクタを開く

  • コマンド パレットを使用します。JupyterLab の上部メニューバーで [View] -> [Activate Command Palette] の順にクリックします。ポップアップで Interactive Beam を検索して、拡張機能のすべてのオプションを一覧表示します。[Open Inspector] をクリックしてインスペクタを開きます。

    コマンド パレットでインスペクタを開く

インスペクタが開くときに:

  1. 開いているノートブックが 1 つだけの場合、インスペクタは自動的にそのノートブックに接続します。

  2. それ以外の場合は、ダイアログが表示され、カーネル(ノートブックが開いていない場合)または接続先のノートブック セッション(複数のノートブックが開いている場合)を選択できます。

    接続するノートブックの選択

開いたノートブックに複数のインスペクタを開き、ワークスペースでタブを自由にドラッグ&ドロップすることでインスペクタを並べ替えることができます。

2 つのインスペクタを開き、並べて配置

ノートブックでセルを実行すると、インスペクタ ページが自動的に更新されます。左側には、接続されたノートブックで定義されたパイプラインと PCollection が表示されます。PCollection は、所属するパイプラインごとに整理されます。ヘッダーのパイプラインをクリックすると、折りたたむことができます。

パイプラインと PCollection リスト内のアイテムをクリックすると、対応する可視化データがインスペクタの右側にレンダリングされます。

  • PCollection の場合、インスペクタは、追加のウィジェットを使用してデータをレンダリングします(データがまだ制限なしの PCollection に送信されている場合は動的にレンダリングします)。[APPLY] ボタンをクリックすると、可視化が調整されます。

    インスペクタ ページ

  • パイプラインである場合は、インスペクタにパイプラインのグラフが表示されます。

    インスペクタ ページ

匿名パイプラインが存在することがあります。これはアクセス可能な PCollection を含むパイプラインですが、メイン セッションによって参照されなくなります。次に例を示します。

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

上記の例では、空のパイプライン p と、1 つの PCollection pcoll を含む匿名パイプラインが作成されます。匿名パイプラインには、引き続き pcoll.pipeline を使用してアクセスできます。

左側のパイプラインと PCollection のリストを切り替えると、可視化に使用するスペースを広げることができます。左側リストの切り替え

パイプラインの記録ステータスについて

可視化だけでなく、describe を呼び出すことで、ノートブック インスタンス内の 1 つまたはすべてのパイプラインの記録ステータスを調べることもできます。

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

describe() メソッドは、次の詳細情報を提供します。

  • ディスク上のパイプラインのすべての記録の合計サイズ(バイト単位)
  • バックグラウンドの記録ジョブの開始時間(Unix エポックからの秒数)
  • バックグラウンドの記録ジョブの現在のパイプライン ステータス
  • パイプラインの Python 変数

ノートブックに作成されたパイプラインから Dataflow ジョブを起動する

  1. (省略可)ノートブックを使用して Dataflow ジョブを実行する前に、カーネルを再起動し、すべてのセルを再実行して出力を確認します。この手順を省略すると、ノートブックの隠れた状態がパイプライン オブジェクトのジョブグラフに影響を及ぼす可能性があります。
  2. Dataflow API を有効にします
  3. 次の import ステートメントを追加します。

    from apache_beam.runners import DataflowRunner
    
  4. パイプライン オプションを渡します。

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    パラメータ値は調整できます。たとえば、region の値を us-central1 から変更できます。

  5. DataflowRunner を使用してパイプラインを実行します。これにより、Dataflow サービスでジョブが実行されます。

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p は、パイプラインの作成のパイプライン オブジェクトです。

インタラクティブ ノートブックでこの変換を実行する方法の例については、ノートブック インスタンスの Dataflow Word Count ノートブックを参照してください。

あるいは、ノートブックを実行可能スクリプトとしてエクスポートし、生成された .py ファイルを前の手順で変更してから、Dataflow サービスにパイプラインをデプロイすることもできます。

ノートブックを保存する

作成したノートブックは、実行中のノートブック インスタンスにローカルに保存されます。開発中にノートブック インスタンスをリセットまたはシャットダウンした場合、これらの新しいノートブックは、/home/jupyter ディレクトリの下に作成される限り保持されます。ただし、ノートブック インスタンスが削除されると、それらのノートブックも削除されます。

ノートブックを後で使用できるようにするには、ワークステーションにローカルにダウンロードするか、GitHub に保存するか、別のファイル形式にエクスポートします。

ノートブックを追加の永続ディスクに保存する

さまざまなノートブック インスタンスでノートブックやスクリプトなどの処理を保持する場合は、Persistent Disk に保存します。

  1. Persistent Disk を作成するか、アタッチします。手順に沿って ssh を使用してノートブック インスタンスの VM に接続し、開いている Cloud Shell でコマンドを発行します。

  2. Persistent Disk がマウントされているディレクトリ(/mnt/myDisk など)をメモします。

  3. ノートブック インスタンスの VM の詳細を編集し、Custom metadata に次のエントリを追加します: キー - container-custom-params、値 - -v /mnt/myDisk:/mnt/myDiskマウントされた PD をバインドするために必要な追加のメタデータ

  4. [保存] をクリックします。

  5. これらの変更を更新するには、ノートブック インスタンスをリセットします。ノートブック インスタンスをリセットする

  6. リセット後にリンクがアクティブになったら、[JupyterLab を開く] をクリックします。JupyterLab UI が利用可能になるまでに時間がかかることがあります。UI が表示されたら、ターミナルを開いてコマンド ls -al /mnt を実行します。/mnt/myDisk ディレクトリが表示されます。ボリュームのバインドの一覧表示

これで、作業を /mnt/myDisk ディレクトリに保存できるようになりました。ノートブック インスタンスが削除されても、Persistent Disk はプロジェクトに存在し続けます。この Persistent Disk は、他のノートブック インスタンスにアタッチできます。

クリーンアップ

Apache Beam ノートブック インスタンスの使用が終了したら、ノートブック インスタンスをシャットダウンして、Google Cloud で作成したリソースをクリーンアップします。

高度な機能

ノートブック管理クラスタ上のインタラクティブな FlinkRunner

ノートブックから本番環境サイズのデータをインタラクティブに操作するには、いくつかの汎用的なパイプライン オプションとともに FlinkRunner を使用することで、ノートブック セッションに対して長期的な Dataproc クラスタを管理し、Beam パイプラインを分散して実行するように指示できます。

前提条件

この機能を使用するには:

  • Dataproc API を有効にする必要があります。
  • ノートブック インスタンスを実行するサービス アカウントには、Dataproc の管理者または編集者のロールが必要です。
  • Beam バージョン 2.40.0 以上のノートブック カーネルを使用します。

構成

最小の設定:

# Set a Cloud Storage bucket to cache source recording and PCollections.
# By default, the cache is on the notebook instance itself, but that does not
# apply to the distributed execution scenario.
ib.options.cache_root = 'gs://<CHANGE_ME>/flink_on_dataproc'

# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

options = PipelineOptions()
# Instruct the notebook that Google Cloud is used to run the FlinkRunner.
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.project = '<YOUR-PROJECT>'

用途

p = beam.Pipeline(interactive_flink_runner, options=options)
pcoll = p | 'read' >> ReadWords('gs://apache-beam-samples/shakespeare/kinglear.txt')
pcoll2 = ...
# The notebook session automatically starts and manages a cluster to execute
# your pipelines for execution with the FlinkRunner.
ib.show(pcoll)
# This reuses the existing cluster.
ib.collect(pcoll2)

# Describe the cluster running the pipeline.
# You can access the Flink Dashboard from the printed link.
ib.clusters.describe(p)

# Cleans up all long-lasting clusters managed by the notebook session.
ib.clusters.cleanup(force=True)

(オプション)明示的なプロビジョニング

その他のオプション

# Change this if the pipeline needs to be executed in a different region
# than the default 'us-central1'. For example, to set it to 'us-west1':
cloud_options.region = 'us-west1'

# Explicitly provision the notebook-managed cluster.
worker_options = options.view_as(WorkerOptions)
# Provision 3 workers to run the pipeline.
worker_options.num_workers=3
# Use the default subnetwork.
worker_options.subnetwork='default'
# Choose the machine type for the workers.
worker_options.machine_type='n2-standard-16'

# When working with non-official Beam releases, e.g. Beam built from source
# code, configure the environment to use a compatible released SDK container.
options.view_as(PortableOptions).environment_config = 'apache/beam_python3.7_sdk:2.40.0'

ノートブック管理クラスタ

  • デフォルトでは、パイプライン オプションが指定されていない場合、Interactive Beam は、FlinkRunner でパイプラインを実行するために、最後に使用されたクラスタを常に再利用します。
    • この動作を回避するには、たとえば、ノートブックでホストされていない FlinkRunner を使用して同じノートブック セッションで別のパイプラインを実行する場合は、ib.clusters.set_default_cluster(None) を実行します。
  • 既存の Dataproc クラスタにマッピングされたプロジェクト、リージョン、プロビジョニング構成を使用する新しいパイプラインをインスタンス化する場合、Dataflow はクラスタを再利用します(これは直近に使用されたクラスタではない場合があります)。
  • ただし、プロビジョニングの変更(クラスタのサイズ変更など)が行われるたびに、目的の変更を行うために新しいクラスタが作成されます。クラスタのサイズを変更する場合は、ib.clusters.cleanup(pipeline) を使用して不要なクラスタをクリーンアップすることで、クラウド リソースが不足しないようにします。
  • Flink master_url が指定されていて、これがノートブック セッションによって管理されているクラスタに属している場合、Dataflow はマネージド クラスタを再利用します。
    • master_url がノートブック セッションで不明な場合、ユーザーのセルフホスト型の FlinkRunner が必要であることを意味します。ノートブックは暗黙的に何も実行しません。

Beam SQL と beam_sql マジック

Beam SQL では、SQL ステートメントを使用して、制限付き PCollection と制限なし PCollection をクエリできます。Apache Beam ノートブックで作業している場合は、IPython カスタム マジック beam_sql を使用して、パイプラインの開発を高速化できます。

-h または --help オプションを使用して、beam_sql マジックの使用方法を確認できます。 beam_sql のヘルプを確認する

定数値から PCollection を作成できます。 定数値から PCollection を作成する

複数の PCollection を結合できます。 複数の PCollection を結合する

-r DataflowRunner または --runner DataflowRunner オプションを指定して Dataflow ジョブを起動できます。 Beam SQL を使用して Dataflow ジョブを起動する

詳細については、サンプル ノートブック Apache Beam SQL in notebooks をご覧ください。

JIT コンパイラと GPU を使用して高速化する

numbaGPU などのライブラリを使用して、Python コードと Apache Beam パイプラインを高速化できます。nvidia-tesla-t4 GPU で作成された Apache Beam ノートブック インスタンスで、numba.cuda.jit を使用して Python コードをコンパイルし、GPU で実行できます。必要に応じて、numba.jit または numba.njit を使用して Python コードをマシンコードにコンパイルし、CPU での実行を高速化することもできます。

GPU で処理される DoFn

class Sampler(beam.DoFn):
    def __init__(self, blocks=80, threads_per_block=64):
        # Uses only 1 cuda grid with below config.
        self.blocks = blocks
        self.threads_per_block = threads_per_block

    def setup(self):
        import numpy as np
        # An array on host as the prototype of arrays on GPU to
        # hold accumulated sub count of points in the circle.
        self.h_acc = np.zeros(
            self.threads_per_block * self.blocks, dtype=np.float32)

    def process(self, element: Tuple[int, int]):
        from numba import cuda
        from numba.cuda.random import create_xoroshiro128p_states
        from numba.cuda.random import xoroshiro128p_uniform_float32

        @cuda.jit
        def gpu_monte_carlo_pi_sampler(rng_states, sub_sample_size, acc):
            """Uses GPU to sample random values and accumulates the sub count
            of values within a circle of radius 1.
            """
            pos = cuda.grid(1)
            if pos < acc.shape[0]:
                sub_acc = 0
                for i in range(sub_sample_size):
                    x = xoroshiro128p_uniform_float32(rng_states, pos)
                    y = xoroshiro128p_uniform_float32(rng_states, pos)
                    if (x * x + y * y) <= 1.0:
                        sub_acc += 1
                acc[pos] = sub_acc

        rng_seed, sample_size = element
        d_acc = cuda.to_device(self.h_acc)
        sample_size_per_thread = sample_size // self.h_acc.shape[0]
        rng_states = create_xoroshiro128p_states(self.h_acc.shape[0], seed=rng_seed)
        gpu_monte_carlo_pi_sampler[self.blocks, self.threads_per_block](
            rng_states, sample_size_per_thread, d_acc)
        yield d_acc.copy_to_host()

GPU での実行: GPU で DoFn を実行する

詳細については、サンプル ノートブック Use GPUs with Apache Beam をご覧ください。