Eventarc を使用して Cloud Run for Anthos の BigQuery 処理パイプラインを作成する


このチュートリアルでは、Eventarc を使用して、一般公開 BigQuery データセットに対するクエリのスケジューリング、データに基づいたグラフの生成、メールによるグラフのリンクの共有を行う処理パイプラインを構築する方法について説明します。

目標

このチュートリアルでは、Google Kubernetes Engine(GKE)クラスタで実行され、Eventarc を使用してイベントを受信する 3 つの Cloud Run for Anthos サービスをビルドしてデプロイします。

  1. Query Runner - Cloud Scheduler ジョブが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、BigQuery API を使用して一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存します。
  2. Chart Creator - Query Runner サービスが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、Python プロット ライブラリ(Matplotlib)を使用してグラフを生成し、Cloud Storage バケットに保存します。
  3. Notifier - Chart Creator サービスが Cloud Storage バケットにグラフを保存すると監査ログによってトリガーされます。このサービスは、メールサービス(SendGrid)を使用して、グラフのリンクをメールアドレスに送信します。

次の図は、アーキテクチャの概要を示しています。

BigQuery 処理パイプライン

料金

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Artifact Registry, Cloud Build, Cloud Logging, Cloud Run for Anthos, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager API を有効にします。

    gcloud services enable artifactregistry.googleapis.comcloudbuild.googleapis.comcloudresourcemanager.googleapis.comcloudscheduler.googleapis.comcontainer.googleapis.comeventarc.googleapis.compubsub.googleapis.comrun.googleapis.comlogging.googleapis.com
  7. Google Cloud CLI をインストールします。
  8. gcloud CLI を初期化するには:

    gcloud init
  9. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  10. Google Cloud プロジェクトで課金が有効になっていることを確認します

  11. Artifact Registry, Cloud Build, Cloud Logging, Cloud Run for Anthos, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager API を有効にします。

    gcloud services enable artifactregistry.googleapis.comcloudbuild.googleapis.comcloudresourcemanager.googleapis.comcloudscheduler.googleapis.comcontainer.googleapis.comeventarc.googleapis.compubsub.googleapis.comrun.googleapis.comlogging.googleapis.com
  12. Cloud Storage の場合は、ADMIN_READDATA_WRITEDATA_READ のデータアクセス タイプの監査ロギングを有効にします。

    1. Google Cloud プロジェクト、フォルダ、または組織に関連付けられた Identity and Access Management(IAM)ポリシーを読み取り、一時ファイルに保存します。
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. テキスト エディタで /tmp/policy.yaml を開き、auditConfigs セクションで監査ログの構成のみを追加または変更します。

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. 新しい IAM ポリシーを作成します。

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      上記のコマンドで別の変更との競合が報告された場合は、IAM ポリシーの読み取りからやり直してください。詳細については、API でデータアクセス監査ログを構成するをご覧ください。

  13. このチュートリアルで使用するデフォルトを設定します。
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    gcloud config set project $PROJECT_ID
    gcloud config set run/region $CLUSTER_LOCATION
    gcloud config set run/cluster $CLUSTER_NAME
    gcloud config set run/cluster_location $CLUSTER_LOCATION
    gcloud config set run/platform gke
    gcloud config set eventarc/location $CLUSTER_LOCATION
    

    PROJECT_ID は、実際のプロジェクト ID に置き換えます。

SendGrid API キーを作成する

SendGrid はクラウドベースのメール プロバイダで、メールサーバーを維持せずにメールを送信できます。

  1. SendGrid にログインし、[Settings] > [API Keys] の順に移動します。
  2. [Create API Key] をクリックします。
  3. キーの権限を選択します。メールを送信するには、キーに少なくとも Mail send(メール送信)の権限が必要です。
  4. [Save] をクリックして、キーを作成します。
  5. SendGrid によって、新しいキーが生成されます。これは、キーの唯一のコピーであるため、後で使用できるようにキーをコピーして保存してください。

Cloud Run for Anthos 用の GKE クラスタを作成する

