Eventarc を使用して Cloud Run for Anthos の BigQuery 処理パイプラインを作成する

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このチュートリアルでは、Eventarc を使用して、一般公開 BigQuery データセットに対するクエリのスケジューリング、データに基づいたグラフの生成、メールによるグラフのリンクの共有を行う処理パイプラインを構築する方法について説明します。

目標

このチュートリアルでは、Google Kubernetes Engine(GKE)クラスタで実行され、Eventarc を使用してイベントを受信する 3 つの Cloud Run for Anthos サービスをビルドしてデプロイします。

  1. Query Runner - Cloud Scheduler ジョブが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、BigQuery API を使用して一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存します。
  2. Chart Creator - Query Runner サービスが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、Python プロット ライブラリ(Matplotlib)を使用してグラフを生成し、Cloud Storage バケットに保存します。
  3. Notifier - Chart Creator サービスが Cloud Storage バケットにグラフを保存すると監査ログによってトリガーされます。このサービスは、メールサービス(SendGrid)を使用して、グラフのリンクをメールアドレスに送信します。

次の図は、アーキテクチャの概要を示しています。

BigQuery 処理パイプライン

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Cloud Build, Cloud Logging, Cloud Run for Anthos, Cloud Scheduler, Container Registry, Eventarc, GKE, Pub/Sub, and Resource Manager API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  7. Cloud Build, Cloud Logging, Cloud Run for Anthos, Cloud Scheduler, Container Registry, Eventarc, GKE, Pub/Sub, and Resource Manager API を有効にします。

    API を有効にする

  8. Google Cloud CLI をインストールして初期化します。
  9. gcloud コンポーネントを更新します。
    gcloud components update
  10. 自分のアカウントでログインします。
    gcloud auth login
  11. Google Cloud Storage を選択して、管理読み取りデータ読み取りデータ書き込みの各ログタイプを有効にします。 Cloud Audit Logs に移動
  12. このチュートリアルで使用するデフォルトを設定します。
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    
    gcloud config set project $PROJECT_ID
    gcloud config set run/region $CLUSTER_LOCATION
    gcloud config set run/cluster $CLUSTER_NAME
    gcloud config set run/cluster_location $CLUSTER_LOCATION
    gcloud config set run/platform gke
    gcloud config set eventarc/location $CLUSTER_LOCATION
    
    PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  13. Git ソースコード管理ツールをダウンロードしてインストールします。

SendGrid API キーの作成

SendGrid はクラウドベースのメール プロバイダで、メールサーバーを維持せずにメールを送信できます。

  1. SendGrid にログインし、[Settings] > [API Keys] の順に移動します。
  2. [Create API Key] をクリックします。
  3. キーの権限を選択します。メールを送信するには、キーに少なくとも Mail send(メール送信)の権限が必要です。
  4. [Save] をクリックして、キーを作成します。
  5. SendGrid によって、新しいキーが生成されます。これは、キーの唯一のコピーであるため、後で使用できるようにキーをコピーして保存してください。

Cloud Run for Anthos 用の GKE クラスタを作成する

クラスタを作成して Workload Identity を有効にし、GKE 内で実行されているアプリケーションから Google Cloud サービスにアクセスできるようにします。Eventarc を使用してイベントを転送するには、Workload Identity も必要です。

  1. CloudRunHttpLoadBalancingHorizontalPodAutoscaling アドオンを有効にして Cloud Run for Anthos 用の GKE クラスタを作成します。

    gcloud beta container clusters create $CLUSTER_NAME \
      --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
      --machine-type=n1-standard-4 \
      --enable-autoscaling --min-nodes=2 --max-nodes=10 \
      --no-issue-client-certificate --num-nodes=2  \
      --logging=SYSTEM,WORKLOAD \
      --monitoring=SYSTEM \
      --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
      --zone us-central1 \
      --release-channel=rapid \
      --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. クラスタの作成が完了するまで数分待ちます。処理中に、無視しても問題のない警告が表示されることがあります。クラスタが作成されると、出力は次のようになります。

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/my-cluster].
    

GKE サービス アカウントを構成する

