Go を使用して Dataflow パイプラインを作成する

このページでは、Apache Beam SDK for Go を使用して、パイプラインを定義するプログラムを作成します。次に、パイプラインをローカルと Dataflow サービスに対して実行します。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager API を有効にします。

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apicloudresourcemanager.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager API を有効にします。

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apicloudresourcemanager.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • PROJECT_NUMBER は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。
  17. Cloud Storage バケットを作成し、次のように構成します。
    • ストレージ クラスを設定します。 S(Standard)
    • ストレージ ロケーションを次のように設定します。 US(米国)
    • BUCKET_NAME を 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Google Cloud プロジェクト ID と Cloud Storage バケット名をコピーします。これらの値は、このクイックスタートの後の部分で必要になります。

開発環境を設定する

Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを定義し、パイプラインを実行するランナー(Dataflow など)を選択します。

Apache Beam SDK for Go を使用する場合は、最新バージョンの Go を使用することをおすすめします。Go の最新バージョンがインストールされていない場合は、Go のダウンロードとインストール ガイドに従って、ご使用のオペレーティング システムに適した Go をダウンロードしてインストールしてください。

インストールした Go のバージョンを確認するには、ローカル ターミナルで次のコマンドを実行します。

go version

Beam のワードカウント サンプルを実行する

Apache Beam SDK for Go には、wordcount パイプライン サンプルが含まれています。wordcount では、次のことを行います。

  1. テキスト ファイルを入力として読み込みます。デフォルトでは、Cloud Storage バケットにあるリソース名 gs://dataflow-samples/shakespeare/kinglear.txt のテキスト ファイルを読み取ります。
  2. 各行を単語に解析します。
  3. トークン化された単語の出現頻度をカウントします。

ローカルマシンで最新バージョンの Beam wordcount サンプルを実行するには、次のコマンドを使用します。読み込むファイルを input フラグに指定し、頻度カウント出力のファイル名を output フラグに指定します。

go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output outputs

パイプラインが完了したら、出力結果を表示します。

more outputs*

終了するには、q キーを押します。

パイプライン コードを変更する

Beam wordcount パイプラインは、大文字の単語と小文字の単語を区別します。次の手順では、独自の Go モジュールを作成し、大文字と小文字を区別しないように wordcount パイプラインを変更して Dataflow で実行します。

Go モジュールを作成する

パイプライン コードの変更手順は次のとおりです。

  1. 任意の場所に Go モジュールのディレクトリを作成します。

    mkdir wordcount
    cd wordcount
    
  2. Go モジュールを作成します。この例では、モジュールパスとして example/dataflow を使用しています。

    go mod init example/dataflow
    
  3. Apache Beam GitHub リポジトリから最新の wordcount コードをダウンロードします。作成した wordcount ディレクトリにこのファイルを配置します。

  4. Linux 以外のオペレーティング システムを使用している場合は、Go unix パッケージを入手する必要があります。このパッケージは、Dataflow サービスでパイプラインを実行するために必要です。

    go get -u golang.org/x/sys/unix
    
  5. go.mod ファイルがモジュールのソースコードと一致していることを確認します。

    go mod tidy
    

変更されていないパイプラインを実行する

変更されていない wordcount パイプラインがローカルで実行されることを確認します。

  1. ターミナルからローカルでパイプラインを構築して実行します。

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. 出力結果を表示します。

     more outputs*
    
  3. 終了するには、q キーを押します。

パイプライン コードを変更する

大文字と小文字を区別しないようにパイプラインを変更するために、すべての単語に strings.ToLower 関数を適用するようにコードを変更します。

  1. 任意のエディタで、wordcount.go ファイルを開きます。

  2. init ブロックを調べます(わかりやすくするためにコメントは削除しています)。

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
     }
    
  3. strings.ToLower 関数を登録するための新しい行を追加します。

     func init() {
       register.DoFn3x0[context.Context, string, func(string)](&extractFn{})
       register.Function2x1(formatFn)
       register.Emitter1[string]()
       register.Function1x1(strings.ToLower)
     }
    
  4. CountWords 関数を調べます。

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Count the number of times each word occurs.
       return stats.Count(s, col)
     }
    
  5. 単語を小文字で表記するには、すべての単語に strings.ToLower を適用する ParDo を追加します。

     func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
       s = s.Scope("CountWords")
    
       // Convert lines of text into individual words.
       col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)
    
       // Map all letters to lowercase.
       lowercaseWords := beam.ParDo(s, strings.ToLower, col)
    
       // Count the number of times each word occurs.
       return stats.Count(s, lowercaseWords)
     }
    
  6. ファイルを保存します。

更新されたパイプラインをローカルで実行する

更新された wordcount パイプラインをローカルで実行し、出力が変更されていることを確認します。

  1. 変更された wordcount パイプラインを構築して実行します。

     go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
         --output outputs
    
  2. 変更されたパイプラインの出力結果を表示します。すべての単語が小文字になっているはずです。

     more outputs*
    
  3. 終了するには、q キーを押します。

Dataflow サービスでパイプラインを実行する

更新した wordcount のサンプルを Dataflow サービスで実行するには、次のコマンドを使用します。

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://BUCKET_NAME/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://BUCKET_NAME/binaries/

次のように置き換えます。

  • BUCKET_NAME: Cloud Storage バケット名。

  • PROJECT_ID: Google Cloud プロジェクト ID。

  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン。例: europe-west1使用可能なロケーションのリストについては、Dataflow のロケーションをご覧ください。--region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

結果を表示する

Google Cloud コンソールで Dataflow ジョブのリストを表示できます。Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

[ジョブ] に移動

[ジョブ] ページに wordcount ジョブの詳細が表示されます。最初にステータスが「実行中」のジョブが表示され、その次に「完了」のジョブが表示されます。

Dataflow を使用してパイプラインを実行すると、結果が Cloud Storage バケットに保存されます。Google Cloud コンソールまたはローカル ターミナルを使用して、出力結果を表示します。

コンソール

Google Cloud コンソールで結果を表示するには、Cloud Storage の [バケット] ページに移動します。

[バケット] に移動

プロジェクト内のバケットのリストで、前に作成したストレージ バケットをクリックします。ジョブによって作成された出力ファイルが results ディレクトリに表示されます。

ターミナル

ターミナルまたは Cloud Shell を使用して結果を表示します。

  1. 出力ファイルを一覧表示するには、gcloud storage ls コマンドを使用します。

    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
    

    BUCKET_NAME は、指定された出力用の Cloud Storage バケットの名前に置き換えます。

  2. 出力ファイルの結果を表示するには、gcloud storage cat コマンドを使用します。

    gcloud storage cat gs://BUCKET_NAME/results/outputs*
    

クリーンアップ

このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。
  4. プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  6. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