使用可选的 Cloud Run 函数触发器将变更流式传输到 Pub/Sub


本教程介绍了如何使用 Bigtable change streams to Pub/Sub 模板,包括如何设置主题并配置模板。您可以视需要使用所选的编程语言创建由事件流触发的 Cloud Run 函数。

本教程适用于熟悉 Bigtable、编写代码和事件流服务的技术用户。

目标

本教程介绍了如何执行以下操作:

  • 创建启用了变更数据流的 Bigtable 表。
  • 创建一个具有 Bigtable 变更数据流架构的 Pub/Sub 主题。
  • 使用模板将 Bigtable 变更数据流部署到 Dataflow 上的 Pub/Sub 流水线。
  • 直接在 Pub/Sub 中或在 Cloud Run 函数日志中查看事件流。

费用

在本文档中,您将使用 Google Cloud的以下收费组件:

您可使用价格计算器,根据您的预计使用情况生成费用估算。

新 Google Cloud 用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

    Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.

    In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Run functions, and Cloud Storage APIs.

    Enable the APIs

    In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Run functions, and Cloud Storage APIs.

    Enable the APIs

    In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  1. 更新并安装 cbt CLI。
    gcloud components update
    gcloud components install cbt
  2. 创建 Pub/Sub 主题

    1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

      打开“主题”

    2. 点击创建主题

    3. 将 ID 设置为 bigtable-change-stream-topic

    4. 选择使用架构

    5. 选择 Pub/Sub 架构下拉列表中,点击创建新的架构。系统会打开一个新标签页,您可以在其中定义架构。

      1. 将架构 ID 设置为 bigtable-change-stream-schema
      2. 将架构类型设置为 Avro
      3. 粘贴以下内容作为架构定义。如需详细了解该架构,请参阅模板文档页面
        {
            "name" : "ChangelogEntryMessage",
            "type" : "record",
            "namespace" : "com.google.cloud.teleport.bigtable",
            "fields" : [
              { "name" : "rowKey", "type" : "bytes"},
              {
                "name" : "modType",
                "type" : {
                  "name": "ModType",
                  "type": "enum",
                  "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
              },
              { "name": "isGC", "type": "boolean" },
              { "name": "tieBreaker", "type": "int"},
              { "name": "columnFamily", "type": "string"},
              { "name": "commitTimestamp", "type" : "long"},
              { "name" : "sourceInstance", "type" : "string"},
              { "name" : "sourceCluster", "type" : "string"},
              { "name" : "sourceTable", "type" : "string"},
              { "name": "column", "type" : ["null", "bytes"]},
              { "name": "timestamp", "type" : ["null", "long"]},
              { "name": "timestampFrom", "type" : ["null", "long"]},
              { "name": "timestampTo", "type" : ["null", "long"]},
              { "name" : "value", "type" : ["null", "bytes"]}
          ]
        }
      
      1. 点击创建以创建架构。
    6. 关闭创建架构标签页,刷新架构列表,然后选择新定义的架构。

    7. 点击创建以创建主题。

    可选:创建 Cloud Run 函数

    您可能希望使用 Cloud Run 函数处理 Pub/Sub 数据流。

    1. bigtable-change-stream-topic 主题的详细信息页面上,点击触发 Cloud Functions 函数
    2. 函数名称字段中,输入名称 bt-ps-tutorial-function
    3. 源代码部分中,点击运行时下拉列表,然后选择所需的运行时和编程语言。系统会生成 hello world,用于输出传入的变更数据流。如需详细了解如何编写 Cloud Run functions 函数,请参阅相关文档。
    4. 其他所有字段均使用默认值。
    5. 点击部署函数

    创建启用了变更数据流的表

    1. 在 Google Cloud 控制台中,前往 Bigtable 实例页面。

      转到实例

    2. 点击您在本教程中使用的实例的 ID。

      如果您没有可用实例,请在您附近的区域中使用默认配置创建实例

    3. 在左侧导航窗格中,点击

    4. 点击创建表

    5. 将表格命名为 change-streams-pubsub-tutorial

    6. 添加一个名为 cf 的列族。

    7. 选择启用变更数据流

    8. 点击创建

    初始化数据流水线以捕获变更数据流

    1. 在 Bigtable 页面上,找到表 change-streams-pubsub-tutorial
    2. 变更数据流列中,点击连接
    3. 在对话框中,选择 Pub/Sub
    4. 点击创建 Dataflow 作业
    5. 在 Dataflow 创建作业页面上,将输出 Pub/Sub 主题名称设置为:bigtable-change-stream-topic
    6. 将 Bigtable 应用配置文件 ID 设置为 default
    7. 点击运行作业
    8. 等待作业状态为正在启动正在运行,然后再继续。作业排队后,大约需要 5 分钟才能完成。

    向 Bigtable 写入一些数据

    1. 在 Cloud Shell 中,向 Bigtable 写入几行,以便变更日志可以将一些数据写入 Pub/Sub 数据流。只要在创建作业后写入数据,系统就会显示变更。您不必等待作业状态变为 running

      cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
          set change-streams-pubsub-tutorial user123 cf:col1=abc
      cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
          set change-streams-pubsub-tutorial user546 cf:col1=def
      cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
          set change-streams-pubsub-tutorial user789 cf:col1=ghi
      

    在 Pub/Sub 中查看变更日志

    1. 在 Google Cloud 控制台中,前往 Pub/Sub 订阅页面。

      前往订阅页面

    2. 点击为主题 bigtable-change-stream-topic 自动创建的订阅。该订阅应命名为 bigtable-change-stream-topic-sub

    3. 前往消息标签页。

    4. 点击拉取

    5. 探索消息列表并查看您写入的数据。

      Pub/Sub 中的变更日志消息

    可选:在 Cloud Run functions 日志中查看变更

    如果您创建了 Cloud Run functions 函数,则可以在日志中查看变更。

    1. 在 Google Cloud 控制台中,前往 Cloud Run functions

      前往 Cloud Run functions

    2. 点击函数 bt-ps-tutorial-function

    3. 前往日志标签页。

    4. 确保将严重级别至少设置为信息,以便您可以查看日志。

    5. 探索日志并查看您写入的数据。

    输出类似于以下内容:

    Pub/Sub message: {"rowKey":"user789","modType":"SET_CELL","isGC":false,"tieBreaker":0,"columnFamily":"cf","commitTimestamp":1695653833064548,"sourceInstance":"YOUR-INSTANCE","sourceCluster":"YOUR-INSTANCE-c1","sourceTable":"change-streams-pubsub-tutorial","column":{"bytes":"col1"},"timestamp":{"long":1695653832278000},"timestampFrom":null,"timestampTo":null,"value":{"bytes":"ghi"}}
    

    清理

    为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

    删除 Bigtable 表

    1. 在 Google Cloud 控制台中,前往 Bigtable 实例页面。

      转到实例

    2. 点击您在本教程中使用的实例的 ID。

    3. 在左侧导航窗格中,点击

    4. 找到 change-streams-pubsub-tutorial 表。

    5. 点击修改

    6. 清除启用变更数据流

    7. 点击保存

    8. 打开该表的溢出菜单。

    9. 点击删除,然后输入表名称进行确认。

    停止变更数据流流水线

    1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

      打开“作业”

    2. 从作业列表中选择您的流处理作业。

    3. 在导航中,点击停止

    4. 停止作业对话框中,取消流水线,然后点击停止作业

    删除 Pub/Sub 主题和订阅

    1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

      打开“主题”

    2. 选择 bigtable-change-stream-topic 主题。

    3. 点击删除并确认。

    4. 点击边栏中的订阅

    5. 选择 bigtable-change-stream-topic-sub 订阅。

    6. 点击删除并确认。

    删除 Cloud Run functions 函数

    1. 在 Google Cloud 控制台中,前往 Cloud Run functions

      前往 Cloud Run functions

    2. 选择 bt-ps-tutorial-function 函数。

    3. 点击删除并确认。

    后续步骤