JupyterLab ノートブックと Apache Beam インタラクティブ ランナーを使うと、「入力、評価、出力」ループ(REPL)のワークフローで反復するパイプラインの開発、パイプライン グラフの検査、個々の PCollection の解析ができます。これらの Apache Beam ノートブックは、AI Platform Notebooks で提供されています。これは、最新のデータ サイエンスおよび機械学習フレームワークがプリインストールされたノートブック仮想マシンをホストするマネージド サービスです。
このガイドでは、Apache Beam ノートブックを導入することで実現する機能を中心に取り上げますが、Apache Beam ノートブックのビルド方法については説明しません。Apache Beam の詳細については、Apache Beam プログラミング ガイドをご覧ください。
始める前に
- Google アカウントにログインします。
Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する。
- Compute Engine API を有効にします。
このガイドの完了後に作成したリソースを削除すれば、今後課金は発生しません。詳しくは、クリーンアップをご覧ください。
Apache Beam ノートブック インスタンスの起動
- Google Cloud Console のプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。
- サイドパネルで [Dataflow] に移動し、[ノートブック] をクリックします。
- ツールバーで [ New Instance] をクリックします。
- [Apache Beam] を選択します。
- [新しいノートブック インスタンス] ページで、ノートブック VM のネットワークを選択し、[作成] をクリックします。
- (省略可)カスタム ノートブック インスタンスを設定する場合は、[カスタマイズ] をクリックします。インスタンス プロパティのカスタマイズについての詳細は、特定のプロパティを使用して AI Platform Notebooks インスタンスを作成するをご覧ください。
- リンクがアクティブになったら、[JupyterLab を開く] をクリックします。AI Platform Notebooks により、新しい Apache Beam ノートブック インスタンスが作成されます。
依存関係のインストール(省略可)
Apache Beam ノートブックには、Apache Beam と Google Cloud コネクタの依存関係がすでにインストールされています。サードパーティのライブラリに依存するカスタム コネクタや PTransform がパイプラインに含まれている場合は、ノートブック インスタンスを作成した後にインストールできます。詳しくは、AI Platform Notebooks ドキュメントの依存関係のインストールをご覧ください。
Apache Beam ノートブックを使ってみる
AI Platform Notebooks インスタンスを開くと、[Examples] フォルダにサンプル ノートブックが表示されます。現在利用できるサンプルは、次のとおりです。
- Word Count
- Streaming Word Count
- Streaming NYC Taxi Ride Data
- Dataflow Word Count
これらのノートブックには、Apache Beam のコンセプトと API の使用方法の理解を助ける説明テキストとコメント付きのコードブロックが含まれています。
ノートブック インスタンスの作成
[File] > [New] > [Notebook] に移動し、Apache Beam 2.22 以降のカーネルを選択します。
Apache Beam はノートブック インスタンスにインストールされているため、ノートブックには interactive_runner
モジュールと interactive_beam
モジュールが含まれます。
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
ノートブックで他の Google API を使用している場合は、次の import ステートメントを追加します。
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
インタラクティブ オプションの設定
以下は、InteractiveRunner が制限のないソースからデータを記録する時間を設定しています。この例では、期間は 10 分に設定されています。
ib.options.recording_duration = '10m'
また、recording_size_limit
プロパティを使用して、制限のないソースの記録サイズ制限(バイト単位)を変更することもできます。
# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9
その他のインタラクティブ オプションについては、interactive_beam.options クラスをご覧ください。
パイプラインの作成
InteractiveRunner
オブジェクトを使用して、パイプラインを初期化します。
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
p = beam.Pipeline(InteractiveRunner(), options=options)
データの読み取りと可視化
次の例は、指定された Pub/Sub トピックへのサブスクリプションを作成し、サブスクリプションからの読み取りを行う Apache Beam パイプラインを示しています。
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
パイプラインは、ソースからウィンドウごとに単語をカウントします。固定されたウィンドウを作成し、各ウィンドウの長さを 10 秒に設定します。
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
データがウィンドウ処理されると、ウィンドウごとに単語をカウントします。
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
show()
メソッドは、作成された PCollection をノートブックに可視化します。
ib.show(windowed_word_counts, include_window_info=True)
n
と duration
の 2 つのオプション パラメータを設定すると、show()
から結果のセットを絞り込むことができます。n
を設定すると、結果のセットは最大で n
の要素数(20 など)を表示するように制限されます。n
が設定されていない場合、デフォルトの動作として、ソースの記録が終了するまで最新の要素が一覧表示されます。duration
を設定すると、ソースの記録の開始から始まり、結果のセットは、指定された秒数に相当するデータに制限されます。duration
が設定されていない場合、デフォルトでは記録が終了するまですべての要素をリストします。
両方のパラメータを設定した場合、いずれかがしきい値に達すると show()
が停止します。次の例では、show()
は記録されたソースからの最初の 30 秒に相当するデータに基づいて、計算された最大 20 個の要素を返します。
ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)
データの可視化を表示するには、visualize_data=True
を show()
メソッドに渡します。可視化には複数のフィルタを適用できます。次の可視化では、ラベルと軸でフィルタリングできます。
Apache Beam ノートブックの可視化では、Pandas DataFrame も役立ちます。次の例では、最初に単語を小文字に変換してから、各単語の頻度を計算します。
windowed_lower_word_counts = (windowed_words
| beam.Map(lambda word: word.lower())
| "count" >> beam.combiners.Count.PerElement())
collect()
メソッドにより、Pandas DataFrame の出力を取得できます。
ib.collect(windowed_lower_word_counts, include_window_info=True)
パイプラインの記録ステータスを理解する
可視化だけでなく、describe を呼び出すことで、ノートブック インスタンス内の 1 つまたはすべてのパイプラインの記録ステータスを調べることもできます。
# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)
describe()
メソッドは、次の詳細情報を提供します。
- ディスク上のパイプラインのすべての記録の合計サイズ(バイト単位)
- バックグラウンドの記録ジョブの開始時刻(Unix エポックからの秒数)
- バックグラウンドの記録ジョブの現在のパイプライン ステータス
- パイプラインの Python 変数
ノートブックに作成されたパイプラインから Dataflow ジョブを起動
- (省略可)ノートブックを使用して Dataflow ジョブを実行する前に、カーネルを再起動し、すべてのセルを再実行して出力を確認します。この手順を省略すると、ノートブックの隠れた状態がパイプライン オブジェクトのジョブグラフに影響を及ぼす可能性があります。
- Dataflow API を有効にします。
次の import ステートメントを追加します。
from apache_beam.runners import DataflowRunner
パイプライン オプションを渡します。
# Set up Apache Beam pipeline options. options = pipeline_options.PipelineOptions() # Set the project to the default project in your current Google Cloud # environment. _, options.view_as(GoogleCloudOptions).project = google.auth.default() # Set the Google Cloud region to run Dataflow. options.view_as(GoogleCloudOptions).region = 'us-central1' # Choose a Cloud Storage location. dataflow_gcs_location = 'gs://<change me>/dataflow' # Set the staging location. This location is used to stage the # Dataflow pipeline and SDK binary. options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location # Set the temporary location. This location is used to store temporary files # or intermediate results before outputting to the sink. options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location # Set the SDK location. This is used by Dataflow to locate the # SDK needed to run the pipeline. options.view_as(pipeline_options.SetupOptions).sdk_location = ( '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' % beam.version.__version__)
パラメータ値は調整できます。たとえば、
region
の値をus-central1
から変更できます。DataflowRunner
を使用してパイプラインを実行します。これにより、Dataflow サービスでジョブが実行されます。runner = DataflowRunner() runner.run_pipeline(p, options=options)
p
は、パイプラインの作成のパイプライン オブジェクトです。
インタラクティブ ノートブックでこの変換を実行する方法の例については、ノートブック インスタンスの Dataflow Word Count ノートブックを参照してください。
あるいは、ノートブックを実行可能スクリプトとしてエクスポートし、生成された .py
ファイルを前の手順で変更してから、Dataflow サービスにパイプラインをデプロイすることもできます。
ノートブックの保存
作成したノートブックは、実行中のノートブック インスタンスにローカルに保存されます。開発中にノートブック インスタンスをリセットまたはシャットダウンすると、これらの新しいノートブックは削除されます。ノートブックを後で使用できるようにするには、ワークステーションにローカルにダウンロードするか、GitHub に保存するか、別のファイル形式にエクスポートします。
クリーンアップ
Apache Beam ノートブック インスタンスの使用が終了したら、ノートブック インスタンスをシャットダウンして、Google Cloud で作成したリソースをクリーンアップします。