In diesem Dokument wird beschrieben, wie Sie in Dataplex Universal Catalog Datenqualitätsaufgaben erstellen, mit denen Sie Datenqualitätsprüfungen für Ihre integrierten und externen BigQuery-Tabellen planen und ausführen können.
Weitere Informationen finden Sie unter Datenqualitätsaufgaben – Übersicht.
Hinweis
In diesem Dokument wird davon ausgegangen, dass Sie bereits einen Dataplex Universal Catalog-Lake haben, in dem Sie die Datenqualitätsaufgabe erstellen können.
Google-APIs und ‑Dienste aktivieren
Aktivieren Sie die Dataproc API.
Aktivieren Sie den privaten Google-Zugriff für Ihr Netzwerk und Subnetzwerk. Für das Netzwerk, das Sie für Datenqualitätsaufgaben in Dataplex Universal Catalog verwenden möchten, muss der private Google-Zugriff aktiviert sein. Wenn Sie beim Erstellen der Datenqualitätsaufgabe in Dataplex Universal Catalog kein Netzwerk oder Subnetzwerk angeben, verwendet Dataplex Universal Catalog das Standardsubnetz. In diesem Fall müssen Sie den privaten Google-Zugriff für das Standardsubnetz aktivieren.
Spezifikationsdatei erstellen
Dataplex Universal Catalog verwendet Open-Source-CloudDQ als Treiberprogramm. Die Anforderungen an die Datenqualitätsprüfung in Dataplex Universal Catalog werden in CloudDQ-YAML-Spezifikationsdateien definiert.
Als Eingabe für die Datenqualitätsaufgabe können Sie eine einzelne YAML-Datei oder ein einzelnes ZIP-Archiv mit einer oder mehreren YAML-Dateien verwenden. Es wird empfohlen, die Anforderungen an die Datenqualitätsprüfung in separaten YAML-Spezifikationsdateien zu erfassen, mit einer Datei pro Abschnitt.
So bereiten Sie eine Spezifikationsdatei vor:
-
Erstellen Sie eine oder mehrere CloudDQ-YAML-Spezifikationsdateien, in denen die Anforderungen an die Datenqualitätsprüfung definiert werden. Weitere Informationen zur erforderlichen Syntax finden Sie im Abschnitt Spezifikationsdatei in diesem Dokument.
Speichern Sie die YAML-Spezifikationsdatei im
.yml
- oder.yaml
-Format. Wenn Sie mehrere YAML-Spezifikationsdateien erstellen, speichern Sie alle in einem einzigen ZIP-Archiv. - Cloud Storage-Bucket erstellen
- Laden Sie die Spezifikationsdatei in den Cloud Storage-Bucket hoch.
Spezifikationsdatei
Ihre CloudDQ-YAML-Spezifikationsdatei muss folgende Abschnitte enthalten:
Regeln (definiert im
rules
-YAML-Knoten der obersten Ebene): Eine Liste der auszuführenden Regeln. Sie können diese Regeln aus vordefinierten Regeltypen wieNOT_NULL
undREGEX
erstellen oder sie mit benutzerdefinierten SQL-Anweisungen wieCUSTOM_SQL_EXPR
undCUSTOM_SQL_STATEMENT
erweitern. Die AnweisungCUSTOM_SQL_EXPR
kennzeichnet alle Zeilen, diecustom_sql_expr
mitFalse
als Fehler ausgewertet hat. Die AnweisungCUSTOM_SQL_STATEMENT
kennzeichnet alle Werte, die von der gesamten Anweisung als Fehler zurückgegeben werden.Zeilenfilter (definiert im
row_filters
-YAML-Knoten der obersten Ebene): SQL-Ausdrücke, die einen booleschen Wert zurückgeben, der Filter definiert, um eine Teilmenge der Daten aus der zugrunde liegenden Entität zur Prüfung abzurufen.Regelbindungen (definiert im
rule_bindings
-YAML-Knoten der obersten Ebene): Definiertrules
undrule filters
für die Tabellen.Regeldimensionen (definiert im
rule_dimensions
-YAML-Knoten der obersten Ebene): Definiert die Liste der zulässigen Dimensionen für Datenqualitätsregeln, die in einer Regel im entsprechendendimension
-Feld definiert werden können.Beispiel:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
Das Feld
dimension
ist für eine Regel optional. Der Abschnitt zu den Regeldimensionen ist obligatorisch, wenndimension
in einer beliebigen Regel aufgeführt ist.
Weitere Informationen finden Sie im CloudDQ-Referenzhandbuch und in den beispielhaften Spezifikationsdateien.
Dataset zum Speichern der Ergebnisse erstellen
-
Erstellen Sie ein BigQuery-Dataset, um die Ergebnisse zu speichern.
Das Dataset muss sich in derselben Region wie die Tabellen befinden, für die Sie die Datenqualitätsaufgabe ausführen.
Dataplex Universal Catalog verwendet dieses Dataset und erstellt oder verwendet eine Tabelle Ihrer Wahl, um die Ergebnisse zu speichern.
Dienstkonto erstellen
Erstellen Sie ein Dienstkonto mit den folgenden IAM-Rollen (Identity and Access Management) und ‑Berechtigungen:
- Lesezugriff auf den Cloud Storage-Pfad, der die YAML-Spezifikationen enthält. Sie können die Rolle „Storage Object Viewer“ (
roles/storage.objectViewer
) für den Cloud Storage-Bucket verwenden. - Lesezugriff auf die BigQuery-Datasets mit den zu prüfenden Daten. Sie können die Rolle „BigQuery Data Viewer“ verwenden.
- Schreibzugriff auf das BigQuery-Dataset, um bei Bedarf eine Tabelle zu erstellen und die Ergebnisse in diese Tabelle zu schreiben. Sie können die Rolle „BigQuery Data Editor“ (
roles/bigquery.dataEditor
) auf Dataset-Ebene verwenden. - Rolle „BigQuery Job User“ (
roles/bigquery.jobUser
) auf Projektebene, um BigQuery-Jobs in einem Projekt zu erstellen. - Rolle „Dataplex Universal Catalog Metadata Reader“ (
roles/dataplex.metadataReader
) auf Projekt- oder Lake-Ebene. - Rolle „Service Usage Consumer“ (
roles/serviceusage.serviceUsageConsumer
) auf Projektebene. - Rolle „Dataproc Worker“.
- Berechtigung
iam.serviceAccounts.actAs
, die dem Nutzer erteilt wird, der den Job sendet. - Rolle „Service Account User“, die dem Dienstkonto des Dataplex Universal Catalog-Lake zugewiesen wird. Sie können das Dienstkonto des Dataplex Universal Catalog-Lake in der Google Cloud Console ansehen.
Erweiterte Einstellungen verwenden
Diese Schritte sind optional:
BigQuery führt Datenqualitätsprüfungen standardmäßig im aktuellen Projekt aus. Sie können aber auch ein anderes Projekt auswählen. Verwenden Sie dazu das Argument
--gcp_project_id
TASK_ARGS
für das Attribut--execution-args
der Aufgabe.Wenn sich die zum Ausführen von BigQuery-Abfragen angegebene Projekt-ID von dem Projekt unterscheidet, in dem das Dienstkonto (durch
--execution-service-account
angegeben) erstellt wird, muss die Organisationsrichtlinie, die die projektübergreifende Dienstkontonutzung deaktiviert (iam.disableServiceAccountCreation
), deaktiviert sein. Achten Sie außerdem darauf, dass das Dienstkonto auf den BigQuery-Jobzeitplan in dem Projekt zugreifen kann, in dem BigQuery-Abfragen ausgeführt werden.
Beschränkungen
Alle Tabellen, die für eine bestimmte Datenqualitätsaufgabe angegeben sind, müssen zur selben Google Cloud-Region gehören.
Zeitplan für eine Datenqualitätsaufgabe festlegen
Console
- Rufen Sie in der Google Cloud Console die Dataplex Universal Catalog-Seite Verarbeiten auf.
- Klicken Sie auf Aufgabe erstellen.
- Klicken Sie auf der Karte Datenqualität prüfen auf Aufgabe erstellen.
- Wählen Sie als Dataplex-Lake Ihren Lake aus.
- Geben Sie als ID eine ID ein.
- Führen Sie im Abschnitt Spezifikation der Datenqualität folgende Schritte aus:
- Klicken Sie im Feld GCS-Datei auswählen auf Durchsuchen.
Wählen Sie Ihren Cloud Storage-Bucket aus.
Klicken Sie auf Auswählen.
Führen Sie im Abschnitt Ergebnistabelle folgende Schritte aus:
Klicken Sie im Feld BigQuery-Dataset auswählen auf Durchsuchen.
Wählen Sie das BigQuery-Dataset aus, in dem die Prüfungsergebnisse gespeichert werden sollen.
Klicken Sie auf Auswählen.
Geben Sie im Feld BigQuery-Tabelle den Namen der Tabelle ein, in der die Ergebnisse gespeichert werden sollen. Wenn die Tabelle nicht vorhanden ist, wird sie von Dataplex Universal Catalog erstellt. Verwenden Sie nicht den Namen
dq_summary
, da er für interne Verarbeitungsaufgaben reserviert ist.
Wählen Sie im Abschnitt Dienstkonto ein Dienstkonto aus dem Menü Nutzerdienstkonto aus.
Klicken Sie auf Weiter.
Konfigurieren Sie im Abschnitt Zeitplan festlegen den Zeitplan für das Ausführen der Datenqualitätsaufgabe.
Klicken Sie auf Erstellen.
gcloud-CLI
Das folgende Beispiel zeigt die Ausführung einer Datenqualitätsaufgabe, bei der der gcloud CLI-Befehl für Dataplex Universal Catalog-Aufgaben verwendet wird:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex Universal Catalog task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex Universal Catalog lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex Universal Catalog lake where your task is created. export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Universal Catalog Editor, Dataproc Worker, and Service Usage Consumer. export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET" # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_ID}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parameter | Beschreibung |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Der Cloud Storage-Pfad zu Ihrer YAML-Konfigurationseingabe für die Datenqualitätsaufgabe. Sie können eine einzelne YAML-Datei im Format .yml oder .yaml oder ein ZIP-Archiv mit mehreren YAML-Dateien haben. |
GOOGLE_CLOUD_PROJECT |
Das Google Cloud -Projekt, in dem die Dataplex Universal Catalog-Aufgabe und die BigQuery-Jobs erstellt werden. |
DATAPLEX_REGION_ID |
Die Region des Dataplex Universal Catalog-Lake, in der die Datenqualitätsaufgabe erstellt wird. |
SERVICE_ACCOUNT |
Das Dienstkonto, das zum Ausführen der Aufgabe verwendet wird. Dieses Dienstkonto muss die erforderlichen IAM-Berechtigungen haben, wie im Abschnitt Hinweis beschrieben. |
Für --execution-args
müssen die folgenden Argumente als positionierte Argumente in dieser Reihenfolge übergeben werden:
Argument | Beschreibung |
---|---|
clouddq-executable.zip |
Vorkompilierte ausführbare Datei, die über spark-file-uris aus einem öffentlichen Cloud Storage-Bucket übergeben wurde. |
ALL |
Alle Regelbindungen werden ausgeführt. Alternativ können Sie bestimmte Regelbindungen als durch Kommas getrennte Liste angeben.
Beispiel: RULE_1,RULE_2 . |
gcp-project-id |
Projekt-ID, unter der die BigQuery-Abfragen ausgeführt werden. |
gcp-region-id |
Region zum Ausführen der BigQuery-Jobs zur Prüfung der Datenqualität. Diese Region sollte mit der Region für gcp-bq-dataset-id und target_bigquery_summary_table übereinstimmen. |
gcp-bq-dataset-id |
BigQuery-Dataset, in dem die rule_binding -Ansichten und die zusammenfassenden Zwischenergebnisse der Datenqualitätsprüfung gespeichert werden. |
target-bigquery-summary-table |
Tabellen-ID-Referenz der BigQuery-Tabelle, in der die Endergebnisse der Datenqualitätsprüfungen gespeichert werden. Verwenden Sie nicht den ID-Wert dq_summary , da er für interne Verarbeitungsaufgaben reserviert ist. |
--summary_to_stdout |
(Optional) Wenn dieses Flag übergeben wird, werden alle Zellen mit Prüfungsergebnissen, die bei der letzten Ausführung in der Tabelle dq_summary erstellt wurden, als JSON-Einträge in Cloud Logging und stdout protokolliert. |
API
Ersetzen Sie Folgendes:
PROJECT_ID = "Your Dataplex Universal Catalog Project ID" REGION = "Your Dataplex Universal Catalog lake region" LAKE_ID = "Your Dataplex Universal Catalog lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Senden Sie eine HTTP-POST-Anfrage:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Beispiel-Airflow-DAG für Datenqualitätsaufgabe in Dataplex Universal Catalog
Geplante Datenqualitätsaufgabe überwachen
Hier erfahren Sie, wie Sie Ihre Aufgabe im Blick behalten.
Ergebnisse aufrufen
Die Ergebnisse der Datenqualitätsprüfungen werden im BigQuery-Dataset und in der Übersichtstabelle gespeichert, die Sie angegeben haben, wie unter Dataset zum Speichern der Ergebnisse erstellen beschrieben. Die Übersichtstabelle enthält eine Ausgabezusammenfassung für die verschiedenen Kombinationen aus Regelbindung und Regel je Prüfung. Die Ausgabe in der Übersichtstabelle umfasst die folgenden Informationen:
Spaltenname | Beschreibung |
---|---|
dataplex_lake |
(String) ID des Dataplex Universal Catalog-Lake, der die zu prüfende Tabelle enthält. |
dataplex_zone |
(String) ID der Dataplex Universal Catalog-Zone, die die zu prüfende Tabelle enthält. |
dataplex_asset_id |
(String) ID des Dataplex Universal Catalog-Assets, das die zu prüfende Tabelle enthält. |
execution_ts |
(Zeitstempel) Zeitstempel für die Ausführung der Prüfungsabfrage. |
rule_binding_id |
(String) ID der Regelbindung, für die Prüfungsergebnisse gemeldet werden. |
rule_id |
(String) ID der Regel in der Regelbindung, für die Prüfungsergebnisse gemeldet werden. |
dimension |
(String) Datenqualitätsdimension der rule_id . Dieser Wert kann nur einer der Werte sein, die im rule_dimensions -YAML-Knoten angegeben sind. |
table_id |
(String) ID der Entität, für die Prüfungsergebnisse gemeldet werden.
Diese ID wird unter dem entity -Parameter der jeweiligen Regelbindung angegeben. |
column_id |
(String) ID der Spalte, für die Prüfungsergebnisse gemeldet werden.
Diese ID wird unter dem column -Parameter der jeweiligen Regelbindung angegeben. |
last_modified |
(Zeitstempel) Zeitstempel der letzten Änderung der zu prüfenden table_id . |
metadata_json_string |
(String) Schlüssel/Wert-Paare des Inhalts des Metadatenparameters, der in der Regelbindung oder während der Datenqualitätsprüfung angegeben wurde. |
configs_hashsum |
(String) Hash-Summe des JSON-Dokuments, das die Regelbindung und alle zugehörigen Regeln, Regelbindungen, Zeilenfilter und Entitätskonfigurationen enthält.
configs_hashsum ermöglicht das Tracking, wenn der Inhalt einer rule_binding -ID oder einer der referenzierten Konfigurationen geändert wurde. |
dq_run_id |
(String) Eindeutige ID des Eintrags. |
invocation_id |
(String) ID der Datenqualitätsprüfung. Alle Datenqualitätszusammenfassungen, die innerhalb derselben Datenqualitätsprüfung generiert wurden, haben dieselbe invocation_id . |
progress_watermark |
(Boolesch) Legt fest, ob dieser Eintrag von der Datenqualitätsprüfung berücksichtigt wird, um den Höchstwert für die inkrementelle Prüfung zu festzustellen. Bei FALSE wird der entsprechende Eintrag beim Festlegen des Höchstwerts ignoriert. Diese Information ist nützlich, wenn Sie testweise Datenqualitätsprüfungen ausführen, die den Höchstwert nicht erhöhen sollen. Dataplex Universal Catalog füllt dieses Feld standardmäßig mit TRUE aus. Der Wert kann jedoch überschrieben werden, wenn das Argument --progress_watermark den Wert FALSE hat.
|
rows_validated |
(Ganzzahl) Gesamtzahl der Einträge, die nach dem Anwenden von row_filters und möglichen Höchstwertfiltern auf die Spalte incremental_time_filter_column_id geprüft wurden, falls angegeben. |
complex_rule_validation_errors_count |
(Gleitkommazahl) Anzahl der Zeilen, die von einer CUSTOM_SQL_STATEMENT -Regel zurückgegeben werden. |
complex_rule_validation_success_flag |
(Boolesch) Erfolgsstatus von CUSTOM_SQL_STATEMENT -Regeln.
|
success_count |
(Ganzzahl) Gesamtzahl der Einträge, die die Prüfung bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT -Regeln auf NULL gesetzt.
|
success_percentage |
(Gleitkommazahl) Prozentsatz der geprüften Einträge, die die Prüfung bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT -Regeln auf NULL gesetzt. |
failed_count |
(Ganzzahl) Gesamtzahl der Einträge, die die Prüfung nicht bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT -Regeln auf NULL gesetzt.
|
failed_percentage |
(Gleitkommazahl) Prozentsatz der geprüften Einträge, die die Prüfung nicht bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT -Regeln auf NULL gesetzt. |
null_count |
(Ganzzahl) Gesamtzahl der Einträge, die während der Prüfung den Wert Null zurückgegeben haben.
Dieses Feld ist für NOT_NULL - und CUSTOM_SQL_STATEMENT -Regeln auf NULL gesetzt. |
null_percentage |
(Gleitkommazahl) Prozentsatz der geprüften Einträge, die während der Prüfung den Wert Null zurückgegeben haben. Dieses Feld ist für NOT_NULL - und CUSTOM_SQL_STATEMENT -Regeln auf NULL gesetzt. |
failed_records_query |
Für jede Regel, die fehlschlägt, wird in dieser Spalte eine Abfrage gespeichert, mit der Sie die fehlgeschlagenen Einträge abrufen können. Weitere Informationen finden Sie in diesem Dokument unter Fehler in fehlgeschlagenen Regeln mit failed_records_query beheben. |
Für BigQuery-Entitäten wird für jede rule_binding
eine Ansicht erstellt, die die SQL-Prüfungslogik der letzten Ausführung enthält. Sie finden diese Ansichten im BigQuery-Dataset, das im Argument --gcp-bq-dataset-id
angegeben ist.
Kostenoptimierung
Datenqualitätsaufgaben werden als BigQuery-Jobs in Ihrem Projekt ausgeführt. Damit die Kosten für die Ausführung von Datenqualitätsjobs unter Kontrolle bleiben, zahlen Sie die BigQuery-Preise für das Projekt, in dem Ihre BigQuery-Jobs ausgeführt werden. Weitere Informationen finden Sie unter Einführung in die Arbeitslastverwaltung.
Inkrementelle Prüfungen
Oft werden Tabellen regelmäßig um neue Partitionen (neue Zeilen) ergänzt. Wenn Sie alte Partitionen nicht bei jeder Ausführung neu prüfen möchten, können Sie inkrementelle Prüfungen verwenden.
Für inkrementelle Prüfungen muss in Ihrer Tabelle eine Spalte vom Typ TIMESTAMP
oder DATETIME
vorhanden sein, in der der Spaltenwert monoton ansteigt. Sie können die Spalten verwenden, nach denen Ihre BigQuery-Tabelle partitioniert ist.
Falls Sie eine inkrementelle Prüfung festlegen möchten, geben Sie einen Wert für incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
als Teil einer Regelbindung an.
Wenn Sie eine Spalte angeben, werden in der Datenqualitätsaufgabe nur Zeilen mit einem TIMESTAMP
-Wert berücksichtigt, der größer ist als der Zeitstempel der letzten Datenqualitätsaufgabe, die ausgeführt wurde.
Beispielhafte Spezifikationsdateien
Wenn Sie diese Beispiele verwenden möchten, erstellen Sie ein BigQuery-Dataset namens sales
. Erstellen Sie dann eine Faktentabelle mit dem Namen sales_orders
und fügen Sie Beispieldaten hinzu, indem Sie mit den folgenden GoogleSQL-Anweisungen eine Abfrage ausführen:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Beispiel 1
Im folgenden Codebeispiel werden Datenqualitätsprüfungen zum Prüfen dieser Werte erstellt:
amount
: Die Werte sind null oder positive Zahlen.item_id
: Ein alphanumerischer String mit fünf alphabetischen Zeichen, gefolgt von 15 Ziffern.transaction_currency
: Ein zulässiger Währungstyp, wie durch eine statische Liste definiert. In der statischen Liste dieses Beispiels werden als Währungstypen GBP und JPY zugelassen. Diese Prüfung gilt nur für Zeilen, die als international markiert sind.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Ersetzen Sie Folgendes:
PROJECT_ID
: Ihre Projekt-ID.DATASET_ID
: Die Dataset-ID.
Beispiel 2
Wenn die zu prüfende Tabelle Teil eines Dataplex Universal Catalog-Lake ist, können Sie die Tabellen mit der Lake- oder Zonennotation angeben. So können Sie Ergebnisse nach Lake oder Zone aggregieren. Sie können beispielsweise eine Punktzahl auf Zonenebene vergeben.
Wenn Sie dieses Beispiel verwenden möchten, erstellen Sie einen Dataplex Universal Catalog-Lake mit der Lake-ID operations
und der Zonen-ID procurement
. Fügen Sie dann die Tabelle sales_orders
als Asset zur Zone hinzu.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Ersetzen Sie Folgendes:
- PROJECT_ID: Ihre Projekt-ID.
- REGION_ID: Die Regions-ID des Dataplex Universal Catalog-Lake, in dem sich die Tabelle befindet, zum Beispiel
us-central1
.
Beispiel 3
In diesem Beispiel wird Beispiel 2 um eine benutzerdefinierte SQL-Prüfung erweitert, um festzustellen, ob die ID-Werte eindeutig sind.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Beispiel 4
In diesem Beispiel wird Beispiel 3 durch das Hinzufügen inkrementeller Prüfungen mit der Spalte last_modified_timestamp
erweitert. Sie können inkrementelle Prüfungen für eine oder mehrere Regelbindungen hinzufügen.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Fehler in fehlgeschlagenen Regeln mit failed_records_query
beheben
Für jede Regel, die fehlschlägt, wird in der Übersichtstabelle eine Abfrage in der Spalte failed_records_query
gespeichert, mit der Sie die fehlgeschlagenen Einträge abrufen können.
Zum Debuggen können Sie auch reference columns
in Ihrer YAML-Datei verwenden. Damit können Sie die Ausgabe von failed_records_query
mit den Originaldaten zusammenführen, um den gesamten Eintrag zu erhalten. Sie können beispielsweise eine primary_key
-Spalte oder eine zusammengesetzte primary_key
-Spalte als Referenzspalte angeben.
Referenzspalten angeben
Wenn Sie Referenzspalten generieren möchten, können Sie Ihrer YAML-Spezifikation Folgendes hinzufügen:
Den Abschnitt
reference_columns
. Darin können Sie einen oder mehrere Referenzspaltensätze erstellen, wobei jeder Satz eine oder mehrere Spalten angibt.Den Abschnitt
rule_bindings
. In diesem Abschnitt können Sie einer Regelbindung eine Zeile hinzufügen, in der eine Referenzspalten-ID (reference_columns_id
) für die Regeln in dieser Regelbindung angegeben wird. Dabei sollte es sich um einen der im Abschnittreference_columns
angegebenen Referenzspaltensätze handeln.
In der folgenden YAML-Datei wird beispielsweise ein reference_columns
-Abschnitt angegeben und es werden die drei Spalten id
, last_modified_timestamp
und item_id
als Teil des Satzes ORDER_DETAILS_REFERENCE_COLUMNS
definiert. Im folgenden Beispiel wird die Tabelle sales_orders
verwendet.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Abfrage fehlgeschlagener Einträge verwenden
Bei der Abfrage fehlgeschlagener Einträge wird für jeden Eintrag, in dem eine Regel fehlgeschlagen ist, eine Zeile generiert. Sie enthält den Spaltennamen und den Wert, die den Fehler ausgelöst haben, sowie die Werte für die Referenzspalten. Außerdem beinhaltet sie Metadaten, mit denen Sie sich auf die Ausführung der Datenqualitätsaufgabe beziehen können.
Sie sehen hier ein Beispiel der Ausgabe einer Abfrage fehlgeschlagener Einträge für die YAML-Datei, die unter Referenzspalten angeben beschrieben wird. Es wird ein Fehler für die Spalte amount
und ein fehlgeschlagener Wert von -10
angezeigt. Außerdem wird der entsprechende Wert für die Referenzspalte erfasst.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Abfragen fehlgeschlagener Einträge für CUSTOM_SQL_STATEMENT-Regeln verwenden
Für CUSTOM_SQL_STATEMENT
-Regeln enthalten Abfragen fehlgeschlagener Einträge die Spalte custom_sql_statement_validation_errors
. Die Spalte custom_sql_statement_validation_errors
ist eine verschachtelte Spalte mit Feldern, die der Ausgabe Ihrer SQL-Anweisung entsprechen. Referenzspalten sind nicht Teil der Abfragen fehlgeschlagener Einträge für CUSTOM_SQL_STATEMENT
-Regeln.
Ihre CUSTOM_SQL_STATEMENT
-Regel könnte beispielsweise so aussehen:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
– eine Zeile pro Ergebnis mit existing_id!=replacement_id
.
Wenn der Inhalt einer Zelle in dieser Spalte in JSON wiedergegeben wird, sieht er so aus:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Sie können diese Ergebnisse mit einem verschachtelten Verweis wie join on custom_sql_statement_valdation_errors.product_key
mit der Originaltabelle verknüpfen.
Nächste Schritte
- CloudDQ-YAML-Spezifikationsreferenz
- Einfache Regeln und Erweiterte Regeln
- Beispiel-Airflow-DAG für Datenqualitätsaufgabe in Dataplex Universal Catalog