Java を使用して Dataflow パイプラインを作成する
このドキュメントでは、Google Cloud プロジェクトの設定、Apache Beam SDK for Java で構築されたサンプル パイプラインの作成、Dataflow サービスでサンプル パイプラインの実行方法について説明します。このパイプラインは、Cloud Storage からテキスト ファイルを読み取り、ファイル内の一意の単語数をカウントし、単語数を Cloud Storage に書き込みます。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。
このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。
このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、「ガイドを表示」をクリックしてください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。PROJECT_NUMBER
は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
は、個々のロールに置き換えます。
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard)。 -
ストレージ ロケーションを次のように設定します。
US
(米国)。 -
BUCKET_NAME
は、 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能です。機密情報をバケット名に含めないようにしてください。 - 次のものをコピーします。これらは以後のセクションで使用されます。
- Cloud Storage バケット名。
- Google Cloud プロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。
- Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールします(Dataflow は引き続きバージョン 8 をサポートします)。
JAVA_HOME
環境変数が設定され、ご使用の JDK インストールをポイントしていることを確認します。 - ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven をダウンロードし、インストールします。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
パイプライン コードの取得
Apache Beam SDK は、データ処理パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。
- シェルまたはターミナルで、Maven Archetype Plugin を使用して、Apache Beam SDK の
WordCount
の例を含む Maven プロジェクトをコンピュータ上に作成します。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.60.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
このコマンドにより、現在のディレクトリの下に
word-count-beam
という新しいディレクトリが作成されます。word-count-beam
ディレクトリには、シンプルなpom.xml
ファイルと、テキスト ファイル内の単語数をカウントする一連のサンプル パイプラインが含まれています。 word-count-beam
ディレクトリにpom.xml
ファイルが含まれていることを確認します。Linux または macOS
cd word-count-beam/ ls
次のような出力が表示されます。
pom.xml src
Windows
cd word-count-beam/ dir
次のような出力が表示されます。
pom.xml src
- Maven プロジェクトにサンプル パイプラインが含まれていることを確認します。
Linux または macOS
ls src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
これらの例で使用されている Apache Beam のコンセプトの詳細については、Apache Beam WordCount の例をご覧ください。次のセクションでは、
WordCount.java
を使用します。
パイプラインをローカルで実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリからWordCount
パイプラインをローカルで実行します。mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
出力ファイルには接頭辞
counts
があり、word-count-beam
ディレクトリに書き込まれます。出力ファイルには、入力テキストの一意の単語と、各単語の出現回数が含まれます。
Dataflow サービスでパイプラインを実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリから Dataflow サービスに対してWordCount
パイプラインをビルドして実行します。mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
次のように置き換えます。
PROJECT_ID
: 実際の Google Cloud プロジェクト IDBUCKET_NAME
: Cloud Storage バケットの名前REGION
: Dataflow リージョン(us-central1
など)
結果を表示する
Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動[ジョブ] ページには、ステータスなどの使用可能なジョブの詳細がすべて表示されます。wordcount ジョブの [ステータス] は最初は [Running] で、その後 [Succeeded] に更新されます。
Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。
[バケット] に移動[バケット] ページでは、プロジェクト内のすべてのストレージ バケットを一覧参照できます。
- 作成したストレージ バケットをクリックします。
[バケットの詳細] ページに、Dataflow ジョブで作成された出力ファイルとステージング ファイルが表示されます。
クリーンアップ
このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。
プロジェクトの削除
課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
個々のリソースの削除
このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke