Eventarc を使用した BigQuery 処理パイプラインの構築

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このチュートリアルでは、Eventarc を使用して、一般公開 BigQuery データセットに対するクエリのスケジューリング、データに基づいたグラフの生成、メールによるグラフのリンクの共有を行う処理パイプラインを構築する方法について説明します。

目標

このチュートリアルでは、未認証アクセスを許可し、Eventarc を使用してイベントを受信する 3 つの Cloud Run サービスをビルドしてデプロイします。

  1. Query Runner - Cloud Scheduler ジョブが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、BigQuery API を使用して一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存します。
  2. Chart Creator - Query Runner サービスが Pub/Sub トピックにメッセージを公開するとトリガーされます。このサービスは、Python プロット ライブラリ(Matplotlib)を使用してグラフを生成し、Cloud Storage バケットに保存します。
  3. Notifier - Chart Creator サービスが Cloud Storage バケットにグラフを保存すると監査ログによってトリガーされます。このサービスは、メールサービス(SendGrid)を使用して、グラフのリンクをメールアドレスに送信します。

次の図は、アーキテクチャの概要を示しています。

BigQuery 処理パイプライン

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

組織が Google Cloud 環境に制約を適用すると、このドキュメントの一部の手順が正しく機能しない場合があります。その場合、パブリック IP アドレスやサービス アカウント キーを作成するようなタスクを完了できない場合があります。制約に関するエラーを返すリクエストを行う場合は、制約がある Google Cloud 環境でアプリケーションを開発する方法をご覧ください。

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Container Registry, Eventarc, Pub/Sub API を有効にします。

    API を有効にする

  5. Google Cloud CLI をインストールして初期化します。
  6. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  7. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  8. Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Container Registry, Eventarc, Pub/Sub API を有効にします。

    API を有効にする

  9. Google Cloud CLI をインストールして初期化します。
  10. gcloud コンポーネントを更新します。
    gcloud components update
  11. 自分のアカウントでログインします。
    gcloud auth login
  12. [Google Cloud Storage] を選択し、管理読み取りデータ読み取りデータ書き込みの各ログタイプを有効にします。

    Cloud Audit Logs に移動

  13. Compute Engine サービス アカウントに eventarc.eventReceiver ロールを付与します。

    export PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member=serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
        --role='roles/eventarc.eventReceiver'
    

  14. 2021 年 4 月 8 日以前に Pub/Sub サービス アカウントを有効にした場合は、Pub/Sub サービス アカウントに iam.serviceAccountTokenCreator ロールを付与します。

    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"\
        --role='roles/iam.serviceAccountTokenCreator'
    

  15. このチュートリアルで使用するデフォルトを設定します。
    export REGION=REGION
    gcloud config set run/region ${REGION}
    gcloud config set run/platform managed
    gcloud config set eventarc/location ${REGION}
    

    REGION は、サポートされている Eventarc のロケーションに置き換えます。

  16. git ソースコードの管理ツールをダウンロードしてインストールします。

SendGrid API キーの作成

SendGrid はクラウドベースのメール プロバイダで、メールサーバーを維持せずにメールを送信できます。

  1. SendGrid にログインし、[Settings] > [API Keys] の順に移動します。
  2. [Create API Key] をクリックします。
  3. キーの権限を選択します。メールを送信するには、キーに少なくとも Mail send(メール送信)の権限が必要です。
  4. [Save] をクリックして、キーを作成します。
  5. SendGrid によって、新しいキーが生成されます。これは、キーの唯一のコピーであるため、後で使用できるようにキーをコピーして保存してください。

Cloud Storage バケットの作成

グラフを保存する一意の Cloud Storage バケットを作成します。バケットとグラフが Cloud Run サービスと同じリージョンで一般公開されていることを確認します。

  export BUCKET="$(gcloud config get-value core/project)-charts"
  gsutil mb -l $(gcloud config get-value run/region) gs://${BUCKET}
  gsutil uniformbucketlevelaccess set on gs://${BUCKET}
  gsutil iam ch allUsers:objectViewer gs://${BUCKET}
  

Notifier サービスのデプロイ

Chart Creator イベントを受信し、SendGrid でグラフのリンクをメールで送信する Cloud Run サービスをデプロイします。

  1. GitHub リポジトリのクローンを作成して、notifier/python ディレクトリに移動します。

    git clone https://github.com/GoogleCloudPlatform/eventarc-samples
    cd eventarc-samples/processing-pipelines/bigquery/notifier/python/
    
  2. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=notifier
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  3. コンテナ イメージを Cloud Run にデプロイし、メールの送信先アドレスと SendGrid API キーを渡します。

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET} \
        --allow-unauthenticated
    

    次のように置き換えます。

    • EMAIL_ADDRESS は、生成されたグラフのリンクを送信するメールアドレスで置き換えます。
    • YOUR_SENDGRID_API_KEY は、前にメモした SendGrid API キーで置き換えます。

サービス URL が表示されたら、デプロイは完了しています。

Notifier サービスのトリガーの作成

Cloud Run にデプロイされた Notifier サービスの Eventarc トリガーは、methodName が storage.objects.create の Cloud Storage 監査ログをフィルタリングします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=${PROJECT_NUMBER}-compute@developer.gserviceaccount.com
    
    

    これにより、trigger-notifier というトリガーが作成されます。

