Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と表現力で変換、活用するフルマネージド サービスです。Apache Beam SDK を使用して、簡素化されたパイプライン開発環境を提供します。Apache Beam SDK は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しています。このクイックスタートでは、Dataflow を使用して次の操作を行う方法を説明します。
- Pub/Sub トピックにパブリッシュされたメッセージを読む
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
このクイックスタートでは、Java と Python で Dataflow を使用する方法について説明します。SQL もサポートされます。
カスタムデータ処理を行う予定がない場合は、UI ベースの Dataflow テンプレートを使用することもできます。
始める前に
- Cloud SDK のインストールと初期化の手順に沿って操作します。
- プロジェクトの課金を有効にします。
このクイックスタートを完了するには、次の API を有効にする必要があります。Compute Engine、Google Cloud のオペレーション スイート、Cloud Storage、Cloud Storage JSON、Pub/Sub、Cloud Scheduler、Resource Manager、App Engine です。
API がコンソールに表示されるまで数分かかることがあります。
サービス アカウント キーを作成します。
- [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
- [サービス アカウント名] フィールドに名前を入力します。
- [役割] リストで、[プロジェクト] > [オーナー] を選択します。
- [作成] をクリックします。
キーがブラウザのデフォルトのダウンロード フォルダに送信されます。
サービス アカウント キーを指すように
GOOGLE_APPLICATION_CREDENTIALS
環境変数を設定します。export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
バケット、プロジェクト、リージョンの変数を作成します。Cloud Storage バケット名は、グローバルに一意である必要があります。 このクイックスタートでコマンドを実行する場所に近い Dataflow リージョンを選択します。
BUCKET_NAME=BUCKET_NAME PROJECT_NAME=$(gcloud config get-value project) REGION=DATAFLOW_REGION
このプロジェクトが所有する Cloud Storage バケットを作成します。
gsutil mb gs://$BUCKET_NAME
このプロジェクトで Pub/Sub トピックを作成します。
gcloud pubsub topics create cron-topic
このプロジェクトで Cloud Scheduler ジョブを作成します。このジョブは、1 分間隔で Cloud Pub/Sub トピックにメッセージをパブリッシュします。
App Engine アプリがプロジェクトに存在しない場合、この手順で作成されます。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=cron-topic --message-body="Hello!"
ジョブを開始します。
gcloud scheduler jobs run publisher-job
次のコマンドを使用して、クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Pub/Sub から Cloud Storage へのメッセージのストリーミング
コードサンプル
このサンプルコードでは、Dataflow を使用して次のことを行います。
- Pub/Sub メッセージを読み取ります。
- パブリッシュ タイムスタンプにより、固定サイズの間隔でメッセージをウィンドウ処理(グループ化)します。
各ウィンドウのメッセージを Cloud Storage のファイルに書き込みます。
Java
Python
パイプラインの開始
パイプラインを開始するには、次のコマンドを実行します。
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_NAME \ --region=$REGION \ --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_NAME \ --region=$REGION \ --input_topic=projects/$PROJECT_NAME/topics/cron-topic \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --temp_location=gs://$BUCKET_NAME/temp
上記のコマンドはローカルで実行され、クラウドで実行される Dataflow ジョブを起動します。コマンドが JOB_MESSAGE_DETAILED: Workers
have started successfully
を返したら、Ctrl+C
を使用してローカル プログラムを終了します。
ジョブとパイプラインの進行状況の確認
ジョブの進行状況は Dataflow コンソールで確認できます。
[ジョブの詳細] ビューを開いて、次の情報を確認します。
- ジョブの構成
- ジョブのログ
- ステージ指標
Cloud Storage に出力ファイルが表示されるまで数分間かかる場合があります。
または、以下のコマンドラインを使用して、書き出されたファイルを確認します。
gsutil ls gs://${BUCKET_NAME}/samples/
出力は次のようになります。
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32 gs://{$BUCKET_NAME}/samples/output-22:32-22:34 gs://{$BUCKET_NAME}/samples/output-22:34-22:36 gs://{$BUCKET_NAME}/samples/output-22:36-22:38
クリーンアップ
Cloud Scheduler ジョブを削除します。
gcloud scheduler jobs delete publisher-job
Dataflow コンソールで、ジョブを停止します。パイプラインをドレインせずにキャンセルします。
トピックを削除します。
gcloud pubsub topics delete cron-topic
パイプラインによって作成されたファイルを削除します。
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Cloud Storage バケットを削除します。
gsutil rb gs://${BUCKET_NAME}
次のステップ
カスタム タイムスタンプで Pub/Sub メッセージをウィンドウ処理する場合は、タイムスタンプを Pub/Sub メッセージ内の属性として指定し、そのカスタム タイムスタンプを PubsubIO の
withTimestampAttribute
で使用できます。Google のストリーミング用に設計されたオープンソースの Dataflow テンプレートをご覧ください。
Dataflow と Pub/Sub の統合方法の詳細を確認します。
ウィンドウ処理の詳細については、Apache Beam モバイルゲーム パイプラインの例をご覧ください。