Eventarc を使用して Knative serving 用の BigQuery 処理パイプラインを構築する


このチュートリアルでは、Eventarc を使用して、一般公開 BigQuery データセットに対するクエリのスケジューリング、データに基づいたグラフの生成、メールによるグラフのリンクの共有を行う処理パイプラインの構築方法について説明します。

目標

このチュートリアルでは、Google Kubernetes Engine(GKE)クラスタで実行され、Eventarc を使用してイベントを受信する 3 つの Knative serving サービスをビルドしてデプロイします。

  1. Query Runner - Cloud Scheduler ジョブが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、BigQuery API を使用して一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存します。
  2. Chart Creator - Query Runner サービスが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、Python プロット ライブラリ(Matplotlib)を使用してグラフを生成し、Cloud Storage バケットに保存します。
  3. Notifier - Chart Creator サービスが Cloud Storage バケットにグラフを保存すると監査ログによってトリガーされます。このサービスは、メールサービス(SendGrid)を使用して、グラフのリンクをメールアドレスに送信します。

次の図は、アーキテクチャの概要を示しています。

BigQuery 処理パイプライン

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  12. Cloud Storage の場合は、ADMIN_READDATA_WRITEDATA_READ のデータアクセス タイプの監査ロギングを有効にします。

    1. Google Cloud プロジェクト、フォルダ、または組織に関連付けられた Identity and Access Management(IAM)ポリシーを読み取り、一時ファイルに保存します。
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. テキスト エディタで /tmp/policy.yaml を開き、auditConfigs セクションで監査ログの構成のみを追加または変更します。

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. 新しい IAM ポリシーを作成します。

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      上記のコマンドで別の変更との競合が報告された場合は、IAM ポリシーの読み取りからやり直してください。詳細については、API でデータアクセス監査ログを構成するをご覧ください。

  13. このチュートリアルで使用するデフォルトを設定します。
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    gcloud config set project $PROJECT_ID
    gcloud config set run/region $CLUSTER_LOCATION
    gcloud config set run/cluster $CLUSTER_NAME
    gcloud config set run/cluster_location $CLUSTER_LOCATION
    gcloud config set run/platform gke
    gcloud config set eventarc/location $CLUSTER_LOCATION

    PROJECT_ID は、実際のプロジェクト ID に置き換えます。

SendGrid API キーを作成する

SendGrid はクラウドベースのメール プロバイダで、メールサーバーを維持せずにメールを送信できます。

  1. SendGrid にログインし、[Settings] > [API Keys] の順に移動します。
  2. [Create API Key] をクリックします。
  3. キーの権限を選択します。メールを送信するには、キーに少なくとも Mail send(メール送信)の権限が必要です。
  4. [Save] をクリックして、キーを作成します。
  5. SendGrid によって、新しいキーが生成されます。これは、キーの唯一のコピーであるため、後で使用できるようにキーをコピーして保存してください。

GKE クラスタを作成する

Workload Identity Federation for GKE を有効にしてクラスタを作成し、GKE 内で実行されているアプリケーションから Google Cloud サービスにアクセスできるようにします。Eventarc を使用してイベントを転送するには、Workload Identity Federation for GKE も必要です。

  1. CloudRunHttpLoadBalancingHorizontalPodAutoscaling の各アドオンを有効にした Knative serving 用の GKE クラスタを作成します。

    gcloud beta container clusters create $CLUSTER_NAME \
        --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
        --machine-type=n1-standard-4 \
        --enable-autoscaling --min-nodes=2 --max-nodes=10 \
        --no-issue-client-certificate --num-nodes=2  \
        --logging=SYSTEM,WORKLOAD \
        --monitoring=SYSTEM \
        --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
        --zone us-central1 \
        --release-channel=rapid \
        --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. クラスタの作成が完了するまで数分待ちます。処理中に、無視しても問題のない警告が表示されることがあります。クラスタが作成されると、出力は次のようになります。

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. Docker コンテナ イメージを保存する Artifact Registry 標準リポジトリを作成します。

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=$CLUSTER_LOCATION

    REPOSITORY は、リポジトリの一意の名前に置き換えます。

GKE サービス アカウントを構成する

デフォルトのコンピューティング サービス アカウントとして機能する GKE サービス アカウントを構成します。

  1. サービス アカウント間に Identity and Access Management(IAM)バインディングを作成します。

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
        $PROJECT_NUMBER-compute@developer.gserviceaccount.com
  2. コンピューティング サービス アカウントのメールアドレスを使用して、iam.gke.io/gcp-service-account アノテーションを GKE サービス アカウントに追加します。

    kubectl annotate serviceaccount \
        --namespace default \
        default \
        iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

GKE の宛先を有効にする

Eventarc が GKE クラスタ内のリソースを管理できるようにするには、GKE の宛先を有効にし、Eventarc サービス アカウントを必要なロールにバインドします。

  1. Eventarc 用に GKE の宛先を有効にします。

    gcloud eventarc gke-destinations init
  2. 必要なロールをバインドするよう求められたら、「y」と入力します。

    次のロールがバインドされます。

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

サービス アカウントを作成してアクセスロールをバインドする

Eventarc トリガーを作成する前に、ユーザー管理のサービス アカウントを設定して特定のロールを付与し、Eventarc が Pub/Sub イベントを転送できるようにします。

  1. TRIGGER_GSA という名前のサービス アカウントを作成します。

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. サービス アカウントに pubsub.subscribermonitoring.metricWritereventarc.eventReceiver のロールを付与します。

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/eventarc.eventReceiver"

