Dataplex 提供由 Dataflow 提供支持的模板,用于执行常见的数据处理任务,例如数据注入、处理和管理数据生命周期。本指南介绍如何配置和运行数据处理模板。
准备工作
Dataplex 模板由 Dataflow 提供支持。在使用模板之前,请先启用 Dataflow API。
Notes
所有模板都支持常用的 Dataflow 流水线选项。
Dataplex 使用数据流水线来调度由模板定义的任务。
您只能在 Google Cloud 控制台 Dataplex 页面上查看通过 Dataplex 安排的任务。
模板:将原始数据转换为精选数据
Dataplex 文件格式转换模板将 Dataplex Cloud Storage 资产中的数据或以 CSV 或 JSON 格式存储的 Dataplex 实体列表转换为其他 Dataplex 资产中的 Parquet 或 Avro 格式数据。转换过程中会保留分区布局。它还支持压缩输出文件。
模板参数
参数 | 说明 |
---|---|
inputAssetOrEntitiesList |
包含输入文件的 Dataplex 资产或 Dataplex 实体。此参数应遵循以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 或 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>... |
outputFileFormat |
Cloud Storage 中的输出文件格式。此参数应遵循以下格式:PARQUET 或 AVRO 。 |
outputAsset |
包含存储输出文件的 Cloud Storage 存储桶的 Dataplex 资源的名称。此参数应遵循以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 。您可以在 Google Cloud 控制台的 Dataplex 资产 Details 标签页中找到 outputAsset 。 |
outputFileCompression |
可选:输出文件压缩。此参数的默认值为 SNAPPY 。该参数的其他值可以是 UNCOMPRESSED 、SNAPPY 、GZIP 或 BZIP2 。PARQUET 文件不支持 BZIP2 。 |
writeDisposition |
可选:指定在目标文件已存在时要执行的操作。此参数的默认值为 SKIP ,表示仅处理目标目录中不存在的文件。该参数的其他值可以是 OVERWRITE (覆盖任何现有文件)或 FAIL (如果已经存在至少一个目标文件,则不处理任何内容并产生错误)。 |
updateDataplexMetadata |
可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 启用后,流水线会自动将架构从来源复制到目标 Dataplex 实体,并且系统不会针对这些实体运行自动化 Dataplex Discovery。如果源(原始)数据的架构由 Dataplex 管理,请使用此标志。 |
运行模板
控制台
在 Google Cloud 控制台中,转到 Dataplex 页面:
进入 Process 视图。
点击创建任务。
在转换为精选格式下,点击创建任务。
选择一个 Dataplex 数据湖。
提供任务名称。
选择任务执行区域。
填写必填参数。
点击继续。
gcloud
替换以下内容:
JOB_NAME: a job name of your choice PROJECT_ID: your template project ID REGION_NAME: region in which to run the job INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers OUTPUT_FILE_FORMAT: your output file format in Cloud Storage OUTPUT_ASSET: your Dataplex output asset ID
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \ --parameters \ inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\ outputFileFormat=OUTPUT_FILE_FORMAT,\ outputAsset=OUTPUT_ASSET
REST API
替换以下内容:
PROJECT_ID: your template project ID REGION_NAME: region in which to run the job JOB_NAME: a job name of your choice INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers OUTPUT_FILE_FORMAT: your output file format in Cloud Storage OUTPUT_ASSET: your Dataplex output asset ID
提交 HTTP POST 请求:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST", "outputFileFormat": "OUTPUT_FILE_FORMAT", "outputAsset": "OUTPUT_ASSET", }, "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview", } }
模板:从 BigQuery 资产到 Cloud Storage 资产的层级数据
Dataplex BigQuery to Cloud Storage 模板可将 Dataplex BigQuery 资源中的数据以与 Dataplex 兼容的布局和格式复制到 Dataplex Cloud Storage 资产中。您可以指定要复制的 BigQuery 数据集或 BigQuery 表列表。为了提高灵活性,该模板允许复制早于指定修改日期的数据,并可以选择在成功复制后从 BigQuery 中删除数据。
在将分区表从 BigQuery 复制到 Cloud Storage 时,模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。在写入 Cloud Storage 时,该模板通过将后缀 _pid
附加到现有分区列来创建新的分区键。如需以外部表的形式访问 BigQuery 中的数据,必须执行此操作。目前,BigQuery 不能使 Hive 样式的分区键与现有列相同。因此,当从 BigQuery 作为外部表查看时,复制的表将包含一个额外的分区键列。其余数据将按原样保留。
将分区表从 BigQuery 复制到 Cloud Storage 时:
- 该模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。目前,BigQuery 不能使 Hive 样式的分区键与现有列相同。您可以使用选项
enforceSamePartitionKey
创建新的分区键,或者保留相同的分区键但重命名现有列。 - 创建 BigQuery 表(以及 Dataproc Metastore 中的表)时,Dataplex Discovery 会将分区类型注册为
string
。这可能会影响您现有的分区过滤条件。
在单次运行模板中可转换的表和分区数量存在限制,约为 300 个。具体数量取决于表名称的长度和其他因素。
模板参数
参数 | 说明 |
---|---|
sourceBigQueryDataset |
用于为其分层的 BigQuery 数据集。此参数应包含格式为 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 的 Dataplex 资产名称,或包含 projects/<name>/datasets/<dataset-id> 格式的 BigQuery 数据集 ID。 |
destinationStorageBucketAssetName |
要将数据分层的 Cloud Storage 存储桶的 Dataplex 资产名称。此参数应遵循 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 格式。 |
tables |
可选:要分层的 BigQuery 表的逗号分隔列表。如果未提供列表,则所有表都将分层。表应仅按名称指定(无项目/数据集前缀),且区分大小写。 |
exportDataModifiedBeforeDateTime |
可选:使用此参数可移动早于此日期(及可选时间)的数据。对于分区表 BigQuery 表,请将上次修改时间早于此日期/时间的分区移动。对于非分区表,如果表的上次修改时间早于此日期/时间,则进行移动。如果未指定,请移动所有表/分区。默认情况下,日期/时间以默认时区进行解析,但支持可选后缀 Z 和 +HH:mm 。此参数应遵循 YYYY-MM-DD 、YYYY-MM-DDTHH:mm:ss 或 YYYY-MM-DDTHH:mm:ss+03:00 格式。也支持相对日期/时间,此格式应遵循 -PnDTnHnMn.nS 格式(必须以 -P 开头,表示过去的时间)。
|
fileFormat |
可选:Cloud Storage 中的输出文件格式。此参数的默认值为 PARQUET 。该参数的另一个值可以是 AVRO 。 |
fileCompression |
可选:输出文件压缩。此参数的默认值为 SNAPPY 。该参数的其他值可以是 UNCOMPRESSED 、SNAPPY 、GZIP 或 BZIP2 。PARQUET 文件不支持 BZIP2 。 |
deleteSourceData |
可选:成功导出后,是否从 BigQuery 中删除源数据。值可以是 true 或 false 。此参数的默认值为 false 。 |
partitionIdRegExp |
可选:仅处理分区 ID 与此正则表达式匹配的分区。如果未提供值,则此参数默认处理所有值。 |
writeDisposition |
可选:指定目标文件已存在时要执行的操作,这意味着一个或多个表/分区已预先分层。此参数的默认值为 SKIP ,表示仅处理未预分层的表/分区。该参数的其他值可以是 OVERWRITE (覆盖任何现有文件)或 FAIL (如果已经存在至少一个目标文件,则不处理任何内容并产生错误)。 |
enforceSamePartitionKey |
可选:是否强制执行同一分区键。由于 BigQuery 的限制,分区外部表中的分区键(位于文件路径中)不能与文件中的某一列同名。如果此参数为 true(即默认值),则目标文件的分区键将设置为原始分区列名称,并重命名文件中的列。如果为 false,则重命名分区键。 例如,如果原始表按名为 如果为 |
updateDataplexMetadata |
可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为 启用后,流水线会自动将架构从来源复制到目标 Dataplex 实体,并且系统不会针对这些实体运行自动化 Dataplex Discovery。如果您要管理 BigQuery 源表的架构,请使用此标志。 |
运行模板
控制台
在 Google Cloud 控制台中,转到 Dataplex 页面:
进入 Process 视图。
点击 Create Task。
在从 BQ 到 GCS 资产下,点击创建任务。
选择一个 Dataplex 数据湖。
提供任务名称。
选择任务执行区域。
填写必填参数。
点击继续。
gcloud
替换以下内容:
JOB_NAME: a job name of your choice PROJECT_ID: your template project ID REGION_NAME: region in which to run the job SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset name for the source BigQuery dataset, or the dataset ID DESTINATION_ASSET_NAME: your Dataplex asset name for the destination Cloud Storage bucket
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \ --parameters \ sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\ destinationStorageBucketAssetName=DESTINATION_ASSET_NAME
REST API
替换以下内容:
PROJECT_ID: your template project ID REGION_NAME: region in which to run the job JOB_NAME: a job name of your choice SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset name for the source BigQuery dataset, or the dataset ID DESTINATION_ASSET_NAME: your Dataplex asset name for the destination Cloud Storage bucket REGION_NAME: region in which to run the job
提交 HTTP POST 请求:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID", "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME", }, "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview", } }
安排其他 Google Cloud 提供的或自定义 Dataflow 模板
借助 Dataplex,您可以安排和监控 Google Cloud 提供的任何 Dataflow 模板或您的自定义 Dataflow 模板。
时间表
控制台
在 Google Cloud 控制台中,转到 Dataplex 页面:
进入 Process 视图。
点击 Create Task。
在编写 Dataflow 流水线下,点击创建 Dataflow 流水线。
选择一个 Dataplex 数据湖。
提供任务名称。
选择运行任务的区域。
选择 Dataflow 模板。
填写必填参数。
点击继续。
监控
控制台
在 Google Cloud 控制台中,转到 Dataplex 页面:
进入 Process 视图。
点击 Dataflow 流水线。
按数据湖或流水线名称过滤。