Java を使用して Dataflow パイプラインを作成する
このドキュメントでは、Google Cloud プロジェクトの設定方法、Apache Beam SDK for Java を使用して Maven プロジェクトを作成する方法、Dataflow サービスでサンプル パイプラインを実行する方法について説明します。このパイプラインは、Cloud Storage からテキスト ファイルを読み取り、ファイル内の一意の単語数をカウントし、単語数を Cloud Storage に書き込みます。
このタスクを Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。
以降のセクションでは、[ガイドを表示] をクリックした場合と同じ手順について説明します。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。
-
サービス アカウントを作成します。
-
Cloud Console で [サービス アカウントの作成] ページに移動します。
[サービス アカウントの作成] に移動 - プロジェクトを選択します。
-
[サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。
[サービス アカウントの説明] フィールドに説明を入力します。例:
Service account for quickstart
- [作成して続行] をクリックします。
-
プロジェクトへのアクセス権限を付与するには、サービス アカウントに次のロールを付与します。[プロジェクト] > [オーナー]
[ロールを選択] リストでロールを選択します。
ロールを追加するには、[
別のロールを追加] をクリックして各ロールを追加します。 - [続行] をクリックします。
-
[完了] をクリックして、サービス アカウントの作成を完了します。
ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。
-
-
サービス アカウント キーを作成します。
- Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
- [キー] をクリックします。
- [鍵を追加]、[新しい鍵を作成] の順にクリックします。
- [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
- [閉じる] をクリックします。
-
環境変数
GOOGLE_APPLICATION_CREDENTIALS
を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。 -
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。
-
サービス アカウントを作成します。
-
Cloud Console で [サービス アカウントの作成] ページに移動します。
[サービス アカウントの作成] に移動 - プロジェクトを選択します。
-
[サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。
[サービス アカウントの説明] フィールドに説明を入力します。例:
Service account for quickstart
- [作成して続行] をクリックします。
-
プロジェクトへのアクセス権限を付与するには、サービス アカウントに次のロールを付与します。[プロジェクト] > [オーナー]
[ロールを選択] リストでロールを選択します。
ロールを追加するには、[
別のロールを追加] をクリックして各ロールを追加します。 - [続行] をクリックします。
-
[完了] をクリックして、サービス アカウントの作成を完了します。
ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。
-
-
サービス アカウント キーを作成します。
- Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
- [キー] をクリックします。
- [鍵を追加]、[新しい鍵を作成] の順にクリックします。
- [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
- [閉じる] をクリックします。
-
環境変数
GOOGLE_APPLICATION_CREDENTIALS
を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。 - Cloud Storage バケットを作成します。
- In the Cloud console, go to the Cloud Storage Browser page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
- 次のものをコピーします。これらは以後のセクションで使用されます。
- Cloud Storage バケット名。
- Google Cloud プロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。
- Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールします(Dataflow は引き続きバージョン 8 をサポートします)。
JAVA_HOME
環境変数が設定され、ご使用の JDK インストールをポイントしていることを確認します。 - ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven をダウンロードし、インストールします。
パイプライン コードの取得
Apache Beam SDK は、データ処理パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。
- シェルまたはターミナルで、Maven Archetype Plugin を使用して、Apache Beam SDK の
WordCount
の例を含む Maven プロジェクトをコンピュータ上に作成します。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.39.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
このコマンドにより、現在のディレクトリの下に
word-count-beam
という新しいディレクトリが作成されます。word-count-beam
ディレクトリには、シンプルなpom.xml
ファイルと、テキスト ファイル内の単語数をカウントする一連のサンプル パイプラインが含まれています。 word-count-beam
ディレクトリにpom.xml
ファイルが含まれていることを確認します。Linux または macOS
cd word-count-beam/ ls
次のような出力が表示されます。
pom.xml src
Windows
cd word-count-beam/ dir
次のような出力が表示されます。
pom.xml src
- Maven プロジェクトにサンプル パイプラインが含まれていることを確認します。
Linux または macOS
ls src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
これらの例で使用されている Apache Beam のコンセプトの詳細については、Apache Beam WordCount の例をご覧ください。次のセクションでは、
WordCount.java
を使用します。
パイプラインをローカルで実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリからWordCount
パイプラインをローカルで実行します。mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
出力ファイルには接頭辞
counts
があり、word-count-beam
ディレクトリに書き込まれます。出力ファイルには、入力テキストの一意の単語と、各単語の出現回数が含まれます。
Dataflow サービスでパイプラインを実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリから Dataflow サービスに対してWordCount
パイプラインをビルドして実行します。mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
次のように置き換えます。
PROJECT_ID
: 実際の Cloud プロジェクトの IDBUCKET_NAME
: Cloud Storage バケットの名前REGION
: Dataflow リージョン エンドポイント(us-central1
など)
結果を表示する
- Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
ジョブに移動[ジョブ] ページには、ステータスなどの使用可能なジョブの詳細がすべて表示されます。wordcount ジョブの [ステータス] は最初は [Running] で、その後 [Succeeded] に更新されます。
- Cloud コンソールの Cloud Storage ブラウザページに移動します。
[ブラウザ] に移動[ブラウザ] ページに、プロジェクト内のすべてのストレージ バケットが一覧表示されます。
- 作成したストレージ バケットをクリックします。
[バケットの詳細] ページに、Dataflow ジョブで作成された出力ファイルとステージング ファイルが表示されます。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
プロジェクトの削除
課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。
- Cloud Console で [リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
個々のリソースの削除
このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。
- Cloud コンソールの Cloud Storage ブラウザページに移動します。
- 削除するバケットのチェックボックスをクリックします。
- バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。