使用 Eventarc 为 Knative serving 构建 BigQuery 处理流水线


本教程介绍如何使用 Eventarc 构建处理流水线,该流水线安排对公共 BigQuery 数据集的查询,根据数据生成图表,并通过电子邮件分享图表链接。

目标

在本教程中,您将构建和部署三项在 Google Kubernetes Engine (GKE) 集群中运行并使用 Eventarc 接收事件的 Knative serving 服务:

  1. Query Runner - 当 Cloud Scheduler 作业向 Pub/Sub 主题发布消息时触发,此服务使用 BigQuery API 从公共 COVID-19 数据集中检索数据,并将结果保存到新的 BigQuery 表中。
  2. Chart Creator - 在 Query Runner 服务向 Pub/Sub 主题发布消息时触发,此服务使用 Python 绘图库 Matplotlib 生成图表,并将图表保存到 Cloud Storage 存储桶。
  3. 通知程序 - 当图表创建程序服务将图表存储在 Cloud Storage 存储桶中时,由审核日志触发;此服务使用电子邮件服务 SendGrid 将图表的链接发送到某个电子邮件地址。

下图展示了高级别的基础架构:

BigQuery 处理流水线

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Artifact Registry, Cloud Build, Cloud Logging, Cloud Scheduler, Eventarc, GKE, Pub/Sub, and Resource Manager APIs:

    gcloud services enable artifactregistry.googleapis.com cloudbuild.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com container.googleapis.com eventarc.googleapis.com pubsub.googleapis.com run.googleapis.com logging.googleapis.com
  12. 对于 Cloud Storage,为 ADMIN_READDATA_WRITEDATA_READ 数据访问类型启用审核日志记录。

    1. 读取与您的 Google Cloud 项目、文件夹或组织关联的 Identity and Access Management (IAM) 政策,并将其存储在临时文件中:
      gcloud projects get-iam-policy PROJECT_ID > /tmp/policy.yaml
    2. 在文本编辑器中,打开 /tmp/policy.yaml,然后仅在 auditConfigs 部分中添加或更改审核日志配置

      
        auditConfigs:
        - auditLogConfigs:
          - logType: ADMIN_READ
          - logType: DATA_WRITE
          - logType: DATA_READ
          service: storage.googleapis.com
        bindings:
        - members:
        [...]
        etag: BwW_bHKTV5U=
        version: 1
    3. 写入新的 IAM 政策:

      gcloud projects set-iam-policy PROJECT_ID /tmp/policy.yaml

      如果上述命令报告与其他更改发生冲突,请重复以上步骤(从读取 IAM 政策开始)。如需了解详情,请参阅使用 API 配置数据访问审核日志

  13. 设置本教程中使用的默认值:
    CLUSTER_NAME=events-cluster
    CLUSTER_LOCATION=us-central1
    PROJECT_ID=PROJECT_ID
    gcloud config set project $PROJECT_ID
    gcloud config set run/region $CLUSTER_LOCATION
    gcloud config set run/cluster $CLUSTER_NAME
    gcloud config set run/cluster_location $CLUSTER_LOCATION
    gcloud config set run/platform gke
    gcloud config set eventarc/location $CLUSTER_LOCATION

    PROJECT_ID 替换为您的项目 ID。

创建 SendGrid API 密钥

SendGrid 是云端电子邮件服务提供商,让您无需维护电子邮件服务器即可发送电子邮件。

  1. 登录到 SendGrid 并转到 Settings > API Keys
  2. 点击 Create API Key
  3. 为该密钥选择权限。该密钥必须至少具有 Mail Send 权限才能发送电子邮件。
  4. 点击保存以创建密钥。
  5. SendGrid 会生成一个新的密钥。这是该密钥的唯一副本,因此请务必复制并保存该密钥以供日后使用。

创建 GKE 集群

