使用 Eventarc 构建 BigQuery 处理流水线


本教程介绍如何使用 Eventarc 构建处理流水线,该流水线安排对公共 BigQuery 数据集的查询,根据数据生成图表,并通过电子邮件分享图表链接。

目标

在本教程中,您将构建和部署三个 Cloud Run 服务,这些服务允许未经身份验证的访问并使用 Eventarc 接收事件:

  1. Query Runner - 当 Cloud Scheduler 作业向 Pub/Sub 主题发布消息时触发,此服务使用 BigQuery API 从公共 COVID-19 数据集中检索数据,并将结果保存到新的 BigQuery 表中。
  2. Chart Creator - 在 Query Runner 服务向 Pub/Sub 主题发布消息时触发,此服务使用 Python 绘图库 Matplotlib 生成图表,并将图表保存到 Cloud Storage 存储桶。
  3. Notifier - 当 Chart Creator 服务将图表存储在 Cloud Storage 存储桶中时,由审核日志触发,此服务使用电子邮件服务 SendGrid,将图表的链接发送到某个电子邮件地址。

下图展示了高级别的基础架构:

BigQuery 处理流水线

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Eventarc, and Pub/Sub APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudscheduler.googleapis.com eventarc.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Eventarc, and Pub/Sub APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudscheduler.googleapis.com eventarc.googleapis.com logging.googleapis.com pubsub.googleapis.com run.googleapis.com
  12. 对于 Cloud Storage,为 ADMIN_READDATA_WRITEDATA_READ 数据访问类型启用审核日志记录。

    1. 读取与您的 Google Cloud 项目、文件夹或组织关联的 Identity and Access Management (IAM) 政策,并将其存储在临时文件中:
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. 在文本编辑器中,打开 /tmp/policy.yaml,然后仅在 auditConfigs 部分中添加或更改审核日志配置

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. 写入新的 IAM 政策:

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      如果上述命令报告与其他更改发生冲突,请重复以上步骤(从读取 IAM 政策开始)。如需了解详情,请参阅使用 API 配置数据访问审核日志

  13. eventarc.eventReceiver 角色授予 Compute Engine 服务账号:

    export PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member=serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com \
        --role='roles/eventarc.eventReceiver'

  14. 如果您在 2021 年 4 月 8 日或之前启用了 Pub/Sub 服务账号,请将 iam.serviceAccountTokenCreator 角色授予 Pub/Sub 服务账号:

    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"\
        --role='roles/iam.serviceAccountTokenCreator'

  15. 设置本教程中使用的默认值:
    export REGION=REGION
    gcloud config set run/region ${REGION}
    gcloud config set run/platform managed
    gcloud config set eventarc/location ${REGION}

    REGION 替换为您选择的受支持的 Eventarc 位置

创建 SendGrid API 密钥

SendGrid 是云端电子邮件服务提供商,让您无需维护电子邮件服务器即可发送电子邮件。

  1. 登录到 SendGrid 并转到 Settings > API Keys
  2. 点击 Create API Key
  3. 为该密钥选择权限。该密钥必须至少具有 Mail Send 权限才能发送电子邮件。
  4. 为密钥命名,然后点击保存以创建密钥。
  5. SendGrid 会生成一个新的密钥。这是该密钥的唯一副本,因此请务必复制并保存该密钥以供日后使用。

创建 Artifact Registry 标准制品库

创建 Artifact Registry 标准制品库以存储您的 Docker 容器映像:

gcloud artifacts repositories create REPOSITORY \
    --repository-format=docker \
    --location=$REGION

REPOSITORY 替换为制品库的唯一名称。

创建 Cloud Storage 存储桶

创建一个唯一的 Cloud Storage 存储桶以保存图表。确保存储桶和图表是公开提供的,并且与 Cloud Run 服务位于同一区域:

export BUCKET="$(gcloud config get-value core/project)-charts"
gsutil mb -l $(gcloud config get-value run/region) gs://${BUCKET}
gsutil uniformbucketlevelaccess set on gs://${BUCKET}
gsutil iam ch allUsers:objectViewer gs://${BUCKET}

部署 Notifier 服务

部署一个 Cloud Run 服务,该服务接收 Chart Creator 事件并使用 SendGrid 通过电子邮件发送生成的图表的链接。

  1. 克隆 GitHub 代码库并切换到 notifier/python 目录:

    git clone https://github.com/GoogleCloudPlatform/eventarc-samples
    cd eventarc-samples/processing-pipelines/bigquery/notifier/python/
  2. 构建并推送容器映像:

    export SERVICE_NAME=notifier
    docker build -t $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  3. 将容器映像部署到 Cloud Run,并传入电子邮件发送到的地址以及 SendGrid API 密钥:

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET} \
        --allow-unauthenticated

    替换以下内容:

    • EMAIL_ADDRESS 替换为生成的图表的链接发送到的电子邮件地址
    • YOUR_SENDGRID_API_KEY 替换为您之前记下的 SendGrid API 密钥

当您看到服务网址时,表示部署完成。

为 Notifier 服务创建触发器

部署在 Cloud Run 上的 Notifier 服务的 Eventarc 触发器过滤 methodNamestorage.objects.create 的 Cloud Storage 审核日志。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=${PROJECT_NUMBER}-compute@developer.gserviceaccount.com

    这将创建一个名为 trigger-notifier 的触发器。