Chart Creator サービスのデプロイ

Query Runner イベントを受け取り、特定の国の BigQuery テーブルからデータを取得し、Matplotlib を使用してそのデータからグラフを生成する Cloud Run サービスをデプロイします。グラフが Cloud Storage バケットにアップロードされます。

  1. chart-creator/python ディレクトリに移動します。

    cd ../../chart-creator/python
    
  2. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=chart-creator
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  3. コンテナ イメージを Cloud Run にデプロイし、BUCKET を渡します。

    gcloud run deploy ${SERVICE_NAME} \
      --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
      --update-env-vars BUCKET=${BUCKET} \
      --allow-unauthenticated
    

サービス URL が表示されたら、デプロイは完了しています。

Chart Creator サービスのトリガーの作成

Cloud Run にデプロイされた Chart Creator サービスの Eventarc トリガーは、Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
      --destination-run-service=${SERVICE_NAME} \
      --destination-run-region=${REGION} \
      --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"
    
    

    これにより、trigger-chart-creator というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)'))
    

Query Runner サービスのデプロイ

Cloud Scheduler イベントを受け取り、一般公開の COVID-19 データセットからデータを取得し、結果を新しい BigQuery テーブルに保存する Cloud Run サービスをデプロイします。

  1. processing-pipelines ディレクトリに移動します。

    cd ../../..
    
  2. コンテナ イメージをビルドして push します。

    export SERVICE_NAME=query-runner
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 -f bigquery/${SERVICE_NAME}/csharp/Dockerfile .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  3. コンテナ イメージを Cloud Run にデプロイし、PROJECT_IDTOPIC_QUERY_COMPLETED を渡します。

    gcloud run deploy ${SERVICE_NAME} \
      --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
      --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED} \
      --allow-unauthenticated
    

サービス URL が表示されたら、デプロイは完了しています。

Query Runner サービスのトリガーの作成

Cloud Run にデプロイされた Query Runner サービスの Eventarc トリガーは Pub/Sub トピックに公開されたメッセージをフィルタします。

  1. トリガーを作成します。

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
      --destination-run-service=${SERVICE_NAME} \
      --destination-run-region=${REGION} \
      --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"
    

    これにより、trigger-query-runner というトリガーが作成されます。

  2. Pub/Sub トピックの環境変数を設定します。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)')
    

ジョブのスケジュール設定

処理パイプラインは、2 つの Cloud Scheduler ジョブによってトリガーされます。

  1. Cloud Scheduler に必要な App Engine アプリを作成し、適切なロケーションを指定します。

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
    
  2. 1 日に 1 回 Pub/Sub トピックに公開する Cloud Scheduler ジョブを 2 つ作成します。

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
      --schedule="0 16 * * *" \
      --topic=${TOPIC_QUERY_SCHEDULED} \
      --message-body="United Kingdom"
    
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
      --schedule="0 17 * * *" \
      --topic=${TOPIC_QUERY_SCHEDULED} \
      --message-body="Cyprus"
    

    スケジュールは unix-cron 形式で指定します。たとえば、0 16 * * * はジョブが毎日午後 4 時(UTC)に実行されることを意味します。

パイプラインの実行

  1. まず、すべてのトリガーが正常に作成されたことを確認します。

    gcloud eventarc triggers list
    

    出力例を以下に示します。

    NAME                   TYPE                                           DESTINATION_RUN_SERVICE  DESTINATION_RUN_PATH  ACTIVE
    trigger-chart-creator  google.cloud.pubsub.topic.v1.messagePublished  chart-creator                                  Yes
    trigger-notifier       google.cloud.audit.log.v1.written              notifier                                       Yes
    trigger-query-runner   google.cloud.pubsub.topic.v1.messagePublished  query-runner                                   Yes
    
  2. Cloud Scheduler ジョブ ID を取得します。

    gcloud scheduler jobs list
    

    出力は次のようになります。

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. ジョブは毎日午後 4 時から午後 5 時に実行されるようにスケジュールされていますが、Cloud Scheduler ジョブを手動で実行することもできます。

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
    
  4. 数分後、Cloud Storage バケットに 2 つのグラフが作成されていることを確認します。

    gsutil ls gs://${BUCKET}
    

    出力は次のようになります。

    gs://BUCKET/chart-cyprus.png
    gs://BUCKET/chart-unitedkingdom.png
    

これで、グラフのリンクを含むメールが 2 通届いたはずです。

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用し、このチュートリアルで変更を加えずに残す場合は、チュートリアル用に作成したリソースを削除します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

チュートリアル リソースの削除

  1. このチュートリアルでデプロイした Cloud Run サービスを削除します。

    gcloud run services delete SERVICE_NAME

    SERVICE_NAME は、選択したサービス名です。

    Cloud Run サービスは Google Cloud コンソールから削除することもできます。

  2. チュートリアルの設定時に追加した gcloud CLI のデフォルト構成を削除します。

    例:

    gcloud config unset run/region

    または

    gcloud config unset project

  3. このチュートリアルで作成した他の Google Cloud リソースを削除します。

    • Eventarc トリガーを削除します。
      gcloud eventarc triggers delete TRIGGER_NAME
      
      TRIGGER_NAME は実際のトリガー名に置き換えます。

次のステップ