使用 Eventarc 构建 BigQuery 处理流水线

本教程介绍如何使用 Eventarc 构建处理流水线,该流水线安排对公共 BigQuery 数据集的查询,根据数据生成图表,并通过电子邮件分享图表链接。

目标

在本教程中,您将构建和部署三个 Cloud Run 服务,这些服务允许未经身份验证的访问并使用 Eventarc 接收事件:

  1. Query Runner - 当 Cloud Scheduler 作业向 Pub/Sub 主题发布消息时触发,此服务使用 BigQuery API 从公共 COVID-19 数据集中检索数据,并将结果保存到新的 BigQuery 表中。
  2. Chart Creator - 在 Query Runner 服务向 Pub/Sub 主题发布消息时触发,此服务使用 Python 绘图库 Matplotlib 生成图表,并将图表保存到 Cloud Storage 存储分区。
  3. Notifier - 当 Chart Creator 服务将图表存储在 Cloud Storage 存储分区中时,由审核日志触发,此服务使用电子邮件服务 SendGrid,将图表的链接发送到某个电子邮件地址。

下图展示了高级别的基础架构:

BigQuery 处理流水线

费用

本教程使用 Google Cloud 的以下收费组件:

请使用价格计算器根据您的预计用量来估算费用。

Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Build, Cloud Logging, Cloud Run, Cloud Scheduler, Container Registry, Eventarc, Pub/Sub API。

    启用 API

  5. 安装并初始化 Cloud SDK。
  6. 更新 gcloud 组件:
    gcloud components update
  7. 使用您的帐号登录:
    gcloud auth login
  8. 选择 Google Cloud Storage 并启用管理员读取数据读取数据写入日志类型: 转到 Cloud Audit Logs
  9. eventarc.eventReceiver 角色授予 Compute Engine 服务帐号:
    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
        --role='roles/eventarc.eventReceiver'
    
  10. 如果您在 2021 年 4 月 8 日或之前启用了 Pub/Sub 服务帐号,请将 iam.serviceAccountTokenCreator 角色授予 Pub/Sub 服务帐号:
    gcloud projects add-iam-policy-binding $(gcloud config get-value project) \
        --member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"\
        --role='roles/iam.serviceAccountTokenCreator'
    
  11. 设置本教程中使用的默认值:
    export REGION=REGION
    gcloud config set run/region ${REGION}
    gcloud config set run/platform managed
    gcloud config set eventarc/location ${REGION}
    
    REGION 替换为您选择的受支持的 Eventarc 位置
  12. 下载并安装 Git 源代码管理工具。

创建 SendGrid API 密钥

SendGrid 是云端电子邮件服务提供商,让您无需维护电子邮件服务器即可发送电子邮件。

  1. 登录到 SendGrid 并转到 Settings > API Keys
  2. 点击 Create API Key
  3. 为该密钥选择权限。该密钥必须至少具有 Mail Send 权限才能发送电子邮件。
  4. 点击保存以创建密钥。
  5. SendGrid 会生成一个新的密钥。这是该密钥的唯一副本,因此请务必复制并保存该密钥以供日后使用。

创建 Cloud Storage 存储分区

创建一个唯一的 Cloud Storage 存储分区以保存图表。确保存储分区和图表是公开提供的,并且与 Cloud Run 服务位于同一区域:

  export BUCKET="$(gcloud config get-value core/project)-charts"
  gsutil mb -l $(gcloud config get-value run/region) gs://${BUCKET}
  gsutil uniformbucketlevelaccess set on gs://${BUCKET}
  gsutil iam ch allUsers:objectViewer gs://${BUCKET}
  

部署 Notifier 服务

部署一个 Cloud Run 服务,该服务接收 Chart Creator 事件并使用 SendGrid 通过电子邮件发送生成的图表的链接。

  1. 克隆 GitHub 代码库并切换到 notifier/python 目录:

    git clone https://github.com/GoogleCloudPlatform/eventarc-samples
    cd eventarc-samples/processing-pipelines/bigquery/notifier/python/
    
  2. 构建并推送容器映像:

    export SERVICE_NAME=notifier
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  3. 将容器映像部署到 Cloud Run,并传入电子邮件发送到的地址以及 SendGrid API 密钥:

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET} \
        --allow-unauthenticated
    

    请替换以下内容:

    • EMAIL_ADDRESS 替换为生成的图表的链接发送到的电子邮件地址
    • YOUR_SENDGRID_API_KEY 替换为您之前记下的 SendGrid API 密钥

当您看到服务网址时,表示部署完成。

为 Notifier 服务创建触发器

部署在 Cloud Run 上的 Notifier 服务的 Eventarc 触发器过滤 methodName 为 storage.objects.create 的 Cloud Storage 审核日志。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
        --destination-run-service=${SERVICE_NAME} \
        --destination-run-region=${REGION} \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=${PROJECT_NUMBER}-compute@developer.gserviceaccount.com
    
    

    这将创建一个名为 trigger-notifier 的触发器。

部署 Chart Creator 服务

