使用数据质量任务

本文档介绍了如何创建 Dataplex 数据质量任务,以便能够为内置和外部 BigQuery 表安排和运行数据质量检查。

如需了解详情,请参阅数据质量任务概览

准备工作

本文档假定您已有 Dataplex 数据湖来创建数据质量任务。

在创建数据质量任务之前,请执行以下操作。

启用 Google API 和服务

  1. 启用 Dataproc API。

    启用该 API

  2. 为您的网络和/或子网启用专用 Google 访问通道。在您计划用于 Dataplex 数据质量任务的网络上启用专用 Google 访问通道。如果您在创建 Dataplex 数据质量任务时未指定网络或子网,Dataplex 将使用默认子网。在这种情况下,您需要在默认子网上启用专用 Google 访问通道。

创建规范文件

Dataplex 使用开源 CloudDQ 作为驱动程序。Dataplex 数据质量检查要求在 CloudDQ YAML 规范文件中定义。

作为数据质量任务的输入,您可以使用单个 YAML 文件或包含一个或多个 YAML 文件的单个 zip 归档文件。建议您在单独的 YAML 规范文件中捕获数据质量检查要求,每个部分对应一个文件。

如需准备规范文件,请执行以下操作:

  1. 创建一个或多个 CloudDQ YAML 规范文件,用于定义数据质量检查要求。如需详细了解所需的语法,请参阅本文档的关于规范文件部分。

    .yml.yaml 格式保存 YAML 规范文件。如果您创建多个 YAML 规范文件,请将所有文件保存在一个 zip 归档文件中。

  2. 创建 Cloud Storage 存储桶
  3. 将规范文件上传到 Cloud Storage 存储桶。

规范文件简介

您的 CloudDQ YAML 规范文件需要包含以下部分:

  • 规则(在顶级 rules YAML 节点中定义):要运行的规则列表。您可以基于预定义的规则类型(例如 NOT_NULLREGEX)创建这些规则,也可以使用自定义 SQL 语句(例如 CUSTOM_SQL_EXPRCUSTOM_SQL_STATEMENT)对这些规则进行扩展。CUSTOM_SQL_EXPR 语句会将 custom_sql_expr 的求值结果为 False 的任何行标记为失败。CUSTOM_SQL_STATEMENT 语句则会将整个语句返回的任何值标记为失败。

  • 行过滤条件(在顶级 row_filters YAML 节点中定义):一组返回布尔值的 SQL 表达式,用于定义若干过滤条件来从底层实体主题中提取一部分数据进行验证。

  • 规则绑定(在顶级 rule_bindings YAML 节点中定义):定义要应用于表的 rulesrule filters

  • 规则维度(在 rule_dimensions YAML 节点中定义):定义规则可在相应的 dimension 字段中定义的数据质量规则维度允许列表。

    例如:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    dimension 字段对于规则是可选的。如果任何规则列出了 dimension,则规则维度部分是必填的。

如需了解详情,请参阅 CloudDQ 参考指南示例规范文件

创建用于存储结果的数据集

  • 如需存储结果,请创建 BigQuery 数据集

    数据集必须与运行数据质量任务的表位于同一区域。

    Dataplex 使用此数据集,它会创建或重复使用您选择的表来存储结果。

创建服务账号

创建具有以下 Identity and Access Management (IAM) 角色和权限的服务帐号

可选:使用高级设置

以下是可选步骤:

  1. 默认情况下,BigQuery 会在当前用户项目中运行数据质量检查。不过,您也可以选择其他项目来运行 BigQuery 作业,只需在任务的 --execution-args 属性中使用 --gcp_project_id TASK_ARGS 参数即可。

  2. 如果指定运行 BigQuery 查询的项目 ID 与在其中创建服务账号(由 --execution-service-account 指定)的项目不同,请确保关闭停用跨项目使用服务账号的组织政策 (iam.disableServiceAccountCreation)。此外,请确保服务账号可以访问运行 BigQuery 查询的项目中的 BigQuery 作业时间表。

