Questo documento mostra come creare attività relative alla qualità dei dati di Dataplex che ti consentono di pianificare ed eseguire controlli di qualità dei dati per le tabelle BigQuery integrate ed esterne.
Per ulteriori informazioni, consulta Panoramica delle attività sulla qualità dei dati.
Prima di iniziare
Abilitare l'API Dataproc.
Abilita l'accesso privato Google per la tua rete e/o la tua subnet. Abilita l'accesso privato Google sulla rete che prevedi di utilizzare con le attività relative alla qualità dei dati di Dataplex. Se non specifichi una rete o una subnet quando crei l'attività relativa alla qualità dei dati di Dataplex, Dataplex utilizza la subnet predefinita. In questo caso, devi abilitare l'accesso privato Google sulla subnet predefinita.
Crea un file di specifiche
Dataplex utilizza open source CloudDQ come programma del driver. I requisiti di controllo della qualità dei dati di Dataplex
sono definiti all'interno dei file delle specifiche YAML CloudDQ. Puoi creare un
file di specifica in formato YAML o zip oppure un singolo archivio ZIP contenente uno o più file YAML in formato .yml
o .yaml
, in un percorso di Cloud Storage.
Il file delle specifiche YAML CloudDQ deve avere le seguenti sezioni:
Regole (definiti nel nodo YAML
rules:
di primo livello): un elenco di regole da eseguire. Puoi creare queste regole da tipi di regole predefiniti, comeNOT_NULL
eREGEX
, oppure puoi estenderle con istruzioni SQL personalizzate comeCUSTOM_SQL_EXPR
eCUSTOM_SQL_STATEMENT
. L'istruzioneCUSTOM_SQL_EXPR
contrassegna come non riuscita ogni riga checustom_sql_expr
ha valutatoFalse
. L'istruzioneCUSTOM_SQL_STATEMENT
contrassegna qualsiasi errore restituito dall'intera istruzione come errore.Filtri di riga (definiti nel nodo YAML
row_filters:
di primo livello): espressioni SQL che restituiscono un valore booleano che definisce i filtri per il recupero di un sottoinsieme di dati dall'entità sottostante per la convalida.Associazioni di regole (definiti nel nodo YAML
rule_bindings:
di primo livello): definiscerules
erule filters
da applicare alle tabelle.- Dimensioni regola (definita nel nodo YAML
rule_dimensions
): definisce l'elenco consentito di dimensioni della regola per la qualità dei dati che una regola può definire nel campodimension
corrispondente.
Ad esempio:rule_dimensions: - consistency - correctness - duplication - completeness - conformance
Il campodimension
è facoltativo per una regola. La sezione Dimensioni regola è obbligatoria se in qualsiasi regola è presente il nomedimension
.
Consulta la guida di riferimento per ulteriori dettagli sulla specifica YAML.
Per quanto riguarda l'attività relativa alla qualità dei dati, puoi avere un singolo file YAML in formato .yml
o .yaml
o un singolo archivio ZIP contenente uno o più file YAML. Ti
consigliamo di acquisire i requisiti per il controllo della qualità dei dati in file di specifiche YAML
separati, con un file per ogni sezione.
Per alcuni esempi, vedi gli esempi di file delle specifiche.
Archivia i risultati
Per archiviare i risultati, crea un set di dati BigQuery. Dataplex utilizza questo set di dati e crea o riutilizza una tabella a tua scelta per archiviare i risultati.
Crea un account di servizio
Puoi creare un account di servizio con i seguenti ruoli e autorizzazioni IAM (Identity and Access Management):
- Accesso in lettura al percorso Cloud Storage contenente le specifiche YAML. Puoi utilizzare il ruolo di Visualizzatore oggetti Storage
(
roles/storage.objectViewer
) nel bucket Cloud Storage. - Accesso in lettura ai set di dati BigQuery con dati da convalidare. Puoi utilizzare il ruolo Visualizzatore dati BigQuery.
- Accesso in scrittura al set di dati BigQuery per creare una tabella (se necessario) e scrivere i risultati in tale tabella. Puoi
utilizzare il ruolo di Editor dati BigQuery
(
roles/bigquery.dataEditor
) a livello di set di dati. - Ruolo utente job BigQuery (
roles/bigquery.jobUser
) a livello di progetto per creare job BigQuery in un progetto. - Il ruolo Lettore metadati Dataplex (
roles/dataplex.metadataReader
) a livello di progetto o lake. - Il ruolo consumer di utilizzo dei servizi (
roles/serviceusage.serviceUsageConsumer
) a livello di progetto. - Il ruolo Worker Dataproc.
- L'autorizzazione
iam.serviceAccounts.actAs
concessa all'utente che invia il job. - Il ruolo utente dell'account di servizio concesso all'account di servizio Lake di Dataplex. Puoi visualizzare l'account di servizio Lake di Dataplex nella console Google Cloud.
(Facoltativo) Utilizzare le impostazioni avanzate
Questi passaggi sono facoltativi:
Per impostazione predefinita, BigQuery esegue i controlli di qualità dei dati nell'attuale progetto utente. In alternativa, puoi scegliere un progetto diverso per eseguire i job BigQuery utilizzando l'argomento
TASK_ARGS
--gcp_project_id
per la proprietà--execution-args
dell'attività.Se l'ID progetto specificato per eseguire le query BigQuery è diverso da quello in cui viene creato l'account di servizio (specificato da
–-execution-service-account
), assicurati che il criterio dell'organizzazione che disattiva l'utilizzo degli account di servizio tra progetti (iam.disableServiceAccountCreation
). Disattiva inoltre l'account di servizio per assicurarti che possa accedere alla pianificazione dei job di BigQuery nel progetto in cui vengono eseguite le query BigQuery.
Limitazioni
- Tutte le tabelle specificate per una determinata attività della qualità dei dati devono appartenere alla stessa regione di Google Cloud.
- Per evitare errori di esecuzione, segui questi passaggi:
- La tabella che archivia le metriche di output si trova nella stessa regione Google Cloud.
Pianificare un'attività relativa alla qualità dei dati
Console
- Nella console Google Cloud, vai alla pagina Procedura di Dataplex.
- Fai clic su Crea attività.
- Nella scheda Controlla la qualità dei dati, fai clic su Crea attività.
- Per lake Dataplex, scegli il tuo lake.
- Inserisci un ID in ID.
- Nella sezione Specifica della qualità dei dati, procedi nel seguente modo:
- Nel campo Seleziona file GCS, fai clic su Sfoglia.
Seleziona il bucket Cloud Storage.
Fai clic su Seleziona.
Nella sezione Tabella dei risultati, procedi nel seguente modo:
Nel campo Seleziona set di dati BigQuery, fai clic su Sfoglia.
Seleziona il set di dati BigQuery per archiviare i risultati della convalida.
Fai clic su Seleziona.
(Facoltativo) Anziché navigare, inserisci un nome nel campo Tabella BigQuery. Se la tabella non esiste, Dataplex la crea per te.
Nella sezione Account di servizio, seleziona un account di servizio dal menu Account di servizio utente.
Fai clic su Continua.
Interfaccia a riga di comando gcloud
Di seguito è riportato un esempio di esecuzione di un'attività relativa alla qualità dei dati che utilizza il comando dell'interfaccia a riga di comando gcloud CLI delle attività Dataplex:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="operations" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="data_quality_summary_dataset" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="data_quality_summary_dataset" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="data_quality_summary" # The unique identifier for the task. export TASK_ID="test-clouddq-task" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parametro | Descrizione |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Il percorso di Cloud Storage al tuo input di configurazione YAML della qualità dei dati per l'attività relativa alla qualità dei dati. Puoi avere un singolo file YAML in formato .yml o .yaml o un archivio ZIP contenente più file YAML. |
GOOGLE_CLOUD_PROJECT |
Il progetto Google Cloud in cui vengono create l'attività Dataplex e i job BigQuery. |
DATAPLEX_REGION_ID |
La regione del lake Dataplex in cui viene creata l'attività di qualità dei dati. |
SERVICE_ACCOUNT |
L'account di servizio utilizzato per eseguire l'attività. Assicurati che questo account di servizio disponga di autorizzazioni IAM sufficienti, come descritto nella sezione Prima di iniziare. |
Per --execution-args
, i seguenti argomenti devono essere passati come argomenti posizionati, quindi in questo ordine:
Argomento | Descrizione |
---|---|
clouddq-executable.zip |
Un eseguibile precompilato che è stato passato in spark-file-uris da un bucket Cloud Storage pubblico. |
ALL |
Eseguire tutte le associazioni di regole. In alternativa, puoi fornire associazioni di regole specifiche come elenco separato da virgole.
Ad esempio, RULE_1,RULE_2 . |
gcp-project-id |
ID progetto che esegue le query BigQuery. |
gcp-region-id |
Regione per l'esecuzione dei job BigQuery per la convalida della qualità dei dati. Questa regione deve essere uguale a quella di gcp-bq-dataset-id e target_bigquery_summary_table . |
gcp-bq-dataset-id |
Set di dati BigQuery utilizzato per archiviare i risultati di riepilogo della qualità dei dati intermedi e rule_binding viste. |
target-biggquery-summary-table |
Riferimento ID tabella della tabella BigQuery in cui sono archiviati i risultati finali dei controlli di qualità dei dati. |
target-biggquery-summary-table |
Riferimento ID tabella della tabella BigQuery in cui sono archiviati i risultati finali dei controlli di qualità dei dati. |
--summary_to_stdout |
(Facoltativo) Una volta superato questo flag, tutte le righe dei risultati di convalida create nella tabella dq_summary nell'ultima esecuzione vengono registrate come record JSON in Cloud Logging e stdout . |
API
Sostituisci quanto segue:
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex Lake Region" LAKE_ID = "Your Dataplex Lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name having the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the dq summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Invia una richiesta POST HTTP:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Vedi anche Esempio di DAG Airflow per l'attività relativa alla qualità dei dati in Dataplex.
Monitorare un'attività di qualità dei dati pianificata
Scopri come monitorare la tua attività.
Visualizza i risultati
La tabella di riepilogo memorizza i risultati delle convalide della qualità dei dati. Contiene il riepilogo dell'output per ogni combinazione di associazione di regole e regola per ogni esecuzione di convalida. L'output nella tabella di riepilogo è strutturato come segue:
Nome colonna | Descrizione |
---|---|
dataplex_lake |
(stringa) ID del lake Dataplex contenente la tabella da convalidare. |
dataplex_zone |
(stringa) ID della zona Dataplex contenente la tabella convalidata. |
dataplex_asset_id |
(stringa) ID dell'asset Dataplex contenente la tabella da convalidare. |
execution_ts |
(timestamp) Timestamp di quando è stata eseguita la query di convalida. |
rule_binding_id |
(stringa) ID dell'associazione della regola per la quale vengono riportati i risultati della convalida. |
rule_id |
(stringa) ID della regola nell'associazione della regola per la quale vengono riportati i risultati della convalida. |
dimension |
(stringa) Dimensione della qualità dei dati di rule_id . Questo valore può essere solo uno dei valori specificati nel nodo YAML rule_dimensions . |
table_id |
(stringa) ID dell'entità per la quale vengono riportati i risultati della convalida.
Questo ID è specificato nel parametro entity della rispettiva associazione di regole. |
column_id |
(stringa) ID della colonna per la quale vengono riportati i risultati della convalida.
Questo ID è specificato nel parametro column della rispettiva associazione di regole. |
last_modified |
(timestamp) L'ultimo timestamp modificato dell'oggetto table_id
in fase di convalida. |
metadata_json_string |
(stringa) Coppie chiave-valore del contenuto del parametro dei metadati specificato nell'associazione delle regole o durante l'esecuzione della qualità dei dati. |
configs_hashsum |
(stringa) La somma hash del documento JSON contenente l'associazione di regole e tutte le regole associate, associazioni di regole, filtri di riga e configurazioni di entità.
code>configs_hashsum consente di monitorare quando i contenuti di un ID rule_binding o di una delle sue configurazioni a cui viene fatto riferimento sono cambiati. |
dq_run_id |
(stringa) ID univoco del record. |
invocation_id |
(stringa) ID dell'esecuzione della qualità dei dati. Tutti i record di riepilogo della qualità dei dati generati all'interno della stessa istanza di esecuzione della qualità dei dati condividono lo stesso invocation_id . |
progress_watermark |
(booleano) determina se questo record specifico è considerato dal controllo della qualità dei dati per determinare la filigrana alta per la convalida incrementale. Se FALSE , il rispettivo record viene ignorato durante la determinazione del valore di filigrana. Queste informazioni sono utili durante l'esecuzione di convalide della qualità dei dati di test che non devono migliorare la filigrana alta. Dataplex compila questo campo con
TRUE per impostazione predefinita, ma il valore può essere sostituito se
l'argomento --progress_watermark ha un valore FALSE .
|
rows_validated |
(numero intero) Numero totale di record convalidati dopo l'applicazione di row_filters e l'eventuale presenza di filtri watermark nella colonna incremental_time_filter_column_id , se specificato. |
complex_rule_validation_errors_count |
(mobile) Numero di righe restituite da una regola CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(booleano) Stato delle regole CUSTOM_SQL_STATEMENT riuscito.
|
success_count |
(numero intero) Numero totale di record che hanno superato la convalida. Questo campo è impostato su NULL per CUSTOM_SQL_STATEMENT regole.
|
success_percentage |
(Mobile) Percentuale del numero di record che hanno superato la convalida rispetto al numero totale di record convalidati. Questo campo è impostato su NULL per CUSTOM_SQL_STATEMENT regole. |
failed_count |
(numero intero) Numero totale di record che non hanno superato la convalida. Questo campo è impostato su NULL per CUSTOM_SQL_STATEMENT regole.
|
failed_percentage |
(Mobile) Percentuale del numero di record non convalidati all'interno del numero totale di record convalidati. Questo campo è impostato su NULL per CUSTOM_SQL_STATEMENT regole. |
null_count |
(numero intero) Numero totale di record che sono stati restituiti nulli durante la convalida.
Questo campo è impostato su NULL per NOT_NULL e CUSTOM_SQL_STATEMENT regole. |
null_percentage |
(Flottante) Percentuale del numero di record che sono stati restituiti nulli durante
la convalida all'interno del numero totale di record convalidati. Questo campo è impostato su NULL per NOT_NULL e CUSTOM_SQL_STATEMENT regole. |
failed_records_query |
Per ogni regola che non riesce, questa colonna archivia una query che puoi utilizzare per ottenere i record non riusciti. In questo documento, consulta la pagina Risolvere i problemi relativi a failed_records_query . |
Per le entità BigQuery viene creata una vista per ogni
rule_binding
contenente la logica di convalida SQL dell'ultima
esecuzione. Puoi trovare queste viste nel set di dati BigQuery specificato nell'argomento --gcp-bq-dataset-id
.
Ottimizzazioni dei costi
Puoi ridurre i costi con le seguenti ottimizzazioni.
Convalide incrementali
Spesso hai tabelle aggiornate regolarmente con nuove partizioni (nuove righe). Se non vuoi riconvalidare le partizioni precedenti in ogni esecuzione, puoi utilizzare le convalide incrementali.
Per le convalide incrementali, devi avere una colonna di tipo TIMESTAMP
o DATETIME
nella tabella in cui il valore della colonna aumenta in modo monotonico. Puoi utilizzare le colonne su cui è partizionata la tabella BigQuery.
Per specificare la convalida incrementale, specifica un valore per
incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
nell'ambito di un'associazione delle regole.
Quando specifichi una colonna, l'attività relativa alla qualità dei dati considera solo le righe con un valore TIMESTAMP
maggiore del timestamp dell'ultima attività relativa alla qualità dei dati.
Utilizzo delle prenotazioni di slot
Quando utilizzi le convalide incrementali, puoi notare una riduzione significativa dei tempi delle query. Tuttavia, a causa di una limitazione attuale, non vedrai alcuna riduzione dei byte elaborati. Pertanto, per risparmiare sui costi delle query, ti consigliamo di utilizzare gli slot riservati.
Esempi di file di specifiche
Crea una tabella di dati sales_orders
con la seguente struttura:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Esempio 1
Il seguente esempio di codice crea controlli di qualità dei dati per la convalida di questi valori:
amount
: i valori sono zero o numeri positivi.item_id
: una stringa alfanumerica di 5 caratteri alfabetici, seguita da 15 cifre.transaction_currency
: un tipo di valuta consentito, come definito da un elenco statico. L'elenco statico di questo campione consente GBP e JPY come tipo di valuta. Questa convalida si applica solo alle righe contrassegnate come internazionali.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/<project-id>/datasets/<dataset-id>/tables/sales_orders
# Replace <location-id> with your region.
# Replace <dataset-id> with your dataset identifier.
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/<project-id>/datasets/<dataset-id>/tables/sales_orders
# Replace <location-id> with your region.
# Replace <dataset-id> with your dataset identifier.
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/<project-id>/datasets/<dataset-id>/tables/sales_orders
# Replace <location-id> with your region.
# Replace <dataset-id> with your dataset identifier.
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Esempio 2
Se la tabella da controllare fa parte di un lake Dataplex, puoi specificare le tabelle utilizzando la notazione lake o la notazione di zona. In questo modo puoi aggregare i risultati per lake o zona. Ad esempio, puoi generare un punteggio a livello di zona.
Questo esempio presuppone che la tabella sales_order
si trovi nel lake Dataplex operations
e nella zona procurement
.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: <project-id> # Replace "project-id" with your project ID.
locations: <region-id> # Replace "region-id" with the region ID of the Dataplex lake in which the table exists. For example, us-central1.
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/<project-id>/locations/<region-id>/lakes/operations/zones/procurement/entities/sales_orders # replace "project-id" with your project ID and "region-id" with your region
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Esempio 3
Questo esempio migliora il Campione 2 aggiungendo un controllo SQL personalizzato per verificare se i valori degli ID sono univoci.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: <project-id> # Replace "project-id" with your project ID.
locations: <region-id> # Replace "region-id" with the region ID of the Dataplex lake in which the table exists. For example, us-central1.
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/<project-id>/locations/<region-id>/lakes/operations/zones/procurement/entities/sales_orders # replace "project-id" with your project ID and "region-id" with your region
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Esempio 4
Questo esempio migliora il Campione 3 aggiungendo convalide incrementali utilizzando la colonna last_modified_timestamp
. Puoi aggiungere convalide incrementali per una
o più associazioni di regole.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: <project-id> # Replace "project-id" with your project ID.
locations: <region-id> # Replace "region-id" with the region ID of the Dataplex lake in which the table exists. For example, us-central1.
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/<project-id>/locations/<region-id>/lakes/operations/zones/procurement/entities/sales_orders # replace "project-id" with your project ID and "region-id" with your region
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Risolvi i problemi con failed_records_query
Per ogni regola che non riesce, la tabella di riepilogo memorizza una query nella colonna failed_records_query
che puoi utilizzare per ottenere i record non riusciti.
Per eseguire il debug, puoi anche utilizzare reference columns
nel file YAML, che ti consente di unire l'output di failed_records_query
con i dati originali per ottenere l'intero record. Ad esempio, puoi specificare una colonna primary_key
o una colonna primary_key
composta come colonna di riferimento.
Specifica colonne di riferimento
Per generare colonne di riferimento, puoi aggiungere quanto segue alla specifica YAML:
La sezione
reference_columns
. In questa sezione puoi creare uno o più insiemi di colonne di riferimento, ognuno dei quali specifica una o più colonne.La sezione
rules_bindings
. In questa sezione, puoi aggiungere una riga a un'associazione di regole che specifica un ID colonna di riferimento (reference_columns_id
) da utilizzare per le regole nell'associazione di questa regola. Deve essere uno degli insiemi di colonne di riferimento specificati nella sezionereference_columns
.
Ad esempio, il seguente file YAML specifica una sezione reference_columns
e definisce tre colonne: id
, last_modified_timestamp
e item_id
come parte dell'insieme ORDER_DETAILS_REFERENCE_COLUMNS
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/<project_id>/datasets/<dataset_id>/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Utilizzo della query sui record non riusciti
La query sui record non riusciti genera una riga per ogni record che ha una regola non riuscita. Include il nome della colonna che ha attivato l'errore, il valore che lo ha attivato e i valori delle colonne di riferimento. Include anche i metadati che puoi utilizzare per eseguire l'attività relativa alla qualità dei dati.
Di seguito è riportato un esempio di output di una query sui record non riusciti per il file YAML
descritto in Specificare le colonne di riferimento. Mostra
un errore per la colonna amount
e un valore di -10
non riuscito. Registra anche il valore corrispondente per la colonna di riferimento.
_dq_validation_invocation_id | id_regolazione_validità_dq | ID_regola_validità_dq | ID_colonna_validità_dq | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | timestamp_ultimo_modificato | id_articolo |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALORE_VALORE_ZERO_O_POSITIVO | importo | -10 | FALSE | ordine1 | 2022-01-22T02:30:06.321Z | id_articolo_non valido |
Utilizza le query sui record non riusciti per le regole CUSTOM_SQL_STATEMENT
Per le regole CUSTOM_SQL_STATEMENT
, le query sui record non riuscite includono la colonna custom_sql_statement_validation_errors
. La colonna custom_sql_statement_validation_errors
è una colonna nidificata con campi che corrispondono all'output della tua istruzione SQL. Le colonne di riferimento non sono incluse nelle query sui record non riuscite per CUSTOM_SQL_STATEMENT
regole.
Ad esempio, la regola CUSTOM_SQL_STATEMENT
potrebbe avere il seguente aspetto:
rules:
TEST_RULE:
rule_type: CUSTOM_SQL_STATEMENT
custom_sql_arguments:
- existing_id
- replacement_id
params:
CUSTOM_SQL_STATEMENT: |-
(SELECT product_name, product_key FROM data
where $existing_id != $replacement_id)
I risultati per questo esempio conterranno una o più righe per la colonna custom_sql_statement_validation_errors
, con una riga per ogni occorrenza in cui existing_id!=replacement_id
.
Quando viene eseguito il rendering in JSON, il contenuto di una cella in questa colonna potrebbe avere il seguente aspetto:
{
"custom_sql_statement_valdation_errors" :{
"product_name"="abc"
"product_key"="12345678"
"_rule_binding_id"="your_rule_binding"
}
}
Puoi unire questi risultati alla tabella originale con un riferimento nidificato come join on custom_sql_statement_valdation_errors.product_key
.
Passaggi successivi
- Riferimento alle specifiche per MLA
- Esempio di regole per la qualità dei dati: vedi Regole semplici o Regole avanzate.
- Esempio di DAG Airflow per le attività relative alla qualità dei dati in Dataplex