Go を使用して Dataflow パイプラインを作成する

このページでは、Apache Beam SDK for Go を使用して、パイプラインを定義するプログラムを作成します。次に、パイプラインをローカルと Dataflow サービスに対して実行します。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • PROJECT_NUMBER は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S(Standard)。
    • ストレージ ロケーションを次のように設定します。 US(米国)。
    • BUCKET_NAME は、 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能です。機密情報をバケット名に含めないようにしてください。
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Google Cloud プロジェクト ID と Cloud Storage バケット名をコピーします。これらの値は、このクイックスタートの後の部分で必要になります。

開発環境を設定する

Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを定義し、パイプラインを実行するランナー(Dataflow など)を選択します。

Apache Beam SDK for Go を使用する場合は、最新バージョンの Go を使用することをおすすめします。Go の最新バージョンがインストールされていない場合は、Go のダウンロードとインストール ガイドに従って、ご使用のオペレーティング システムに適した Go をダウンロードしてインストールしてください。

インストールした Go のバージョンを確認するには、ローカル ターミナルで次のコマンドを実行します。

go version

Beam のワードカウント サンプルを実行する

Apache Beam SDK for Go には、wordcount パイプライン サンプルが含まれています。wordcount では、次のことを行います。

  1. テキスト ファイルを入力として読み込みます。デフォルトでは、Cloud Storage バケットにあるリソース名 gs://dataflow-samples/shakespeare/kinglear.txt のテキスト ファイルを読み取ります。
  2. 各行を単語に解析します。
  3. トークン化された単語の出現頻度をカウントします。

ローカルマシンで最新バージョンの Beam wordcount サンプルを実行するには、次のコマンドを使用します。読み込むファイルを input フラグに指定し、頻度カウント出力のファイル名を output フラグに指定します。

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

パイプラインが完了したら、出力結果を表示します。

more outputs*

終了するには、q キーを押します。

パイプライン コードを変更する

Beam wordcount パイプラインは、大文字の単語と小文字の単語を区別します。次の手順では、独自の Go モジュールを作成し、大文字と小文字を区別しないように wordcount パイプラインを変更して Dataflow で実行します。

Go モジュールを作成する

パイプライン コードの変更手順は次のとおりです。

  1. 任意の場所に Go モジュールのディレクトリを作成します。

    mkdir wordcount
    cd wordcount
    
  2. Go モジュールを作成します。この例では、モジュールパスとして example/dataflow を使用しています。

    go mod init example/dataflow
    
  3. Apache Beam GitHub リポジトリから最新の wordcount コードをダウンロードします。作成した wordcount ディレクトリにこのファイルを配置します。

  4. Linux 以外のオペレーティング システムを使用している場合は、Go unix パッケージを入手する必要があります。このパッケージは、Dataflow サービスでパイプラインを実行するために必要です。

    go get -u golang.org/x/sys/unix
    
  5. go.mod ファイルがモジュールのソースコードと一致していることを確認します。

    go mod tidy
    

変更されていないパイプラインを実行する

変更されていない wordcount パイプラインがローカルで実行されることを確認します。

  1. ターミナルからローカルでパイプラインを構築して実行します。

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. 出力結果を表示します。

     more outputs*
    
  3. 終了するには、q キーを押します。

パイプライン コードを変更する

大文字と小文字を区別しないようにパイプラインを変更するために、すべての単語に strings.ToLower 関数を適用するようにコードを変更します。

  1. 任意のエディタで、wordcount.go ファイルを開きます。

  2. init ブロックを調べます(わかりやすくするためにコメントは削除しています)。

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. strings.ToLower 関数を登録するための新しい行を追加します。

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. CountWords 関数を調べます。

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. 単語を小文字で表記するには、すべての単語に strings.ToLower を適用する ParDo を追加します。

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. ファイルを保存します。

更新されたパイプラインをローカルで実行する

更新された wordcount パイプラインをローカルで実行し、出力が変更されていることを確認します。

  1. 変更された wordcount パイプラインを構築して実行します。

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. 変更されたパイプラインの出力結果を表示します。すべての単語が小文字になっているはずです。

     more outputs*
    
  3. 終了するには、q キーを押します。

Dataflow サービスでパイプラインを実行する

更新した wordcount のサンプルを Dataflow サービスで実行するには、次のコマンドを使用します。

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

次のように置き換えます。

  • BUCKET_NAME: Cloud Storage バケット名。

  • PROJECT_ID: Google Cloud プロジェクト ID。

  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン。例: europe-west1使用可能なロケーションのリストについては、Dataflow のロケーションをご覧ください。--region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

結果を表示する

Google Cloud コンソールで Dataflow ジョブのリストを表示できます。Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

[ジョブ] に移動

[ジョブ] ページに wordcount ジョブの詳細が表示されます。最初にステータスが「実行中」のジョブが表示され、その次に「完了」のジョブが表示されます。

Dataflow を使用してパイプラインを実行すると、結果が Cloud Storage バケットに保存されます。Google Cloud コンソールまたはローカル ターミナルを使用して、出力結果を表示します。

コンソール

Google Cloud コンソールで結果を表示するには、Cloud Storage の [バケット] ページに移動します。

[バケット] に移動

プロジェクト内のバケットのリストで、前に作成したストレージ バケットをクリックします。ジョブによって作成された出力ファイルが results ディレクトリに表示されます。

ターミナル

ターミナルまたは Cloud Shell を使用して結果を表示します。

  1. 出力ファイルを一覧表示するには、gcloud storage ls コマンドを使用します。

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    BUCKET_NAME は、指定された出力用の Cloud Storage バケットの名前に置き換えます。

  2. 出力ファイルの結果を表示するには、gcloud storage cat コマンドを使用します。

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

クリーンアップ

このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