创建一个启用了 Workload Identity Federation for GKE 的集群,以便它可以从 GKE 中运行的应用访问 Google Cloud 服务。使用 Eventarc 转发事件也需要 Workload Identity Federation for GKE。

  1. 为启用了 CloudRunHttpLoadBalancingHorizontalPodAutoscaling 插件的 Knative serving 创建 GKE 集群:

    gcloud beta container clusters create $CLUSTER_NAME \
        --addons=HttpLoadBalancing,HorizontalPodAutoscaling,CloudRun \
        --machine-type=n1-standard-4 \
        --enable-autoscaling --min-nodes=2 --max-nodes=10 \
        --no-issue-client-certificate --num-nodes=2  \
        --logging=SYSTEM,WORKLOAD \
        --monitoring=SYSTEM \
        --scopes=cloud-platform,logging-write,monitoring-write,pubsub \
        --zone us-central1 \
        --release-channel=rapid \
        --workload-pool=$PROJECT_ID.svc.id.goog
    
  2. 等待几分钟,让创建集群的过程完成。在该过程中,您可能会看到一些可以放心忽略的警告。创建集群后,输出类似于以下内容:

    Creating cluster ...done.
    Created [https://container.googleapis.com/v1beta1/projects/my-project/zones/us-central1/clusters/events-cluster].
    
  3. 创建 Artifact Registry 标准制品库以存储您的 Docker 容器映像:

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=$CLUSTER_LOCATION

    REPOSITORY 替换为制品库的唯一名称。

配置 GKE 服务账号

配置 GKE 服务账号以充当默认计算服务账号。

  1. 在服务账号之间创建 Identity and Access Management (IAM) 绑定:

    PROJECT_NUMBER="$(gcloud projects describe $(gcloud config get-value project) --format='value(projectNumber)')"
    
    gcloud iam service-accounts add-iam-policy-binding \
        --role roles/iam.workloadIdentityUser \
        --member "serviceAccount:$PROJECT_ID.svc.id.goog[default/default]" \
        $PROJECT_NUMBER-compute@developer.gserviceaccount.com
  2. 使用计算服务账号的电子邮件地址将 iam.gke.io/gcp-service-account 注解添加到 GKE 服务账号:

    kubectl annotate serviceaccount \
        --namespace default \
        default \
        iam.gke.io/gcp-service-account=$PROJECT_NUMBER-compute@developer.gserviceaccount.com

启用 GKE 目标

如需允许 Eventarc 管理 GKE 集群中的资源,请启用 GKE 目标,并将 Eventarc 服务账号绑定到所需的角色。

  1. 为 Eventarc 启用 GKE 目标:

    gcloud eventarc gke-destinations init
  2. 在系统提示绑定所需角色时,输入 y

    绑定以下角色:

    • roles/compute.viewer
    • roles/container.developer
    • roles/iam.serviceAccountAdmin

创建服务账号并绑定访问角色

在创建 Eventarc 触发器之前,请先设置用户管理的服务账号,并为其授予特定的角色,以便 Eventarc 可以转发 Pub/Sub 事件。

  1. 创建名为 TRIGGER_GSA 的服务账号:

    TRIGGER_GSA=eventarc-bigquery-triggers
    gcloud iam service-accounts create $TRIGGER_GSA
  2. 向该服务账号授予 pubsub.subscribermonitoring.metricWritereventarc.eventReceiver 角色:

    PROJECT_ID=$(gcloud config get-value project)
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/pubsub.subscriber"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/monitoring.metricWriter"
    
    gcloud projects add-iam-policy-binding $PROJECT_ID \
        --member "serviceAccount:$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com" \
        --role "roles/eventarc.eventReceiver"

创建 Cloud Storage 存储桶

创建一个 Cloud Storage 存储桶以保存图表。确保存储桶和图表是公开提供的,并且与 GKE 服务位于同一区域:

export BUCKET="$(gcloud config get-value core/project)-charts"
gcloud storage buckets create gs://${BUCKET} --location=$(gcloud config get-value run/region) --uniform-bucket-level-access
gcloud storage buckets add-iam-policy-binding gs://${BUCKET} --member=allUsers --role=roles/storage.objectViewer

克隆存储库

克隆 GitHub 代码库。

git clone https://github.com/GoogleCloudPlatform/eventarc-samples
cd eventarc-samples/processing-pipelines

部署通知程序服务

bigquery/notifier/python 目录中,部署一项 Knative serving 服务,用于接收图表创建程序事件并使用 SendGrid 通过电子邮件发送所生成图表的链接。

  1. 构建并推送容器映像:

    pushd bigquery/notifier/python
    export SERVICE_NAME=notifier
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. 将容器映像部署到 Knative serving,并传入电子邮件发送到的地址以及 SendGrid API 密钥:

    export TO_EMAILS=EMAIL_ADDRESS
    export SENDGRID_API_KEY=YOUR_SENDGRID_API_KEY
    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars TO_EMAILS=${TO_EMAILS},SENDGRID_API_KEY=${SENDGRID_API_KEY},BUCKET=${BUCKET}

    替换以下内容:

    • EMAIL_ADDRESS:所生成图表的链接发送到的电子邮件地址
    • YOUR_SENDGRID_API_KEY:您之前记下的 SendGrid API 密钥

当您看到服务网址时,表示部署完成。

为通知程序服务创建触发器

部署在 Knative serving 上的通知程序服务的 Eventarc 触发器过滤 methodName 为 storage.objects.create 的 Cloud Storage 审核日志。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.audit.log.v1.written" \
        --event-filters="serviceName=storage.googleapis.com" \
        --event-filters="methodName=storage.objects.create" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    这将创建一个名为 trigger-notifier-gke 的触发器。

部署图表创建程序服务

bigquery/chart-creator/python 目录中,部署一项 Knative serving 服务,用于接收查询运行程序事件,从 BigQuery 表中检索特定国家/地区的数据,然后使用 Matplotlib 根据数据生成图表。图表将上传到 Cloud Storage 存储桶。

  1. 构建并推送容器映像:

    pushd bigquery/chart-creator/python
    export SERVICE_NAME=chart-creator
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
    popd
  2. 将容器映像部署到 Knative serving,并传入 BUCKET

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars BUCKET=${BUCKET}

当您看到服务网址时,表示部署完成。

为图表创建程序服务创建触发器

部署在 Knative serving 上的图表创建程序服务的 Eventarc 触发器过滤发布到某个 Pub/Sub 主题的消息。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    这将创建一个名为 trigger-chart-creator-gke 的触发器。

  2. 设置 Pub/Sub 主题环境变量。

    export TOPIC_QUERY_COMPLETED=$(basename $(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)'))

部署查询运行程序服务

processing-pipelines 目录中,部署一项 Knative serving 服务,用于接收 Cloud Scheduler 事件,从公共 COVID-19 数据集中检索数据,并将结果保存在新的 BigQuery 表中。

  1. 构建并推送容器映像:

    export SERVICE_NAME=query-runner
    docker build -t $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 -f Dockerfile .
    docker push $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1
  2. 将容器映像部署到 Knative serving,并传入 PROJECT_IDTOPIC_QUERY_COMPLETED

    gcloud run deploy ${SERVICE_NAME} \
        --image $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/${SERVICE_NAME}:v1 \
        --update-env-vars PROJECT_ID=$(gcloud config get-value project),TOPIC_ID=${TOPIC_QUERY_COMPLETED}

当您看到服务网址时,表示部署完成。

为查询运行程序服务创建触发器

部署在 Knative serving 上的查询运行程序服务的 Eventarc 触发器过滤发布到某个 Pub/Sub 主题的消息。

  1. 创建触发器:

    gcloud eventarc triggers create trigger-${SERVICE_NAME}-gke \
        --destination-gke-cluster=$CLUSTER_NAME \
        --destination-gke-location=$CLUSTER_LOCATION \
        --destination-gke-namespace=default \
        --destination-gke-service=$SERVICE_NAME \
        --destination-gke-path=/ \
        --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
        --service-account=$TRIGGER_GSA@$PROJECT_ID.iam.gserviceaccount.com

    这将创建一个名为 trigger-query-runner-gke 的触发器。

  2. 为 Pub/Sub 主题设置环境变量。

    export TOPIC_QUERY_SCHEDULED=$(gcloud eventarc triggers describe trigger-${SERVICE_NAME}-gke --format='value(transport.pubsub.topic)')

安排作业

处理流水线由两个 Cloud Scheduler 作业触发。

  1. 创建 Cloud Scheduler 需要的 App Engine 应用,并指定适当的位置(例如 europe-west):

    export APP_ENGINE_LOCATION=LOCATION
    gcloud app create --region=${APP_ENGINE_LOCATION}
  2. 创建两个 Cloud Scheduler 作业,这些作业每天发布到 Pub/Sub 主题一次:

    gcloud scheduler jobs create pubsub cre-scheduler-uk \
        --schedule="0 16 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="United Kingdom"
    gcloud scheduler jobs create pubsub cre-scheduler-cy \
        --schedule="0 17 * * *" \
        --topic=${TOPIC_QUERY_SCHEDULED} \
        --message-body="Cyprus"

    时间表以 unix-cron 格式指定。例如,0 16 * * * 表示作业在世界协调时间 (UTC) 每天 16:00(下午 4 点)运行。

运行流水线

  1. 确认所有触发器已成功创建:

    gcloud eventarc triggers list

    输出应类似如下所示:

    NAME                       TYPE                                            DESTINATION         ACTIVE  LOCATION
    trigger-chart-creator-gke  google.cloud.pubsub.topic.v1.messagePublished   GKE:chart-creator   Yes     us-central1
    trigger-notifier-gke       google.cloud.audit.log.v1.written               GKE:notifier        Yes     us-central1
    trigger-query-runner-gke   google.cloud.pubsub.topic.v1.messagePublished   GKE:query-runner    Yes     us-central1
    
  2. 检索 Cloud Scheduler 作业 ID:

    gcloud scheduler jobs list

    输出应类似如下所示:

    ID                LOCATION      SCHEDULE (TZ)         TARGET_TYPE  STATE
    cre-scheduler-cy  us-central1   0 17 * * * (Etc/UTC)  Pub/Sub      ENABLED
    cre-scheduler-uk  us-central1   0 16 * * * (Etc/UTC)  Pub/Sub      ENABLED
    
  3. 这些作业安排在每天下午 4 点和 5 点运行,但您也可以手动运行 Cloud Scheduler 作业:

    gcloud scheduler jobs run cre-scheduler-cy
    gcloud scheduler jobs run cre-scheduler-uk
  4. 几分钟后,确认 Cloud Storage 存储桶中有两个图表:

    gcloud storage ls gs://${BUCKET}

    输出应类似如下所示:

    gs://PROJECT_ID-charts/chart-cyprus.png
    gs://PROJECT_ID-charts/chart-unitedkingdom.png
    

恭喜!您应该还会收到两封电子邮件,其中包含图表链接。

清理

如果您为本教程创建了一个新项目,请删除项目。如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

删除教程资源

  1. 删除您在本教程中部署的所有 Knative serving 服务:

    gcloud run services delete SERVICE_NAME

    其中,SERVICE_NAME 是您选择的服务名称。

    您还可以从 Google Cloud 控制台中删除 Knative serving 服务。

  2. 删除您在本教程中创建的所有 Eventarc 触发器:

    gcloud eventarc triggers delete TRIGGER_NAME
    

    TRIGGER_NAME 替换为您的触发器的名称。

  3. 移除您在教程设置过程中添加的任何 Google Cloud CLI 默认配置。

    gcloud config unset project
    gcloud config unset run/cluster
    gcloud config unset run/cluster_location
    gcloud config unset run/platform
    gcloud config unset eventarc/location
    gcloud config unset compute/zone
  4. 从 Artifact Registry 中删除映像。

    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/notifier:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/chart-creator:v1
    gcloud artifacts docker images delete $CLUSTER_LOCATION-docker.pkg.dev/$(gcloud config get-value project)/REPOSITORY/query-runner:v1
  5. 删除存储桶以及存储桶中的所有对象:

    gcloud storage rm --recursive gs://${BUCKET}/
  6. 删除 Cloud Scheduler 作业:

    gcloud scheduler jobs delete cre-scheduler-cy
    gcloud scheduler jobs delete cre-scheduler-uk

后续步骤