クラスタを作成して Workload Identity を有効にし、GKE 内で実行されているアプリケーションから Google Cloud サービスにアクセスできるようにします。Eventarc を使用してイベントを転送するには、Workload Identity も必要です。

  1. CloudRunHttpLoadBalancingHorizontalPodAutoscaling アドオンを有効にして Cloud Run for Anthos 用の GKE クラスタを作成します。

    gcloud beta container clusters create $CLUSTER_NAME \
        --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
        --machine-type=n1-standard-4 \
        --enable-autoscaling --min-nodes=2 --max-nodes=10 \
        --no-issue-client-certificate --num-nodes=2  \
        --logging=SYSTEM,WORKLOAD \
        --monitoring=SYSTEM \
        --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
        --zone us-central1 \
        --release-channel=rapid \
        --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. クラスタの作成が完了するまで数分待ちます。処理中に、無視しても問題のない警告が表示されることがあります。クラスタが作成されると、出力は次のようになります。

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. Docker コンテナ イメージを保存する Artifact Registry 標準リポジトリを作成します。

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=$CLUSTER_LOCATION

    REPOSITORY は、リポジトリの一意の名前に置き換えます。

GKE サービス アカウントを構成する

デフォルトのコンピューティング サービス アカウントとして機能する GKE サービス アカウントを構成します。

  1. サービス アカウント間に Identity and Access Management(IAM)バインディングを作成します。

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
        $PROJECT_NUMBER-compute@developer.gserviceaccount.com
    
  2. コンピューティング サービス アカウントのメールアドレスを使用して、iam.gke.io/gcp-service-account アノテーションを GKE サービス アカウントに追加します。

    kubectl annotate serviceaccount \
        --namespace default \
        default \
        iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com
    

GKE の宛先を有効にする

Eventarc が GKE クラスタ内のリソースを管理できるようにするには、GKE の宛先を有効にし、Eventarc サービス アカウントを必要なロールにバインドします。

  1. Eventarc 用に GKE の宛先を有効にします。

    gcloud eventarc gke-destinations init
    
  2. 必要なロールをバインドするよう求められたら、「y」と入力します。

    次のロールがバインドされます。

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

サービス アカウントを作成してアクセスロールをバインドする

Eventarc トリガーを作成する前に、ユーザー管理のサービス アカウントを設定して特定のロールを付与し、Eventarc が Pub/Sub イベントを転送できるようにします。

  1. TRIGGER_GSA という名前のサービス アカウントを作成します。

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. サービス アカウントに pubsub.subscribermonitoring.metricWritereventarc.eventReceiver のロールを付与します。

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/eventarc.eventReceiver"
    

Cloud Storage バケットを作成する

グラフを保存する Cloud Storage バケットを作成します。バケットとグラフが Cloud Run for Anthos サービスと同じリージョンで一般公開されていることを確認します。

export BUCKET="$(gcloud config get-value core/project)-charts"
gsutil mb -l $(gcloud config get-value run/region) gs://${BUCKET}
gsutil uniformbucketlevelaccess set on gs://${BUCKET}
gsutil iam ch allUsers:objectViewer gs://${BUCKET}

リポジトリのクローンを作成する

GitHub リポジトリのクローンを作成します。

git clone https://github.com/GoogleCloudPlatform/eventarc-samples
cd eventarc-samples/processing-pipelines

Notifier サービスをデプロイする

bigquery/notifier/python ディレクトリから、Cloud Run for Anthos サービスをデプロイします。このサービスは、Chart Creator イベントを受信し、SendGrid を使用して、生成されたグラフへのリンクをメールで送信します。

  1. コンテナ イメージをビルドして push します。

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
    
  2. コンテナ イメージを Cloud Run for Anthos にデプロイし、メールの送信先アドレスと SendGrid API キーを渡します。

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}
    

    次のように置き換えます。

    • EMAIL_ADDRESS: 生成されたグラフのリンクを送信するメールアドレス
    • YOUR_SENDGRID_API_KEY: 前にメモした SendGrid API キー

サービス URL が表示されたら、デプロイは完了しています。

Notifier サービスのトリガーを作成する

Cloud Run for Anthos にデプロイされた Notifier サービスの Eventarc トリガーは、methodName が storage.objects.create の Cloud Storage 監査ログをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
    

    これにより、trigger-notifier-gke というトリガーが作成されます。

Chart Creator サービスをデプロイする

