Dataflow を使用して Pub/Sub Lite メッセージをストリーミングする
独自のデータ処理プログラムを作成して実行する代わりに、Dataflow を Apache Beam の Pub/Sub Lite I/O コネクタとともに使用することもできます。Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と明瞭度で変換、活用するフルマネージド サービスです。Dataflow は、Apache Beam SDK を使用して開発されたプログラムを確実に実行します。Apache Beam SDK は、強力なステートフル処理抽象化と、他のストリーミング システムとバッチシステムへの I/O コネクタで構成された拡張可能なセットです。
このクイックスタートでは、以下を行う Apache Beam パイプラインを作成する方法を示します。
- Pub/Sub Lite からメッセージを読み取る
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
また、次の方法についても説明します。
- Dataflow で実行するため、パイプラインを送信する
- パイプラインから Dataflow フレックス テンプレートを作成する
このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Pub/Sub Lite プロジェクトを設定する
Cloud Storage バケット、プロジェクト、Dataflow リージョンの変数を作成します。Cloud Storage のバケット名は、グローバルに一意にする必要があります。Dataflow リージョンは、ジョブを実行できる有効なリージョンである必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
このプロジェクトが所有する Cloud Storage バケットを作成します。
gcloud storage buckets create gs://$BUCKET
Pub/Sub Lite ゾーン Lite のトピックとサブスクリプションを作成する
ゾーン Lite Pub/Sub Lite トピックと Lite サブスクリプションを作成します。
Lite ロケーションの場合は、サポートされている Pub/Sub Lite のロケーションを選択します。また、リージョンのゾーンも指定する必要があります。例: us-central1-a
。
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Dataflow にメッセージをストリーミングする
クイックスタート サンプルコードをダウンロードする
クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
サンプルコード
このサンプルコードでは、Dataflow を使用して次のことを行います。
- 制限なしソースとして Pub/Sub Lite サブスクリプションからメッセージを読み取る。
- 固定時間ウィンドウとデフォルト トリガーを使用して、パブリッシュ タイムスタンプに基づいてメッセージをグループ化します。
グループ化したメッセージを Cloud Storage のファイルに書き込む。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Dataflow パイプラインを開始する
Dataflow でパイプラインを開始するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
上記のコマンドは Dataflow ジョブを起動します。コンソール出力のリンクをたどって、Dataflow モニタリング コンソールでジョブにアクセスします。
ジョブの進行状況を確認する
Dataflow コンソールでジョブの進行状況を確認します。
[ジョブの詳細] ビューを開いて、次の情報を確認します。
- ジョブグラフ
- 実行の詳細
- ジョブの指標
Lite トピックにメッセージを公開します。
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
ワーカーログにメッセージが表示されるまで、数分間かかる場合があります。
次のコマンドを使用して、Cloud Storage に書き込まれたファイルを確認します。
gcloud storage ls "gs://$BUCKET/samples/"
出力は次のようになります。
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
次のコマンドを使用して、ファイルの内容を確認します。
gcloud storage cat "gs://$BUCKET/samples/your-filename"
省略可: Dataflow テンプレートを作成する
必要に応じて、パイプラインに基づいたカスタム Dataflow フレックス テンプレートを作成できます。Dataflow テンプレートを使用すると、完全な Java 開発環境をセットアップしなくても、Google Cloud コンソールやコマンドラインから異なる入力パラメータを使用してジョブを実行できます。
パイプラインのすべての依存関係を含むファット JAR を作成します。コマンドの実行後、
target/pubsublite-streaming-bundled-1.0.jar
が表示されます。mvn clean package -DskipTests=true
テンプレート ファイルとテンプレート コンテナ イメージの名前と場所を指定します。
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
カスタム Flex テンプレートを作成します。この例では、ジョブの実行に必要な仕様を含む
metadata.json
ファイルが指定されています。gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
カスタム Flex テンプレートを使用してジョブを実行します。
Console
ジョブ名を入力します。
Dataflow リージョンを入力します。
カスタム テンプレートを選択します。
テンプレート パスを入力します。
必要なパラメータを入力します。
[ジョブを実行] をクリックします。
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。
Dataflow コンソールで、ジョブを停止します。パイプラインは、ドレインするのではなくキャンセルします。
トピックとサブスクリプションを削除します。
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
パイプラインによって作成されたファイルを削除します。
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
テンプレート イメージとテンプレート ファイルが存在する場合は、それらを削除します。
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Cloud Storage バケットを削除します。
gcloud storage rm gs://$BUCKET --recursive
- サービス アカウントを削除します。
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
次のステップ
Dataflow フレックス テンプレートの構成の詳細を確認する。
Dataflow ストリーミング パイプラインを理解する。