Java を使用して Dataflow パイプラインを作成する

このドキュメントでは、Google Cloud プロジェクトの設定、Apache Beam SDK for Java で構築されたサンプル パイプラインの作成、Dataflow サービスでサンプル パイプラインの実行方法について説明します。このパイプラインは、Cloud Storage からテキスト ファイルを読み取り、ファイル内の一意の単語数をカウントし、単語数を Cloud Storage に書き込みます。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。

このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。


このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、「ガイドを表示」をクリックしてください。

ガイドを表示


始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubdatastore.googleapis.comcloudresourcemanager.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubdatastore.googleapis.comcloudresourcemanager.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • PROJECT_NUMBER は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。
  17. Cloud Storage バケットを作成し、次のように構成します。
    • ストレージ クラスを設定します。 S(Standard)
    • ストレージ ロケーションを次のように設定します。 US(米国)
    • BUCKET_NAME を 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. 次のものをコピーします。これらは以後のセクションで使用されます。
    • Cloud Storage バケット名。
    • Google Cloud プロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。
  19. Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールします(Dataflow は引き続きバージョン 8 をサポートします)。JAVA_HOME 環境変数が設定され、ご使用の JDK インストールをポイントしていることを確認します。
  20. ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven をダウンロードし、インストールします。

パイプライン コードの取得

Apache Beam SDK は、データ処理パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。

  1. シェルまたはターミナルで、Maven Archetype Plugin を使用して、Apache Beam SDK の WordCount の例を含む Maven プロジェクトをコンピュータ上に作成します。
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.55.1 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    このコマンドにより、現在のディレクトリの下に word-count-beam という新しいディレクトリが作成されます。word-count-beam ディレクトリには、シンプルな pom.xml ファイルと、テキスト ファイル内の単語数をカウントする一連のサンプル パイプラインが含まれています。

  2. word-count-beam ディレクトリに pom.xml ファイルが含まれていることを確認します。

    Linux または macOS

    cd word-count-beam/
    ls

    次のような出力が表示されます。

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    次のような出力が表示されます。

    pom.xml   src
  3. Maven プロジェクトにサンプル パイプラインが含まれていることを確認します。

    Linux または macOS

    ls src/main/java/org/apache/beam/examples/

    次のような出力が表示されます。

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    次のような出力が表示されます。

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

これらの例で使用されている Apache Beam のコンセプトの詳細については、Apache Beam WordCount の例をご覧ください。次のセクションでは、 WordCount.java を使用します。

パイプラインをローカルで実行する

  • シェルまたはターミナルで、word-count-beam ディレクトリから WordCount パイプラインをローカルで実行します。
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    出力ファイルには接頭辞 counts があり、word-count-beam ディレクトリに書き込まれます。出力ファイルには、入力テキストの一意の単語と、各単語の出現回数が含まれます。

Dataflow サービスでパイプラインを実行する

  • シェルまたはターミナルで、word-count-beam ディレクトリから Dataflow サービスに対して WordCount パイプラインをビルドして実行します。
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    次のように置き換えます。

    • PROJECT_ID: 実際の Google Cloud プロジェクト ID
    • BUCKET_NAME: Cloud Storage バケットの名前
    • REGION: Dataflow リージョンus-central1 など)

結果を表示する

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

    [ジョブ] ページには、ステータスなどの使用可能なジョブの詳細がすべて表示されます。wordcount ジョブの [ステータス] は最初は [Running] で、その後 [Succeeded] に更新されます。

  2. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

    [バケット] ページには、プロジェクト内のすべてのストレージ バケットが一覧表示されます。

  3. 作成したストレージ バケットをクリックします。

    [バケットの詳細] ページに、Dataflow ジョブで作成された出力ファイルとステージング ファイルが表示されます。

クリーンアップ

このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。

プロジェクトの削除

課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。
  4. Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  6. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