7 月 18 ~ 20 日に開催される Apache Beam コミュニティの Beam Summit 2022 にぜひご参加ください。Beam について理解を深め、専門知識を共有してください。
クイックスタート: Java を使用して Dataflow パイプラインを作成する

Java を使用して Dataflow パイプラインを作成する

このドキュメントでは、Google Cloud プロジェクトの設定方法、Apache Beam SDK for Java を使用して Maven プロジェクトを作成する方法、Dataflow サービスでサンプル パイプラインを実行する方法について説明します。このパイプラインは、Cloud Storage からテキスト ファイルを読み取り、ファイル内の一意の単語数をカウントし、単語数を Cloud Storage に書き込みます。


このタスクを Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。

ガイドを表示


以降のセクションでは、[ガイドを表示] をクリックした場合と同じ手順について説明します。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。

    API を有効にする

  5. サービス アカウントを作成します。

    1. Cloud Console で [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. プロジェクトへのアクセス権限を付与するには、サービス アカウントに次のロールを付与します。[プロジェクト] > [オーナー]

      [ロールを選択] リストでロールを選択します。

      ロールを追加するには、[別のロールを追加] をクリックして各ロールを追加します。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  6. サービス アカウント キーを作成します。

    1. Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  7. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

  8. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  9. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  10. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。

    API を有効にする

  11. サービス アカウントを作成します。

    1. Cloud Console で [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. プロジェクトへのアクセス権限を付与するには、サービス アカウントに次のロールを付与します。[プロジェクト] > [オーナー]

      [ロールを選択] リストでロールを選択します。

      ロールを追加するには、[別のロールを追加] をクリックして各ロールを追加します。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  12. サービス アカウント キーを作成します。

    1. Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  13. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

  14. Cloud Storage バケットを作成します。
    1. In the Cloud console, go to the Cloud Storage Browser page.

      Go to Browser

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  15. 次のものをコピーします。これらは以後のセクションで使用されます。
    • Cloud Storage バケット名。
    • Google Cloud プロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。
  16. Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールします(Dataflow は引き続きバージョン 8 をサポートします)。JAVA_HOME 環境変数が設定され、ご使用の JDK インストールをポイントしていることを確認します。
  17. ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven をダウンロードし、インストールします。

パイプライン コードの取得

Apache Beam SDK は、データ処理パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。

  1. シェルまたはターミナルで、Maven Archetype Plugin を使用して、Apache Beam SDK の WordCount の例を含む Maven プロジェクトをコンピュータ上に作成します。
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.39.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    このコマンドにより、現在のディレクトリの下に word-count-beam という新しいディレクトリが作成されます。word-count-beam ディレクトリには、シンプルな pom.xml ファイルと、テキスト ファイル内の単語数をカウントする一連のサンプル パイプラインが含まれています。

  2. word-count-beam ディレクトリに pom.xml ファイルが含まれていることを確認します。

    Linux または macOS

    cd word-count-beam/
    ls

    次のような出力が表示されます。

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    次のような出力が表示されます。

    pom.xml   src
  3. Maven プロジェクトにサンプル パイプラインが含まれていることを確認します。

    Linux または macOS

    ls src/main/java/org/apache/beam/examples/

    次のような出力が表示されます。

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    次のような出力が表示されます。

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

これらの例で使用されている Apache Beam のコンセプトの詳細については、Apache Beam WordCount の例をご覧ください。次のセクションでは、 WordCount.java を使用します。

パイプラインをローカルで実行する

  • シェルまたはターミナルで、word-count-beam ディレクトリから WordCount パイプラインをローカルで実行します。
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    出力ファイルには接頭辞 counts があり、word-count-beam ディレクトリに書き込まれます。出力ファイルには、入力テキストの一意の単語と、各単語の出現回数が含まれます。

Dataflow サービスでパイプラインを実行する

  • シェルまたはターミナルで、word-count-beam ディレクトリから Dataflow サービスに対して WordCount パイプラインをビルドして実行します。
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    次のように置き換えます。

結果を表示する

  1. Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
    ジョブに移動

    [ジョブ] ページには、ステータスなどの使用可能なジョブの詳細がすべて表示されます。wordcount ジョブの [ステータス] は最初は [Running] で、その後 [Succeeded] に更新されます。

  2. Cloud コンソールの Cloud Storage ブラウザページに移動します。
    [ブラウザ] に移動

    [ブラウザ] ページに、プロジェクト内のすべてのストレージ バケットが一覧表示されます。

  3. 作成したストレージ バケットをクリックします。

    [バケットの詳細] ページに、Dataflow ジョブで作成された出力ファイルとステージング ファイルが表示されます。

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

プロジェクトの削除

課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。

  1. Cloud コンソールの Cloud Storage ブラウザページに移動します。

    ブラウザに移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

次のステップ