Cloud Storage バケットを作成する

グラフを保存する Cloud Storage バケットを作成します。バケットとグラフが GKE サービスと同じリージョンで一般公開されていることを確認します。

export BUCKET="$(gcloud config get-value core/project)-charts"
gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region) --uniform-bucket-level-access
gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

リポジトリのクローンを作成する

GitHub リポジトリのクローンを作成します。

git clone https://github.com/GoogleCloudPlatform/eventarc-samples
cd eventarc-samples/processing-pipelines

Notifier サービスをデプロイする

bigquery/notifier/python ディレクトリから、Knative serving サービスをデプロイします。このサービスは、Chart Creator イベントを受信し、SendGrid を使用して、生成されたグラフへのリンクをメールで送信します。

  1. コンテナ イメージをビルドして push します。

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. コンテナ イメージを Knative serving にデプロイし、メールの送信先アドレスと SendGrid API キーを渡します。

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

    次のように置き換えます。

    • EMAIL_ADDRESS: 生成されたグラフのリンクを送信するメールアドレス
    • YOUR_SENDGRID_API_KEY: 前にメモした SendGrid API キー

サービス URL が表示されたら、デプロイは完了しています。

Notifier サービスのトリガーを作成する

Knative serving にデプロイされた Notifier サービスの Eventarc トリガーで、methodName が storage.objects.create の Cloud Storage 監査ログをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    これにより、trigger-notifier-gke というトリガーが作成されます。

Chart Creator サービスをデプロイする

bigquery/chart-creator/python ディレクトリで、Query Runner イベントを受信し、特定の国の BigQuery テーブルからデータを取得して、そのデータから Matplotlib を使用してグラフを生成する Knative serving サービスをデプロイします。グラフが Cloud Storage バケットにアップロードされます。

  1. コンテナ イメージをビルドして push します。

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. BUCKET を渡してコンテナ イメージを Knative serving にデプロイします。

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET}

サービス URL が表示されたら、デプロイは完了しています。

Chart Creator サービスのトリガーを作成する

Knative serving にデプロイされた Chart Creator サービスの Eventarc トリガーで、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    これにより、trigger-chart-creator-gke というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))

Query Runner サービスをデプロイする

processing-pipelines ディレクトリで、Cloud Scheduler イベントを受信し、一般公開の COVID-19 データセットからデータを取得して、結果を新しい BigQuery テーブルに保存する Knative serving サービスをデプロイします。

  1. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=query-runner
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  2. PROJECT_IDTOPIC_QUERY_COMPLETED を渡してコンテナ イメージを Knative serving にデプロイします。

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

サービス URL が表示されたら、デプロイは完了しています。

Query Runner サービスのトリガーを作成する

Knative serving にデプロイされた Query Runner サービスの Eventarc トリガーで、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    これにより、trigger-query-runner-gke というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')

ジョブのスケジュール設定

処理パイプラインは、2 つの Cloud Scheduler ジョブによってトリガーされます。

  1. Cloud Scheduler に必要な App Engine アプリを作成し、適切なロケーションeurope-west など)を指定します。

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. 1 日に 1 回 Pub/Sub トピックに公開する Cloud Scheduler ジョブを 2 つ作成します。

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    スケジュールは unix-cron 形式で指定します。たとえば、0 16 * * * はジョブが毎日午後 4 時(UTC)に実行されることを意味します。

パイプラインの実行

  1. すべてのトリガーが正常に作成されたことを確認します。

    gcloud eventarc triggers list

    出力は次のようになります。

    NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
    trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
    
  2. Cloud Scheduler ジョブ ID を取得します。

    gcloud scheduler jobs list

    出力は次のようになります。

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. ジョブは毎日午後 4 時から午後 5 時に実行されるようにスケジュールされていますが、Cloud Scheduler ジョブを手動で実行することもできます。

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. 数分後、Cloud Storage バケットに 2 つのグラフが作成されていることを確認します。

    gcloud storage ls gs://${BUCKET}

    出力は次のようになります。

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

これで、グラフのリンクを含むメールが 2 通届いたはずです。

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用していて、このチュートリアルで行った変更を追加せずに残す場合は、チュートリアル用に作成したリソースを削除します

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

チュートリアル リソースの削除

  1. このチュートリアルでデプロイした Knative serving サービスを削除します。

    gcloud run services delete SERVICE_NAME

    SERVICE_NAME は、選択したサービス名です。

    Knative serving サービスは、Google Cloud コンソールから削除することもできます。

  2. このチュートリアルで作成した Eventarc トリガーをすべて削除します。

    gcloud eventarc triggers delete TRIGGER_NAME
    

    TRIGGER_NAME は実際のトリガー名に置き換えます。

  3. チュートリアルの設定で追加した Google Cloud CLI のデフォルト構成を削除します。

    gcloud config unset project
    gcloud config unset run/cluster
    gcloud config unset run/cluster_location
    gcloud config unset run/platform
    gcloud config unset eventarc/location
    gcloud config unset compute/zone
  4. Artifact Registry からイメージを削除します。

    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
  5. バケット内のすべてのオブジェクトと一緒にバケットを削除します。

    gcloud storage rm --recursive gs://${BUCKET}/
  6. Cloud Scheduler ジョブを削除します。

    gcloud scheduler jobs delete cre-scheduler-cy
    gcloud scheduler jobs delete cre-scheduler-uk

次のステップ