部署 Chart Creator 服务

部署一个 Cloud Run 服务,该服务接收 Query Runner 事件,从 BigQuery 表检索特定国家/地区的数据,然后使用 Matplotlib 根据数据生成图表。图表将上传到 Cloud Storage 存储桶。

  1. 切换到 chart-creator/python 目录:

    cd ../../chart-creator/python
  2. 构建并推送容器映像:

    export SERVICE_NAME=chart-creator
    docker build -t $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  3. 将容器映像部署到 Cloud Run,传入 BUCKET

    gcloud run deploy ${SERVICE_NAME} \
        --image $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET} \
        --allow-unauthenticated

当您看到服务网址时,表示部署完成。

为 Chart Creator 服务创建触发器

部署在 Cloud Run 上的 Chart Creator 服务的 Eventarc 触发器过滤发布到某个 Pub/Sub 主题的消息。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"

    这将创建一个名为 trigger-chart-creator 的触发器。

  2. 设置 Pub/Sub 主题环境变量。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)'))

部署 Query Runner 服务

部署一个 Cloud Run 服务,该服务接收 Cloud Scheduler 事件,从公共 COVID-19 数据集中检索数据,并将结果保存到新的 BigQuery 表中。

  1. 切换到 processing-pipelines 目录:

    cd ../../..
  2. 构建并推送容器映像:

    export SERVICE_NAME=query-runner
    docker build -t $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  3. 将容器映像部署到 Cloud Run,传入 PROJECT_IDTOPIC_QUERY_COMPLETED

    gcloud run deploy ${SERVICE_NAME} \
        --image $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED} \
        --allow-unauthenticated

当您看到服务网址时,表示部署完成。

为 Query Runner 服务创建触发器

部署在 Cloud Run 上的 Query Runner 服务的 Eventarc 触发器过滤发布到某个 Pub/Sub 主题的消息。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"

    这将创建一个名为 trigger-query-runner 的触发器。

  2. 为 Pub/Sub 主题设置环境变量。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)')

安排作业

处理流水线由两个 Cloud Scheduler 作业触发。

  1. 创建 Cloud Scheduler 需要的 App Engine 应用,并指定适当的位置

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. 创建两个 Cloud Scheduler 作业,这些作业每天发布到 Pub/Sub 主题一次:

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    时间表以 unix-cron 格式指定。例如,0 16 * * * 表示作业在世界协调时间 (UTC) 每天 16:00(下午 4 点)运行。

运行流水线

  1. 首先,确认所有触发器已成功创建:

    gcloud eventarc triggers list

    输出应类似如下所示:

    NAME: trigger-chart-creator
    TYPE: google.cloud.pubsub.topic.v1.messagePublished
    DESTINATION: Cloud Run service: chart-creator
    ACTIVE: Yes
    LOCATION: us-central1
    
    NAME: trigger-notifier
    TYPE: google.cloud.audit.log.v1.written
    DESTINATION: Cloud Run service: notifier
    ACTIVE: Yes
    LOCATION: us-central1
    
    NAME: trigger-query-runner
    TYPE: google.cloud.pubsub.topic.v1.messagePublished
    DESTINATION: Cloud Run service: query-runner
    ACTIVE: Yes
    LOCATION: us-central1
    
  2. 检索 Cloud Scheduler 作业 ID:

    gcloud scheduler jobs list

    输出应类似如下所示:

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. 这些作业安排在每天下午 4 点和 5 点运行,但您也可以手动运行 Cloud Scheduler 作业:

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. 几分钟后,确认 Cloud Storage 存储桶中有两个图表:

    gsutil ls gs://${BUCKET}

    输出应类似如下所示:

    gs://BUCKET/chart-cyprus.png
    gs://BUCKET/chart-unitedkingdom.png
    

恭喜!您应该还会收到两封电子邮件,其中包含图表链接。

清理

如果您为本教程创建了一个新项目,请删除项目。如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

删除教程资源

  1. 删除您在本教程中部署的所有 Cloud Run 服务:

    gcloud run services delete SERVICE_NAME

    其中,SERVICE_NAME 是您选择的服务名称。

    您还可以从 Google Cloud 控制台中删除 Cloud Run 服务。

  2. 移除您在教程设置过程中添加的任何 Google Cloud CLI 默认配置。

    gcloud config unset project
    gcloud config unset run/region
    gcloud config unset run/platform
    gcloud config unset eventarc/location
  3. 删除您在本教程中创建的所有 Eventarc 触发器:

     gcloud eventarc triggers delete TRIGGER_NAME
     
    TRIGGER_NAME 替换为您的触发器的名称。

  4. 从 Artifact Registry 中删除映像。

    gcloud artifacts docker images delete $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
    gcloud artifacts docker images delete $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
    gcloud artifacts docker images delete $REGION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
  5. 删除存储桶以及存储桶中的所有对象:

    gcloud storage rm --recursive gs://${BUCKET}/
  6. 删除 Cloud Scheduler 作业:

    gcloud scheduler jobs delete cre-scheduler-cy
    gcloud scheduler jobs delete cre-scheduler-uk

后续步骤