使用模板处理数据

Dataplex 提供由 Dataflow 提供支持的模板,用于执行数据注入、处理和管理数据生命周期等常见数据处理任务。本指南介绍了如何配置和运行数据处理模板。

准备工作

Dataplex 模板由 Dataflow 提供支持。在使用模板之前,请先启用 Dataflow API。

启用 Dataflow API

请注意以下几点:

  • 所有模板都支持常见的 Dataflow 流水线选项

  • Dataplex 使用数据流水线来安排模板定义的任务。

  • 您只能在 Google Cloud 控制台的 Dataplex 页面上看到通过 Dataplex 安排的任务。

模板:将原始数据转换为精选数据

Dataplex 文件格式转换模板可将 Dataplex Cloud Storage 资产中的数据或以 CSV 或 JSON 格式存储的 Dataplex 实体列表转换为另一个 Dataplex 资产中的 Parquet 或 Avro 格式数据。分区布局会在转换中保留。它还支持压缩输出文件。

模板参数

参数 说明
inputAssetOrEntitiesList 包含输入文件的 Dataplex 资产或 Dataplex 实体。此参数必须采用以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>...
outputFileFormat Cloud Storage 中的输出文件格式。此参数必须采用以下格式:PARQUETAVRO
outputAsset 包含将存储输出文件的 Cloud Storage 存储桶的 Dataplex 资产的名称。此参数必须采用以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>。您可以在 Google Cloud 控制台中的“Dataplex 资产”Details标签页中找到 outputAsset
outputFileCompression 可选:输出文件压缩。此参数的默认值为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2PARQUET 文件不支持 BZIP2
writeDisposition 可选:指定目标文件已存在时执行的操作。此参数的默认值为 SKIP,表示仅处理目标目录中不存在的文件。该参数的其他值可以是 OVERWRITE(覆盖所有现有文件)或 FAIL(不处理任何内容,如果至少有一个目标文件已存在,则会生成错误)。
updateDataplexMetadata

可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 false

如果启用此功能,流水线将自动将架构从源复制到目标 Dataplex 实体,并且不会为这些实体运行自动化 Dataplex 发现。如果来源(原始)数据的架构由 Dataplex 管理,请使用此标志。

运行模板

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 页面。

    进入 Dataplex

  2. 前往流程视图。

  3. 点击创建任务

  4. 转换为专业格式下,点击创建任务

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择任务执行区域。

  8. 填写必需参数。

  9. 点击继续

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \
--parameters \
inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\
outputFileFormat=OUTPUT_FILE_FORMAT,\
outputAsset=OUTPUT_ASSET

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

REST

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST",
        "outputFileFormat": "OUTPUT_FILE_FORMAT",
        "outputAsset": "OUTPUT_ASSET",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview",
 }
}

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers
OUTPUT_FILE_FORMAT: your output file format in Cloud Storage
OUTPUT_ASSET: your Dataplex output asset ID

模板:将 BigQuery 资产中的数据分层到 Cloud Storage 资产

Dataplex BigQuery to Cloud Storage 模板可将数据从 Dataplex BigQuery 资产复制到 Dataplex Cloud Storage 资产,并采用与 Dataplex 兼容的布局和格式。您可以指定要复制的 BigQuery 数据集或 BigQuery 表列表。为了提高灵活性,该模板允许复制早于指定修改日期的数据,并允许在成功复制后视需要从 BigQuery 中删除数据。

将分区表从 BigQuery 复制到 Cloud Storage 时:

  • 该模板会在 Cloud Storage 存储桶上创建 Hive 风格的分区。 BigQuery 中的 Hive 风格分区键不能与现有列相同。您可以使用选项 enforceSamePartitionKey 创建新的分区键,也可以保留相同的分区键,但重命名现有列。
  • 创建 BigQuery 表(以及 Dataproc Metastore 中的表)时,Dataplex Discovery 会将分区类型注册为 string。这可能会影响您现有的分区过滤条件。

单次模板运行中可转换的表和分区数量存在限制,大约为 300 个。确切数量取决于表名称的长度和其他因素。

模板参数

