使用模板处理数据

Dataplex 提供由 Dataflow 提供支持的模板,用于执行数据提取、处理和管理数据生命周期等常见数据处理任务。本指南介绍了如何配置和运行数据处理模板。

准备工作

Dataplex 模板由 Dataflow 提供支持。 在使用模板之前,请先启用 Dataflow API。

启用 Dataflow API

备注

  • 所有模板都支持常见的 Dataflow 流水线选项

  • Dataplex 使用数据流水线来安排模板定义的任务。

  • 您只能在 Google Cloud 控制台的 Dataplex 页面上看到通过 Dataplex 安排的任务。

模板:将原始数据转换为精选数据

Dataplex 文件格式转换模板可将 Dataplex Cloud Storage 资产或 以 CSV 或 JSON 格式存储的 Dataplex 实体、Parquet 或 另一个 Dataplex 资产中的 Avro format-data。分区布局 会保留在转换中它还支持对输出文件进行压缩。

模板参数

参数 说明
inputAssetOrEntitiesList 需要指定 Cloud Storage 存储分区的 Dataplex 资产或 Dataplex 实体 包含输入文件。此参数应遵循以下格式: projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>...
outputFileFormat Cloud Storage 中的输出文件格式。此参数应遵循以下格式:PARQUETAVRO
outputAsset 包含 用于存储输出文件的 Cloud Storage 存储桶。此参数应采用以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>。您可以 outputAsset,在 Google Cloud 控制台中 Dataplex 资产“Details”标签页。
outputFileCompression 可选:输出文件压缩。此字段的默认值 参数为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2PARQUET 文件不支持 BZIP2
writeDisposition 可选:指定在生成目标文件时执行的操作 已存在。此参数的默认值为 SKIP。 哪些信号仅处理 目标目录中。参数的其他值可以是 OVERWRITE(覆盖所有现有文件)或 FAIL (如果至少有 1 个目标,则不处理任何内容,并且会生成错误 文件已存在)。
updateDataplexMetadata

可选:是否为 新建实体。此参数的默认值为 false

如果启用此功能,流水线将自动将架构从源复制到目标 Dataplex 实体,并且不会为这些实体运行自动化 Dataplex 发现功能。如果 源(原始)数据的架构由 Dataplex 管理。

运行模板

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    进入 Dataplex

  2. 前往流程视图。

  3. 点击创建任务

  4. 转换为精选格式下方,点击创建任务

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择用于执行任务的区域。

  8. 填写必需参数。

  9. 点击继续

gcloud

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

REST API

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

模板:层级数据从 BigQuery 资产到 Cloud Storage 资产

Dataplex BigQuery to Cloud Storage 模板可将数据从 Dataplex BigQuery 资产复制到 Dataplex Cloud Storage 资产,并采用与 Dataplex 兼容的布局和格式。您可以指定 BigQuery 数据集或 BigQuery 表列表 要复制的对象。为了提高灵活性,该模板允许复制数据 早于指定的修改日期,并允许选择性地删除数据 从 BigQuery 中复制数据。

将分区表从 BigQuery 复制到 Cloud Storage 时:

  • 该模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。 BigQuery 不能包含 Hive 样式的分区键 与现有列相同。您可以使用选项 enforceSamePartitionKey 创建新的分区键,也可以保留相同的分区键,但重命名现有列。
  • 创建 BigQuery 表(以及 Dataproc Metastore 中的表)时,Dataplex Discovery 会将分区类型注册为 string。这可能会影响您现有的分区过滤条件。

可以转换的表和分区的数量有限制 大约需要 300 次。确切数量取决于表名称的长度和其他因素。

模板参数

