处理 Bigtable 变更数据流


本教程介绍如何将数据流水线部署到 Dataflow,以获取源自 Bigtable 表的更改流的实时数据库更改流。流水线的输出会写入 Cloud Storage 上的一系列文件中。

本教程提供了一个聆听音乐的应用的示例数据集。在本教程中,您将跟踪聆听过的歌曲,然后排出一段时间内最常听的前五首歌曲。

本教程适用于熟悉代码编写以及将数据流水线部署到 Google Cloud 的技术用户。

目标

本教程介绍了如何执行以下操作:

  • 创建启用了变更数据流的 Bigtable 表。
  • 在 Dataflow 上部署流水线,以转换和输出变更数据流。
  • 查看数据流水线的结果。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. 安装 Google Cloud CLI。
  8. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  9. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  10. 确保您的 Google Cloud 项目已启用结算功能

  11. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. 更新并安装 cbt CLI。
    gcloud components update
    gcloud components install cbt
    

准备环境

获取代码

克隆包含示例代码的代码库。如果您之前已下载此代码库,请拉取以获取最新版本。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

创建存储桶

  • 创建 Cloud Storage 存储分区:
    gcloud storage buckets create gs://BUCKET_NAME
    BUCKET_NAME 替换为符合存储桶名称要求的存储桶名称。
  • 创建 Bigtable 实例

    您可以在本教程中使用现有实例,也可以在您附近的区域使用默认配置创建实例

    创建表

    示例应用跟踪用户聆听的歌曲并将聆听事件存储在 Bigtable 中。创建一个启用了变更数据流的表,该表具有一个列族 (cf) 和一个列(歌曲),并使用用户 ID 作为行键。

    创建表。

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    替换以下内容:

    • PROJECT_ID:您正在使用的项目的 ID
    • BIGTABLE_INSTANCE_ID:要包含新表的实例的 ID

    启动流水线

    此流水线通过执行以下操作来转换变更数据流:

    1. 读取变更数据流
    2. 获取歌曲名称
    3. 将歌曲聆听事件分组到 N 秒窗口中
    4. 统计排名前 5 的歌曲
    5. 输出结果

    运行流水线。

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    BIGTABLE_REGION 替换为 Bigtable 实例所在区域的 ID,例如 us-east5

    了解流水线

    流水线中的以下代码段可帮助您了解正在运行的代码。

    读取变更数据流

    此示例中的代码使用特定 Bigtable 实例和表的参数配置源数据流。

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    正在获取歌曲名称

    聆听歌曲时,歌曲名称会写入列族 cf 和列限定符 song,因此代码会从变更数据流变更中提取值并将其输出到流水线的下一步。

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    统计前五首歌曲

    您可以使用内置的 Beam 函数 CountTop.of 来获取当前时间段内最常听的前五首歌曲。

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    输出结果

    此流水线将结果写入标准输出和文件中。对于这些文件,它会将写入内容划分为包含 10 个元素或 1 分钟片段的组。

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    查看流水线

    1. 在 Google Cloud 控制台中,转到 Dataflow 页面。

      进入 Dataflow

    2. 点击名称以 song-rank 开头的作业。

    3. 在屏幕底部,点击显示以打开日志面板。

    4. 点击工作器日志以监控变更数据流的输出日志。

    流式写入

    使用 cbt CLI 将各种用户聆听的歌曲次数写入 song-rank 表。其设计目的是在几分钟内写入数据,以模拟一段时间内的歌曲聆听流式传输情况。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    查看输出

    读取 Cloud Storage 上的输出,以查看最热门的歌曲。

    gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    输出示例:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    清理

    为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

    删除项目

      删除 Google Cloud 项目:

      gcloud projects delete PROJECT_ID

    删除各个资源

    1. 删除存储桶和文件。

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. 在表上停用变更数据流

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. 删除 song-rank 表。

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. 停止变更数据流流水线。

      1. 列出作业以获取作业 ID。

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. 取消作业。

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        JOB_ID 替换为上一条命令之后显示的作业 ID。

    后续步骤