Eventarc を使用した BigQuery 処理パイプラインを構築する


このチュートリアルでは、Eventarc を使用して、一般公開 BigQuery データセットに対するクエリのスケジューリング、データに基づいたグラフの生成、メールによるグラフのリンクの共有を行う処理パイプラインの構築方法について説明します。

目標

このチュートリアルでは、未認証アクセスを許可し、Eventarc を使用してイベントを受信する 3 つの Cloud Run サービスをビルドしてデプロイします。

  1. Query Runner - Cloud Scheduler ジョブが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、BigQuery API を使用して一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存します。
  2. Chart Creator - Query Runner サービスが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、Python プロット ライブラリ(Matplotlib)を使用してグラフを生成し、Cloud Storage バケットに保存します。
  3. Notifier - Chart Creator サービスが Cloud Storage バケットにグラフを保存すると監査ログによってトリガーされます。このサービスは、メールサービス(SendGrid)を使用して、グラフのリンクをメールアドレスに送信します。

次の図は、アーキテクチャの概要を示しています。

BigQuery 処理パイプライン

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

組織で定義されているセキュリティの制約により、次の手順を完了できない場合があります。トラブルシューティング情報については、制約のある Google Cloud 環境でアプリケーションを開発するをご覧ください。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Eventarc, and Pub/Sub APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudscheduler.googleapis.com eventarc.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Eventarc, and Pub/Sub APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudscheduler.googleapis.com eventarc.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com
  12. Cloud Storage の場合は、ADMIN_READDATA_WRITEDATA_READ のデータアクセス タイプの監査ロギングを有効にします。

    1. Google Cloud プロジェクト、フォルダ、または組織に関連付けられた Identity and Access Management(IAM)ポリシーを読み取り、一時ファイルに保存します。
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. テキスト エディタで /tmp/policy.yaml を開き、auditConfigs セクションで監査ログの構成のみを追加または変更します。

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. 新しい IAM ポリシーを作成します。

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      上記のコマンドで別の変更との競合が報告された場合は、IAM ポリシーの読み取りからやり直してください。詳細については、API でデータアクセス監査ログを構成するをご覧ください。

  13. Compute Engine サービス アカウントに eventarc.eventReceiver ロールを付与します。

    export PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member=serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
        --role='roles/eventarc.eventReceiver'

  14. 2021 年 4 月 8 日以前に Pub/Sub サービス アカウントを有効にした場合は、Pub/Sub サービス アカウントに iam.serviceAccountTokenCreator ロールを付与します。

    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"\
        --role='roles/iam.serviceAccountTokenCreator'

  15. このチュートリアルで使用するデフォルトを設定します。
    export REGION=REGION
    gcloud config set run/region ${REGION}
    gcloud config set run/platform managed
    gcloud config set eventarc/location ${REGION}

    REGION は、サポートされている Eventarc のロケーションに置き換えます。

SendGrid API キーを作成する

SendGrid はクラウドベースのメール プロバイダで、メールサーバーを維持せずにメールを送信できます。

  1. SendGrid にログインし、[Settings] > [API Keys] の順に移動します。
  2. [Create API Key] をクリックします。
  3. キーの権限を選択します。メールを送信するには、キーに少なくとも Mail send(メール送信)の権限が必要です。
  4. キーに名前を付け、キーを作成するには [保存] をクリックします。
  5. SendGrid によって、新しいキーが生成されます。これは、キーの唯一のコピーであるため、後で使用できるようにキーをコピーして保存してください。

Artifact Registry 標準リポジトリを作成する

Docker コンテナ イメージを保存する Artifact Registry 標準リポジトリを作成します。

gcloud artifacts repositories create REPOSITORY \
    --repository-format=docker \
    --location=$REGION

REPOSITORY は、リポジトリの一意の名前に置き換えます。

Cloud Storage バケットを作成する

グラフを保存する一意の Cloud Storage バケットを作成します。バケットとグラフが Cloud Run サービスと同じリージョンで一般公開されていることを確認します。

export BUCKET="$(gcloud config get-value core/project)-charts"
gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region) --uniform-bucket-level-access
gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

Notifier サービスのデプロイ

Chart Creator イベントを受信し、SendGrid でグラフのリンクをメールで送信する Cloud Run サービスをデプロイします。

  1. GitHub リポジトリのクローンを作成して、notifier/python ディレクトリに移動します。

    git clone https://github.com/GoogleCloudPlatform/eventarc-samples
    cd eventarc-samples/processing-pipelines/bigquery/notifier/python/
  2. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=notifier
    docker build -t $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  3. コンテナ イメージを Cloud Run にデプロイし、メールの送信先アドレスと SendGrid API キーを渡します。

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET} \
        --allow-unauthenticated

    次のように置き換えます。

    • EMAIL_ADDRESS は、生成されたグラフのリンクを送信するメールアドレスで置き換えます。
    • YOUR_SENDGRID_API_KEY は、前にメモした SendGrid API キーで置き換えます。

サービス URL が表示されたら、デプロイは完了しています。

Notifier サービスのトリガーの作成