部署一个 Cloud Run 服务,该服务接收 Query Runner 事件,从 BigQuery 表检索特定国家/地区的数据,然后使用 Matplotlib 根据数据生成图表。图表将上传到 Cloud Storage 存储分区。

  1. 切换到 chart-creator/python 目录:

    cd ../../chart-creator/python
    
  2. 构建并推送容器映像:

    export SERVICE_NAME=chart-creator
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  3. 将容器映像部署到 Cloud Run,传入 BUCKET

    gcloud run deploy ${SERVICE_NAME} \
      --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
      --update-env-vars BUCKET=${BUCKET} \
      --allow-unauthenticated
    

当您看到服务网址时,表示部署完成。

为 Chart Creator 服务创建触发器

部署在 Cloud Run 上的 Chart Creator 服务的 Eventarc 触发器过滤发布到某个 Pub/Sub 主题的消息。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
      --destination-run-service=${SERVICE_NAME} \
      --destination-run-region=${REGION} \
      --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"
    
    

    这将创建一个名为 trigger-chart-creator 的触发器。

  2. 设置 Pub/Sub 主题环境变量。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)'))
    

部署 Query Runner 服务

部署一个 Cloud Run 服务,该服务接收 Cloud Scheduler 事件,从公共 COVID-19 数据集中检索数据,并将结果保存到新的 BigQuery 表中。

  1. 切换到 processing-pipelines 目录:

    cd ../../..
    
  2. 构建并推送容器映像:

    export SERVICE_NAME=query-runner
    docker build -t gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 -f bigquery/${SERVICE_NAME}/csharp/Dockerfile .
    docker push gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1
    
  3. 将容器映像部署到 Cloud Run,传入 PROJECT_IDTOPIC_QUERY_COMPLETED

    gcloud run deploy ${SERVICE_NAME} \
      --image gcr.io/$(gcloud config get-value project)/${SERVICE_NAME}:v1 \
      --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED} \
      --allow-unauthenticated
    

当您看到服务网址时,表示部署完成。

为 Query Runner 服务创建触发器

部署在 Cloud Run 上的 Query Runner 服务的 Eventarc 触发器过滤发布到某个 Pub/Sub 主题的消息。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME} \
      --destination-run-service=${SERVICE_NAME} \
      --destination-run-region=${REGION} \
      --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished"
    

    这将创建一个名为 trigger-query-runner 的触发器。

  2. 为 Pub/Sub 主题设置环境变量。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME} --format='value(transport.pubsub.topic)')
    

安排作业

处理流水线由两个 Cloud Scheduler 作业触发。

  1. 创建 Cloud Scheduler 需要的 App Engine 应用,并指定适当的位置

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
    
  2. 创建两个 Cloud Scheduler 作业,这些作业每天发布到 Pub/Sub 主题一次:

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
      --schedule="0 16 * * *" \
      --topic=${TOPIC_QUERY_SCHEDULED} \
      --message-body="United Kingdom"
    
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
      --schedule="0 17 * * *" \
      --topic=${TOPIC_QUERY_SCHEDULED} \
      --message-body="Cyprus"
    

    时间表以 unix-cron 格式指定。例如,0 16 * * * 表示作业在世界协调时间 (UTC) 每天 16:00(下午 4 点)运行。

运行流水线

  1. 首先,确认所有触发器已成功创建:

    gcloud eventarc triggers list
    

    输出应类似如下所示:

    NAME                   TYPE                                           DESTINATION_RUN_SERVICE  DESTINATION_RUN_PATH  ACTIVE
    trigger-chart-creator  google.cloud.pubsub.topic.v1.messagePublished  chart-creator                                  Yes
    trigger-notifier       google.cloud.audit.log.v1.written              notifier                                       Yes
    trigger-query-runner   google.cloud.pubsub.topic.v1.messagePublished  query-runner                                   Yes
    
  2. 检索 Cloud Scheduler 作业 ID:

    gcloud scheduler jobs list
    

    输出应类似如下所示:

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. 这些作业安排在每天下午 4 点和 5 点运行,但您也可以手动运行 Cloud Scheduler 作业:

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
    
  4. 几分钟后,确认 Cloud Storage 存储分区中有两个图表:

    gsutil ls gs://${BUCKET}
    

    输出应类似如下所示:

    gs://BUCKET/chart-cyprus.png
    gs://BUCKET/chart-unitedkingdom.png
    

恭喜!您应该还会收到两封电子邮件,其中包含图表链接。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

如需删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

  1. 删除您在本教程中部署的 Cloud Run 服务:

    gcloud run services delete SERVICE-NAME

    其中,SERVICE-NAME 是您选择的服务名称。

    您还可以从 Google Cloud Console 中删除 Cloud Run 服务。

  2. 移除您在教程设置过程中添加的 gcloud 默认配置。

    移除区域设置:

     gcloud config unset run/region
    
  3. 移除项目配置:

     gcloud config unset project
    
  4. 删除在本教程中创建的其他 Google Cloud 资源:

    • 删除触发器:
      gcloud eventarc triggers delete TRIGGER_NAME
      
      TRIGGER_NAME 替换为您的触发器的名称。

后续步骤