デフォルトのコンピューティング サービス アカウントとして機能する GKE サービス アカウントを構成します。

  1. サービス アカウント間に Identity and Access Management(IAM)バインディングを作成します。

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
      --role roles/iam.workloadIdentityUser \
      --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
      $PROJECT_NUMBER-compute@developer.gserviceaccount.com
    
  2. コンピューティング サービス アカウントのメールアドレスを使用して、iam.gke.io/gcp-service-account アノテーションを GKE サービス アカウントに追加します。

    kubectl annotate serviceaccount \
      --namespace default \
      default \
      iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com
    

GKE の宛先を有効にする

Eventarc が GKE クラスタ内のリソースを管理できるようにするには、GKE の宛先を有効にし、Eventarc サービス アカウントを必要なロールにバインドします。

  1. Eventarc 用に GKE の宛先を有効にします。

    gcloud eventarc gke-destinations init
    
  2. 必要なロールをバインドするよう求められたら、「y」と入力します。

    次のロールがバインドされます。

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

サービス アカウントを作成してアクセスロールをバインドする

Eventarc トリガーを作成する前に、ユーザー管理のサービス アカウントを設定して特定のロールを付与し、Eventarc が Pub/Sub イベントを転送できるようにします。

  1. TRIGGER_GSA という名前のサービス アカウントを作成します。

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. サービス アカウントに pubsub.subscribermonitoring.metricWritereventarc.eventReceiver のロールを付与します。

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
    --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
    --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
    --role "roles/eventarc.eventReceiver"
    

Cloud Storage バケットを作成する

グラフを保存する Cloud Storage バケットを作成します。バケットとグラフが Cloud Run for Anthos サービスと同じリージョンで一般公開されていることを確認します。

  export BUCKET="$(gcloud config get-value core/project)-charts"
  gsutil mb -l $(gcloud config get-value run/region) gs://${BUCKET}
  gsutil uniformbucketlevelaccess set on gs://${BUCKET}
  gsutil iam ch allUsers:objectViewer gs://${BUCKET}
  

リポジトリのクローンを作成する

GitHub リポジトリのクローンを作成します。

  git clone https://github.com/GoogleCloudPlatform/eventarc-samples
  cd eventarc-samples/processing-pipelines

Notifier サービスをデプロイする

bigquery/notifier/python ディレクトリから、Cloud Run for Anthos サービスをデプロイします。このサービスは、Chart Creator イベントを受信し、SendGrid を使用して、生成されたグラフへのリンクをメールで送信します。

  1. コンテナ イメージをビルドして push します。

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    popd
    
  2. コンテナ イメージを Cloud Run for Anthos にデプロイし、メールの送信先アドレスと SendGrid API キーを渡します。

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}
    

    次のように置き換えます。

    • EMAIL_ADDRESS は、生成されたグラフのリンクを送信するメールアドレスで置き換えます。
    • YOUR_SENDGRID_API_KEY は、前にメモした SendGrid API キーで置き換えます。

サービス URL が表示されたら、デプロイは完了しています。

Notifier サービスのトリガーを作成する

Cloud Run for Anthos にデプロイされた Notifier サービスの Eventarc トリガーは、methodName が storage.objects.create の Cloud Storage 監査ログをフィルタリングします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
    

    これにより、trigger-notifier-gke というトリガーが作成されます。

Chart Creator サービスをデプロイする

Query Runner イベントを受信して、BigQuery テーブルから特定の国のデータを取得し、そのデータから Matplotlib を使用してグラフを生成する Cloud Run for Anthos サービスを bigquery/chart-creator/python ディレクトリからデプロイします。グラフが Cloud Storage バケットにアップロードされます。

  1. コンテナ イメージをビルドして push します。

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    popd
    
  2. コンテナ イメージを Cloud Run for Anthos にデプロイし、BUCKET を渡します。

    gcloud run deploy ${SERVICE_NAME} \
      --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
      --update-env-vars BUCKET=${BUCKET}
    

サービス URL が表示されたら、デプロイは完了しています。

Chart Creator サービスのトリガーを作成する

Cloud Run for Anthos にデプロイされた Chart Creator サービスの Eventarc トリガーは、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
      --destination-gke-cluster=$CLUSTER_NAME \
      --destination-gke-location=$CLUSTER_LOCATION \
      --destination-gke-namespace=default \
      --destination-gke-service=$SERVICE_NAME \
      --destination-gke-path=/ \
      --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
      --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
    

    これにより、trigger-chart-creator-gke というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))
    

