Questo documento mostra come creare attività di qualità dei dati di Dataplex Universal Catalog che consentono di pianificare ed eseguire controlli di qualità dei dati per le tabelle BigQuery integrate ed esterne.
Per maggiori informazioni, vedi Panoramica delle attività di qualità dei dati.
Prima di iniziare
Questo documento presuppone che tu disponga di un lake Dataplex Universal Catalog esistente in cui creare l'attività di qualità dei dati.
Abilita API e servizi Google
Abilitare l'API Dataproc.
Abilita l'accesso privato Google per la tua rete e la tua subnet. Attiva l'accesso Google privato sulla rete che prevedi di utilizzare con le attività di qualità dei dati di Dataplex Universal Catalog. Se non specifichi una rete o una subnet quando crei l'attività di qualità dei dati di Dataplex Universal Catalog, Dataplex Universal Catalog utilizza la subnet predefinita. In questo caso, devi abilitare l'accesso privato Google nella subnet predefinita.
Crea un file di specifica
Dataplex Universal Catalog utilizza CloudDQ open source come programma driver. I requisiti di controllo della qualità dei dati di Dataplex Universal Catalog sono definiti all'interno dei file delle specifiche YAML di CloudDQ.
Come input per l'attività di qualità dei dati, puoi avere un singolo file YAML o un singolo archivio zip contenente uno o più file YAML. Ti consigliamo di acquisire i requisiti di controllo della qualità dei dati in file delle specifiche YAML separati, con un file per ogni sezione.
Per preparare un file di specifica:
-
Crea uno o più file delle specifiche YAML di CloudDQ che definiscono i requisiti di controllo della qualità dei dati. Per ulteriori informazioni sulla sintassi richiesta, consulta la sezione Informazioni sul file di specifica di questo documento.
Salva il file di specifica YAML in formato
.yml
o.yaml
. Se crei più file di specifica YAML, salvali tutti in un unico archivio zip. - Crea un bucket Cloud Storage.
- Carica il file delle specifiche nel bucket Cloud Storage.
Informazioni sul file di specifica
Il file delle specifiche YAML di CloudDQ deve avere le seguenti sezioni:
Regole (definite nel nodo YAML
rules
di primo livello): un elenco di regole da eseguire. Puoi creare queste regole a partire da tipi di regole predefiniti, ad esempioNOT_NULL
eREGEX
, oppure puoi estenderle con istruzioni SQL personalizzate, ad esempioCUSTOM_SQL_EXPR
eCUSTOM_SQL_STATEMENT
. L'istruzioneCUSTOM_SQL_EXPR
segnala come errore qualsiasi riga checustom_sql_expr
ha restituito il valoreFalse
. L'istruzioneCUSTOM_SQL_STATEMENT
contrassegna come errore qualsiasi valore restituito dall'intera istruzione.Filtri di riga (definiti nel nodo YAML
row_filters
di primo livello): espressioni SQL che restituiscono un valore booleano che definisce i filtri per recuperare un sottoinsieme di dati dall'entità sottostante soggetta a convalida.Associazioni di regole (definite nel nodo YAML
rule_bindings
di primo livello): Definiscerules
erule filters
da applicare alle tabelle.Dimensioni delle regole (definite nel nodo YAML
rule_dimensions
): definisce l'elenco consentito di dimensioni delle regole di qualità dei dati che una regola può definire nel campodimension
corrispondente.Ad esempio:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
Il campo
dimension
è facoltativo per una regola. La sezione delle dimensioni della regola è obbligatoria sedimension
è elencato in una regola.
Per ulteriori informazioni, consulta la guida di riferimento di CloudDQ e i file di specifiche di esempio.
Crea un set di dati per archiviare i risultati
-
Per archiviare i risultati, crea un set di dati BigQuery.
Il set di dati deve trovarsi nella stessa regione delle tabelle su cui esegui l'attività di qualità dei dati.
Dataplex Universal Catalog utilizza questo set di dati e crea o riutilizza una tabella a tua scelta per archiviare i risultati.
Crea un account di servizio
Crea un service account con i seguenti ruoli e autorizzazioni Identity and Access Management (IAM):
- Accesso in lettura al percorso Cloud Storage contenente le specifiche YAML. Puoi utilizzare il ruolo Visualizzatore oggetti Storage
(
roles/storage.objectViewer
) nel bucket Cloud Storage. - Accesso in lettura ai set di dati BigQuery con i dati da convalidare.
Puoi utilizzare il ruolo Visualizzatore dati BigQuery
(
roles/bigquery.dataViewer
). - Accesso in scrittura al set di dati BigQuery per creare una tabella
(se necessario) e scrivere i risultati in questa tabella. Puoi
utilizzare il ruolo Editor dati BigQuery
(
roles/bigquery.dataEditor
) a livello di set di dati. - Ruolo Utente job BigQuery (
roles/bigquery.jobUser
) a livello di progetto per creare job BigQuery in un progetto. - Il ruolo Lettore metadati Dataplex (
roles/dataplex.metadataReader
) a livello di progetto o lake. - Il ruolo Consumer Service Usage
(
roles/serviceusage.serviceUsageConsumer
) a livello di progetto. - Il ruolo Worker Dataproc
(
roles/dataproc.worker
). - L'autorizzazione
iam.serviceAccounts.actAs
concessa all'utente che invia il job. - Il ruolo Utente service account concesso alaccount di serviziot del lake Dataplex Universal Catalog. Puoi visualizzare il account di servizio del lake Dataplex Universal Catalog nella Google Cloud console.
Utilizzare le impostazioni avanzate
Questi passaggi sono facoltativi:
BigQuery esegue i controlli di qualità dei dati nel progetto corrente per impostazione predefinita. Puoi scegliere un progetto diverso per eseguire i job BigQuery. Utilizza l'argomento
--gcp_project_id
TASK_ARGS
per la proprietà--execution-args
dell'attività.Se l'ID progetto specificato per l'esecuzione delle query BigQuery è diverso dal progetto in cui viene creato l'account di servizio (specificato da
--execution-service-account
), assicurati che il criterio dell'organizzazione che disabilita l'utilizzo dell'account di servizio tra progetti (iam.disableServiceAccountCreation
) sia disattivato. Assicurati inoltre che l'account di servizio possa accedere alla pianificazione dei job BigQuery nel progetto in cui vengono eseguite le query BigQuery.
Limitazioni
Tutte le tabelle specificate per una determinata attività di qualità dei dati devono appartenere alla stessa Google Cloud regione.
Pianificare un'attività di qualità dei dati
Console
- Nella console Google Cloud , vai alla pagina Processo di Dataplex Universal Catalog.
- Fai clic su Crea attività.
- Nella scheda Controlla la qualità dei dati, fai clic su Crea attività.
- Per Lake Dataplex, scegli il tuo lake.
- In ID, inserisci un ID.
- Nella sezione Specifica della qualità dei dati, segui questi passaggi:
- Nel campo Seleziona file GCS, fai clic su Sfoglia.
Seleziona il bucket Cloud Storage.
Fai clic su Seleziona.
Nella sezione Tabella dei risultati, segui questi passaggi:
Nel campo Seleziona set di dati BigQuery, fai clic su Sfoglia.
Seleziona il set di dati BigQuery in cui archiviare i risultati della convalida.
Fai clic su Seleziona.
Nel campo Tabella BigQuery, inserisci il nome della tabella in cui archiviare i risultati. Se la tabella non esiste, Dataplex Universal Catalog la crea per te. Non utilizzare il nome
dq_summary
perché è riservato per le attività di elaborazione interne.
Nella sezione Service account, seleziona un account di servizio dal menu Service account utente.
Fai clic su Continua.
Nella sezione Imposta pianificazione, configura la pianificazione per l'esecuzione dell'attività di qualità dei dati.
Fai clic su Crea.
Interfaccia a riga di comando gcloud
Di seguito è riportato un esempio di esecuzione di un'attività di qualità dei dati che utilizza il comando gcloud CLI delle attività di Dataplex Universal Catalog:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex Universal Catalog task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex Universal Catalog lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex Universal Catalog lake where your task is created. export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Universal Catalog Editor, Dataproc Worker, and Service Usage Consumer. export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET" # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_ID}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parametro | Descrizione |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Il percorso Cloud Storage per la configurazione YAML della qualità dei dati
input per l'attività di qualità dei dati. Puoi avere un
singolo file YAML in formato .yml o .yaml o un archivio ZIP
contenente più file YAML. |
GOOGLE_CLOUD_PROJECT |
Il Google Cloud progetto in cui vengono creati l'attività Dataplex Universal Catalog e i job BigQuery. |
DATAPLEX_REGION_ID |
La regione del lake Dataplex Universal Catalog in cui viene creato il task di qualità dei dati. |
SERVICE_ACCOUNT |
Il account di servizio utilizzato per eseguire l'attività. Assicurati che questo service account disponga delle autorizzazioni IAM necessarie, come descritto nella sezione Prima di iniziare. |
Per --execution-args
, i seguenti argomenti devono essere passati come
argomenti posizionali e quindi in questo ordine:
Argomento | Descrizione |
---|---|
clouddq-executable.zip |
Un eseguibile precompilato passato in
spark-file-uris da un bucket Cloud Storage pubblico. |
ALL |
Esegui tutti i binding delle regole. In alternativa, puoi fornire associazioni di regole specifiche come elenco separato da virgole.
Ad esempio, RULE_1,RULE_2 . |
gcp-project-id |
L'ID progetto che esegue le query BigQuery. |
gcp-region-id |
Regione per l'esecuzione dei
job BigQuery per la convalida della qualità dei dati. Questa regione
deve essere la stessa di gcp-bq-dataset-id e
target_bigquery_summary_table . |
gcp-bq-dataset-id |
Set di dati BigQuery utilizzato per archiviare le viste rule_binding e i risultati intermedi del riepilogo della qualità dei dati. |
target-bigquery-summary-table |
Riferimento all'ID tabella della tabella BigQuery in cui sono archiviati i risultati finali dei controlli della qualità dei dati. Non utilizzare il valore ID
dq_summary perché è riservato alle attività di elaborazione interne. |
--summary_to_stdout |
(Facoltativo) Quando viene passato questo flag, tutte le righe dei risultati della convalida
create nell'ultima esecuzione nella tabella dq_summary vengono
registrate come record JSON in Cloud Logging e
stdout . |
API
Sostituisci quanto segue:
PROJECT_ID = "Your Dataplex Universal Catalog Project ID" REGION = "Your Dataplex Universal Catalog lake region" LAKE_ID = "Your Dataplex Universal Catalog lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Invia una richiesta HTTP POST:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Vedi anche DAG Airflow di esempio per l'attività di qualità dei dati di Dataplex Universal Catalog.
Monitorare un'attività di qualità dei dati pianificata
Scopri come monitorare l'attività.
Visualizza i risultati
I risultati delle convalide della qualità dei dati vengono archiviati nel set di dati BigQuery e nella tabella riepilogativa che hai specificato, come descritto in Crea un set di dati per archiviare i risultati. La tabella riepilogativa contiene il riepilogo dell'output per ogni combinazione di associazione di regole e regole per ogni esecuzione della convalida. L'output nella tabella riepilogativa include le seguenti informazioni:
Nome colonna | Descrizione |
---|---|
dataplex_lake |
(stringa) ID del lake Dataplex Universal Catalog contenente la tabella da convalidare. |
dataplex_zone |
(stringa) ID della zona Dataplex Universal Catalog contenente la tabella da convalidare. |
dataplex_asset_id |
(stringa) ID dell'asset Dataplex Universal Catalog contenente la tabella da convalidare. |
execution_ts |
(timestamp) Timestamp relativo alla data e all'ora di esecuzione della query di convalida. |
rule_binding_id |
(stringa) ID del binding della regola per cui vengono segnalati i risultati della convalida. |
rule_id |
(stringa) ID della regola nel binding della regola per cui vengono segnalati i risultati della convalida. |
dimension |
(stringa) Dimensione della qualità dei dati di rule_id . Questo valore può essere solo
uno dei valori specificati nel nodo YAML rule_dimensions . |
table_id |
(stringa) ID dell'entità per cui vengono riportati i risultati della convalida.
Questo ID è specificato nel parametro entity del
rispettivo binding della regola. |
column_id |
(stringa) ID della colonna per cui vengono riportati i risultati della convalida.
Questo ID è specificato nel parametro column del
rispettivo binding della regola. |
last_modified |
(timestamp) Il timestamp dell'ultima modifica di table_id
in fase di convalida. |
metadata_json_string |
(stringa) Coppie chiave-valore del contenuto del parametro dei metadati specificato nel binding della regola o durante l'esecuzione della qualità dei dati. |
configs_hashsum |
(stringa) La somma hash del documento JSON contenente l'associazione di regole
e tutte le regole, le associazioni di regole, i filtri delle righe e le configurazioni delle entità associati.
configs_hashsum consente di monitorare quando i contenuti di un ID rule_binding o di una delle relative configurazioni di riferimento sono stati modificati. |
dq_run_id |
(stringa) ID univoco del record. |
invocation_id |
(stringa) ID dell'esecuzione della qualità dei dati. Tutti i record di riepilogo della qualità dei dati
generati all'interno della stessa istanza di esecuzione della qualità dei dati condividono lo stesso
invocation_id . |
progress_watermark |
(booleano) Determina se questo particolare record viene preso in considerazione
dal controllo della qualità dei dati per determinare il limite massimo per
la convalida incrementale. Se FALSE , il record corrispondente viene
ignorato durante la definizione del valore del livello massimo. Queste informazioni sono utili quando
esegui le convalide della qualità dei dati di test che non devono far avanzare
il limite massimo. Il Catalogo universale Dataplex compila questo campo con
TRUE per impostazione predefinita, ma il valore può essere sostituito se l'argomento
--progress_watermark ha un valore di FALSE .
|
rows_validated |
(integer) Numero totale di record convalidati dopo l'applicazione di
row_filters e di eventuali filtri high-watermark sulla
colonna incremental_time_filter_column_id , se specificati. |
complex_rule_validation_errors_count |
(float) Numero di righe restituite da una regola CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(booleano) Stato di riuscita delle regole CUSTOM_SQL_STATEMENT .
|
success_count |
(integer) Numero totale di record che hanno superato la convalida. Questo campo
è impostato su NULL per le regole CUSTOM_SQL_STATEMENT .
|
success_percentage |
(float) Percentuale del numero di record che hanno superato la convalida
rispetto al numero totale di record convalidati. Questo campo è impostato su
NULL per le regole CUSTOM_SQL_STATEMENT . |
failed_count |
(integer) Numero totale di record la cui convalida non è riuscita. Questo campo
è impostato su NULL per le regole CUSTOM_SQL_STATEMENT .
|
failed_percentage |
(float) Percentuale del numero di record la cui convalida non è riuscita
rispetto al numero totale di record convalidati. Questo campo è impostato su
NULL per le regole CUSTOM_SQL_STATEMENT . |
null_count |
(integer) Numero totale di record che hanno restituito null durante la convalida.
Questo campo è impostato su NULL per le regole NOT_NULL e
CUSTOM_SQL_STATEMENT . |
null_percentage |
(float) Percentuale del numero di record che hanno restituito null durante la
convalida rispetto al numero totale di record convalidati. Questo campo è
impostato su NULL per le regole
NOT_NULL e CUSTOM_SQL_STATEMENT . |
failed_records_query |
Per ogni regola non soddisfatta, questa colonna memorizza una query che
puoi utilizzare per ottenere i record non riusciti. In questo documento, vedi
Risolvere i problemi relativi alle regole non riuscite con
failed_records_query . |
Per le entità BigQuery, viene creata una visualizzazione per ogni
rule_binding
contenente la logica di convalida SQL dell'ultima esecuzione. Puoi trovare queste viste nel set di dati BigQuery
specificato nell'argomento --gcp-bq-dataset-id
.
Ottimizzazioni dei costi
Le attività di qualità dei dati vengono eseguite come job BigQuery nel tuo progetto. Per controllare il costo di esecuzione dei job di qualità dei dati, utilizza i prezzi di BigQuery nel progetto in cui vengono eseguiti i job BigQuery. Per ulteriori informazioni, vedi Gestione dei carichi di lavoro BigQuery.
Convalide incrementali
Spesso, hai tabelle che vengono aggiornate regolarmente con nuove partizioni (nuove righe). Se non vuoi convalidare nuovamente le partizioni precedenti a ogni esecuzione, puoi utilizzare le convalide incrementali.
Per le convalide incrementali, devi avere una colonna di tipo TIMESTAMP
o DATETIME
nella tabella in cui il valore della colonna aumenta monotonicamente. Puoi utilizzare le colonne in base alle quali è partizionata la tabella BigQuery.
Per specificare la convalida incrementale, specifica un valore per
incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
come parte di un binding della regola.
Quando specifichi una colonna, l'attività di qualità dei dati prende in considerazione solo le righe con un valore di TIMESTAMP
maggiore del timestamp dell'ultima attività di qualità dei dati eseguita.
File di specifica di esempio
Per utilizzare questi esempi, crea un set di dati BigQuery
denominato sales
. Poi, crea una tabella dei fatti denominata sales_orders
e aggiungi
dati di esempio
eseguendo una query
con le seguenti istruzioni GoogleSQL:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Esempio 1
Il seguente esempio di codice crea controlli della qualità dei dati per convalidare questi valori:
amount
: i valori sono zero o numeri positivi.item_id
: una stringa alfanumerica di 5 caratteri alfabetici, seguita da 15 cifre.transaction_currency
: un tipo di valuta consentito, come definito da un elenco statico. L'elenco statico di questo esempio consente di utilizzare GBP e JPY come tipi di valuta. Questa convalida si applica solo alle righe contrassegnate come internazionali.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.DATASET_ID
: l'ID set di dati.
Esempio 2
Se la tabella da controllare fa parte di un lake Dataplex Universal Catalog, puoi specificare le tabelle utilizzando la notazione lake o zona. In questo modo puoi aggregare i risultati per lago o zona. Ad esempio, puoi generare un punteggio a livello di zona.
Per utilizzare questo esempio, crea un lake Dataplex Universal Catalog con l'ID lake
operations
e l'ID zona procurement
. Poi aggiungi la tabella sales_orders
come asset alla zona.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto.
- REGION_ID: l'ID regione del
lake Dataplex Universal Catalog in cui esiste la tabella, ad esempio
us-central1
.
Esempio 3
Questo esempio migliora l'esempio 2 aggiungendo un controllo SQL personalizzato per verificare se i valori ID sono univoci.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Esempio 4
Questo esempio migliora l'esempio 3 aggiungendo convalide incrementali utilizzando la colonna
last_modified_timestamp
. Puoi aggiungere convalide incrementali per uno o più binding delle regole.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Risolvere i problemi relativi alle regole non superate con failed_records_query
Per ogni regola non rispettata, la tabella riepilogativa
memorizza una query nella colonna failed_records_query
che puoi utilizzare per ottenere i record non riusciti.
Per eseguire il debug, puoi anche utilizzare reference columns
nel file YAML,
che ti consente di unire l'output di failed_records_query
con
i dati originali per ottenere l'intero record. Ad esempio, puoi specificare
una colonna primary_key
o una colonna primary_key
composta
come colonna di riferimento.
Specifica le colonne di riferimento
Per generare colonne di riferimento, puoi aggiungere quanto segue alla specifica YAML:
La sezione
reference_columns
. In questa sezione puoi creare uno o più set di colonne di riferimento, ognuno dei quali specifica una o più colonne.La sezione
rule_bindings
. In questa sezione, puoi aggiungere una riga a un binding di regole che specifica un ID colonna di riferimento (reference_columns_id
) da utilizzare per le regole in quel binding di regole. Deve essere uno dei set di colonne di riferimento specificati nella sezionereference_columns
.
Ad esempio, il seguente file YAML specifica una sezione reference_columns
e
definisce tre colonne: id
, last_modified_timestamp
e
item_id
come parte del set ORDER_DETAILS_REFERENCE_COLUMNS
. L'esempio
seguente utilizza la tabella di esempio sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Utilizzo della query dei record non riusciti
La query sui record non riusciti genera una riga per ogni record con una regola non riuscita. Include il nome della colonna che ha causato l'errore, il valore che ha causato l'errore e i valori delle colonne di riferimento. Include anche metadati che puoi utilizzare per correlare l'esecuzione dell'attività di qualità dei dati.
Di seguito è riportato un esempio dell'output di una query sui record non riuscita per il file YAML
descritto in Specifica le colonne di riferimento. Mostra un
errore per la colonna amount
e un valore non riuscito di -10
. Registra anche
il valore corrispondente per la colonna di riferimento.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | ID | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | quantità | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Utilizzare le query sui record non riusciti per le regole CUSTOM_SQL_STATEMENT
Per le regole CUSTOM_SQL_STATEMENT
, le query sui record non riuscite includono la colonna custom_sql_statement_validation_errors
. La
colonna custom_sql_statement_validation_errors
è una colonna nidificata con campi che
corrispondono all'output dell'istruzione SQL. Le colonne di riferimento non sono incluse nelle query dei record non riusciti
per le regole CUSTOM_SQL_STATEMENT
.
Ad esempio, la regola CUSTOM_SQL_STATEMENT
potrebbe avere il seguente aspetto:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
, con una riga per ogni occorrenza di existing_id!=replacement_id
.
Quando viene eseguito il rendering in JSON, i contenuti di una cella di questa colonna potrebbero avere questo aspetto:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Puoi unire questi risultati alla tabella originale con un riferimento nidificato come join on custom_sql_statement_valdation_errors.product_key
.
Passaggi successivi
- Consulta il riferimento alle specifiche YAML di CloudDQ.
- Per esempi di regole di qualità dei dati, vedi Regole semplici e Regole avanzate.
- Consulta DAG Airflow di esempio per l'attività di qualità dei dati di Dataplex Universal Catalog.