参数 说明
sourceBigQueryDataset 要对数据进行分层的 BigQuery 数据集。此参数必须包含格式为 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 的 Dataplex 资产名称,或格式为 projects/<name>/datasets/<dataset-id> 的 BigQuery 数据集 ID。
destinationStorageBucketAssetName 要将数据分层到其中的 Cloud Storage 存储桶的 Dataplex 资产名称。此参数必须遵循 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 格式。
tables 可选:要分层的 BigQuery 表的逗号分隔列表。如果未提供列表,系统将对所有表进行分层。表必须仅通过名称(不含项目或数据集前缀)指定,并且区分大小写。
exportDataModifiedBeforeDateTime 可选:使用此参数可移动早于此日期(和可选时间)的数据。对于分区 BigQuery 表,移动上次修改时间在此日期/时间之前的分区。对于非分区表,如果表的上次修改时间在此日期/时间之前,则移除。如果未指定,则移动所有表/分区。系统会默认在默认时区解析日期/时间,但支持可选后缀 Z+HH:mm。此参数必须采用 YYYY-MM-DDYYYY-MM-DDTHH:mm:ssYYYY-MM-DDTHH:mm:ss+03:00 格式。 还支持相对日期/时间,且必须采用 -PnDTnHnMn.nS 格式(必须以 -P 开头,表示过去的时间)。
fileFormat 可选:Cloud Storage 中的输出文件格式。此参数的默认值为 PARQUET。该参数的另一个值可以是 AVRO
fileCompression 可选:输出文件压缩。此参数的默认值为 SNAPPY。该参数的其他值可以是 UNCOMPRESSEDSNAPPYGZIPBZIP2PARQUET 文件不支持 BZIP2
deleteSourceData 可选:是否在导出成功后从 BigQuery 中删除源数据。值可以是 truefalse。此参数的默认值为 false
partitionIdRegExp 可选:仅处理与此正则表达式匹配的分区 ID 的分区。如果未提供值,此参数默认为处理所有内容。
writeDisposition 可选:指定目标文件已存在时执行的操作,即一个或多个表/分区已预分层。此参数的默认值为 SKIP,表示仅处理尚未预分层的表/分区。该参数的其他值可以是 OVERWRITE(覆盖所有现有文件)或 FAIL(不处理任何内容,如果至少有一个目标文件已存在,则会生成错误)。
enforceSamePartitionKey

可选:是否强制使用相同的分区键。由于 BigQuery 限制,分区外部表中的分区键(在文件路径中)不能与文件中的某个列具有相同的名称。如果此参数为 true(默认值),则目标文件的分区键会设置为原始分区列名称,并且文件中的列会重命名。如果为 false,则会重命名分区键。

例如,如果原始表基于名为 TSenforceSamePartitionKey=true 的列进行分区,则目标文件路径为 gs://<bucket>/TS=<partition ID>/<file>,并且该列在文件中会重命名为 TS_pkey。这样,您就可以针对旧表或新表中的相同分区执行现有查询。

如果为 enforceSamePartitionKey=false,则目标文件路径为 gs://<bucket>/TS_pid=<partition ID>/<file>,但列名称在文件中保持为 TS

updateDataplexMetadata

可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 false

如果启用此功能,流水线将自动将架构从源复制到目标 Dataplex 实体,并且不会为这些实体运行自动化 Dataplex 发现。如果您要管理源 BigQuery 表的架构,请使用此标志。

运行模板

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 页面。

    进入 Dataplex

  2. 前往流程视图。

  3. 点击创建任务

  4. 从 BQ 到 GCS 资产分层下,点击创建任务

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择任务执行区域。

  8. 填写必需参数。

  9. 点击继续

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--region=REGION_NAME \
--template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \
--parameters \
sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\
destinationStorageBucketAssetName=DESTINATION_ASSET_NAME

替换以下内容:

JOB_NAME: a job name of your choice
PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket

REST

提交 HTTP POST 请求:

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch
{
 "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
        "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID",
        "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview",
 }
}

替换以下内容:

PROJECT_ID: your template project ID
REGION_NAME: region in which to run the job
JOB_NAME: a job name of your choice
SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset
name for the source BigQuery dataset, or the dataset ID
DESTINATION_ASSET_NAME: your Dataplex asset name for
the destination Cloud Storage bucket
REGION_NAME: region in which to run the job

安排其他 Google Cloud提供或自定义的 Dataflow 模板

借助 Dataplex,您可以在控制台中安排和监控Google Cloud提供的任何 Dataflow 模板或自定义 Dataflow 模板。

时间表

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 页面。

    进入 Dataplex

  2. 前往流程视图。

  3. 点击创建任务

  4. 编写 Dataflow 流水线下方,点击创建 Dataflow 流水线

  5. 选择一个 Dataplex 数据湖。

  6. 提供任务名称。

  7. 选择要运行任务的区域。

  8. 选择 Dataflow 模板。

  9. 填写必需参数。

  10. 点击继续

监控

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 页面。

    进入 Dataplex

  2. 前往流程视图。

  3. 点击 Dataflow 流水线

  4. 按数据湖或数据流水线名称过滤。