限制

  • 为给定数据质量任务指定的所有表都必须属于同一 Google Cloud 区域。

安排数据质量任务

控制台

  1. 在 Google Cloud 控制台中,前往 Dataplex 的流程页面。

    前往流程

  2. 点击 创建任务
  3. 检查数据质量卡片上,点击创建任务
  4. Dataplex 数据湖部分,选择您的数据湖。
  5. ID 部分,输入 ID。
  6. 数据质量规范部分,执行以下操作:
    1. 选择 GCS 文件字段中,点击浏览
    2. 选择您的 Cloud Storage 存储桶。

    3. 点击选择

  7. 结果表部分,执行以下操作:

    1. 选择 BigQuery 数据集字段中,点击浏览

    2. 选择要用来存储验证结果的 BigQuery 数据集。

    3. 点击选择

    4. BigQuery 表字段中,输入用于存储结果的表的名称。 如果该表不存在,Dataplex 会为您创建该表。请勿使用名称 dq_summary,因为它是为内部处理任务预留的。

  8. 服务账号部分,从用户服务账号菜单中选择一个服务账号。

  9. 点击继续

  10. 设置时间表部分中,配置运行数据质量任务的时间表。

  11. 点击创建

gcloud CLI

下面的示例会使用 Dataplex 任务适用的 gcloud CLI 命令来执行数据质量任务:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
参数 说明
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH 为数据质量任务输入的数据质量 YAML 配置的 Cloud Storage 路径。您可以使用单个 .yml.yaml 格式的 YAML 文件,也可以使用包含多个 YAML 文件的 zip 归档文件。
GOOGLE_CLOUD_PROJECT 在其中创建 Dataplex 任务和 BigQuery 作业的 Google Cloud 项目。
DATAPLEX_REGION_ID 在其中创建数据质量任务的 Dataplex 数据湖所在的区域。
SERVICE_ACCOUNT 用于执行任务的服务账号。确保此服务账号拥有足够的 IAM 权限,具体请参阅准备工作部分。

对于 --execution-args,需要将以下参数作为定位参数传递,并需遵循下面的顺序:

参数名 说明
clouddq-executable.zip 从公共 Cloud Storage 存储桶中通过 spark-file-uris 传递的预编译可执行文件。
ALL 运行所有规则绑定。或者,您也可以采用英文逗号分隔列表的形式提供特定的规则绑定。例如 RULE_1,RULE_2
gcp-project-id 运行 BigQuery 查询的项目的 ID。
gcp-region-id 在其中运行 BigQuery 作业以进行数据质量验证的区域。此区域应与 gcp-bq-dataset-idtarget_bigquery_summary_table 的区域相同。
gcp-bq-dataset-id 用来存储 rule_binding 视图和中间数据质量摘要结果的 BigQuery 数据集。
target-bigquery-summary-table 存储数据质量检查最终结果的 BigQuery 表的表 ID 引用。请勿使用 ID 值 dq_summary,因为它是为内部处理任务预留的。
--summary_to_stdout (可选)如果传递此标志,最近一次运行检查时在 dq_summary 表中创建的所有验证结果行都会采用 JSON 格式记录到 Cloud Logging 和 stdout

API

  1. 替换以下内容:

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. 提交 HTTP POST 请求:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

另请参阅适用于 Dataplex 数据质量任务的 Airflow DAG 示例

监控已安排的数据质量任务

了解如何监控任务

查看结果

数据质量验证的结果存储在您指定的 BigQuery 数据集和摘要表中,如创建用于存储结果的数据集中所述。摘要表包含每次验证运行的规则绑定和规则的每个组合的输出摘要。摘要表中的输出包含以下信息:

