Go を使用して Dataflow パイプラインを作成する
このページでは、Apache Beam SDK for Go を使用して、パイプラインを定義するプログラムを作成します。次に、パイプラインをローカルと Dataflow サービスに対して実行します。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。PROJECT_NUMBER
は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
は、個々のロールに置き換えます。
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard)。 -
ストレージ ロケーションを次のように設定します。
US
(米国)。 -
BUCKET_NAME
は、 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能です。機密情報をバケット名に含めないようにしてください。 - Google Cloud プロジェクト ID と Cloud Storage バケット名をコピーします。これらの値は、このクイックスタートの後の部分で必要になります。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
開発環境を設定する
Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを定義し、パイプラインを実行するランナー(Dataflow など)を選択します。
Apache Beam SDK for Go を使用する場合は、最新バージョンの Go を使用することをおすすめします。Go の最新バージョンがインストールされていない場合は、Go のダウンロードとインストール ガイドに従って、ご使用のオペレーティング システムに適した Go をダウンロードしてインストールしてください。
インストールした Go のバージョンを確認するには、ローカル ターミナルで次のコマンドを実行します。
go version
Beam のワードカウント サンプルを実行する
Apache Beam SDK for Go には、wordcount
パイプライン サンプルが含まれています。wordcount
では、次のことを行います。
- テキスト ファイルを入力として読み込みます。デフォルトでは、Cloud Storage バケットにあるリソース名
gs://dataflow-samples/shakespeare/kinglear.txt
のテキスト ファイルを読み取ります。 - 各行を単語に解析します。
- トークン化された単語の出現頻度をカウントします。
ローカルマシンで最新バージョンの Beam wordcount
サンプルを実行するには、次のコマンドを使用します。読み込むファイルを input
フラグに指定し、頻度カウント出力のファイル名を output
フラグに指定します。
go run github.com/apache/beam/sdks/v2/go/examples/wordcount@latest \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output outputs
パイプラインが完了したら、出力結果を表示します。
more outputs*
終了するには、q キーを押します。
パイプライン コードを変更する
Beam wordcount
パイプラインは、大文字の単語と小文字の単語を区別します。次の手順では、独自の Go モジュールを作成し、大文字と小文字を区別しないように wordcount
パイプラインを変更して Dataflow で実行します。
Go モジュールを作成する
パイプライン コードの変更手順は次のとおりです。
任意の場所に Go モジュールのディレクトリを作成します。
mkdir wordcount
cd wordcount
Go モジュールを作成します。この例では、モジュールパスとして
example/dataflow
を使用しています。go mod init example/dataflow
Apache Beam GitHub リポジトリから最新の
wordcount
コードをダウンロードします。作成したwordcount
ディレクトリにこのファイルを配置します。Linux 以外のオペレーティング システムを使用している場合は、Go
unix
パッケージを入手する必要があります。このパッケージは、Dataflow サービスでパイプラインを実行するために必要です。go get -u golang.org/x/sys/unix
go.mod
ファイルがモジュールのソースコードと一致していることを確認します。go mod tidy
変更されていないパイプラインを実行する
変更されていない wordcount
パイプラインがローカルで実行されることを確認します。
ターミナルからローカルでパイプラインを構築して実行します。
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
出力結果を表示します。
more outputs*
終了するには、q キーを押します。
パイプライン コードを変更する
大文字と小文字を区別しないようにパイプラインを変更するために、すべての単語に strings.ToLower
関数を適用するようにコードを変更します。
任意のエディタで、
wordcount.go
ファイルを開きます。init
ブロックを調べます(わかりやすくするためにコメントは削除しています)。func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
strings.ToLower
関数を登録するための新しい行を追加します。func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
CountWords
関数を調べます。func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
単語を小文字で表記するには、すべての単語に
strings.ToLower
を適用する ParDo を追加します。func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
ファイルを保存します。
更新されたパイプラインをローカルで実行する
更新された wordcount
パイプラインをローカルで実行し、出力が変更されていることを確認します。
変更された
wordcount
パイプラインを構築して実行します。go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
変更されたパイプラインの出力結果を表示します。すべての単語が小文字になっているはずです。
more outputs*
終了するには、q キーを押します。
Dataflow サービスでパイプラインを実行する
更新した wordcount
のサンプルを Dataflow サービスで実行するには、次のコマンドを使用します。
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
次のように置き換えます。
BUCKET_NAME
: Cloud Storage バケット名。PROJECT_ID
: Google Cloud プロジェクト ID。DATAFLOW_REGION
: Dataflow ジョブをデプロイするリージョン。例:europe-west1
使用可能なロケーションのリストについては、Dataflow のロケーションをご覧ください。--region
フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。
結果を表示する
Google Cloud コンソールで Dataflow ジョブのリストを表示できます。Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[ジョブ] ページに wordcount
ジョブの詳細が表示されます。最初にステータスが「実行中」のジョブが表示され、その次に「完了」のジョブが表示されます。
Dataflow を使用してパイプラインを実行すると、結果が Cloud Storage バケットに保存されます。Google Cloud コンソールまたはローカル ターミナルを使用して、出力結果を表示します。
コンソール
Google Cloud コンソールで結果を表示するには、Cloud Storage の [バケット] ページに移動します。
プロジェクト内のバケットのリストで、前に作成したストレージ バケットをクリックします。ジョブによって作成された出力ファイルが results
ディレクトリに表示されます。
ターミナル
ターミナルまたは Cloud Shell を使用して結果を表示します。
出力ファイルを一覧表示するには、
gcloud storage ls
コマンドを使用します。gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
BUCKET_NAME
は、指定された出力用の Cloud Storage バケットの名前に置き換えます。出力ファイルの結果を表示するには、
gcloud storage cat
コマンドを使用します。gcloud storage cat gs://BUCKET_NAME/results/outputs*
クリーンアップ
このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke