Dataplex 提供由 Dataflow 提供支持的模板,用于执行数据提取、处理和管理数据生命周期等常见数据处理任务。本指南介绍了如何配置和运行数据处理模板。
准备工作
Dataplex 模板由 Dataflow 提供支持。 在使用模板之前,请先启用 Dataflow API。
备注
所有模板都支持常见的 Dataflow 流水线选项。
Dataplex 使用数据流水线来安排模板定义的任务。
您只能在 Google Cloud 控制台的 Dataplex 页面上看到通过 Dataplex 安排的任务。
模板:将原始数据转换为精选数据
Dataplex 文件格式转换模板可将 Dataplex Cloud Storage 资产或 以 CSV 或 JSON 格式存储的 Dataplex 实体、Parquet 或 另一个 Dataplex 资产中的 Avro format-data。分区布局 会保留在转换中它还支持对输出文件进行压缩。
模板参数
参数 | 说明 |
---|---|
inputAssetOrEntitiesList |
需要指定 Cloud Storage 存储分区的 Dataplex 资产或 Dataplex 实体
包含输入文件。此参数应遵循以下格式:
projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name>
或projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity1-name>,projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/entities/<entity 2 name>... |
outputFileFormat |
Cloud Storage 中的输出文件格式。此参数应遵循以下格式:PARQUET 或 AVRO 。 |
outputAsset |
包含
用于存储输出文件的 Cloud Storage 存储桶。此参数应采用以下格式:projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 。您可以
outputAsset ,在 Google Cloud 控制台中
Dataplex 资产“Details ”标签页。 |
outputFileCompression |
可选:输出文件压缩。此字段的默认值
参数为 SNAPPY 。该参数的其他值可以是 UNCOMPRESSED 、SNAPPY 、GZIP 或 BZIP2 。PARQUET 文件不支持 BZIP2 。 |
writeDisposition |
可选:指定在生成目标文件时执行的操作
已存在。此参数的默认值为 SKIP 。
哪些信号仅处理
目标目录中。参数的其他值可以是
OVERWRITE (覆盖所有现有文件)或 FAIL
(如果至少有 1 个目标,则不处理任何内容,并且会生成错误
文件已存在)。 |
updateDataplexMetadata |
可选:是否为
新建实体。此参数的默认值为
如果启用此功能,流水线将自动将架构从源复制到目标 Dataplex 实体,并且不会为这些实体运行自动化 Dataplex 发现功能。如果 源(原始)数据的架构由 Dataplex 管理。 |
运行模板
控制台
在 Google Cloud 控制台中,转到 Dataplex 页面:
前往流程视图。
点击创建任务。
在转换为精选格式下方,点击创建任务。
选择一个 Dataplex 数据湖。
提供任务名称。
选择用于执行任务的区域。
填写必需参数。
点击继续。
gcloud
替换以下内容:
JOB_NAME: a job name of your choice PROJECT_ID: your template project ID REGION_NAME: region in which to run the job INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers OUTPUT_FILE_FORMAT: your output file format in Cloud Storage OUTPUT_ASSET: your Dataplex output asset ID
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview \ --parameters \ inputAssetOrEntitiesList=INPUT_ASSET_OR_ENTITIES_LIST,\ outputFileFormat=OUTPUT_FILE_FORMAT,\ outputAsset=OUTPUT_ASSET
REST API
替换以下内容:
PROJECT_ID: your template project ID REGION_NAME: region in which to run the job JOB_NAME: a job name of your choice INPUT_ASSET_OR_ENTITIES_LIST: path to your JDBC drivers OUTPUT_FILE_FORMAT: your output file format in Cloud Storage OUTPUT_ASSET: your Dataplex output asset ID
提交 HTTP POST 请求:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputAssetOrEntitiesList": "INPUT_ASSET_OR_ENTITIES_LIST", "outputFileFormat": "OUTPUT_FILE_FORMAT", "outputAsset": "OUTPUT_ASSET", }, "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_File_Format_Conversion_Preview", } }
模板:层级数据从 BigQuery 资产到 Cloud Storage 资产
Dataplex BigQuery to Cloud Storage 模板可将数据从 Dataplex BigQuery 资产复制到 Dataplex Cloud Storage 资产,并采用与 Dataplex 兼容的布局和格式。您可以指定 BigQuery 数据集或 BigQuery 表列表 要复制的对象。为了提高灵活性,该模板允许复制数据 早于指定的修改日期,并允许选择性地删除数据 从 BigQuery 中复制数据。
将分区表从 BigQuery 复制到 Cloud Storage 时:
- 该模板会在 Cloud Storage 存储桶上创建 Hive 样式的分区。
BigQuery 不能包含 Hive 样式的分区键
与现有列相同。您可以使用选项
enforceSamePartitionKey
创建新的分区键,也可以保留相同的分区键,但重命名现有列。 - 创建 BigQuery 表(以及 Dataproc Metastore 中的表)时,Dataplex Discovery 会将分区类型注册为
string
。这可能会影响您现有的分区过滤条件。
可以转换的表和分区的数量有限制 大约需要 300 次。确切数量取决于表名称的长度和其他因素。
模板参数
参数 | 说明 |
---|---|
sourceBigQueryDataset |
要对数据进行分层的 BigQuery 数据集。此参数应包含格式为 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 的 Dataplex 资产名称,或格式为 projects/<name>/datasets/<dataset-id> 的 BigQuery 数据集 ID。 |
destinationStorageBucketAssetName |
Cloud Storage 存储桶的 Dataplex 资产名称
将数据划分到哪个层级此参数应遵循 projects/<name>/locations/<loc>/lakes/<lake-name>/zones/<zone-name>/assets/<asset-name> 格式。 |
tables |
可选:要分层的 BigQuery 表的逗号分隔列表。如果未提供列表,则所有表将进行分层。表应 (无项目/数据集前缀),并且 区分大小写。 |
exportDataModifiedBeforeDateTime |
可选:使用此参数可移动早于此日期(以及
(可选时间)。对于分区的 BigQuery 表,请将
分区在此日期/时间之前的最后修改时间。对于非分区表,如果表的上次修改时间在此日期/时间之前,则移除。如果未指定,则移动所有表/分区。系统会默认在默认时区解析日期/时间,但支持可选后缀 Z 和 +HH:mm 。此参数应遵循以下格式:
YYYY-MM-DD 、YYYY-MM-DDTHH:mm:ss 或
YYYY-MM-DDTHH:mm:ss+03:00 。
相对日期/时间
格式,并且应遵循
-PnDTnHnMn.nS (必须以 -P 开头,
表示过去的时间)。
|
fileFormat |
可选:Cloud Storage 中的输出文件格式。默认
此参数的值为 PARQUET 。另一个
参数可以是 AVRO 。 |
fileCompression |
可选:输出文件压缩。此字段的默认值
参数为 SNAPPY 。参数的其他值可以是
UNCOMPRESSED 、SNAPPY 、GZIP 或
BZIP2 。PARQUET 文件不支持 BZIP2 。 |
deleteSourceData |
可选:在导出成功后是否从 BigQuery 中删除源数据。值可以是 true 或
false 。此参数的默认值为 false 。 |
partitionIdRegExp |
可选:仅处理与此正则表达式匹配的分区 ID 的分区。如果未提供任何值,则此参数默认为 全部处理。 |
writeDisposition |
可选:指定目标文件已存在时执行的操作,即一个或多个表/分区已预分层。此参数的默认值为 SKIP 。
哪些信号仅处理未未受其影响的表/分区,
。参数的其他值可以是
OVERWRITE (覆盖所有现有文件)或 FAIL
(如果至少有 1 个目标,则不处理任何内容,并且会生成错误
文件已存在)。 |
enforceSamePartitionKey |
可选:是否强制使用相同的分区键。由于 BigQuery 限制,分区外部表中的分区键(在文件路径中)不能与文件中的某个列具有相同的名称。如果此参数为 true(默认值),则目标文件的分区键会设置为原始分区列名称,并且文件中的列会重命名。如果 false,则重命名分区键。 例如,如果原始表根据名为
如果为 |
updateDataplexMetadata |
可选:是否更新新创建的实体的 Dataplex 元数据。此参数的默认值为
启用后,流水线将自动从源复制架构 发送到目标 Dataplex 实体, Dataplex Discovery 不会为其运行。如果您 管理源 BigQuery 表的架构。 |
运行模板
控制台
在 Google Cloud 控制台中,前往 Dataplex 页面:
转到进程视图。
点击创建任务。
在 Tier from BQ to GCS Assets(从 BQ 到 GCS 资产)下,点击创建任务。
选择一个 Dataplex 数据湖。
提供任务名称。
选择用于执行任务的区域。
填写必需参数。
点击继续。
gcloud
替换以下内容:
JOB_NAME: a job name of your choice PROJECT_ID: your template project ID REGION_NAME: region in which to run the job SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset name for the source BigQuery dataset, or the dataset ID DESTINATION_ASSET_NAME: your Dataplex asset name for the destination Cloud Storage bucket
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview \ --parameters \ sourceBigQueryDataset=SOURCE_ASSET_NAME_OR_DATASET_ID,\ destinationStorageBucketAssetName=DESTINATION_ASSET_NAME
REST API
替换以下内容:
PROJECT_ID: your template project ID REGION_NAME: region in which to run the job JOB_NAME: a job name of your choice SOURCE_ASSET_NAME_OR_DATASET_ID: your Dataplex asset name for the source BigQuery dataset, or the dataset ID DESTINATION_ASSET_NAME: your Dataplex asset name for the destination Cloud Storage bucket REGION_NAME: region in which to run the job
提交 HTTP POST 请求:
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION_NAME/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "sourceBigQueryDataset": "SOURCE_ASSET_NAME_OR_DATASET_ID", "destinationStorageBucketAssetName": "DESTINATION_ASSET_NAME", }, "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/latest/flex/Dataplex_BigQuery_to_GCS_Preview", } }
安排其他 Google Cloud 提供的 Dataflow 模板或自定义 Dataflow 模板
通过 Dataplex,您可以安排和监控 Google Cloud 提供的 Dataflow 模板或您的自定义 控制台中的 Dataflow 模板。
时间表
控制台
在 Google Cloud 控制台中,转到 Dataplex 页面:
前往流程视图。
点击创建任务。
在编写 Dataflow 流水线下,点击创建 Dataflow 流水线。
选择一个 Dataplex 数据湖。
提供任务名称。
选择要运行任务的区域。
选择一个 Dataflow 模板。
填写所需的参数。
点击继续。
监控
控制台
在 Google Cloud 控制台中,前往 Dataplex 页面:
前往流程视图。
点击 Dataflow 流水线。
按数据湖或数据流水线名称过滤。