使用模板处理数据

Dataplex 提供由 Dataflow 提供支持的模板,用于执行常见的数据处理任务,例如数据注入、处理和管理数据生命周期。本指南介绍如何配置和运行数据处理模板。

准备工作

Dataplex 模板由 Dataflow 提供支持。在使用模板之前,请先启用 Dataflow API。

启用 Dataflow API

Notes

  • 所有模板都支持常用的 Dataflow 流水线选项

  • Dataplex 使用数据流水线来调度由模板定义的任务。

  • 您只能在 Google Cloud 控制台 Dataplex 页面上查看通过 Dataplex 安排的任务。

模板:将原始数据转换为精选数据

Dataplex 文件格式转换模板将 Dataplex Cloud Storage 资产中的数据或以 CSV 或 JSON 格式存储的 Dataplex 实体列表转换为其他 Dataplex 资产中的 Parquet 或 Avro 格式数据。转换过程中会保留分区布局。它还支持压缩输出文件。

模板参数

参数 说明
inputAssetOrEntitiesList 包含输入文件的 Dataplex 资产或 Dataplex 实体。此参数应遵循以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>...
outputFileFormat Cloud Storage 中的输出文件格式。此参数应遵循以下格式:PARQUETAVRO
outputAsset 包含存储输出文件的 Cloud Storage 存储桶的 Dataplex 资源的名称。此参数应遵循以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>。您可以在 Google Cloud 控制台的 Dataplex 资产 Details 标签页中找到 outputAsset
outputFileCompression 可选:输出文件压缩。此参数的默认值为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2PARQUET 文件不支持 BZIP2
writeDisposition 可选:指定在目标文件已存在时要执行的操作。此参数的默认值为 SKIP,表示仅处理目标目录中不存在的文件。该参数的其他值可以是 OVERWRITE(覆盖任何现有文件)或 FAIL(如果已经存在至少一个目标文件,则不处理任何内容并产生错误)。
updateDataplexMetadata

可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 false

启用后,流水线会自动将架构从来源复制到目标 Dataplex 实体,并且系统不会针对这些实体运行自动化 Dataplex Discovery。如果源(原始)数据的架构由 Dataplex 管理,请使用此标志。

运行模板

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    进入 Dataplex

  2. 进入 Process 视图。

  3. 点击创建任务

  4. 转换为精选格式下,点击创建任务

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择任务执行区域。

  8. 填写必填参数。

  9. 点击继续

gcloud

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

REST API

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

模板:从 BigQuery 资产到 Cloud Storage 资产的层级数据

Dataplex BigQuery to Cloud Storage 模板可将 Dataplex BigQuery 资源中的数据以与 Dataplex 兼容的布局和格式复制到 Dataplex Cloud Storage 资产中。您可以指定要复制的 BigQuery 数据集或 BigQuery 表列表。为了提高灵活性,该模板允许复制早于指定修改日期的数据,并可以选择在成功复制后从 BigQuery 中删除数据。

在将分区表从 BigQuery 复制到 Cloud Storage 时,模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。在写入 Cloud Storage 时,该模板通过将后缀 _pid 附加到现有分区列来创建新的分区键。如需以外部表的形式访问 BigQuery 中的数据,必须执行此操作。目前,BigQuery 不能使 Hive 样式的分区键与现有列相同。因此,当从 BigQuery 作为外部表查看时,复制的表将包含一个额外的分区键列。其余数据将按原样保留。

将分区表从 BigQuery 复制到 Cloud Storage 时:

  • 该模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。目前,BigQuery 不能使 Hive 样式的分区键与现有列相同。您可以使用选项 enforceSamePartitionKey 创建新的分区键,或者保留相同的分区键但重命名现有列。
  • 创建 BigQuery 表(以及 Dataproc Metastore 中的表)时,Dataplex Discovery 会将分区类型注册为 string。这可能会影响您现有的分区过滤条件。

在单次运行模板中可转换的表和分区数量存在限制,约为 300 个。具体数量取决于表名称的长度和其他因素。

模板参数

