使用 Dataflow 执行 ETL 以从关系型数据库转到 BigQuery

本教程演示了如何使用 Dataflow 从联机事务处理 (OLTP) 关系型数据库中提取、转换和加载 (ETL) 数据到 BigQuery 以进行分析。

本教程适用于对 BigQuery 的分析查询功能和 Dataflow 的批处理功能感兴趣的数据库管理员、运维专业人员和云架构师。

OLTP 数据库通常是关系型数据库,用于为电子商务网站、软件即服务 (SaaS) 应用或游戏等存储信息和流程事务。OLTP 数据库通常针对需要 ACID 属性的事务进行了优化:原子性、一致性、隔离性及耐用性,并且通常具有高度规范化的架构。相比之下,数据仓库往往经过优化以用于数据检索和分析,而不是事务,且通常具有反规范化的架构。通常,对 OLTP 数据库中的数据进行反规范化会使其更适用于 BigQuery 中的分析。

目标

本教程展示了两种执行 ETL 以将规范化 RDBMS 数据转换为反规范化 BigQuery 数据的方法:

  • 使用 BigQuery 来加载和转换数据。使用此方法可将少量数据一次性加载到 BigQuery 中进行分析。在对大型或多个数据集使用自动化之前,您也可以使用此方法对数据集进行原型设计。
  • 使用 Dataflow 来加载、转换和清理数据。使用此方法可以加载大量数据、从多个数据源加载数据,或者以增量方式或自动加载数据。

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Compute Engine 和 Dataflow API。

    启用 API

  5. 安装并初始化 Cloud SDK

使用 MusicBrainz 数据集

本教程依赖于 MusicBrainz 数据库中表的 JSON 快照,该数据库基于 PostgreSQL 构建,包含有关所有 MusicBrainz 音乐的信息。MusicBrainz 架构包含的一些元素如下:

  • 音乐人
  • 发行组
  • 发行版本
  • 录制内容
  • 作品
  • 标签
  • 这些实体之间的许多关系

MusicBrainz 架构包括三个相关的表:artistrecordingartist_credit_nameartist_credit 表示参与某个录制内容表演的音乐人,而 artist_credit_name 表中的行通过 artist_credit 值将录制内容与其对应的音乐人关联。

本教程提供已经提取为 JSON 格式的 PostgreSQL 表。如需自行执行此步骤,您可以使用以下示例代码:

pg_cmd="\\copy (select row_to_json(r) from (select * from artist) r ) to
exported_artist.json"
psql -w -h $host -U $user -d $db -c $pg_cmd
sed -i -e 's/\\\\/\\/g' exported_artist.json # clean up extra '\' characters

方法 1:使用 BigQuery 执行 ETL

使用此方法可将少量数据一次性加载到 BigQuery 中进行分析。在对大型或多个数据集使用自动化之前,您也可以使用此方法对数据集进行原型设计。

创建 BigQuery 数据集

下图说明了创建 BigQuery 数据集的步骤。

创建 BigQuery 数据集的步骤。

您分别将各个 MusicBrainz 表加载到 BigQuery 中,然后联接加载的表,使每一行都包含您想要的数据关联。您将联接结果存储在新的 BigQuery 表格中。 然后,您可以删除加载的原始表。

  1. 在 Cloud Console 中打开 BigQuery。

    打开 BigQuery

  2. 资源下,点击您的项目的名称。

  3. 在左侧导航中,点击 + 添加数据

  4. 创建数据集对话框中,完成以下步骤:

    1. 数据集 ID 字段中,输入 musicbrainz
    2. 数据位置保留为默认
  5. 点击创建数据集

导入 MusicBrainz 表

