使用 Dataflow 将 Avro 记录流式传输到 BigQuery

本教程介绍如何通过自动生成表架构并转换输入元素来使用 Dataflow 将 Avro SpecificRecord 对象存储在 BigQuery 中。本教程还演示了如何使用 Avro 生成的类在 Dataflow 流水线中的工作器之间具体化或传输中间数据。

Apache Avro 是一个序列化系统,它依赖架构来设计数据结构。由于架构在读取或写入 Avro 数据时始终存在,因此序列化速度快且体积小。这种性能优势使其成为在系统间传递消息的主流选择,这些系统包括通过消息代理将事件发送到分析系统的应用。您可以使用 Avro 架构来管理 BigQuery 数据仓库架构。将 Avro 架构转换为 BigQuery 表结构需要自定义代码,本教程对此进行了演示。

本教程适用于有兴趣使用 Avro 架构来管理 BigQuery 数据仓库架构的开发者和架构师。本教程假定您熟悉 Avro 和 Java。

下图展示了本教程的概要架构。

管理 BigQuery 数据仓库架构的 Avro 架构的体系结构。

本教程使用一个简单的订单处理系统,并按以下步骤演示此架构模式:

  • 当客户进行购买时,在线应用生成事件。
  • 订单对象包含一个唯一标识符、所购商品的列表和时间戳。
  • Dataflow 流水线从 Pub/Sub 主题读取 OrderDetails SpecificRecord Avro 消息。
  • Dataflow 流水线将记录作为 Avro 文件写入 Cloud Storage。
  • OrderDetails 类自动生成相应的 BigQuery 架构。
  • OrderDetails 对象通过通用转换函数写入到 BigQuery 中。

目标

  • 使用 Dataflow 从 Pub/Sub 数据流中提取 JSON 字符串。
  • 将 JSON 对象转换为 Avro 生成的类的对象。
  • 从 Avro 架构生成 BigQuery 表架构。
  • 将 Avro 记录写入 Cloud Storage 中的文件。
  • 将 Avro 记录写入 BigQuery。

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 BigQuery, Cloud Storage, and Dataflow API。

    启用 API

  5. 在 Cloud Console 中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Cloud Console 的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Cloud SDK 的 Shell 环境,其中包括 gcloud 命令行工具以及已为当前项目设置的值。该会话可能需要几秒钟时间来完成初始化。

设置环境

  1. 在 Cloud Shell 中,克隆源代码库:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. 生成 Avro 类:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn generate-sources
    

    此命令使用 orderdetails.avsc 文件生成 OrderDetails 类和 OrderItems 类。OrderDetails 类具有一个唯一标识符、时间戳和 OrderItems 列表。OrderItems 类具有一个唯一标识符、名称和价格。Avro 架构将传播到 BigQuery 表,表中包含订单的行具有 OrderItem 类型的一组记录。如需了解详情,请参阅指定嵌套和重复的列

  3. 打开 env.sh 文件:

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  4. env.sh 文件包含可用于本教程的预设默认值,但您可以根据自己的环境修改这些文件。

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="$GOOGLE_CLOUD_PROJECT""_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="$MY_BUCKET/""out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    `BQ_TABLE=`"`orders`"
    
    # Maximum number of Dataflow workers
    NUM_WORKERS=1
    

    替换以下内容:

    • avro-records:Pub/Sub 主题的名称。
    • $GOOGLE_CLOUD_PROJECT"_avro_beam:由 Cloud 项目 ID 生成的 Cloud Storage 存储分区的名称。
    • $MY_BUCKET/""out/":包含 Avro 输出的 Cloud Storage 存储分区的路径。
    • us-central1:用于 Pub/Sub 和 Dataflow 的区域。如需详细了解区域,请参阅地理和区域
    • US:BigQuery 的区域。如需详细了解位置,请参阅数据集位置
    • sales:BigQuery 数据集名称。
    • orders:BigQuery 表名称。
    • 1:Dataflow 工作器数量上限。
  5. 设置环境变量:

     . ./env.sh
    

创建资源

  1. 在 Cloud Shell 中,创建 Pub/Sub 主题:

    gcloud pubsub topics create $MY_TOPIC
    
  2. 创建 Cloud Storage 存储分区,请运行以下命令:

    gsutil mb -l $REGION -c regional gs://$MY_BUCKET
    

    Cloud Storage 存储分区会备份应用生成的原始事件。该存储分区还可用作在 Dataproc 上运行的 Spark 和 Hadoop 作业进行离线分析和验证的备用源。

  3. 创建 BigQuery 数据集:

    bq --location=$BQ_REGION mk --dataset $GOOGLE_CLOUD_PROJECT:$BQ_DATASET

    BigQuery 数据集包含一个区域中的表和视图,或者包含一个具有多个区域的地理位置。如需了解详情,请参阅创建数据集

