Java を使用して Dataflow パイプラインを作成する
このドキュメントでは、Google Cloud プロジェクトの設定、Apache Beam SDK for Java で構築されたサンプル パイプラインの作成、Dataflow サービスでサンプル パイプラインの実行方法について説明します。このパイプラインは、Cloud Storage からテキスト ファイルを読み取り、ファイル内の一意の単語数をカウントし、単語数を Cloud Storage に書き込みます。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。
このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。
このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、「ガイドを表示」をクリックしてください。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成する
gcloud projects create PROJECT_ID
-
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
-
-
Google Cloud プロジェクトの課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Google アカウントの認証情報を作成します。
gcloud auth application-default login
-
Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。EMAIL_ADDRESS
は実際のメールアドレスに置き換えます。ROLE
は、個々のロールに置き換えます。
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成する
gcloud projects create PROJECT_ID
-
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
-
-
Google Cloud プロジェクトの課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Google アカウントの認証情報を作成します。
gcloud auth application-default login
-
Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。EMAIL_ADDRESS
は実際のメールアドレスに置き換えます。ROLE
は、個々のロールに置き換えます。
-
Cloud Storage バケットを作成し、次のように構成します。
-
ストレージ クラスを設定します。
S
(Standard) -
ストレージ ロケーションを次のように設定します。
US
(米国) -
BUCKET_NAME
を 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
gsutil mb -c STANDARD -l US gs://BUCKET_NAME
-
ストレージ クラスを設定します。
Compute Engine のデフォルト サービス アカウントにロールを付与します。IAM ロール
roles/dataflow.admin
、roles/dataflow.worker
、roles/storage.objectAdmin
ごとに次のコマンドを 1 回実行します。gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。PROJECT_NUMBER
は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
は、個々のロールに置き換えます。
- 次のものをコピーします。これらは以後のセクションで使用されます。
- Cloud Storage バケット名。
- Google Cloud プロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。
- Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールします(Dataflow は引き続きバージョン 8 をサポートします)。
JAVA_HOME
環境変数が設定され、ご使用の JDK インストールをポイントしていることを確認します。 - ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven をダウンロードし、インストールします。
パイプライン コードの取得
Apache Beam SDK は、データ処理パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。
- シェルまたはターミナルで、Maven Archetype Plugin を使用して、Apache Beam SDK の
WordCount
の例を含む Maven プロジェクトをコンピュータ上に作成します。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.47.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
このコマンドにより、現在のディレクトリの下に
word-count-beam
という新しいディレクトリが作成されます。word-count-beam
ディレクトリには、シンプルなpom.xml
ファイルと、テキスト ファイル内の単語数をカウントする一連のサンプル パイプラインが含まれています。 word-count-beam
ディレクトリにpom.xml
ファイルが含まれていることを確認します。Linux または macOS
cd word-count-beam/ ls
次のような出力が表示されます。
pom.xml src
Windows
cd word-count-beam/ dir
次のような出力が表示されます。
pom.xml src
- Maven プロジェクトにサンプル パイプラインが含まれていることを確認します。
Linux または macOS
ls src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
これらの例で使用されている Apache Beam のコンセプトの詳細については、Apache Beam WordCount の例をご覧ください。次のセクションでは、
WordCount.java
を使用します。
パイプラインをローカルで実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリからWordCount
パイプラインをローカルで実行します。mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
出力ファイルには接頭辞
counts
があり、word-count-beam
ディレクトリに書き込まれます。出力ファイルには、入力テキストの一意の単語と、各単語の出現回数が含まれます。
Dataflow サービスでパイプラインを実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリから Dataflow サービスに対してWordCount
パイプラインをビルドして実行します。mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
次のように置き換えます。
PROJECT_ID
: 実際の Google Cloud プロジェクト IDBUCKET_NAME
: Cloud Storage バケットの名前REGION
: Dataflow リージョン エンドポイント(us-central1
など)
結果を表示する
- Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動[ジョブ] ページには、ステータスなどの使用可能なジョブの詳細がすべて表示されます。wordcount ジョブの [ステータス] は最初は [Running] で、その後 [Succeeded] に更新されます。
- Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。
[ブラウザ] に移動[ブラウザ] ページに、プロジェクト内のすべてのストレージ バケットが一覧表示されます。
- 作成したストレージ バケットをクリックします。
[バケットの詳細] ページに、Dataflow ジョブで作成された出力ファイルとステージング ファイルが表示されます。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。
プロジェクトの削除
課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。
- Google Cloud コンソールで、[リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
個々のリソースの削除
このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。
- Google Cloud コンソールで、Cloud Storage の [ブラウザ] ページに移動します。
- 削除するバケットのチェックボックスをクリックします。
- バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。
-
作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。
gcloud auth application-default revoke
-
(省略可)gcloud CLI から認証情報を取り消します。
gcloud auth revoke
Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。IAM ロール
roles/dataflow.admin
、roles/dataflow.worker
、roles/storage.objectAdmin
ごとに、次のコマンドを 1 回実行します。gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=ROLE