列名 说明
dataplex_lake (字符串)包含待验证表的 Dataplex 数据湖的 ID。
dataplex_zone (字符串)包含待验证表的 Dataplex 区域的 ID。
dataplex_asset_id (字符串)包含待验证表的 Dataplex 资产的 ID。
execution_ts (时间戳)代表验证查询运行时间的时间戳。
rule_binding_id (字符串)验证结果报告所依据的规则绑定的 ID。
rule_id (字符串)验证结果报告所依据的规则绑定下的规则的 ID。
dimension (字符串)rule_id 的数据质量维度。此值只能是 rule_dimensions YAML 节点中指定的某个值。
table_id (字符串)验证结果报告所依据的实体的 ID。此 ID 在相应规则绑定的 entity 参数下指定。
column_id (字符串)验证结果报告所依据的列的 ID。此 ID 在相应规则绑定的 column 参数下指定。
last_modified (时间戳)代表待验证 table_id 的最后修改时间的时间戳。
metadata_json_string (字符串)在规则绑定下或在数据质量运行期间指定的元数据参数内容的键值对。
configs_hashsum (字符串)包含规则绑定及其所有关联规则、规则绑定、行过滤条件和实体配置的 JSON 文档的哈希值总和。configs_hashsum 允许在 rule_binding ID 或其引用的某个配置的内容发生更改时进行跟踪。
dq_run_id (字符串)记录的唯一 ID。
invocation_id (字符串)数据质量运行的 ID。在同一数据质量执行实例中生成的所有数据质量摘要记录都共用同一个 invocation_id
progress_watermark (布尔值)确定数据质量检查是否使用此特定记录来确定增量验证的高水印。如果为 FALSE,则在设立高位值后,相应记录会被忽略。在执行不应提高高水印的测试数据质量验证时,此信息非常有用。Dataplex 默认使用 TRUE 填充此字段,但如果参数 --progress_watermark 的值为 FALSE,此字段的值可以被替换。
rows_validated (整数)在 incremental_time_filter_column_id 列(如果指定)上应用 row_filters 和任何高位值过滤条件后,所验证的记录总数。
complex_rule_validation_errors_count (浮点数)CUSTOM_SQL_STATEMENT 规则返回的行数。
complex_rule_validation_success_flag (布尔值)CUSTOM_SQL_STATEMENT 规则的成功状态。
success_count (整数)通过验证的记录总数。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL
success_percentage (浮点数)通过验证的记录数占进行验证的记录总数的百分比。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL
failed_count (整数)未通过验证的记录总数。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL
failed_percentage (浮点数)未通过验证的记录数占进行验证的记录总数的百分比。对于 CUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL
null_count (整数)在验证期间返回 null 的记录总数。对于 NOT_NULLCUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL
null_percentage (浮点数)在验证期间返回 null 的记录数占进行验证的记录总数的百分比。对于 NOT_NULLCUSTOM_SQL_STATEMENT 规则,此字段设置为 NULL
failed_records_query 对于未通过验证的每一条规则,此列都会存储一个可用于获取失败记录的查询。在本文档中,请参阅使用 failed_records_query 对失败的规则进行问题排查

对于 BigQuery 实体,系统会为每个 rule_binding 创建一个视图,其中包含最近的执行作业的 SQL 验证逻辑。您可以在参数 --gcp-bq-dataset-id 中指定的 BigQuery 数据集中找到这些视图。

费用优化

您可以通过进行以下优化来帮助降低费用。

增量验证

通常,您的表会定期使用新分区(新行)进行更新。如果您不想在每次运行时都重新验证旧的分区,则可以使用增量验证。

对于增量验证,您必须在表中有一个 TIMESTAMPDATETIME 类型的列,其中列值会单调递增。您可以使用对 BigQuery 表进行分区所依据的那些列。

如需指定增量验证,请在规则绑定中指定 incremental_time_filter_column_id=TIMESTAMP/DATETIME type column 的值。

指定列时,数据质量任务仅考虑 TIMESTAMP 值大于上次运行数据质量任务时的时间戳的那些行。

规范文件示例

