Questo documento mostra come creare attività di qualità dei dati Dataplex che ti consentono di pianificare ed eseguire controlli della qualità dei dati per i tuoi tabelle BigQuery esterne.
Per ulteriori informazioni, vedi Panoramica delle attività relative alla qualità dei dati.
Prima di iniziare
Questo documento presuppone che tu abbia un lake Dataplex esistente per creare l'attività di qualità dei dati.
Prima di creare un'attività di qualità dei dati, segui questi passaggi.
Abilita le API e i servizi Google
Abilitare l'API Dataproc.
Abilitare l'accesso privato Google per la tua rete e/o la tua subnet. Abilita accesso privato Google su la rete che prevedi di utilizzare con la qualità dei dati Dataplex attività di machine learning. Se non specifichi una rete o una subnet quando crei Attività di qualità dei dati Dataplex, Dataplex utilizza predefinita. In questo caso, devi attivare Accesso privato Google sulla subnet predefinita.
Crea un file di specifiche
Dataplex utilizza CloudDQ open source come programma driver. Requisiti per il controllo della qualità dei dati Dataplex sono definiti all'interno dei file delle specifiche YAML di CloudDQ.
Come input per l'attività di qualità dei dati, puoi avere un singolo file YAML o un singolo archivio ZIP contenente uno o più file YAML. È ti consigliamo di acquisire i requisiti di controllo della qualità dei dati in una specifica YAML separata file, con un file per ogni sezione.
Per preparare un file delle specifiche:
-
Crea uno o più file di specifiche YAML CloudDQ che definiscono il controllo di qualità dei dati i tuoi requisiti. Per ulteriori informazioni sulla sintassi richiesta, consulta Sezione Informazioni sul file delle specifiche di questo documento documento.
Salva il file delle specifiche YAML in formato
.yml
o.yaml
. Se creare più file di specifiche YAML, salvare tutti i file in un unico archivio ZIP. - Crea un bucket Cloud Storage.
- Carica il file delle specifiche sul nel bucket Cloud Storage.
Informazioni sul file delle specifiche
Il file delle specifiche YAML CloudDQ deve contenere le seguenti sezioni:
Regole (definite nel nodo YAML
rules
di primo livello): un elenco di regole per vengono eseguiti tutti i test delle unità. Puoi creare queste regole da tipi di regole predefinite, ad esempioNOT_NULL
eREGEX
, oppure puoi estenderle con istruzioni SQL personalizzate comeCUSTOM_SQL_EXPR
eCUSTOM_SQL_STATEMENT
. L'istruzioneCUSTOM_SQL_EXPR
segnala qualsiasi riga checustom_sql_expr
ha valutato comeFalse
come errore. La L'istruzioneCUSTOM_SQL_STATEMENT
segnala qualsiasi valore restituito dall'intero come errore.Filtri di riga (definiti nel nodo YAML
row_filters
di primo livello): SQL che restituiscono un valore booleano che definisce i filtri per recuperare un sottoinsieme di dell'entità sottostante per la convalida.Associazioni di regole (definite nel nodo YAML
rule_bindings
di primo livello): Definiscerules
erule filters
da applicare alle tabelle.Dimensioni regola (definite nel nodo YAML
rule_dimensions
): indica le dimensioni consentite l'elenco delle dimensioni delle regole di qualità dei dati che una regola può definire neldimension
.Ad esempio:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
Il campo
dimension
è facoltativo per una regola. Sezione Dimensioni regola è obbligatorio sedimension
è elencato in una regola.
Per ulteriori informazioni, consulta Guida di riferimento di CloudDQ e i file delle specifiche di esempio.
crea un set di dati per archiviare i risultati
-
Per archiviare i risultati, crea un set di dati BigQuery.
Il set di dati deve trovarsi nella stessa regione delle tabelle su cui esegui l'attività di qualità dei dati.
Dataplex utilizza questo set di dati e crea o riutilizza una tabella a tua scelta per archiviare i risultati.
Crea un account di servizio
Crea un account di servizio con quanto segue. Ruoli e autorizzazioni di Identity and Access Management (IAM):
- Accesso in lettura al percorso Cloud Storage contenente il file YAML
specifiche. Puoi utilizzare il ruolo Visualizzatore oggetti Storage
(
roles/storage.objectViewer
) sul bucket Cloud Storage. - Accesso in lettura ai set di dati BigQuery con i dati da convalidare. Puoi utilizzare il ruolo Visualizzatore dati BigQuery.
- Accesso in scrittura al set di dati BigQuery per creare una tabella
(se necessario) e scrivi i risultati nella tabella. Puoi
Utilizza il ruolo Editor dati BigQuery
(
roles/bigquery.dataEditor
) a livello del set di dati. - Ruolo Utente job BigQuery (
roles/bigquery.jobUser
) a livello di progetto per creare job BigQuery in un progetto. - Ruolo Lettore metadati Dataplex (
roles/dataplex.metadataReader
) a livello di progetto o lake. - Il ruolo Consumer utilizzo dei servizi
(
roles/serviceusage.serviceUsageConsumer
) a livello di progetto. - Il ruolo Worker Dataproc.
- L'autorizzazione
iam.serviceAccounts.actAs
assegnati all'utente che invia il job. - Il ruolo Utente account di servizio concesso all'account di servizio del lake Dataplex. Puoi Visualizzare l'account di servizio del lake Dataplex nella console Google Cloud.
(Facoltativo) Utilizzare le impostazioni avanzate
Questi passaggi sono facoltativi:
BigQuery esegue i controlli di qualità dei dati nel progetto utente corrente per impostazione predefinita. In alternativa, puoi scegliere un progetto diverso in cui eseguire Job BigQuery mediante
--gcp_project_id
TASK_ARGS
per la proprietà--execution-args
dell'attività.Se l'ID progetto specificato per eseguire le query BigQuery è diverso dal progetto in cui è presente l'account di servizio (specificato da
--execution-service-account
) viene creato, assicurati che criterio dell'organizzazione che disattiva l'utilizzo degli account di servizio tra progetti (iam.disableServiceAccountCreation
) non è attivo. Inoltre, garantire che l'account di servizio possa accedere Pianificazione del job BigQuery nel progetto in cui Le query di BigQuery sono in fase di esecuzione.
Limitazioni
- Tutte le tabelle specificate per una determinata attività di qualità dei dati devono appartenere al nella stessa regione Google Cloud.
Pianifica un'attività di qualità dei dati
Console
- Nella console Google Cloud, vai alla pagina Processo di Dataplex.
- Fai clic su Crea attività.
- Nella scheda Controlla la qualità dei dati, fai clic su Crea attività.
- Per Lake Dataplex, scegli il tuo lake.
- In ID, inserisci un ID.
- Nella sezione Specifica della qualità dei dati, segui questi passaggi:
- .
- Nel campo Seleziona file GCS, fai clic su Sfoglia.
Seleziona il bucket Cloud Storage.
Fai clic su Seleziona.
Nella sezione Tabella dei risultati, segui questi passaggi:
Nel campo Seleziona set di dati BigQuery, fai clic su Sfoglia.
Seleziona il set di dati BigQuery per archiviare i risultati della convalida.
Fai clic su Seleziona.
Nel campo Tabella BigQuery, inserisci il nome della tabella in cui archiviare i risultati. Se la tabella non esiste, Dataplex la crea per te. Non utilizzare il nome
dq_summary
perché è riservata ad attività di elaborazione interna.
Nella sezione Account di servizio, seleziona un account di servizio Menu Account di servizio utente.
Fai clic su Continua.
Nella sezione Imposta pianificazione, configura la pianificazione per l'esecuzione dei dati attività di qualità.
Fai clic su Crea.
Interfaccia a riga di comando gcloud
Di seguito è riportato un esempio di esecuzione di un'attività di qualità dei dati che utilizza Comando gcloud CLI delle attività Dataplex:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parametro | Descrizione |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Percorso Cloud Storage del file YAML della qualità dei dati
l'input delle configurazioni
per l'attività di qualità dei dati. Puoi avere un
singolo file YAML in formato .yml o .yaml o in un archivio ZIP
contenenti più file YAML. |
GOOGLE_CLOUD_PROJECT |
Il progetto Google Cloud in cui l'attività Dataplex e Vengono creati i job BigQuery. |
DATAPLEX_REGION_ID |
La regione del lake Dataplex in cui viene creata l'attività di qualità dei dati. |
SERVICE_ACCOUNT |
L'account di servizio utilizzato per eseguire l'attività. Assicurati che questo servizio disponga di autorizzazioni IAM sufficienti, come descritto in la sezione Prima di iniziare. |
Per --execution-args
, i seguenti argomenti devono essere passati come
argomenti posizionati, quindi in questo ordine:
Argomento | Descrizione |
---|---|
clouddq-executable.zip |
Un eseguibile precompilato che è stato passato in
spark-file-uris da un ambiente Cloud Storage pubblico
di sincronizzare la directory di una VM
con un bucket. |
ALL |
Esegui tutte le associazioni di regole. In alternativa,
può fornire associazioni di regole specifiche sotto forma di elenco separato da virgole.
Ad esempio, RULE_1,RULE_2 . |
gcp-project-id |
ID progetto che esegue le query di BigQuery. |
gcp-region-id |
Regione per l'esecuzione di
Job BigQuery per la convalida della qualità dei dati. Questa regione
deve essere la stessa della regione per gcp-bq-dataset-id e
target_bigquery_summary_table . |
gcp-bq-dataset-id |
set di dati BigQuery
viene utilizzato per archiviare le viste rule_binding e i valori intermedi
dei risultati di riepilogo della qualità dei dati. |
target-bigquery-summary-table |
Riferimento all'ID tabella della tabella BigQuery in cui
vengono archiviati i risultati finali
dei controlli sulla qualità dei dati. Non utilizzare il valore ID
dq_summary perché è riservata ad attività di elaborazione interna. |
--summary_to_stdout |
(Facoltativo) Una volta superato questo flag, tutte le righe dei risultati della convalida
create nella tabella dq_summary nell'ultima esecuzione sono
registrati come record JSON in Cloud Logging
stdout . |
API
Sostituisci quanto segue:
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Invia una richiesta POST HTTP:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Vedi anche Esempio di attività di qualità dei dati di Airflow per Dataplex.
Monitorare un'attività pianificata relativa alla qualità dei dati
Scopri come monitorare l'attività.
Visualizza i risultati
I risultati delle convalide della qualità dei dati vengono archiviati nel set di dati BigQuery e la tabella di riepilogo specificata, come descritto Crea un set di dati per archiviare i risultati. La tabella di riepilogo contiene riepilogo dell'output per ogni combinazione di associazione di regole e regola per ogni convalida vengono eseguiti tutti i test delle unità. L'output nella tabella di riepilogo include le seguenti informazioni:
Nome colonna | Descrizione |
---|---|
dataplex_lake |
(stringa) ID del lake Dataplex contenente la tabella in fase di convalida. |
dataplex_zone |
(stringa) ID della zona Dataplex contenente la tabella in fase di convalida. |
dataplex_asset_id |
(stringa) ID dell'asset Dataplex contenente la tabella in fase di convalida. |
execution_ts |
(timestamp) Timestamp dell'esecuzione della query di convalida. |
rule_binding_id |
(stringa) ID dell'associazione di regole per la quale sono stati visualizzati i risultati della convalida segnalato. |
rule_id |
(stringa) ID della regola nell'ambito dell'associazione di regole per la quale vengono riportati i risultati. |
dimension |
(stringa) Dimensione della qualità dei dati di rule_id . Questo valore può essere solo
uno dei valori specificati nel nodo YAML rule_dimensions . |
table_id |
(stringa) ID dell'entità per la quale sono segnalati i risultati della convalida.
Questo ID viene specificato nel parametro entity della
la rispettiva associazione di regole. |
column_id |
(stringa) ID della colonna per la quale sono segnalati i risultati di convalida.
Questo ID viene specificato nel parametro column della
la rispettiva associazione di regole. |
last_modified |
(timestamp) Il timestamp dell'ultima modifica di table_id
in fase di convalida. |
metadata_json_string |
(stringa) Coppie chiave-valore dei contenuti parametro dei metadati specificati durante l'associazione di regole o durante l'esecuzione della qualità dei dati. |
configs_hashsum |
(stringa) La somma hash del documento JSON contenente l'associazione di regole
e tutte le regole, le associazioni di regole, i filtri di riga e le configurazioni di entità associati.
configs_hashsum consente di monitorare quando i contenuti di
L'ID rule_binding o una delle configurazioni di riferimento è stato modificato. |
dq_run_id |
(stringa) ID univoco del record. |
invocation_id |
(stringa) ID dell'esecuzione della qualità dei dati. Tutti i record di riepilogo della qualità dei dati
generati all'interno della stessa istanza di esecuzione di qualità dei dati condividono lo stesso
invocation_id . |
progress_watermark |
(booleano) determina se questo particolare record viene considerato
dal controllo della qualità dei dati per determinare
il livello di filigrana elevato
una convalida incrementale. Se FALSE , il rispettivo record è
da ignorare quando si stabilisce il valore di filigrana alta. Queste informazioni sono utili quando
eseguire convalide della qualità dei dati di test che non devono avanzare
filigrana alta. Dataplex compila questo campo con
TRUE per impostazione predefinita, ma è possibile eseguire l'override del valore se il valore
L'argomento --progress_watermark ha il valore FALSE .
|
rows_validated |
(numero intero) Numero totale di record convalidati dopo l'applicazione
row_filters e qualsiasi filtro con filigrana alta nella
Colonna incremental_time_filter_column_id , se specificata. |
complex_rule_validation_errors_count |
(In virgola mobile) Numero di righe restituite da un CUSTOM_SQL_STATEMENT
personalizzata. |
complex_rule_validation_success_flag |
(booleano) Stato di operazione riuscita di CUSTOM_SQL_STATEMENT regole.
|
success_count |
(numero intero) Numero totale di record che hanno superato la convalida. Questo campo
è impostato su NULL per CUSTOM_SQL_STATEMENT regole.
|
success_percentage |
(in virgola mobile) Percentuale del numero di record che hanno superato la convalida
rispetto al numero totale di record convalidati. Questo campo è impostato su
NULL per CUSTOM_SQL_STATEMENT regole. |
failed_count |
(numero intero) Numero totale di record che non hanno superato la convalida. Questo campo
è impostato su NULL per CUSTOM_SQL_STATEMENT regole.
|
failed_percentage |
(in virgola mobile) Percentuale del numero di record che non hanno superato la convalida
rispetto al numero totale di record convalidati. Questo campo è impostato su
NULL per CUSTOM_SQL_STATEMENT regole. |
null_count |
(numero intero) Numero totale di record che hanno restituito un valore nullo durante la convalida.
Questo campo è impostato su NULL per NOT_NULL e
CUSTOM_SQL_STATEMENT regole. |
null_percentage |
(in virgola mobile) Percentuale del numero di record che hanno restituito un valore nullo durante
valida entro il numero totale di record convalidati. Questo campo è
impostata su NULL per NOT_NULL e
CUSTOM_SQL_STATEMENT regole. |
failed_records_query |
Per ogni regola con esito negativo, questa colonna memorizza una query
che puoi utilizzare per recuperare i record non riusciti. In questo documento, vedi
Risolvere i problemi relativi alle regole non riuscite con
failed_records_query . |
Per le entità BigQuery, viene creata una vista
rule_binding
contenente la logica di convalida SQL di
all'ultima esecuzione. Puoi trovare queste viste in BigQuery
specificato nell'argomento --gcp-bq-dataset-id
.
Ottimizzazioni dei costi
Puoi contribuire a ridurre i costi con le ottimizzazioni riportate di seguito.
Convalide incrementali
Spesso, le tabelle vengono aggiornate regolarmente con nuovi (nuove righe). Se non vuoi riconvalidare le partizioni precedenti in ogni puoi utilizzare convalide incrementali.
Per le convalide incrementali, devi avere una colonna di tipo TIMESTAMP
o DATETIME
nella tabella in cui il valore della colonna aumenta monotonicamente. Puoi utilizzare lo
e le colonne in cui è partizionata la tabella BigQuery.
Per specificare la convalida incrementale, specifica un valore per
incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
nell'ambito di un'associazione di regole.
Quando specifichi una colonna, l'attività di qualità dei dati considera solo le righe con un
Il valore TIMESTAMP
è maggiore del timestamp dell'ultima attività di qualità dei dati che
è stato eseguito.
Esempi di file di specifiche
Per utilizzare questi esempi, crea un set di dati BigQuery
denominato sales
. Poi, crea una tabella delle cose denominata sales_orders
e aggiungi
campione di dati per
esecuzione di una query
con le seguenti istruzioni GoogleSQL:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Esempio 1
Il seguente esempio di codice crea controlli di qualità dei dati per la convalida di questi valori:
amount
: i valori sono zero o numeri positivi.item_id
: una stringa alfanumerica di 5 caratteri alfabetici, seguita di 15 cifre.transaction_currency
: un tipo di valuta consentito, come definito da un carattere statico dall'elenco di lettura. L'elenco statico di questo esempio consente GBP e JPY come tipi di valuta. Questo la convalida si applica solo alle righe contrassegnate come internazionali.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.DATASET_ID
: l'ID del set di dati.
Esempio 2
Se la tabella da controllare fa parte di un lake Dataplex, puoi specificare le tabelle utilizzando la notazione di lake o zona. In questo modo puoi aggregare i risultati per lake o zona. Ad esempio, puoi generare un punteggio a livello di zona.
Per utilizzare questo esempio, crea un lake Dataplex con l'ID lake
operations
e l'ID zona procurement
. Poi aggiungi la tabella sales_orders
come asset nella zona.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto.
- REGION_ID: l'ID regione del
Lake Dataplex in cui esiste la tabella, ad esempio
us-central1
.
Esempio 3
Questo esempio migliora l'esempio 2 aggiungendo un controllo SQL personalizzato per verificare se i valori ID sono univoci.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Esempio 4
Questo esempio migliora il campione 3 aggiungendo convalide incrementali utilizzando la proprietà
Colonna last_modified_timestamp
. Puoi aggiungere convalide incrementali per un
o più associazioni di regole.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Risolvi i problemi relativi alle regole non riuscite con failed_records_query
Per ogni regola con esito negativo, la tabella di riepilogo
archivia una query nella colonna failed_records_query
che puoi utilizzare per recuperare i record non riusciti.
Per eseguire il debug, puoi anche usare reference columns
nel file YAML,
che ti consente di unire l'output di failed_records_query
con
i dati originali per acquisire l'intero record. Ad esempio, puoi specificare
una colonna primary_key
o una colonna primary_key
composta come
una colonna di riferimento.
Specifica le colonne di riferimento
Per generare colonne di riferimento, puoi aggiungere quanto segue alla specifica YAML:
La sezione
reference_columns
. In questa sezione puoi creare uno o più set di colonne di riferimento, dove ogni insieme specifica una o più colonne.La sezione
rule_bindings
. In questa sezione, puoi aggiungere una riga a una regola associazione che specifica un ID colonna di riferimento (reference_columns_id
) da utilizzare per nell'associazione di regole. Deve essere uno degli insiemi di colonne di riferimento specificati in Sezionereference_columns
.
Ad esempio, il seguente file YAML specifica una sezione reference_columns
e
definisce tre colonne: id
, last_modified_timestamp
e
item_id
come parte del set ORDER_DETAILS_REFERENCE_COLUMNS
. La
nell'esempio seguente viene utilizzata la tabella di esempio sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Utilizzo della query sui record non riusciti
La query sui record non riusciti genera una riga per ogni record contenente una regola non riuscito. Include il nome della colonna che ha attivato l'errore, il valore che ha attivato l'errore e i valori delle colonne di riferimento. Inoltre, include i metadati che puoi utilizzare per la relativa esecuzione l'attività di qualità dei dati.
Di seguito è riportato un esempio dell'output di una query sui record non riusciti per il file YAML
descritta in Specificare le colonne di riferimento. Viene mostrato un
errore per la colonna amount
e un valore di -10
non riuscito. Inoltre, registra
il valore corrispondente per la colonna di riferimento.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | ID | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | quantità | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Utilizza le query dei record non riusciti per le regole CUSTOM_SQL_STATEMENT
Per CUSTOM_SQL_STATEMENT
regole, le query di registrazione non riuscite includono il valore
custom_sql_statement_validation_errors
. La
La colonna custom_sql_statement_validation_errors
è una colonna nidificata con campi che
corrisponde all'output dell'istruzione SQL. Le colonne di riferimento non sono incluse nei record con errori
query per CUSTOM_SQL_STATEMENT
di regole.
Ad esempio, la regola CUSTOM_SQL_STATEMENT
potrebbe avere il seguente aspetto:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
, con una riga per ogni occorrenza in cui existing_id!=replacement_id
.
Quando viene eseguito il rendering in JSON, il contenuto di una cella di questa colonna potrebbe avere il seguente aspetto:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Puoi unire questi risultati alla tabella originale con un riferimento nidificato come join on custom_sql_statement_valdation_errors.product_key
.
Passaggi successivi
- Riferimento alle specifiche YAML CloudDQ
- Esempi di regole sulla qualità dei dati: vedi Regole semplici o Regole avanzate
- Attività di qualità dei dati di esempio di Airflow per Dataplex