Dataflow を使用して Pub/Sub からメッセージをストリーミングする
Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と表現力で変換、活用するフルマネージド サービスです。Apache Beam SDK を使用して、簡素化されたパイプライン開発環境を提供します。Apache Beam SDK は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しています。このクイックスタートでは、Dataflow を使用して次の操作を行う方法を説明します。
- Pub/Sub トピックにパブリッシュされたメッセージを読む
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
このクイックスタートでは、Java と Python で Dataflow を使用する方法について説明します。SQL もサポートされます。このクイックスタートは、ご利用を開始していただくにあたり一時的な認証情報を提供する Google Cloud Skills Boost チュートリアルとしても掲載しています。
カスタムデータ処理を行う予定がない場合は、UI ベースの Dataflow テンプレートを使用することもできます。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
-
作成した Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
-
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler API を有効にします。
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
認証を設定します。
-
サービス アカウントを作成します。
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
SERVICE_ACCOUNT_NAME
をサービス アカウントの名前に置き換えます。 -
サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
次のように置き換えます。
SERVICE_ACCOUNT_NAME
: サービス アカウントの名前PROJECT_ID
: サービス アカウントを作成したプロジェクト IDROLE
: 付与するロール
-
サービス アカウントのロールを使用して、そのサービス アカウントを他のリソースに関連付けることができるロールを Google アカウントに付与します。
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
次のように置き換えます。
SERVICE_ACCOUNT_NAME
: サービス アカウントの名前PROJECT_ID
: サービス アカウントを作成したプロジェクト IDUSER_EMAIL
: Google アカウントのメールアドレス
-
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
-
作成した Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
-
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler API を有効にします。
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
認証を設定します。
-
サービス アカウントを作成します。
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
SERVICE_ACCOUNT_NAME
をサービス アカウントの名前に置き換えます。 -
サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
次のように置き換えます。
SERVICE_ACCOUNT_NAME
: サービス アカウントの名前PROJECT_ID
: サービス アカウントを作成したプロジェクト IDROLE
: 付与するロール
-
サービス アカウントのロールを使用して、そのサービス アカウントを他のリソースに関連付けることができるロールを Google アカウントに付与します。
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
次のように置き換えます。
SERVICE_ACCOUNT_NAME
: サービス アカウントの名前PROJECT_ID
: サービス アカウントを作成したプロジェクト IDUSER_EMAIL
: Google アカウントのメールアドレス
-
-
Google アカウントの認証情報を作成します。
gcloud auth application-default login
Pub/Sub プロジェクトを設定する
-
バケット、プロジェクト、およびリージョンの変数を作成します。 Cloud Storage のバケット名は、グローバルに一意である必要がある。 このクイックスタートでコマンドを実行する場所に近接した Dataflow リージョンを選択します。
REGION
変数の値は有効なリージョン名にする必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
このプロジェクトが所有する Cloud Storage バケットを作成します。
gsutil mb gs://$BUCKET_NAME
-
このプロジェクトで Pub/Sub トピックを作成します。
gcloud pubsub topics create $TOPIC_ID
-
このプロジェクトで Cloud Scheduler ジョブを作成します。このジョブは、1 分間隔で Cloud Pub/Sub トピックにメッセージをパブリッシュします。
App Engine アプリがプロジェクトに存在しない場合は、この手順で作成されます。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
ジョブを開始します。
gcloud scheduler jobs run publisher-job --location=$REGION
-
次のコマンドを使用して、クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Pub/Sub から Cloud Storage へのメッセージのストリーミング
コードサンプル
このサンプルコードでは、Dataflow を使用して次のことを行います。
- Pub/Sub メッセージを読み取ります。
- パブリッシュ タイムスタンプにより、固定サイズの間隔でメッセージをウィンドウ処理(グループ化)します。
各ウィンドウのメッセージを Cloud Storage のファイルに書き込みます。
Java
Python
パイプラインを開始する
パイプラインを開始するには、次のコマンドを実行します。
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上記のコマンドがローカルで実行され、クラウドで実行される Dataflow ジョブを起動します。コマンドが JOB_MESSAGE_DETAILED: Workers
have started successfully
を返したら、Ctrl+C
を使用してローカル プログラムを終了します。
ジョブとパイプラインの進行状況の確認
ジョブの進行状況は Dataflow コンソールで確認できます。
[ジョブの詳細] ビューを開いて、次の情報を確認します。
- ジョブの構成
- ジョブのログ
- ステージ指標
Cloud Storage に出力ファイルが表示されるまで数分間かかる場合があります。
または、以下のコマンドラインを使用して、書き出されたファイルを確認します。
gsutil ls gs://${BUCKET_NAME}/samples/
出力は次のようになります。
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースと一緒に Cloud プロジェクトを削除します。
Cloud Scheduler ジョブを削除します。
gcloud scheduler jobs delete publisher-job --location=$REGION
Dataflow コンソールで、ジョブを停止します。パイプラインをドレインせずにキャンセルします。
トピックを削除します。
gcloud pubsub topics delete $TOPIC_ID
パイプラインによって作成されたファイルを削除します。
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Cloud Storage バケットを削除します。
gsutil rb gs://${BUCKET_NAME}
- サービス アカウントを削除します。
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。
gcloud auth application-default revoke
-
(省略可)gcloud CLI から認証情報を取り消します。
gcloud auth revoke
次のステップ
カスタム タイムスタンプで Pub/Sub メッセージをウィンドウ処理する場合は、タイムスタンプを Pub/Sub メッセージ内の属性として指定し、そのカスタム タイムスタンプを PubsubIO の
withTimestampAttribute
で使用できます。Google のストリーミング用に設計されたオープンソースの Dataflow テンプレートをご覧ください。
Dataflow と Pub/Sub の統合方法の詳細を確認します。
Pub/Sub から読み取り、Dataflow Flex テンプレートを使用して BigQuery に書き込むこちらのチュートリアルをご覧ください。
ウィンドウ処理の詳細については、Apache Beam モバイルゲーム パイプラインの例をご覧ください。