参数 说明
sourceBigQueryDataset 用于为其分层的 BigQuery 数据集。此参数应包含格式为 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 的 Dataplex 资产名称,或包含 projects/<name>/datasets/<dataset-id> 格式的 BigQuery 数据集 ID。
destinationStorageBucketAssetName 要将数据分层的 Cloud Storage 存储桶的 Dataplex 资产名称。此参数应遵循 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 格式。
tables 可选:要分层的 BigQuery 表的逗号分隔列表。如果未提供列表,则所有表都将分层。表应仅按名称指定(无项目/数据集前缀),且区分大小写。
exportDataModifiedBeforeDateTime 可选:使用此参数可移动早于此日期(及可选时间)的数据。对于分区表 BigQuery 表,请将上次修改时间早于此日期/时间的分区移动。对于非分区表,如果表的上次修改时间早于此日期/时间,则进行移动。如果未指定,请移动所有表/分区。默认情况下,日期/时间以默认时区进行解析,但支持可选后缀 Z+HH:mm。此参数应遵循 YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00 格式。也支持相对日期/时间,此格式应遵循 -PnDTnHnMn.nS 格式(必须以 -P 开头,表示过去的时间)。
fileFormat 可选:Cloud Storage 中的输出文件格式。此参数的默认值为 PARQUET。该参数的另一个值可以是 AVRO
fileCompression 可选:输出文件压缩。此参数的默认值为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2PARQUET 文件不支持 BZIP2
deleteSourceData 可选:成功导出后,是否从 BigQuery 中删除源数据。值可以是 truefalse。此参数的默认值为 false
partitionIdRegExp 可选:仅处理分区 ID 与此正则表达式匹配的分区。如果未提供值,则此参数默认处理所有值。
writeDisposition 可选:指定目标文件已存在时要执行的操作,这意味着一个或多个表/分区已预先分层。此参数的默认值为 SKIP,表示仅处理未预分层的表/分区。该参数的其他值可以是 OVERWRITE(覆盖任何现有文件)或 FAIL(如果已经存在至少一个目标文件,则不处理任何内容并产生错误)。
enforceSamePartitionKey

可选:是否强制执行同一分区键。由于 BigQuery 的限制,分区外部表中的分区键(位于文件路径中)不能与文件中的某一列同名。如果此参数为 true(即默认值),则目标文件的分区键将设置为原始分区列名称,并重命名文件中的列。如果为 false,则重命名分区键。

例如,如果原始表按名为 TSenforceSamePartitionKey=true 的列进行分区,则目标文件路径为 gs://<bucket>/TS=<partition ID>/<file>,并且该文件在文件中已重命名为 TS_pkey。这样,就可以对旧表或新表中的相同分区执行现有查询。

如果为 enforceSamePartitionKey=false,则目标文件路径为 gs://<bucket>/TS_pid=<partition ID>/<file>,但列名称会在文件中保留为 TS

updateDataplexMetadata

可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 false

启用后,流水线会自动将架构从来源复制到目标 Dataplex 实体,并且系统不会针对这些实体运行自动化 Dataplex Discovery。如果您要管理 BigQuery 源表的架构,请使用此标志。

运行模板

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    进入 Dataplex

  2. 进入 Process 视图。

  3. 点击 Create Task

  4. 从 BQ 到 GCS 资产下,点击创建任务

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择任务执行区域。

  8. 填写必填参数。

  9. 点击继续

gcloud

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

REST API

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

安排其他 Google Cloud 提供的或自定义 Dataflow 模板

借助 Dataplex,您可以安排和监控 Google Cloud 提供的任何 Dataflow 模板或您的自定义 Dataflow 模板。

时间表

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    进入 Dataplex

  2. 进入 Process 视图。

  3. 点击 Create Task

  4. 编写 Dataflow 流水线下,点击创建 Dataflow 流水线

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择运行任务的区域。

  8. 选择 Dataflow 模板。

  9. 填写必填参数。

  10. 点击继续

监控

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    进入 Dataplex

  2. 进入 Process 视图。

  3. 点击 Dataflow 流水线

  4. 按数据湖或流水线名称过滤。