启动 Beam Dataflow 应用

  1. 在 Cloud Shell 中,在 Dataflow 运行程序上部署并运行流水线:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    mvn compile exec:java \
        -Dexec.mainClass=com.google.cloud.solutions.beamavro.AvroToBigQuery \
        -Dexec.cleanupDaemonThreads=false \
        -Dexec.args=" \
        --project=$GOOGLE_CLOUD_PROJECT \
        --runner=DataflowRunner \
        --stagingLocation=gs://$MY_BUCKET/stage/ \
        --tempLocation=gs://$MY_BUCKET/temp/ \
        --inputPath=projects/$GOOGLE_CLOUD_PROJECT/topics/$MY_TOPIC \
        --workerMachineType=n1-standard-1 \
        --maxNumWorkers=$NUM_WORKERS \
        --region=$REGION \
        --dataset=$BQ_DATASET \
        --bqTable=$BQ_TABLE \
        --outputPath=$AVRO_OUT"
    

    输出中包含您的应用 ID。记下您的应用 ID,因为需要在本教程后面的部分用到它。

  2. 在 Cloud Console 中,转到 Dataflow。

    转到 Dataflow

  3. 要查看流水线状态,请点击您的应用 ID。流水线状态以图表形式显示。

    流水线状态图。

查看代码

AvroToBigQuery.java 文件中,必需参数的流水线选项通过命令行参数进行传递。流式模式选项也已启用。BigQuery 表架构通过 Avro 类架构生成,稍后由 BigQuery IO 类使用:

TableSchema ts = BigQueryAvroUtils.getTableSchema(OrderDetails.SCHEMA$);

AvroUtils 类遍历 Avro 架构对象中的字段,并以递归方式生成相应的 TableFieldSchema 对象。然后,对象被封装在 TableSchema 对象中并返回。

对于 Avro 输入格式,对象通过 Pub/Sub 读取。如果输入格式为 JSON,则事件将被读取并转换为 Avro 对象。

private static PCollection<OrderDetails> getInputCollection(
    Pipeline pipeline, String inputPath, FORMAT format) {
  if (format == FORMAT.JSON) {
    // Transform JSON to Avro
    return pipeline
        .apply("Read JSON from PubSub", PubsubIO.readStrings().fromTopic(inputPath))
        .apply("To binary", ParDo.of(new JSONToAvro()));
  } else {
    // Read Avro
    return pipeline.apply(
        "Read Avro from PubSub", PubsubIO.readAvros(OrderDetails.class).fromTopic(inputPath));
  }
}

流水线产生分支。Write to Cloud Storage 转换是一种复合转换,它会在 10 秒的时间范围内收集记录,然后使用 AvroIO 编写器将这些数据写入 Cloud Storage 上的 Avro 文件:

ods.apply(
    "Write to GCS",
    new AvroWriter()
        .withOutputPath(options.getOutputPath())
        .withRecordType(OrderDetails.class));

Write to BigQuery 转换将记录写入 BigQuery 表:

ods.apply(
    "Write to BigQuery",
    BigQueryIO.write()
        .to(bqStr)
        .withSchema(ts)
        .withWriteDisposition(WRITE_APPEND)
        .withCreateDisposition(CREATE_IF_NEEDED)
        .withFormatFunction(TABLE_ROW_PARSER));

BigQueryIO 转换使用 TABLE_ROW_PARSER 方法将 Avro 对象转换为 TableRow 对象,从而将其写入 BigQuery。解析器调用 BigQueryAvroUtils 类中的 convertSpecificRecordToTableRow 方法,该类基于 Apache Beam 项目中的测试类method 以递归方式解析 Avro 字段,并将其添加到 TableRow 对象。

private static TableRow convertSpecificRecordToTableRow(
    SpecificRecord record, List<TableFieldSchema> fields) {
  TableRow row = new TableRow();
  for (TableFieldSchema subSchema : fields) {
    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
    // is required, so it may not be null.
    Field field = record.getSchema().getField(subSchema.getName());
    if (field == null || field.name() == null) {
      continue;
    }
    Object convertedValue = getTypedCellValue(field.schema(), subSchema, record.get(field.pos()));
    if (convertedValue != null) {
      // To match the JSON files exported by BigQuery, do not include null values in the output.
      row.set(field.name(), convertedValue);
    }
  }

  return row;
}

下表显示了 BigQuery 数据类型与 Avro 数据类型之间的映射。请注意 DateTimestamp 等类型,它们由字段的逻辑类型标识。

BigQuery Avro
STRING STRING
GEOGRAPHY STRING
BYTES BYTES
INTEGER INT
FLOAT FLOAT
FLOAT64 DOUBLE
NUMERIC BYTES
BOOLEAN BOOLEAN
INT64 LONG
TIMESTAMP LONG
DATE INT
DATETIME STRING
TIME LONG
STRUCT RECORD
REPEATED FIELD ARRAY

在 BigQuery 中查看结果

要测试流水线,请启动 gen.py 脚本。此脚本模拟订单事件的生成,并将其推送到 Pub/Sub 主题。

  1. 在 Cloud Shell 中,切换到示例事件生成器脚本目录并运行该脚本:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. 在 Cloud Console 中,转到 BigQuery。

    转到 BigQuery

  3. 要查看表架构,请点击 sales 数据集,然后选择 orders 表。如果您修改了 env.sh 中的默认环境变量,则数据集和表名称可能有所不同。

    orders 表的表架构。

  4. 要查看一些示例数据,请在查询编辑器中运行查询:

    SELECT * FROM sales.orders LIMIT 5
    

    示例数据的查询结果。

    系统会从 Avro 记录自动生成 BigQuery 表架构,并且数据会自动转换为 BigQuery 表结构。

清除数据

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

  1. 按照以下说明停止 Dataflow 作业

  2. 删除 Cloud Storage 存储分区:

    gsutil rm -r gs://$MY_BUCKET
    

后续步骤