如需使用这些示例,请创建 BigQuery 数据集,并将其命名为 sales。然后,创建一个名为 sales_orders 的事实表,并通过使用以下 GoogleSQL 语句运行查询来添加示例数据:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

示例 1

以下代码示例会创建数据质量检查来验证这些值:

  • amount:值为零或正数。
  • item_id:由 5 个字母字符后跟 15 位数字组成的字母数字字符串。
  • transaction_currency:通过一个静态列表定义的允许货币类型。此示例的静态列表允许以 GBP 和 JPY 作为货币类型。此验证仅适用于标记为国际化的行。
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

替换以下内容:

  • PROJECT_ID:您的项目 ID。
  • DATASET_ID:数据集 ID。

示例 2

如果要检查的表是 Dataplex 数据湖的一部分,您可以使用数据湖或区域表示法指定这些表。这样,您就可以按数据湖或可用区汇总结果。例如,您可以生成区间分数。

如需使用此示例,请创建一个数据湖 ID 为 operations 且可用区 ID 为 procurement 的 Dataplex 数据湖。然后,将表 sales_orders 作为资产添加到该可用区。

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

替换以下内容:

  • PROJECT_ID:您的项目 ID。
  • REGION_ID:表所在 Dataplex 数据湖的地区 ID,例如 us-central1

示例 3

此示例添加了一项自定义 SQL 检查以验证 ID 值是否唯一,从而增强了示例 2。

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

示例 4

此示例使用 last_modified_timestamp 列添加了增量验证,从而增强了示例 3。您可以为一个或多个规则绑定添加增量验证。

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

排查 failed_records_query 导致运行失败的规则

对于未通过验证的每一条规则,摘要表都会将一个查询存储在 failed_records_query 列中,您可以用该查询来获取失败的记录。

若要进行调试,您还可以在 YAML 文件中使用 reference columns,以便将 failed_records_query 的输出与原始数据关联起来,以获取整条记录的内容。例如,您可以指定 primary_key 列或复合 primary_key 列作为引用列。

指定引用列

若要生成引用列,您可以将以下内容添加到 YAML 规范中:

  1. reference_columns 部分。在本部分中,您可以创建一个或多个引用列集,其中每个列集指定一个或多个列。

  2. rule_bindings 部分。在本部分中,您可以向规则绑定添加一行,用于指定该规则绑定中的规则要使用的引用列的 ID (reference_columns_id)。它应该是 reference_columns 部分中指定的引用列集之一。

例如,以下 YAML 文件指定了一个 reference_columns 部分,并在 ORDER_DETAILS_REFERENCE_COLUMNS 集内定义了三列:idlast_modified_timestampitem_id。以下示例使用示例表 sales_orders

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

使用失败记录查询

失败记录查询会为有规则未通过验证的每条记录都生成一行数据。该行数据包括触发失败的列的名称、触发失败的值以及相应引用列的值。它还包含可用于与数据质量任务的执行进行关联的元数据。

以下是指定参考列中所述 YAML 文件的失败记录查询输出的示例。它展示了触发失败的列 amount 以及失败值 -10,还记录了相应的引用列值。

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

对 CUSTOM_SQL_STATEMENT 规则使用失败的记录查询

对于 CUSTOM_SQL_STATEMENT 规则,失败的记录查询包括 custom_sql_statement_validation_errors 列。custom_sql_statement_validation_errors 列是一个嵌套列,其字段与 SQL 语句的输出相匹配。CUSTOM_SQL_STATEMENT 规则的失败记录查询中不包含引用列。

例如,您的 CUSTOM_SQL_STATEMENT 规则可能如下所示:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
此示例的结果将包含 custom_sql_statement_validation_errors 列的一行或多行,existing_id!=replacement_id 的每个实例各一行。

以 JSON 格式呈现时,此列中单元格的内容可能如下所示:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

您可以使用 join on custom_sql_statement_valdation_errors.product_key 等嵌套引用将这些结果联接到原始表。

后续步骤