Query Runner サービスをデプロイする

Cloud Scheduler イベントを受信し、一般公開の COVID-19 データセットからデータを取得して、結果を新しい BigQuery テーブルに保存する Cloud Run for Anthos サービスを processing-pipelines ディレクトリからデプロイします。

  1. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=query-runner
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 -f bigquery/${SERVICE_NAME}/csharp/Dockerfile .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  2. コンテナ イメージを Cloud Run for Anthos にデプロイし、PROJECT_IDTOPIC_QUERY_COMPLETED を渡します。

    gcloud run deploy ${SERVICE_NAME} \
      --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
      --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}
    

サービス URL が表示されたら、デプロイは完了しています。

Query Runner サービスのトリガーを作成する

Cloud Run for Anthos にデプロイされた Query Runner サービスの Eventarc トリガーは Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
      --destination-gke-cluster=$CLUSTER_NAME \
      --destination-gke-location=$CLUSTER_LOCATION \
      --destination-gke-namespace=default \
      --destination-gke-service=$SERVICE_NAME \
      --destination-gke-path=/ \
      --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
      --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com
    

    これにより、trigger-query-runner-gke というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')
    

ジョブのスケジュール設定

処理パイプラインは、2 つの Cloud Scheduler ジョブによってトリガーされます。

  1. Cloud Scheduler に必要な App Engine アプリを作成し、適切なロケーションeurope-west など)を指定します。

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
    
  2. 1 日に 1 回 Pub/Sub トピックに公開する Cloud Scheduler ジョブを 2 つ作成します。

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
      --schedule="0 16 * * *" \
      --topic=${TOPIC_QUERY_SCHEDULED} \
      --message-body="United Kingdom"
    
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
      --schedule="0 17 * * *" \
      --topic=${TOPIC_QUERY_SCHEDULED} \
      --message-body="Cyprus"
    

    スケジュールは unix-cron 形式で指定します。たとえば、0 16 * * * はジョブが毎日午後 4 時(UTC)に実行されることを意味します。

パイプラインの実行

  1. すべてのトリガーが正常に作成されたことを確認します。

    gcloud eventarc triggers list
    

    出力例を以下に示します。

    NAME                       TYPE                                            DESTINATION_RUN_SERVICE  DESTINATION_RUN_PATH  ACTIVE
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished                                                  Yes
    trigger-notifier-gke       google.cloud.audit.log.v1.written                                                              Yes
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished                                                  Yes
    
  2. Cloud Scheduler ジョブ ID を取得します。

    gcloud scheduler jobs list
    

    出力は次のようになります。

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. ジョブは毎日午後 4 時から午後 5 時に実行されるようにスケジュールされていますが、Cloud Scheduler ジョブを手動で実行することもできます。

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
    
  4. 数分後、Cloud Storage バケットに 2 つのグラフが作成されていることを確認します。

    gsutil ls gs://${BUCKET}
    

    出力は次のようになります。

    gs://BUCKET/chart-cyprus.png
    gs://BUCKET/chart-unitedkingdom.png
    

これで、グラフのリンクを含むメールが 2 通届いたはずです。

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用し、このチュートリアルで変更を加えずに残す場合は、チュートリアル用に作成したリソースを削除します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

チュートリアル リソースの削除

  1. このチュートリアルでデプロイした Cloud Run for Anthos サービスをすべて削除します。

    gcloud run services delete SERVICE_NAME

    SERVICE_NAME は、選択したサービス名です。

    Google Cloud Console から Cloud Run for Anthos のサービスを削除することもできます。

  2. このチュートリアルで作成した Eventarc トリガーをすべて削除します。

    gcloud eventarc triggers delete TRIGGER_NAME
    

    TRIGGER_NAME は実際のトリガー名に置き換えます。

  3. チュートリアルを設定したときに追加した gcloud のデフォルト構成を削除します。

    gcloud config unset project
    gcloud config unset run/cluster
    gcloud config unset run/cluster_location
    gcloud config unset run/platform
    gcloud config unset eventarc/location
    gcloud config unset compute/zone
    

次のステップ