Query Runner イベントを受信して、BigQuery テーブルから特定の国のデータを取得し、そのデータから Matplotlib を使用してグラフを生成する Cloud Run for Anthos サービスを bigquery/chart-creator/python ディレクトリからデプロイします。グラフが Cloud Storage バケットにアップロードされます。

  1. コンテナ イメージをビルドして push します。

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
    
  2. コンテナ イメージを Cloud Run for Anthos にデプロイし、BUCKET を渡します。

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET}
    

サービス URL が表示されたら、デプロイは完了しています。

Chart Creator サービスのトリガーを作成する

Cloud Run for Anthos にデプロイされた Chart Creator サービスの Eventarc トリガーは、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
    

    これにより、trigger-chart-creator-gke というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))
    

Query Runner サービスをデプロイする

Cloud Scheduler イベントを受信し、一般公開の COVID-19 データセットからデータを取得して、結果を新しい BigQuery テーブルに保存する Cloud Run for Anthos サービスを processing-pipelines ディレクトリからデプロイします。

  1. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=query-runner
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    
  2. コンテナ イメージを Cloud Run for Anthos にデプロイし、PROJECT_IDTOPIC_QUERY_COMPLETED を渡します。

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}
    

サービス URL が表示されたら、デプロイは完了しています。

Query Runner サービスのトリガーを作成する

Cloud Run for Anthos にデプロイされた Query Runner サービスの Eventarc トリガーは、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
    

    これにより、trigger-query-runner-gke というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')
    

ジョブのスケジュール設定

処理パイプラインは、2 つの Cloud Scheduler ジョブによってトリガーされます。

  1. Cloud Scheduler に必要な App Engine アプリを作成し、適切なロケーションeurope-west など)を指定します。

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
    
  2. 1 日に 1 回 Pub/Sub トピックに公開する Cloud Scheduler ジョブを 2 つ作成します。

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"
    

    スケジュールは unix-cron 形式で指定します。たとえば、0 16 * * * はジョブが毎日午後 4 時(UTC)に実行されることを意味します。

パイプラインの実行

  1. すべてのトリガーが正常に作成されたことを確認します。

    gcloud eventarc triggers list
    

    出力例を以下に示します。

    NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
    trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
    
  2. Cloud Scheduler ジョブ ID を取得します。

    gcloud scheduler jobs list
    

    出力は次のようになります。

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. ジョブは毎日午後 4 時から午後 5 時に実行されるようにスケジュールされていますが、Cloud Scheduler ジョブを手動で実行することもできます。

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
    
  4. 数分後、Cloud Storage バケットに 2 つのグラフが作成されていることを確認します。

    gsutil ls gs://${BUCKET}
    

    出力は次のようになります。

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

これで、グラフのリンクを含むメールが 2 通届いたはずです。

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用していて、このチュートリアルで行った変更を追加せずに残す場合は、チュートリアル用に作成したリソースを削除します

    Google Cloud プロジェクトを削除します。

    gcloud projects delete PROJECT_ID

チュートリアル リソースを削除する

  1. このチュートリアルでデプロイした Cloud Run for Anthos サービスをすべて削除します。

    gcloud run services delete SERVICE_NAME

    SERVICE_NAME は、選択したサービス名です。

    Google Cloud Console から Cloud Run for Anthos のサービスを削除することもできます。

  2. このチュートリアルで作成した Eventarc トリガーをすべて削除します。

    gcloud eventarc triggers delete TRIGGER_NAME
    

    TRIGGER_NAME は実際のトリガー名に置き換えます。

  3. チュートリアルの設定で追加した Google Cloud CLI のデフォルト構成を削除します。

    gcloud config unset project
    gcloud config unset run/cluster
    gcloud config unset run/cluster_location
    gcloud config unset run/platform
    gcloud config unset eventarc/location
    gcloud config unset compute/zone
    
  4. Artifact Registry からイメージを削除します。

    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
    
  5. バケット内のすべてのオブジェクトと一緒にバケットを削除します。

    gcloud storage rm --recursive gs://${BUCKET}/
  6. Cloud Scheduler ジョブを削除します。

    gcloud scheduler jobs delete cre-scheduler-cy
    gcloud scheduler jobs delete cre-scheduler-uk
    

次のステップ