Python を使用して Dataflow パイプラインを作成する

このクイックスタートでは、Apache Beam SDK for Python を使用して、パイプラインを定義するプログラムを作成する方法について説明します。次に、直接ローカル ランナーまたはクラウドベースのランナー(Dataflow など)を使用してパイプラインを実行します。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。


このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、「ガイドを表示」をクリックしてください。

ガイドを表示


始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubdatastore.googleapis.comcloudresourcemanager.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager API を有効にします。

    gcloud services enable dataflowcompute_componentloggingstorage_componentstorage_apibigquerypubsubdatastore.googleapis.comcloudresourcemanager.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • PROJECT_NUMBER は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。
  17. Cloud Storage バケットを作成し、次のように構成します。
    • ストレージ クラスを設定します。 S(Standard)
    • ストレージ ロケーションを次のように設定します。 US(米国)
    • BUCKET_NAME を 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  18. Google Cloud プロジェクト ID と Cloud Storage バケット名をコピーします。これらの値は、このドキュメントの後の部分で必要になります。

環境の設定

このセクションでは、コマンド プロンプトを使用して独立した Python 仮想環境を設定し、venv を使用してパイプライン プロジェクトを実行します。このプロセスでは、1 つのプロジェクトの依存関係を他のプロジェクトの依存関係から分離できます。

すぐに利用できるコマンド プロンプトがない場合は、Cloud Shell を使用できます。Cloud Shell には Python 3 用のパッケージ マネージャーがすでにインストールされているので、仮想環境の作成にスキップできます。

Python をインストールして仮想環境を作成する手順は次のとおりです。

  1. システムで Python 3 と pip が実行されていることを確認します。
    python --version
    python -m pip --version
    
  2. 必要であれば、Python 3 をインストールして Python 仮想環境を設定します。手順については、Python 開発環境の設定ページで「Python のインストール」と「venv の設定」のセクションをご覧ください。Python 3.10 以降を使用している場合は、Dataflow Runner v2 を有効にする必要もあります。Runner v1 を使用するには、Python 3.9 以前を使用します。

このクイックスタートが完了したら、deactivate を実行して仮想環境を無効にできます。

Apache Beam SDK を入手する

Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを定義し、パイプラインを実行するランナー(Dataflow など)を選択します。

Apache Beam SDK をダウンロードしてインストールする手順は次のとおりです。

  1. 前のセクションで作成した Python 仮想環境にいることを確認します。プロンプトが <env_name> で始まっていることを確認します。env_name は仮想環境の名前です。
  2. Python wheel パッケージ標準をインストールします。
    pip install wheel
    
  3. Apache Beam SDK for Python の最新バージョンをインストールします。
  4. pip install 'apache-beam[gcp]'

    Microsoft Windows の場合は、次のコマンドを使用します。

    pip install apache-beam[gcp]

    接続によってはインストールに時間がかかることがあります。

パイプラインをローカルで実行する

ローカルでパイプラインがどのように実行されるかを確認するには、apache_beam パッケージに含まれている wordcount サンプル用に作成されている Python モジュールを使用します。

wordcount パイプラインの例では、次のことを行います。

  1. テキスト ファイルを入力として使用します。

    このテキスト ファイルは、リソース名 gs://dataflow-samples/shakespeare/kinglear.txt で Cloud Storage バケットに配置されます。

  2. 各行を単語に解析します。
  3. トークン化された単語の出現頻度をカウントします。

wordcount パイプラインをローカルでステージングする手順は次のとおりです。

  1. ローカル ターミナルから、wordcount の例を実行します。
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. パイプラインの出力を表示します。
    more outputs*
  3. 終了するには、q キーを押します。
パイプラインをローカルで実行すると、Apache Beam プログラムのテストとデバッグを行うことができます。Apache Beam GitHubwordcount.py ソースコードを表示できます。

Dataflow サービスでパイプラインを実行する

このセクションでは、Dataflow サービスの apache_beam パッケージから wordcount サンプル パイプラインを実行します。この例では、--runner のパラメータとして DataflowRunner を指定しています。
  • パイプラインを実行します。
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    次のように置き換えます。

    • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン(例: europe-west1

      --region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

    • BUCKET_NAME: 先ほどコピーした Cloud Storage バケット名
    • PROJECT_ID: 先ほどコピーした Google Cloud プロジェクト ID

結果を表示する

Dataflow を使用してパイプラインを実行すると、結果が Cloud Storage バケットに保存されます。このセクションでは、Google Cloud コンソールまたはローカル ターミナルを使用して、パイプラインの実行状況を確認します。

Google Cloud コンソール

Google Cloud コンソールで結果を表示する手順は次のとおりです。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

    [ジョブ] ページには wordcount ジョブの詳細が表示されます。最初にステータスが「実行中」のジョブが表示され、その次に「完了」のジョブが表示されます。

  2. Cloud Storage バケットのページに移動します。

    [バケット] に移動

  3. プロジェクト内のバケットのリストで、前に作成したストレージ バケットをクリックします。

    wordcount ディレクトリに、ジョブによって作成された出力ファイルが表示されます。

ローカル ターミナル

ターミナルまたは Cloud Shell を使用して結果を表示します。

  1. 出力ファイルを一覧表示するには、gcloud storage ls コマンドを使用します。
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. BUCKET_NAME は、パイプライン プログラムで使用する Cloud Storage バケットの名前で置き換えます。

  3. 出力ファイルの結果を表示するには、gcloud storage cat コマンドを使用します。
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

パイプライン コードを変更する

前の例の wordcount パイプラインでは、大文字の単語と小文字の単語が区別されました。次の手順では、wordcount パイプラインで大文字と小文字が区別されないように、パイプラインを変更します。
  1. ローカルマシンで、Apache Beam GitHub リポジトリから最新の wordcount コードをダウンロードします。
  2. ローカル ターミナルから、パイプラインを実行します。
    python wordcount.py --output outputs
  3. 結果を表示します。
    more outputs*
  4. 終了するには、q キーを押します。
  5. 任意のエディタで、wordcount.py ファイルを開きます。
  6. run 関数内で、パイプラインのステップを調べます。
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    split の後、文字列として単語に分割されます。

  7. 文字列を小文字に変換するには、split の後の行を変更します。
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    この変更により、str.lower 関数がすべての単語にマッピングされます。この行は beam.Map(lambda word: str.lower(word)) と同じです。
  8. ファイルを保存して、変更した wordcount ジョブを実行します。
    python wordcount.py --output outputs
  9. 変更されたパイプラインの結果を表示します。
    more outputs*
  10. 終了するには、q キーを押します。
  11. 変更したパイプラインを Dataflow サービスで実行します。
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    次のように置き換えます。

    • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン
    • BUCKET_NAME: Cloud Storage バケット名
    • PROJECT_ID: Google Cloud プロジェクト ID

クリーンアップ

このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、Google Cloud プロジェクトとそのリソースを削除します。

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. 削除するバケットのチェックボックスをクリックします。
  3. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。
  4. プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  6. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