参数 说明
sourceBigQueryDataset 要对数据进行分层的 BigQuery 数据集。此参数应包含格式为 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 的 Dataplex 资产名称,或格式为 projects/<name>/datasets/<dataset-id> 的 BigQuery 数据集 ID。
destinationStorageBucketAssetName Cloud Storage 存储桶的 Dataplex 资产名称 将数据划分到哪个层级此参数应遵循 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 格式。
tables 可选:要分层的 BigQuery 表的逗号分隔列表。如果未提供列表,则所有表将进行分层。表应 (无项目/数据集前缀),并且 区分大小写。
exportDataModifiedBeforeDateTime 可选:使用此参数可移动早于此日期(以及 (可选时间)。对于分区的 BigQuery 表,请将 分区在此日期/时间之前的最后修改时间。对于非分区表,如果表的上次修改时间在此日期/时间之前,则移除。如果未指定,则移动所有表/分区。系统会默认在默认时区解析日期/时间,但支持可选后缀 Z+HH:mm。此参数应遵循以下格式: YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00相对日期/时间 格式,并且应遵循 -PnDTnHnMn.nS(必须以 -P 开头, 表示过去的时间)。
fileFormat 可选:Cloud Storage 中的输出文件格式。默认 此参数的值为 PARQUET。另一个 参数可以是 AVRO
fileCompression 可选:输出文件压缩。此字段的默认值 参数为 SNAPPY。参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2PARQUET 文件不支持 BZIP2
deleteSourceData 可选:在导出成功后是否从 BigQuery 中删除源数据。值可以是 truefalse。此参数的默认值为 false
partitionIdRegExp 可选:仅处理与此正则表达式匹配的分区 ID 的分区。如果未提供任何值,则此参数默认为 全部处理。
writeDisposition 可选:指定目标文件已存在时执行的操作,即一个或多个表/分区已预分层。此参数的默认值为 SKIP。 哪些信号仅处理未未受其影响的表/分区, 。参数的其他值可以是 OVERWRITE(覆盖所有现有文件)或 FAIL (如果至少有 1 个目标,则不处理任何内容,并且会生成错误 文件已存在)。
enforceSamePartitionKey

可选:是否强制使用相同的分区键。由于 BigQuery 限制,分区外部表中的分区键(在文件路径中)不能与文件中的某个列具有相同的名称。如果此参数为 true(默认值),则目标文件的分区键会设置为原始分区列名称,并且文件中的列会重命名。如果 false,则重命名分区键。

例如,如果原始表根据名为 TSenforceSamePartitionKey=true,然后 目标文件路径为 gs://<bucket>/TS=<partition ID>/<file> 和 在该文件中,该列已重命名为 TS_pkey。这样,您就可以针对旧表或新表中的相同分区执行现有查询。

如果为 enforceSamePartitionKey=false,则目标文件路径为 gs://<bucket>/TS_pid=<partition ID>/<file>,但列名称在文件中保持为 TS

updateDataplexMetadata

可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 false

启用后,流水线将自动从源复制架构 发送到目标 Dataplex 实体, Dataplex Discovery 不会为其运行。如果您 管理源 BigQuery 表的架构。

运行模板

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 页面:

    进入 Dataplex

  2. 转到进程视图。

  3. 点击创建任务

  4. Tier from BQ to GCS Assets(从 BQ 到 GCS 资产)下,点击创建任务

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择用于执行任务的区域。

  8. 填写必需参数。

  9. 点击继续

gcloud

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

REST API

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

安排其他 Google Cloud 提供的 Dataflow 模板或自定义 Dataflow 模板

通过 Dataplex,您可以安排和监控 Google Cloud 提供的 Dataflow 模板或您的自定义 控制台中的 Dataflow 模板。

时间表

控制台

  1. 在 Google Cloud 控制台中,转到 Dataplex 页面:

    进入 Dataplex

  2. 前往流程视图。

  3. 点击创建任务

  4. 编写 Dataflow 流水线下,点击创建 Dataflow 流水线

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择要运行任务的区域。

  8. 选择一个 Dataflow 模板。

  9. 填写所需的参数。

  10. 点击继续

监控

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 页面:

    进入 Dataplex

  2. 前往流程视图。

  3. 点击 Dataflow 流水线

  4. 按数据湖或数据流水线名称过滤。