Cloud Run にデプロイされた Notifier サービスの Eventarc トリガーは、methodNamestorage.objects.create の Cloud Storage 監査ログをフィルタリングします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=${PROJECT_NUMBER}-compute@developer.gserviceaccount.com

    これにより、trigger-notifier というトリガーが作成されます。

Chart Creator サービスのデプロイ

Query Runner イベントを受け取り、特定の国の BigQuery テーブルからデータを取得し、Matplotlib を使用してそのデータからグラフを生成する Cloud Run サービスをデプロイします。グラフが Cloud Storage バケットにアップロードされます。

  1. chart-creator/python ディレクトリに移動します。

    cd ../../chart-creator/python
  2. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=chart-creator
    docker build -t $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  3. コンテナ イメージを Cloud Run にデプロイし、BUCKET を渡します。

    gcloud run deploy ${SERVICE_NAME} \
        --image $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET} \
        --allow-unauthenticated

サービス URL が表示されたら、デプロイは完了しています。

Chart Creator サービスのトリガーの作成

Cloud Run にデプロイされた Chart Creator サービスの Eventarc トリガーは、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"

    これにより、trigger-chart-creator というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)'))

Query Runner サービスのデプロイ

Cloud Scheduler イベントを受け取り、一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存する Cloud Run サービスをデプロイします。

  1. processing-pipelines ディレクトリに移動します。

    cd ../../..
  2. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=query-runner
    docker build -t $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  3. コンテナ イメージを Cloud Run にデプロイし、PROJECT_IDTOPIC_QUERY_COMPLETED を渡します。

    gcloud run deploy ${SERVICE_NAME} \
        --image $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED} \
        --allow-unauthenticated

サービス URL が表示されたら、デプロイは完了しています。

Query Runner サービスのトリガーの作成

Cloud Run にデプロイされた Query Runner サービスの Eventarc トリガーは Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"

    これにより、trigger-query-runner というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)')

ジョブのスケジュール設定

処理パイプラインは、2 つの Cloud Scheduler ジョブによってトリガーされます。

  1. Cloud Scheduler に必要な App Engine アプリを作成し、適切なロケーションを指定します。

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. 1 日に 1 回 Pub/Sub トピックに公開する Cloud Scheduler ジョブを 2 つ作成します。

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    スケジュールは unix-cron 形式で指定します。たとえば、0 16 * * * はジョブが毎日午後 4 時(UTC)に実行されることを意味します。

パイプラインの実行

  1. まず、すべてのトリガーが正常に作成されたことを確認します。

    gcloud eventarc triggers list

    出力は次のようになります。

    NAME: trigger-chart-creator
    TYPE: google.cloud.pubsub.topic.v1.messagePublished
    DESTINATION: Cloud Run service: chart-creator
    ACTIVE: Yes
    LOCATION: us-central1
    
    NAME: trigger-notifier
    TYPE: google.cloud.audit.log.v1.written
    DESTINATION: Cloud Run service: notifier
    ACTIVE: Yes
    LOCATION: us-central1
    
    NAME: trigger-query-runner
    TYPE: google.cloud.pubsub.topic.v1.messagePublished
    DESTINATION: Cloud Run service: query-runner
    ACTIVE: Yes
    LOCATION: us-central1
    
  2. Cloud Scheduler ジョブ ID を取得します。

    gcloud scheduler jobs list

    出力は次のようになります。

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. ジョブは毎日午後 4 時から午後 5 時に実行されるようにスケジュールされていますが、Cloud Scheduler ジョブを手動で実行することもできます。

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. 数分後、Cloud Storage バケットに 2 つのグラフが作成されていることを確認します。

    gcloud storage ls gs://${BUCKET}

    出力は次のようになります。

    gs://BUCKET/chart-cyprus.png
    gs://BUCKET/chart-unitedkingdom.png
    

これで、グラフのリンクを含むメールが 2 通届いたはずです。

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用していて、このチュートリアルで行った変更を追加せずに残す場合は、チュートリアル用に作成したリソースを削除します

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

チュートリアル リソースの削除

  1. このチュートリアルでデプロイした Cloud Run サービスを削除します。

    gcloud run services delete SERVICE_NAME

    SERVICE_NAME は、選択したサービス名です。

    Cloud Run サービスは Google Cloud コンソールから削除することもできます。

  2. チュートリアルの設定で追加した Google Cloud CLI のデフォルト構成を削除します。

    gcloud config unset project
    gcloud config unset run/region
    gcloud config unset run/platform
    gcloud config unset eventarc/location
  3. このチュートリアルで作成した Eventarc トリガーをすべて削除します。

     gcloud eventarc triggers delete TRIGGER_NAME
     
    TRIGGER_NAME は、実際のトリガー名に置き換えます。

  4. Artifact Registry からイメージを削除します。

    gcloud artifacts docker images delete $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
    gcloud artifacts docker images delete $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
    gcloud artifacts docker images delete $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
  5. バケット内のすべてのオブジェクトと一緒にバケットを削除します。

    gcloud storage rm --recursive gs://${BUCKET}/
  6. Cloud Scheduler ジョブを削除します。

    gcloud scheduler jobs delete cre-scheduler-cy
    gcloud scheduler jobs delete cre-scheduler-uk

次のステップ