对于每个 MusicBrainz 表,请执行以下步骤将表添加到您创建的数据集中:

  1. 在 Cloud Console 中,点击数据集名称,然后点击 +创建表
  2. 创建表对话框中,完成以下步骤,然后点击创建表

    1. 来源下的基于以下数据创建表下拉列表中,选择 Google Cloud Storage
    2. 从 Cloud Storage 存储分区选择文件字段中,输入数据文件的网址:gs://solutions-public-assets/bqetl/artist.json
    3. 针对文件格式,选择 JSON(以换行符分隔)
    4. 表名称中,输入表名称 artist
    5. 表类型中,使原生表处于选中状态。
    6. 架构部分下方,点击以开启以文本形式修改
    7. 下载 artist 架构文件
    8. 架构部分的内容替换为您下载的架构文件的内容。

    使用所下载 JSON 文件中的更新架构创建表对话框。

  3. 等待加载作业完成。如需监控作业,请点击作业记录

    加载完成后,新表将显示在数据集下。

  4. artist_credit_name 表重复步骤 1 - 3,并进行以下更改:

  5. recording 表重复步骤 1 - 3,并进行以下更改:

手动对数据进行反规范化

为了对数据进行反规范化,请将数据联接到一个新的 BigQuery 表格中,该表中每位音乐人的每一条录制内容各占一行,并且包括您希望保留用于分析的选定元数据。

  1. 在 Cloud Console 中,复制以下查询并将其粘贴到查询编辑器中:

    SELECT artist.id, artist.gid as artist_gid,
           artist.name as artist_name, artist.area,
           recording.name as recording_name, recording.length,
           recording.gid as recording_gid, recording.video
      FROM `[PROJECT_ID].[DATASET].artist` as artist
          INNER JOIN `[PROJECT_ID].[DATASET].artist_credit_name` AS artist_credit_name
               ON artist.id = artist_credit_name.artist
          INNER JOIN `[PROJECT_ID].[DATASET].recording` AS recording
               ON artist_credit_name.artist_credit = recording.artist_credit
    

    [DATASET] 替换为您之前创建的数据集的名称(例如 musicbrainz),并将 [PROJECT_ID] 替换为您的 Google Cloud 项目 ID。

  2. 点击更多下拉列表,然后选择查询设置

  3. 查询设置卡中,执行以下操作:

    1. 选中为查询结果设置目标表复选框。
    2. 表名称中输入 recordings_by_artists_manual.
    3. 对于目标表的写入设置,点击覆盖表
    4. 选中允许大型结果(无大小限制)复选框。
    5. 作业优先级保留为默认的互动
    6. SQL 方言保留为默认的标准
    7. 点击保存
  4. 点击运行

    查询完成后,查询结果数据在新创建的 BigQuery 表格中按照每位音乐人的歌曲进行组织。

    目标表的查询设置。

方法 2:使用 Dataflow 执行 ETL 以转到 BigQuery

在本教程的这一部分中,您将使用示例程序通过 Dataflow 流水线将数据加载到 BigQuery 中,而不是使用 BigQuery 界面。然后使用 Dataflow 编程模型对数据进行反规范化和清理,以便加载到 BigQuery 中。

开始之前,请查看相关概念和示例代码。

查看概念

虽然本例中的数据规模很小并且可以使用 BigQuery 界面快速上传,但在本教程中,您也可以使用 Dataflow 执行 ETL。当您执行大规模联接时,使用 Dataflow 而非 BigQuery 界面来执行 ETL 以转到 BigQuery,大规模联接是指处理约有 500-5000 列、超过 10 TB 的数据以完成以下目标:

  • 您希望在数据加载到 BigQuery 之后便进行清理或转换,而不是存储数据并在存储之后进行联接。因此,使用此方法还具有较低的存储要求,因为数据只会以联接和转换状态存储在 BigQuery 中。
  • 您计划进行自定义数据清理(不能只通过 SQL 来实现)。
  • 您计划在加载过程中将该数据与 OLTP 外部的数据(例如日志或远程访问的数据)组合在一起。
  • 您计划使用持续集成或持续部署 (CI/CD) 来实现数据加载逻辑的测试和部署自动化。
  • 您期望随着时间的推移逐步迭代和增强/改进 ETL 流程。
  • 您计划以增量方式添加数据,而不是执行一次性 ETL。

以下是示例程序创建的数据流水线图表:

使用 BigQuery 的数据流水线。

