本文說明如何建立 Dataplex Universal Catalog 資料品質工作,以便排定及執行內建和外部 BigQuery 資料表的資料品質檢查。
詳情請參閱「資料品質工作總覽」。
事前準備
本文假設您已有 Dataplex Universal Catalog 湖泊,可供建立資料品質工作。
啟用 Google API 和服務
啟用 Dataproc API。
為網路和子網路啟用私人 Google 存取權。在您打算用於 Dataplex Universal Catalog 資料品質工作的網路上,啟用 Private Google Access。建立 Dataplex Universal Catalog 資料品質工作時,如果未指定網路或子網路,Dataplex Universal Catalog 會使用預設子網路。在這種情況下,您必須在預設子網路中啟用私人 Google 存取權。
建立規格檔案
Dataplex Universal Catalog 會使用開放原始碼 CloudDQ 做為驅動程式。Dataplex Universal Catalog 資料品質檢查要求 是在 CloudDQ YAML 規格檔案中定義。
您可以將單一 YAML 檔案或包含一或多個 YAML 檔案的單一 zip 封存檔,做為資料品質工作的輸入內容。建議您在不同的 YAML 規格檔案中擷取資料品質檢查要求,每個區段各有一個檔案。
如要準備規格檔案,請執行下列步驟:
-
建立一或多個 CloudDQ YAML 規格檔案,定義資料品質檢查要求。如要進一步瞭解必要語法,請參閱本文件的「關於規格檔案」一節。
以
.yml
或.yaml
格式儲存 YAML 規格檔案。如果建立多個 YAML 規格檔案,請將所有檔案儲存至單一 zip 壓縮檔。 - 建立 Cloud Storage 值區。
- 將規格檔案上傳至 Cloud Storage bucket。
規格檔案簡介
CloudDQ YAML 規格檔案必須包含下列部分:
規則 (在頂層
rules
YAML 節點中定義):要執行的規則清單。您可以從預先定義的規則類型 (例如NOT_NULL
和REGEX
) 建立這些規則,也可以使用自訂 SQL 陳述式 (例如CUSTOM_SQL_EXPR
和CUSTOM_SQL_STATEMENT
) 擴充規則。CUSTOM_SQL_EXPR
陳述式會將custom_sql_expr
評估為False
的任何資料列標示為失敗。CUSTOM_SQL_STATEMENT
陳述式會將整個陳述式傳回的任何值標示為失敗。列篩選器 (在頂層
row_filters
YAML 節點中定義):傳回布林值的 SQL 運算式,用於定義篩選器,從基礎實體主體擷取部分資料進行驗證。規則繫結 (在頂層
rule_bindings
YAML 節點中定義): 定義要套用至資料表的rules
和rule filters
。規則維度 (在
rule_dimensions
YAML 節點中定義):定義規則可在對應的dimension
欄位中定義的資料品質規則維度允許清單。例如:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
規則的
dimension
欄位為選填。如果任何規則列出dimension
,則必須填寫規則尺寸部分。
詳情請參閱 CloudDQ 參考指南和範例規格檔案。
建立資料集來儲存結果
-
如要儲存結果,請建立 BigQuery 資料集。
資料集必須與執行資料品質工作的資料表位於同一個地區。
Dataplex Universal Catalog 會使用這個資料集,並建立或重複使用您選擇的資料表來儲存結果。
建立服務帳戶
建立服務帳戶,並指派下列身分與存取權管理 (IAM) 角色和權限:
- 具備包含 YAML 規格的 Cloud Storage 路徑讀取權限。您可以在 Cloud Storage bucket 上使用 Storage 物件檢視者角色 (
roles/storage.objectViewer
)。 - 可讀取要驗證資料的 BigQuery 資料集。 您可以使用 BigQuery 資料檢視者角色。
- BigQuery 資料集的寫入權,可建立資料表 (如有需要),並將結果寫入該資料表。您可以在資料集層級使用 BigQuery 資料編輯者角色 (
roles/bigquery.dataEditor
)。 - 專案層級的 BigQuery 工作使用者角色 (
roles/bigquery.jobUser
),才能在專案中建立 BigQuery 工作。 - 專案或 lake 層級的 Dataplex Universal Catalog 中繼資料讀取者角色 (
roles/dataplex.metadataReader
)。 - 專案層級的服務使用情形消費者角色 (
roles/serviceusage.serviceUsageConsumer
)。 - Dataproc 工作者角色。
- 授予提交工作的使用者
iam.serviceAccounts.actAs
權限。 - 授予 Dataplex Universal Catalog Lake 服務帳戶的服務帳戶使用者角色。您可以在 Google Cloud 控制台中查看 Dataplex Universal Catalog 湖泊服務帳戶。
使用進階設定
以下為選用步驟:
根據預設,BigQuery 會在目前專案中執行資料品質檢查。您可以選擇其他專案來執行 BigQuery 工作。在工作
--execution-args
屬性中使用--gcp_project_id
TASK_ARGS
引數。如果指定用來執行 BigQuery 查詢的專案 ID,與建立服務帳戶 (由
--execution-service-account
指定) 的專案不同,請確認禁止跨專案使用服務帳戶的機構政策 (iam.disableServiceAccountCreation
) 已關閉。此外,請確認服務帳戶可以存取專案中的 BigQuery 工作排程,以便執行 BigQuery 查詢。
限制
指定給特定資料品質工作的資料表必須位於同一個 Google Cloud區域。
排定資料品質任務
控制台
- 前往 Google Cloud 控制台的 Dataplex Universal Catalog「Process」(程序) 頁面。
- 按一下「建立工作」 。
- 在「檢查資料品質」資訊卡上,按一下「建立工作」。
- 在「Dataplex lake」(Dataplex 湖泊) 部分,選擇您的湖泊。
- 在「ID」中輸入 ID。
- 在「資料品質規格」部分,執行下列操作:
- 在「Select GCS file」(選取 GCS 檔案) 欄位中,按一下「Browse」(瀏覽)。
選取 Cloud Storage bucket。
按一下 [選取]。
在「結果資料表」部分,執行下列操作:
在「選取 BigQuery 資料集」欄位中,按一下「瀏覽」。
選取用來儲存驗證結果的 BigQuery 資料集。
按一下 [選取]。
在「BigQuery table」(BigQuery 資料表) 欄位中,輸入要儲存結果的資料表名稱。 如果資料表不存在,Dataplex Universal Catalog 會為您建立。請勿使用
dq_summary
這個名稱,因為這是保留給內部處理工作的名稱。
在「服務帳戶」部分,從「使用者服務帳戶」選單中選取服務帳戶。
按一下「繼續」。
在「設定時間表」部分,設定執行資料品質工作的時間表。
點選「建立」。
gcloud CLI
以下是執行資料品質工作的範例,該工作使用 Dataplex Universal Catalog 工作 gcloud CLI 指令:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex Universal Catalog task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex Universal Catalog lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex Universal Catalog lake where your task is created. export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Universal Catalog Editor, Dataproc Worker, and Service Usage Consumer. export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET" # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_ID}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
參數 | 說明 |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
資料品質 YAML 設定的 Cloud Storage 路徑,用於資料品質工作。您可以採用 .yml 或 .yaml 格式的單一 YAML 檔案,也可以使用包含多個 YAML 檔案的 ZIP 封存檔。 |
GOOGLE_CLOUD_PROJECT |
建立 Dataplex Universal Catalog 工作和 BigQuery 工作的 Google Cloud 專案。 |
DATAPLEX_REGION_ID |
建立資料品質工作的 Dataplex Universal Catalog 湖泊所在區域。 |
SERVICE_ACCOUNT |
用於執行工作的服務帳戶。請確認這個服務帳戶具備「事前準備」一節所述的足夠 IAM 權限。 |
如果是 --execution-args
,下列引數必須以位置引數的形式傳遞,因此順序如下:
引數 | 說明 |
---|---|
clouddq-executable.zip |
從公開 Cloud Storage bucket 傳遞的預先編譯可執行檔。spark-file-uris |
ALL |
執行所有規則繫結。或者,您也可以提供以逗號分隔的特定規則繫結清單。例如 RULE_1,RULE_2 。 |
gcp-project-id |
執行 BigQuery 查詢的專案 ID。 |
gcp-region-id |
執行 BigQuery 工作以驗證資料品質的區域。這個區域應與 gcp-bq-dataset-id 和 target_bigquery_summary_table 的區域相同。 |
gcp-bq-dataset-id |
用來儲存 rule_binding 檢視區塊和中繼資料品質摘要結果的 BigQuery 資料集。 |
target-bigquery-summary-table |
BigQuery 資料表的資料表 ID 參照,用於儲存資料品質檢查的最終結果。請勿使用 ID 值 dq_summary ,因為該值保留供內部處理工作使用。 |
--summary_to_stdout |
(選用) 傳遞這個旗標時,上次執行期間在 dq_summary 資料表中建立的所有驗證結果列,都會以 JSON 記錄的形式記錄到 Cloud Logging 和 stdout 。 |
API
更改下列內容:
PROJECT_ID = "Your Dataplex Universal Catalog Project ID" REGION = "Your Dataplex Universal Catalog lake region" LAKE_ID = "Your Dataplex Universal Catalog lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- 提交 HTTP POST 要求:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
監控排定的資料品質任務
瞭解如何監控工作。
查看結果
資料品質驗證結果會儲存在您指定的 BigQuery 資料集和摘要資料表中,如「建立資料集來儲存結果」一文所述。摘要資料表包含每次驗證執行作業的輸出摘要,以及規則繫結和規則的各項組合。摘要表格中的輸出內容包含下列資訊:
資料欄名稱 | 說明 |
---|---|
dataplex_lake |
(字串) 包含要驗證資料表的 Dataplex Universal Catalog 湖泊 ID。 |
dataplex_zone |
(字串) 包含要驗證資料表的 Dataplex Universal Catalog 區域 ID。 |
dataplex_asset_id |
(字串) 包含要驗證資料表的 Dataplex Universal Catalog 資產 ID。 |
execution_ts |
(時間戳記) 執行驗證查詢的時間戳記。 |
rule_binding_id |
(字串) 規則繫結的 ID,系統會回報驗證結果。 |
rule_id |
(字串) 規則繫結下的規則 ID,系統會針對該規則回報驗證結果。 |
dimension |
(字串) rule_id 的資料品質維度。這個值只能是 rule_dimensions YAML 節點中指定的值。 |
table_id |
(字串) 驗證結果所回報的實體 ID。
這個 ID 是在相應規則繫結的 entity 參數下指定。 |
column_id |
(字串) 系統回報驗證結果的資料欄 ID。
這個 ID 是在相應規則繫結的 column 參數下指定。 |
last_modified |
(時間戳記) 正在驗證的 table_id 上次修改時間戳記。 |
metadata_json_string |
(字串) 規則繫結或資料品質執行期間指定的中繼資料參數內容鍵/值組合。 |
configs_hashsum |
(字串) 包含規則繫結和所有相關聯規則、規則繫結、資料列篩選器和實體設定的 JSON 文件雜湊總和。configs_hashsum 可追蹤 rule_binding ID 的內容或其中一個參照設定是否已變更。 |
dq_run_id |
(字串) 記錄的專屬 ID。 |
invocation_id |
(字串) 資料品質執行作業的 ID。在同一個資料品質執行個體中產生的所有資料品質摘要記錄,都會共用相同的 invocation_id 。 |
progress_watermark |
(布林值) 判斷資料品質檢查是否會將這筆特定記錄納入考量,以判斷遞增驗證的高水位線。如果為 FALSE ,系統在建立高水位線值時會忽略相應記錄。執行不應推進高水位線的測試資料品質驗證時,這項資訊就很有用。Dataplex Universal Catalog 預設會在這個欄位填入 TRUE ,但如果 --progress_watermark 引數的值為 FALSE ,則可以覆寫這個值。
|
rows_validated |
(整數) 套用 row_filters 和 incremental_time_filter_column_id 欄 (如已指定) 的任何高水位線篩選器後,驗證的記錄總數。 |
complex_rule_validation_errors_count |
(浮點數) 規則傳回的資料列數。CUSTOM_SQL_STATEMENT |
complex_rule_validation_success_flag |
(布林值) CUSTOM_SQL_STATEMENT 規則的成功狀態。
|
success_count |
(整數) 通過驗證的記錄總數。這個欄位會針對 CUSTOM_SQL_STATEMENT 規則設為 NULL 。 |
success_percentage |
(浮點數) 通過驗證的記錄數量百分比,計算方式為通過驗證的記錄數量除以驗證的記錄總數。這個欄位會針對 CUSTOM_SQL_STATEMENT 規則設為 NULL 。 |
failed_count |
(整數) 驗證失敗的記錄總數。這個欄位會針對 CUSTOM_SQL_STATEMENT 規則設為 NULL 。 |
failed_percentage |
(浮點數) 驗證失敗的記錄數量占驗證記錄總數的百分比。這個欄位會針對 CUSTOM_SQL_STATEMENT 規則設為 NULL 。 |
null_count |
(整數) 驗證期間傳回空值的記錄總數。
這個欄位會針對 NOT_NULL 和 CUSTOM_SQL_STATEMENT 規則設為 NULL 。 |
null_percentage |
(浮點數) 驗證期間傳回空值的記錄數百分比,以驗證的記錄總數為分母。這個欄位會針對 NOT_NULL 和 CUSTOM_SQL_STATEMENT 規則設為 NULL 。 |
failed_records_query |
如果規則失敗,這個資料欄會儲存查詢,供您取得失敗記錄。請參閱本文的「使用 failed_records_query 排解規則失敗問題」。 |
如果是 BigQuery 實體,系統會為每個rule_binding
建立檢視區塊,其中包含最新執行的 SQL 驗證邏輯。您可以在引數 --gcp-bq-dataset-id
中指定的 BigQuery 資料集中找到這些檢視區塊。
成本最佳化
資料品質工作會以專案中的 BigQuery 工作形式執行。如要控管執行資料品質工作的費用,請在 BigQuery 工作執行的專案中,使用 BigQuery 定價。詳情請參閱 BigQuery 工作負載管理。
漸進式驗證
您通常會定期更新資料表,加入新的分區 (新的資料列)。如果不想在每次執行時重新驗證舊分割區,可以使用增量驗證。
如要進行增量驗證,資料表中必須有 TIMESTAMP
或 DATETIME
類型的資料欄,且資料欄值會單調遞增。您可以使用 BigQuery 資料表分區依據的資料欄。
如要指定增量驗證,請在規則繫結中指定 incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
的值。
指定資料欄後,資料品質工作只會考量 TIMESTAMP
值大於上次執行資料品質工作時間戳記的資料列。
規格檔案範例
如要使用這些範例,請建立名為 sales
的 BigQuery 資料集。接著,建立名為 sales_orders
的事實資料表,然後執行查詢,使用下列 GoogleSQL 陳述式新增範例資料:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
範例 1
下列程式碼範例會建立資料品質檢查,以驗證這些值:
amount
:值為零或正數。item_id
:由 5 個英文字母組成的英數字元字串,後接 15 個數字。transaction_currency
:允許的貨幣類型,由靜態清單定義。這個範例的靜態清單允許英鎊和日圓做為貨幣類型。這項驗證只適用於標示為國際的列。
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
更改下列內容:
PROJECT_ID
:您的專案 ID。DATASET_ID
:資料集 ID。
範例 2
如果要檢查的資料表屬於 Dataplex Universal Catalog 湖泊,可以使用湖泊或可用區標記指定資料表。這可讓您依湖泊或區域匯總結果。舉例來說,您可以產生區域層級的分數。
如要使用這個範例,請建立 Dataplex Universal Catalog 湖泊,並提供湖泊 ID operations
和區域 ID procurement
。然後將表格 sales_orders
新增為區域的資產。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
更改下列內容:
- PROJECT_ID:您的專案 ID。
- REGION_ID:資料表所在的 Dataplex Universal Catalog 湖泊區域 ID,例如
us-central1
。
範例 3
這個範例在範例 2 的基礎上,新增自訂 SQL 檢查,確認 ID 值是否不重複。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
範例 4
這個範例會使用 last_modified_timestamp
資料欄新增遞增驗證,進而強化範例 3。您可以為一或多個規則繫結新增漸進式驗證。
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
使用 failed_records_query
排解失敗的規則
對於每個失敗的規則,摘要表格會在 failed_records_query
欄中儲存查詢,您可以使用該查詢取得失敗的記錄。
如要進行偵錯,您也可以在 YAML 檔案中使用 reference columns
,將 failed_records_query
的輸出內容與原始資料合併,取得完整記錄。舉例來說,您可以指定 primary_key
資料欄或複合 primary_key
資料欄做為參照資料欄。
指定參照資料欄
如要產生參照資料欄,請在 YAML 規格中加入下列內容:
「
reference_columns
」專區。您可以在這個部分建立一或多個參照資料欄集,每個資料欄集可指定一或多個資料欄。「
rule_bindings
」專區。在本節中,您可以在規則繫結中新增一行,指定要用於該規則繫結中規則的參照欄 ID (reference_columns_id
)。這應是reference_columns
區段中指定的其中一組參照資料欄。
舉例來說,下列 YAML 檔案會指定 reference_columns
區段,並定義 ORDER_DETAILS_REFERENCE_COLUMNS
集中的三個資料欄:id
、last_modified_timestamp
和 item_id
。下列範例使用範例資料表 sales_orders
。
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
使用失敗記錄查詢
失敗記錄查詢會為每個規則失敗的記錄產生資料列。其中包含導致失敗的資料欄名稱、導致失敗的值,以及參照資料欄的值。此外,這項服務還包含中繼資料,可用於與資料品質工作的執行作業建立關聯。
以下範例為 YAML 檔案的記錄查詢失敗輸出內容,如「指定參照資料欄」一節所述。顯示欄 amount
的失敗,以及失敗值 -10
。系統也會記錄參照資料欄的對應值。
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
針對 CUSTOM_SQL_STATEMENT 規則使用失敗記錄查詢
如果是 CUSTOM_SQL_STATEMENT
規則,失敗的記錄查詢會包含 custom_sql_statement_validation_errors
欄。custom_sql_statement_validation_errors
欄位是巢狀欄位,內含與 SQL 陳述式輸出內容相符的欄位。查詢 CUSTOM_SQL_STATEMENT
規則的失敗記錄時,不會顯示參照資料欄。
舉例來說,您的 CUSTOM_SQL_STATEMENT
規則可能如下所示:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
資料欄的一或多個資料列,每個資料列代表 existing_id!=replacement_id
的一次出現次數。以 JSON 格式呈現時,這個資料欄中儲存格的內容可能如下所示:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
您可以使用 join on custom_sql_statement_valdation_errors.product_key
等巢狀參照,將這些結果與原始資料表合併。
後續步驟
- 請參閱 CloudDQ YAML 規格參考資料。
- 如需資料品質規則範例,請參閱「簡單規則」和「進階規則」。
- 請參閱「Dataplex Universal Catalog 資料品質工作的範例 Airflow DAG」。