本文档介绍了如何创建 Dataplex 数据质量任务,以便为内置和外部 BigQuery 表安排和运行数据质量检查。
如需了解详情,请参阅数据质量任务概览。
准备工作
本文档假定您有一个现有的 Dataplex 数据湖,可以在其中创建数据质量任务。
启用 Google API 和服务
启用 Dataproc API。
为您的网络和子网启用专用 Google 访问通道。在您计划用于 Dataplex 数据质量任务的网络上启用专用 Google 访问通道。如果您在创建 Dataplex 数据质量任务时未指定网络或子网,则 Dataplex 会使用默认子网。在这种情况下,您需要在默认子网上启用专用 Google 访问通道。
创建规范文件
Dataplex 使用开源 CloudDQ 作为驱动程序。Dataplex 数据质量检查要求则需在 CloudDQ YAML 规范文件中定义。
作为数据质量任务的输入,您可以使用单个 YAML 文件,也可以使用包含一个或多个 YAML 文件的单个 zip 归档文件。建议您在单独的 YAML 规范文件中分别捕获各个数据质量检查要求,每个部分对应一个文件。
如需准备规范文件,请执行以下操作:
-
创建一个或多个 CloudDQ YAML 规范文件,用于定义数据质量检查要求。如需详细了解所需的语法,请参阅本文档的规范文件简介部分。
将 YAML 规范文件保存为
.yml
或.yaml
格式。如果您创建多个 YAML 规范文件,请将所有文件保存在单个 zip 归档文件中。 - 创建 Cloud Storage 存储桶。
- 将规范文件上传到 Cloud Storage 存储桶。
规范文件简介
您的 CloudDQ YAML 规范文件需要包含以下部分:
规则(在顶级
rules
YAML 节点中定义):要运行的规则列表。您可以基于预定义的规则类型(例如NOT_NULL
和REGEX
)创建这些规则,也可以使用自定义 SQL 语句(例如CUSTOM_SQL_EXPR
和CUSTOM_SQL_STATEMENT
)对这些规则进行扩展。CUSTOM_SQL_EXPR
语句会将custom_sql_expr
的求值结果为False
的任何行标记为失败。CUSTOM_SQL_STATEMENT
语句则会将整个语句返回的任何值标记为失败。行过滤条件(在顶级
row_filters
YAML 节点中定义):一组返回布尔值的 SQL 表达式,用于定义若干过滤条件来从底层实体主题中提取一部分数据进行验证。规则绑定(在顶级
rule_bindings
YAML 节点中定义):定义要应用于表的rules
和rule filters
。规则维度(在
rule_dimensions
YAML 节点中定义):定义规则可在相应的dimension
字段中定义的数据质量规则维度允许列表。例如:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
dimension
字段对于规则是可选的。如果任何规则列出了dimension
,则规则维度部分是必填的。
如需了解详情,请参阅 CloudDQ 参考指南和示例规范文件。
创建一个数据集来存储结果
-
如需存储结果,请创建 BigQuery 数据集。
数据集必须与您要针对其运行数据质量任务的表位于同一区域。
Dataplex 会使用此数据集,并会创建或重复使用您选择的表来存储结果。
创建服务账号
创建服务账号,使其具有以下 Identity and Access Management (IAM) 角色和权限:
- 对包含 YAML 规范的 Cloud Storage 路径拥有读取权限。为此,您可以使用 Cloud Storage 存储桶的 Storage Object Viewer 角色 (
roles/storage.objectViewer
)。 - 对包含待验证数据的 BigQuery 数据集拥有读取权限。为此,您可以使用 BigQuery Data Viewer 角色。
- 对 BigQuery 数据集拥有写入权限,以便能够创建表(如果需要)并将结果写入该表。为此,您可以使用数据集级层的 BigQuery Data Editor 角色 (
roles/bigquery.dataEditor
)。 - 拥有项目级层的 BigQuery Job User 角色 (
roles/bigquery.jobUser
),以便能够在项目中创建 BigQuery 作业。 - 拥有项目级层或数据湖级层的 Dataplex Metadata Reader 角色 (
roles/dataplex.metadataReader
)。 - 拥有项目级层的 Service Usage Consumer 角色 (
roles/serviceusage.serviceUsageConsumer
)。 - 拥有 Dataproc Worker 角色。
- 向提交作业的用户授予
iam.serviceAccounts.actAs
权限。 - 向 Dataplex 数据湖服务账号授予 Service Account User 角色。您可以在 Google Cloud 控制台中查看 Dataplex 数据湖服务账号。
可选:使用高级设置
以下是可选步骤:
默认情况下,BigQuery 会在当前用户项目中运行数据质量检查。不过,您也可以选择其他项目来运行 BigQuery 作业,只需在任务的
--execution-args
属性中使用--gcp_project_id
TASK_ARGS
参数即可。如果指定运行 BigQuery 查询的项目 ID 与在其中创建服务账号(由
--execution-service-account
指定)的项目不同,请确保关闭停用跨项目使用服务账号的组织政策 (iam.disableServiceAccountCreation
)。此外,请确保服务账号可以访问运行 BigQuery 查询的项目中的 BigQuery 作业时间表。
限制
- 为给定数据质量任务指定的所有表都必须属于同一 Google Cloud 区域。
安排数据质量任务
控制台
- 在 Google Cloud 控制台中,前往 Dataplex 的流程页面。
- 点击 创建任务。
- 在检查数据质量卡片上,点击创建任务。
- 在 Dataplex 数据湖部分,选择您的数据湖。
- 在 ID 部分,输入 ID。
- 在数据质量规范部分,执行以下操作:
- 在选择 GCS 文件字段中,点击浏览。
选择您的 Cloud Storage 存储桶。
点击选择。
在结果表部分,执行以下操作:
在选择 BigQuery 数据集字段中,点击浏览。
选择要用来存储验证结果的 BigQuery 数据集。
点击选择。
在 BigQuery table(BigQuery 表)字段中,输入要存储结果的表的名称。 如果该表不存在,Dataplex 会为您创建该表。请勿使用名称
dq_summary
,因为该名称已预留给内部处理任务。
在服务账号部分,从用户服务账号菜单中选择一个服务账号。
点击继续。
在设置时间表部分,配置运行数据质量任务的时间表。
点击创建。
gcloud CLI
下面的示例会使用 Dataplex 任务适用的 gcloud CLI 命令来执行数据质量任务:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
参数 | 说明 |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
为数据质量任务输入的数据质量 YAML 配置的 Cloud Storage 路径。您可以使用单个 .yml 或 .yaml 格式的 YAML 文件,也可以使用包含多个 YAML 文件的 zip 归档文件。 |
GOOGLE_CLOUD_PROJECT |
在其中创建 Dataplex 任务和 BigQuery 作业的 Google Cloud 项目。 |
DATAPLEX_REGION_ID |
在其中创建数据质量任务的 Dataplex 数据湖所在的区域。 |
SERVICE_ACCOUNT |
用于执行任务的服务账号。确保此服务账号拥有足够的 IAM 权限,具体请参阅准备工作部分。 |
对于 --execution-args
,需要将以下参数作为定位参数传递,并需遵循下面的顺序:
参数名 | 说明 |
---|---|
clouddq-executable.zip |
从公共 Cloud Storage 存储桶中通过 spark-file-uris 传递的预编译可执行文件。 |
ALL |
运行所有规则绑定。或者,您也可以采用英文逗号分隔列表的形式提供特定的规则绑定。例如 RULE_1,RULE_2 。 |
gcp-project-id |
运行 BigQuery 查询的项目的 ID。 |
gcp-region-id |
在其中运行 BigQuery 作业以进行数据质量验证的区域。此区域应与 gcp-bq-dataset-id 和 target_bigquery_summary_table 的区域相同。 |
gcp-bq-dataset-id |
用来存储 rule_binding 视图和中间数据质量摘要结果的 BigQuery 数据集。 |
target-bigquery-summary-table |
用来存储数据质量检查的最终结果的 BigQuery 表的表 ID 引用。请勿使用 ID 值 dq_summary ,因为它已预留给内部处理任务。 |
--summary_to_stdout |
(可选)如果传递此标志,最近一次运行检查时在 dq_summary 表中创建的所有验证结果行都会采用 JSON 格式记录到 Cloud Logging 和 stdout 。 |
API
请替换以下内容:
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- 提交 HTTP POST 请求:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
监控已安排的数据质量任务
了解如何监控任务。
查看结果
数据质量验证结果会存储在您指定的 BigQuery 数据集和摘要表中,如创建用于存储结果的数据集中所述。 摘要表包含每个验证运行的每个规则绑定和规则组合的输出摘要。摘要表中的输出包含以下信息:
列名 | 说明 |
---|---|
dataplex_lake |
(字符串)包含待验证表的 Dataplex 数据湖的 ID。 |
dataplex_zone |
(字符串)包含待验证表的 Dataplex 区域的 ID。 |
dataplex_asset_id |
(字符串)包含待验证表的 Dataplex 资产的 ID。 |
execution_ts |
(时间戳)代表验证查询运行时间的时间戳。 |
rule_binding_id |
(字符串)验证结果报告所依据的规则绑定的 ID。 |
rule_id |
(字符串)验证结果报告所依据的规则绑定下的规则的 ID。 |
dimension |
(字符串)rule_id 的数据质量维度。此值只能是 rule_dimensions YAML 节点中指定的某个值。 |
table_id |
(字符串)验证结果报告所依据的实体的 ID。此 ID 在相应规则绑定的 entity 参数下指定。 |
column_id |
(字符串)验证结果报告所依据的列的 ID。此 ID 在相应规则绑定的 column 参数下指定。 |
last_modified |
(时间戳)代表待验证 table_id 的最后修改时间的时间戳。 |
metadata_json_string |
(字符串)在规则绑定下或在数据质量运行期间指定的元数据参数内容的键值对。 |
configs_hashsum |
(字符串)包含规则绑定及其所有关联规则、规则绑定、行过滤条件和实体配置的 JSON 文档的哈希值总和。configs_hashsum 允许跟踪 rule_binding ID 或其引用的某个配置的内容变更情况。 |
dq_run_id |
(字符串)记录的唯一 ID。 |
invocation_id |
(字符串)数据质量运行的 ID。在同一数据质量执行实例中生成的所有数据质量摘要记录都共用同一个 invocation_id 。 |
progress_watermark |
(布尔值)确定数据质量检查是否使用此特定记录来确定增量验证的高水印。如果为 FALSE ,则在设立高位值后,相应记录会被忽略。在执行不应提高高水印的测试数据质量验证时,此信息非常有用。Dataplex 默认使用 TRUE 填充此字段,但如果参数 --progress_watermark 的值为 FALSE ,此字段的值可以被替换。 |
rows_validated |
(整数)在 incremental_time_filter_column_id 列(如果指定)上应用 row_filters 和任何高位值过滤条件后,所验证的记录总数。 |
complex_rule_validation_errors_count |
(浮点数)CUSTOM_SQL_STATEMENT 规则返回的行数。 |
complex_rule_validation_success_flag |
(布尔值)CUSTOM_SQL_STATEMENT 规则的成功状态。 |
success_count |
(整数)通过验证的记录总数。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL 。 |
success_percentage |
(浮点数)通过验证的记录数占进行验证的记录总数的百分比。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL 。 |
failed_count |
(整数)未通过验证的记录总数。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL 。 |
failed_percentage |
(浮点数)未通过验证的记录数占进行验证的记录总数的百分比。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL 。 |
null_count |
(整数)在验证期间返回 null 的记录总数。对于 NOT_NULL 和 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL 。 |
null_percentage |
(浮点数)在验证期间返回 null 的记录数占进行验证的记录总数的百分比。对于 NOT_NULL 和 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL 。 |
failed_records_query |
对于未通过验证的每一条规则,此列都会存储一个可用于获取失败记录的查询。在本文档中,请参阅使用 failed_records_query 排查失败规则的问题。 |
对于 BigQuery 实体,系统会为每个 rule_binding
创建一个视图,其中包含最近的执行作业的 SQL 验证逻辑。您可以在参数 --gcp-bq-dataset-id
中指定的 BigQuery 数据集中找到这些视图。
费用优化
您可以通过进行以下优化来帮助降低费用。
增量验证
通常,您的表会定期使用新分区(新行)进行更新。如果您不想在每次运行时都重新验证旧的分区,则可以使用增量验证。
对于增量验证,您必须在表中有一个 TIMESTAMP
或 DATETIME
类型的列,其中列值会单调递增。您可以使用对 BigQuery 表进行分区所依据的那些列。
如需指定增量验证,请在规则绑定中指定 incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
的值。
指定列时,数据质量任务仅考虑 TIMESTAMP
值大于上次运行数据质量任务时的时间戳的那些行。
规范文件示例
如需使用这些示例,请创建一个名为 sales
的 BigQuery 数据集。然后,使用以下 GoogleSQL 语句运行查询,创建一个名为 sales_orders
的事实表并添加示例数据:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
示例 1
以下代码示例会创建数据质量检查来验证这些值:
amount
:值为零或正数。item_id
:由 5 个字母字符后跟 15 位数字组成的字母数字字符串。transaction_currency
:通过一个静态列表定义的允许货币类型。此示例的静态列表允许以 GBP 和 JPY 作为货币类型。此验证仅适用于标记为国际化的行。
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
替换以下内容:
PROJECT_ID
:您的项目 ID。DATASET_ID
:数据集 ID。
示例 2
如果要检查的表属于 Dataplex 数据湖,您可以使用数据湖或区域符号指定表。这样,您就可以按湖泊或区域汇总结果。例如,您可以生成区域级得分。
如需使用此示例,请创建一个 Dataplex 湖,其数据湖 ID 为 operations
,区域 ID 为 procurement
。然后,将表格 sales_orders
作为资产添加到该区域。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
替换以下内容:
- PROJECT_ID:您的项目 ID。
- REGION_ID:表所在的 Dataplex 数据湖的地区 ID,例如
us-central1
。
示例 3
此示例通过添加自定义 SQL 检查来查看 ID 值是否唯一,从而增强了示例 2。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
示例 4
此示例通过使用 last_modified_timestamp
列添加增量验证来增强示例 3。您可以为一个或多个规则绑定添加增量验证。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
使用 failed_records_query
排查失败的规则
对于未通过验证的每一条规则,摘要表都会将一个查询存储在 failed_records_query
列中,您可以用该查询来获取失败的记录。
若要进行调试,您还可以在 YAML 文件中使用 reference columns
,以便将 failed_records_query
的输出与原始数据关联起来,以获取整条记录的内容。例如,您可以指定 primary_key
列或复合 primary_key
列作为引用列。
指定引用列
若要生成引用列,您可以将以下内容添加到 YAML 规范中:
reference_columns
部分。在本部分中,您可以创建一个或多个引用列集,其中每个列集指定一个或多个列。rule_bindings
部分。在本部分中,您可以向规则绑定添加一行,用于指定该规则绑定中的规则要使用的引用列的 ID (reference_columns_id
)。它应该是reference_columns
部分中指定的引用列集之一。
例如,以下 YAML 文件指定了 reference_columns
部分并定义了 3 列(id
、last_modified_timestamp
和 item_id
)作为 ORDER_DETAILS_REFERENCE_COLUMNS
集的一部分。以下示例使用示例表 sales_orders
。
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
使用失败记录查询
失败记录查询会为有规则未通过验证的每条记录都生成一行数据。该行数据包括触发失败的列的名称、触发失败的值以及相应引用列的值。它还包含可用于与数据质量任务的执行进行关联的元数据。
以下是指定参考列中所述 YAML 文件的失败记录查询输出的示例。它展示了触发失败的列 amount
以及失败值 -10
,还记录了相应的引用列值。
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
对 CUSTOM_SQL_STATEMENT 规则使用失败的记录查询
对于 CUSTOM_SQL_STATEMENT
规则,失败的记录查询包括 custom_sql_statement_validation_errors
列。custom_sql_statement_validation_errors
列是一个嵌套列,其字段与 SQL 语句的输出相匹配。CUSTOM_SQL_STATEMENT
规则的失败记录查询中不包含引用列。
例如,您的 CUSTOM_SQL_STATEMENT
规则可能如下所示:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
列的一行或多行,existing_id!=replacement_id
的每个实例各一行。以 JSON 格式呈现时,此列中单元格的内容可能如下所示:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
您可以使用 join on custom_sql_statement_valdation_errors.product_key
等嵌套引用将这些结果联接到原始表。
后续步骤
- 请参阅 CloudDQ YAML 规范参考文档。
- 如需查看数据质量规则示例,请参阅简单规则和高级规则。
- 请参阅适用于 Dataplex 数据质量任务的 Airflow DAG 示例。