在示例代码中,许多流水线步骤被分组或封装到简便方法中,给定描述性名称,并重复使用。在以上图表中,重复使用的步骤由虚线框起。

查看流水线代码

此代码创建一个执行以下步骤的流水线:

  1. 将您要在联接中使用的每个表加载到字符串 PCollection 集合中。每个元素包含表中以 JSON 表示的一行。

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. 将这些 JSON 字符串转换为 MusicBrainzDataObject 对象的表示形式,然后通过其中一个列值(如主键或外键)组织对象表示形式。

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply("load " + name,
                        MapElements
                          .into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
                          .via( (String input) -> {
                                MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                                Long key = (Long) datum.getColumnValue(namespacedKeyname);
                                return KV.of(key, datum);
                                })
             );
    }
  3. 根据共同音乐人联接列表。artist_credit_name 将音乐人团队与其录制内容进行关联,并包含音乐人外键。artist_credit_name 表作为键值 KV 对象列表进行加载。K 成员为音乐人。

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. 使用 MusicBrainzTransforms.innerJoin() 方法联接列表。

    public static PCollection<MusicBrainzDataObject>
                         innerJoin(String name,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table1,
                                   PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>(){};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>(){};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. 按要联接的键成员对 KV 对象的集合进行分组。这会生成带有长键(artist.id 列值)的 KV 对象的 PCollection,并生成 CoGbkResult(代表根据键结果合并分组)。CoGbkResult 对象是对象列表的元组,其具有第一和第二个 PCollections 集合的共同键值。在运行 group 方法中的 CoGroupByKey 操作之前,该元组通过为每个 PCollection 配制的 Tuple 标记可以进行寻址。
    2. 将每个对象匹配合并到表示联接结果的 MusicBrainzDataObject 对象中。

          PCollection<List<MusicBrainzDataObject>> mergedResult =
              joinedResult.apply("merge join results",
                           MapElements
                         .into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                         .via( ( KV<Long, CoGbkResult> group ) -> {
                             List<MusicBrainzDataObject> result = new ArrayList<MusicBrainzDataObject>();
                             Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                             Iterable<MusicBrainzDataObject> rightObjects = group.getValue().getAll(t2);
                             leftObjects.forEach((MusicBrainzDataObject l) -> {
                               rightObjects.forEach((MusicBrainzDataObject r) -> {
                                 result.add(l.duplicate().merge(r));
                               });
                             });
                             return result;
                           }
                         )
      );
    3. 将集合重新组织为 KV 对象列表以开始下一次联接。此时,K 值为 artist_credit 列,用于联接录制内容表。

      PCollection<KV<Long,MusicBrainzDataObject>> artistCreditNamesByArtistCredit =  MusicBrainzTransforms.by("artist_credit_name_artist_credit", artistCredits);
    4. 将结果与按 MusicBrainzDataObject 组织的已加载的录制内容集合进行联接,获得 artist_credit.id 对象的最终生成集合。

      PCollection<MusicBrainzDataObject> artistRecordings = MusicBrainzTransforms.innerJoin("joined recordings",
         artistCreditNamesByArtistCredit, recordingsByArtistCredit);
    5. 将生成的 MusicBrainzDataObjects 对象映射到 TableRows

      PCollection<TableRow> tableRows = MusicBrainzTransforms.transformToTableRows(artistRecordings, bqTableSchema);
    6. 将生成的 TableRows 写入 BigQuery。

      tableRows.apply(
           "Write to BigQuery",
           BigQueryIO.writeTableRows()
          .to(BQETLOptions.getBigQueryTablename())
          .withSchema(bqTableSchema)
          .withCustomGcsTempLocation(StaticValueProvider.of(BQETLOptions.getTempLocation() ))
          .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
          .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

如需详细了解 Dataflow 流水线编程机制,请参阅以下有关编程模型的主题:

在查看完代码执行的步骤后,您可以运行流水线。

运行流水线代码

  1. 在 Cloud Console 中,打开 Cloud Shell。

    打开 Cloud Shell

  2. 设置项目的环境变量:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    

    [PROJECT_ID] 替换为您的 Google Cloud 项目 ID,并将 [CHOOSE_AN_APPROPRIATE_ZONE] 替换为 Google Cloud 地区

  3. 设置流水线脚本使用的环境变量:

    export DESTINATION_TABLE=recordings_by_artists_dataflow
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export DATASET=musicbrainz
    export SERVICE_ACCOUNT=project-owner
    
  4. 确保 gcloud 正在使用您在本教程开头部分创建或选择的项目:

    gcloud config set project $PROJECT_ID
    
  5. 创建一个服务帐号以运行流水线:

    gcloud iam service-accounts create ${SERVICE_ACCOUNT} \
        --display-name "Project Owner Account"
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
        --member serviceAccount:${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
        --role roles/owner
    gcloud iam service-accounts keys create \
        ~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json \
        --iam-account ${SERVICE_ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com
    

    此命令将下载包含服务帐号密钥的 JSON 文件。 请将此文件存储在安全的位置。

  6. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务帐号密钥的 JSON 文件的文件路径:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  7. 克隆包含 Dataflow 代码的代码库:

    git clone https://github.com/GoogleCloudPlatform/bigquery-etl-dataflow-sample.git
    
  8. 切换到示例目录:

    cd bigquery-etl-dataflow-sample
    
  9. 在 Cloud Storage 中创建一个暂存存储分区,因为 Dataflow 作业需要使用 Cloud Storage 中的存储分区来暂存运行流水线所用到的二进制文件。

    gsutil mb gs://$STAGING_BUCKET
    
  10. [STAGING_BUCKET_NAME] 的对象生命周期设置为 dataflow-staging-policy.json 文件中的对象生命周期:

    gsutil lifecycle set dataflow-staging-policy.json gs://$STAGING_BUCKET
    
  11. 运行 Dataflow 作业:

    ./run.sh simple
    
  12. 如需查看流水线的进度,请在 Cloud Console 中转到 Dataflow 页面。

    转到 Dataflow 页面

    作业的状态显示在状态列中。状态为成功表示作业已完成。

  13. (可选)如需查看作业图表并详细了解步骤,请点击作业名称(例如 etl-into-bigquery-bqetlsimple)。

  14. 在 Cloud Console 中,转到 BigQuery 页面。

    转到 BigQuery 页面

    确保您的 Google Cloud 项目处于选中状态。

  15. 如需在新表上运行查询,请在查询编辑器窗格中输入以下内容:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow
    WHERE artist_area is NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    更新了查询编辑器以便查询新表。

清理数据

接下来,请对 Dataflow 流水线稍做更改,以便您可以加载对照表并将其作为辅助输入进行处理,如下图所示。

更新了 Dataflow 流水线以支持辅助输入。

当您查询生成的 BigQuery 表格时,如果不手动查找 MusicBrainz 数据库的 area 表中的区域数字 ID,则很难推测音乐人来自哪里。这会使得分析查询结果变得复杂。

同样,音乐人性别也显示为 ID,但整个 MusicBrainz 性别表仅包含三行。为了解决此问题,您可以在 Dataflow 流水线中添加一个步骤,以使用 MusicBrainz areagender 表将 ID 映射到其正确的标签。

artist_areaartist_gender 表包含的行数明显少于音乐人或录制内容数据表。后续表中的元素数量分别受到地理区域数量或性别数量的限制。

因此,查找步骤会使用称为辅助输入的 Dataflow 功能。

辅助输入作为按行分隔的 JSON 的表导出进行加载,并且可通过一个步骤用来对表数据进行反规范化。

查看用于向流水线添加辅助输入的代码

在运行流水线之前,请查看代码以便深入了解新步骤。

BQETLSimple.java 文件中,查看被注释掉的行。这些行在以下步骤中未取消注释。

//PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
//        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
//        MusicBrainzTransforms.lookup("gender","id","name","gender"));

PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");

此代码演示了如何使用辅助输入进行数据清理。MusicBrainzTransforms 类为使用辅助输入将外键值映射到标签提供了更多便利。MusicBrainzTransforms 库提供了一种创建内部查找类的方法。该查找类的作用是描述每个对照表以及用标签和可变长度参数替换的字段。keyKey 是包含查找键的列名,valueKey 是包含对应标签的列名。

public static LookupDescription lookup(String objectName, String keyKey, String valueKey, String... destinationKeys) {
  return new LookupDescription(objectName, keyKey, valueKey, destinationKeys);
}

每个辅助输入都作为单个映射对象进行加载,用于查找 ID 的对应标签。

首先,对照表的 JSON 最初加载到具有空命名空间的 MusicBrainzDataObjects,然后转换为从 Key 列值到 Value 列值的映射。

public static PCollectionView<Map<Long, String>> loadMapFromText(PCollection<String> text, String name, String keyKey, String valueKey) {
  // column/Key names are namespaced in MusicBrainzDataObject
  String keyKeyName = name + "_" + keyKey;
  String valueKeyName = name + "_" + valueKey;

  PCollection<KV<Long, String>> entries = text.apply(
        "sideInput_" + name,
        MapElements
          .into(new TypeDescriptor<KV<Long, String>>() {})
          .via((String input) -> {
                 MusicBrainzDataObject object = JSONReader.readObject(name, input);
                 Long key = (Long) object.getColumnValue(keyKeyName);

                 String value = (String) object.getColumnValue(valueKeyName);
                 return KV.of(key, value);
               })
        );

  return entries.apply(View.<Long, String>asMap());
}

所有这些 Map 对象都通过其 destinationKey(该键将替换为查找值)的值加入一个 Map

List<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>> mapSideInputs = new ArrayList<SimpleEntry<ArrayList<String>, PCollectionView<Map<Long, String>>>>();

for (LookupDescription mapper : mappers) {
  PCollectionView<Map<Long, String>> mapView = loadMap(text.getPipeline(), mapper.objectName, mapper.keyKey, mapper.valueKey);
  List<String> destKeyList =
      mapper.destinationKeys.stream()
                            .map( destinationKey -> name + "_" + destinationKey )
                            .collect(Collectors.toList());

    mapSideInputs.add(new SimpleEntry(destKeyList, mapView));

}

然后,在从 JSON 转换音乐人对象时,destinationKey 的值(以数字开头)将替换为其标签。

Map<Long, String> sideInputMap = c.sideInput(mapping.getValue());

List<String> keyList = mapping.getKey();

keyList.forEach( ( String key ) -> {
  Long id = (Long) result.getColumnValue(key);
  if (id != null) {
    String label = (String) sideInputMap.get(id);
    if (label == null) {
      label = "" + id;
    }
    result.replace(key, label);

如需修改 BQETLSimple.java,并使用查询来解码 artist_areaartist_gender 字段的数据,请完成以下步骤:

  1. 对程序流稍做更改:

    1. 取消注释那些使用查询加载音乐人数据的行。
    2. 注释掉不使用查询加载音乐人数据的 loadTable 方法调用。
    //PCollection<KV<Long,MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p,"artist","id",
    //        MusicBrainzTransforms.lookup("area", "id", "name", "area", "begin_area"),
    //        MusicBrainzTransforms.lookup("gender","id","name","gender"));
    
    PCollection<KV<Long, MusicBrainzDataObject>> artists = MusicBrainzTransforms.loadTable(p, "artist", "id");
  2. 注释掉相应的 int 字段并取消注释相应的 string 字段,将 TableFieldSchemasartist_areaartist_gender 的数据类型从 int 更改为 string

    /*Switch these two lines when using mapping table for artist_area */
    //        .stringField("artist_area")
            .intField("artist_area")
    /*Switch these two lines when using mapping table for artist_gender */
    //        .stringField("artist_gender")
            .intField("artist_gender")
    /*Switch these two lines when using mapping table for artist_begin_area */
            .intField("artist_begin_area")
    //      .stringField("artist_begin_area")
  3. 如需重新运行流水线代码,请完成以下步骤:

    1. 设置项目的环境变量:

      export PROJECT_ID=[PROJECT_ID]
      export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
      
    2. 确保已设置环境:

      export DESTINATION_TABLE=recordings_by_artists_dataflow_sideinputs
      export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
      export DATASET=musicbrainz
      export SERVICE_ACCOUNT=project-owner
      
    3. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务帐号密钥的 JSON 文件的文件路径。

      export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
      
    4. 运行流水线以嵌套音乐人行中的录制内容行:

      ./run.sh simple
      
  4. 执行包含 artist_areaartist_gender 的相同查询:

    SELECT artist_name, artist_gender, artist_area, recording_name, recording_length
    FROM musicbrainz.recordings_by_artists_dataflow_sideinputs
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    在输出中,artist_areaartist_gender 现在已解码:

    由“artist_area”和“artist_gender”解码的输出。

优化 BigQuery 架构

在本教程的最后一部分,您将运行一个流水线,该流水线会使用嵌套字段生成更优化的表架构。

请花点时间查看用于生成该表这一优化版本的代码。

下图展示了一个稍有不同的 Dataflow 流水线,它将音乐人的录制内容嵌套在每个音乐人行中,而不是创建重复的音乐人行。

用于将音乐人的录制内容嵌套在每个音乐人行中的 Dataflow 流水线。

数据的当前表示形式呈展平状态。也就是说,每个记录的录制内容对应一行,其中包括来自 BigQuery 架构的所有音乐人的元数据,以及所有录制内容和 artist_credit_name 元数据。这种展平化表示形式至少有两个缺点:

  • 它会为每个记入音乐人的录制内容重复 artist 元数据,这会增加所需的存储空间。
  • 当您将数据导出为 JSON 时,它会导出重复该数据的数组,而不是具有嵌套录制内容数据的音乐人,但后者可能才是您想要的结果。

在没有任何性能损失且不使用额外存储空间的情况下,您可以通过对 Dataflow 流水线进行一些更改,将录制内容存储为每个音乐人记录中的重复字段,而不是每行存储一个录制内容。

流水线不是通过 artist_credit_name.artist 将录制内容与音乐人信息进行联接,而是在音乐人对象中创建嵌套的录制内容列表。

public static PCollection<MusicBrainzDataObject> nest(PCollection<KV<Long, MusicBrainzDataObject>> parent,
                                                      PCollection<KV<Long, MusicBrainzDataObject>> child,
                                                      String nestingKey) {
  final TupleTag<MusicBrainzDataObject> parentTag = new TupleTag<MusicBrainzDataObject>(){};
  final TupleTag<MusicBrainzDataObject> childTag = new TupleTag<MusicBrainzDataObject>(){};

  PCollection<KV<Long, CoGbkResult>> joinedResult = group("nest " + nestingKey, parent, child, parentTag, childTag);
  return joinedResult.apply("merge join results " + nestingKey,
                            MapElements
                             .into(new TypeDescriptor<MusicBrainzDataObject>() {})
                             .via((KV<Long, CoGbkResult> group) -> {
                                MusicBrainzDataObject parentObject = group.getValue().getOnly(parentTag);
                                Iterable<MusicBrainzDataObject> children = group.getValue().getAll(childTag);
                                List<MusicBrainzDataObject> childList = new ArrayList<MusicBrainzDataObject>();
                                children.forEach(childList::add);
                                parentObject = parentObject.duplicate();
                                parentObject.addColumnValue("recordings", childList);
                                return parentObject;
                              })
                         );
}

TableRow 在 BigQuery API 中存在大小限制,因此代码将给定记录的嵌套录制内容数量限制为 1000 个元素。如果给定的音乐人拥有超过 1000 个录制内容,则代码将复制该行(包括 artist 元数据),并继续将录制内容数据嵌套在复制行中。

private static List<TableRow> toTableRows(MusicBrainzDataObject mbdo, Map<String, Object> serializableSchema) {
  TableRow row = new TableRow();
  List<TableRow> result = new ArrayList<TableRow>();
  Map<String, List<MusicBrainzDataObject>> nestedLists = new HashMap<String, List<MusicBrainzDataObject>>();
  Set<String> keySet = serializableSchema.keySet();
  /*
   *  construct a row object without the nested objects
   */
  int maxListSize = 0;
  for (String key : keySet) {
    Object value = serializableSchema.get(key);
    Object fieldValue = mbdo.getColumnValue(key);
    if (fieldValue != null) {
      if (value instanceof Map) {
        List<MusicBrainzDataObject> list = (List<MusicBrainzDataObject>) fieldValue;
        if (list.size() > maxListSize) {
          maxListSize = list.size();
        }
        nestedLists.put(key, list);
      } else {
        row.set(key, fieldValue);
      }

    }
  }
  /*
   * add the nested objects but break up the nested objects across duplicate rows if nesting limit exceeded
   */
  TableRow parent = row.clone();
  Set<String> listFields = nestedLists.keySet();
  for (int i = 0; i < maxListSize; i++) {
    parent = (parent == null ? row.clone() : parent);
    final TableRow parentRow = parent;
    nestedLists.forEach((String key, List<MusicBrainzDataObject> nestedList) -> {
      if (nestedList.size() > 0) {
        if (parentRow.get(key) == null) {
          parentRow.set(key, new ArrayList<TableRow>());
        }
        List<TableRow> childRows = (List<TableRow>) parentRow.get(key);
        childRows.add(toChildRow(nestedList.remove(0), (Map<String, Object>) serializableSchema.get(key)));
      }
    });
    if ((i > 0) && (i % BIGQUERY_NESTING_LIMIT == 0)) {
      result.add(parent);
      parent = null;
    }
  }
  if (parent != null) {
    result.add(parent);
  }
  return result;
}

下图显示了流水线的来源、转换和接收器。

具有来源、转换和接收器的优化流水线。

大多数情况下,步骤名称在代码中作为 apply 方法调用的一部分提供。

若要创建此优化流水线,请完成以下步骤:

  1. 在 Cloud Shell 中,确保为流水线脚本设置了环境:

    export PROJECT_ID=[PROJECT_ID]
    export ZONE=[CHOOSE_AN_APPROPRIATE_ZONE]
    export DESTINATION_TABLE=recordings_by_artists_dataflow_nested
    export DATASET=musicbrainz
    export STAGING_BUCKET=${PROJECT_ID}-etl-staging-bucket
    export SERVICE_ACCOUNT=project-owner
    
  2. 将环境变量 GOOGLE_APPLICATION_CREDENTIALS 设置为包含服务帐号密钥的 JSON 文件的文件路径:

    export GOOGLE_APPLICATION_CREDENTIALS=~/${PROJECT_ID}-${SERVICE_ACCOUNT}.json
    
  3. 运行流水线以嵌套音乐人行中的录制内容行:

    ./run.sh nested
    
  4. 从嵌套在 BigQuery 的表中查询字段:

    SELECT artist_name, artist_gender, artist_area, artist_recordings
    FROM musicbrainz.recordings_by_artists_dataflow_nested
    WHERE artist_area IS NOT NULL
    AND artist_gender IS NOT NULL
    LIMIT 1000;
    

    嵌套表的查询结果。

  5. 运行查询以从 STRUCT 提取值并使用这些值来过滤结果:

    SELECT artist_name,
           artist_gender,
           artist_area,
           ARRAY(SELECT artist_credit_name_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS artist_credit_name_name,
           ARRAY(SELECT recording_name
                   FROM UNNEST(recordings_by_artists_dataflow_nested.artist_recordings)) AS recording_name
     FROM musicbrainz.recordings_by_artists_dataflow_nested,
          UNNEST(recordings_by_artists_dataflow_nested.artist_recordings) AS artist_recordings_struct
    WHERE artist_recordings_struct.recording_name LIKE "%Justin%"
    LIMIT 1000;
    

    查询以过滤结果。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

请按照相应步骤逐个删除资源,而非删除整个项目。

删除 Cloud Storage 存储分区

  1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

    转到“Cloud Storage 浏览器”页面

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除

删除 BigQuery 数据集

  1. 打开 BigQuery 网页界面。

    打开 BIGQUERY

  2. 选择您在本教程中创建的 BigQuery 数据集。

  3. 点击删除

后续步骤