Utilizzare le attività relative alla qualità dei dati

Questo documento mostra come creare attività di qualità dei dati Dataplex che ti consentono di pianificare ed eseguire controlli della qualità dei dati per i tuoi tabelle BigQuery esterne.

Per ulteriori informazioni, vedi Panoramica delle attività relative alla qualità dei dati.

Prima di iniziare

Questo documento presuppone che tu abbia un lake Dataplex esistente per creare l'attività di qualità dei dati.

Prima di creare un'attività relativa alla qualità dei dati, svolgi le seguenti operazioni.

Abilita le API e i servizi Google

  1. Abilitare l'API Dataproc.

    Abilitare l'API

  2. Abilita l'accesso privato Google per la tua rete e/o subnet. Abilita accesso privato Google su la rete che prevedi di utilizzare con la qualità dei dati Dataplex attività di machine learning. Se non specifichi una rete o una subnet quando crei Attività di qualità dei dati Dataplex, Dataplex utilizza predefinita. In questo caso, devi attivare Accesso privato Google sulla subnet predefinita.

Crea un file delle specifiche

Dataplex utilizza CloudDQ open source come programma driver. I requisiti di controllo della qualità dei dati Dataplex vengono definiti nei file delle specifiche YAML di CloudDQ.

Come input per l'attività di qualità dei dati, puoi avere un singolo file YAML o un singolo archivio ZIP contenente uno o più file YAML. È consigliabile acquisire i requisiti di controllo della qualità dei dati in file di specifiche YAML distinti, con un file per ogni sezione.

Per preparare un file di specifica:

  1. Crea uno o più file di specifiche YAML CloudDQ che definiscono il controllo di qualità dei dati i tuoi requisiti. Per ulteriori informazioni sulla sintassi richiesta, consulta la sezione Informazioni sul file di specifiche di questo documento.

    Salva il file delle specifiche YAML in formato .yml o .yaml. Se crei più file di specifiche YAML, salvali tutti in un unico archivio ZIP.

  2. Crea un bucket Cloud Storage.
  3. Carica il file delle specifiche nel bucket Cloud Storage.

Informazioni sul file di specifica

Il file delle specifiche YAML CloudDQ deve contenere le seguenti sezioni:

  • Regole (definite nel nodo YAML rules di primo livello): un elenco di regole per vengono eseguiti tutti i test delle unità. Puoi creare queste regole da tipi di regole predefinite, ad esempio NOT_NULL e REGEX, oppure puoi estenderle con istruzioni SQL personalizzate come CUSTOM_SQL_EXPR e CUSTOM_SQL_STATEMENT. L'istruzione CUSTOM_SQL_EXPR segnala qualsiasi riga che custom_sql_expr ha valutato come False come errore. La L'istruzione CUSTOM_SQL_STATEMENT segnala qualsiasi valore restituito dall'intero come errore.

  • Filtri di riga (definiti nel nodo YAML row_filters di primo livello): SQL che restituiscono un valore booleano che definisce i filtri per recuperare un sottoinsieme di dati dell'entità sottostante per la convalida.

  • Associazioni di regole (definite nel nodo YAML rule_bindings di primo livello): definisce rules e rule filters da applicare alle tabelle.

  • Dimensioni delle regole (definite nel nodo YAML rule_dimensions): definisce l'elenco consentito di dimensioni delle regole di qualità dei dati che una regola può definire nel campo dimension corrispondente.

    Ad esempio:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    Il campo dimension è facoltativo per una regola. La sezione delle dimensioni della regola è obbligatoria se dimension è elencato in una regola.

Per ulteriori informazioni, consulta Guida di riferimento di CloudDQ e i file delle specifiche di esempio.

Crea un set di dati per archiviare i risultati

  • Per archiviare i risultati, crea un set di dati BigQuery.

    Il set di dati deve trovarsi nella stessa regione delle tabelle su cui esegui l'attività di qualità dei dati.

    Dataplex utilizza questo set di dati e crea o riutilizza una tabella a tua scelta per archiviare i risultati.

Crea un account di servizio

Crea un account di servizio con quanto segue. Ruoli e autorizzazioni di Identity and Access Management (IAM):

(Facoltativo) Utilizza le impostazioni avanzate

Questi passaggi sono facoltativi:

  1. BigQuery esegue i controlli di qualità dei dati nel progetto utente corrente per impostazione predefinita. In alternativa, puoi scegliere un progetto diverso per eseguire i job BigQuery utilizzando l'argomento --gcp_project_id TASK_ARGS per la proprietà --execution-args dell'attività.

  2. Se l'ID progetto specificato per eseguire query BigQuery è diverso dal progetto in cui è stato creato l'account di servizio (specificato da --execution-service-account), assicurati che il criterio dell'organizzazione che disattiva l'utilizzo dell'account di servizio tra progetti (iam.disableServiceAccountCreation) sia disattivato. Inoltre, assicurati che l'account di servizio possa accedere alla pianificazione dei job BigQuery nel progetto in cui vengono eseguite le query BigQuery.

Limitazioni

  • Tutte le tabelle specificate per una determinata attività di qualità dei dati devono appartenere al nella stessa regione Google Cloud.

Pianificare un'attività di qualità dei dati

Console

  1. Nella console Google Cloud, vai alla pagina Processo di Dataplex.

    Vai a Procedura.

  2. Fai clic su Crea attività.
  3. Nella scheda Controlla la qualità dei dati, fai clic su Crea attività.
  4. Per Lake Dataplex, scegli il tuo lake.
  5. In ID, inserisci un ID.
  6. Nella sezione Specifica della qualità dei dati:
    1. Nel campo Seleziona file GCS, fai clic su Sfoglia.
    2. Seleziona il bucket Cloud Storage.

    3. Fai clic su Seleziona.

  7. Nella sezione Tabella dei risultati:

    1. Nel campo Seleziona set di dati BigQuery, fai clic su Sfoglia.

    2. Seleziona il set di dati BigQuery in cui archiviare i risultati della convalida.

    3. Fai clic su Seleziona.

    4. Nel campo Tabella BigQuery, inserisci il nome della tabella in cui archiviare i risultati. Se la tabella non esiste, Dataplex la crea per te. Non utilizzare il nome dq_summary perché è riservato per le attività di elaborazione interna.

  8. Nella sezione Account di servizio, seleziona un account di servizio dal menu Account di servizio utente.

  9. Fai clic su Continua.

  10. Nella sezione Imposta pianificazione, configura la pianificazione per l'esecuzione dell'attività di qualità dei dati.

  11. Fai clic su Crea.

Interfaccia a riga di comando gcloud

Di seguito è riportato un esempio di esecuzione di un'attività di qualità dei dati che utilizza il comando gcloud CLI per le attività Dataplex:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parametro Descrizione
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH Percorso Cloud Storage del file YAML della qualità dei dati l'input delle configurazioni per l'attività di qualità dei dati. Puoi avere un singolo file YAML in formato .yml o .yaml o in un archivio ZIP contenenti più file YAML.
GOOGLE_CLOUD_PROJECT Il progetto Google Cloud in cui vengono creati il compito Dataplex e i job BigQuery.
DATAPLEX_REGION_ID La regione del lake Dataplex in cui viene creata l'attività di qualità dei dati.
SERVICE_ACCOUNT L'account di servizio utilizzato per l'esecuzione dell'attività. Assicurati che questo account di servizio abbia le autorizzazioni IAM sufficienti, come descritto nella sezione Prima di iniziare.

Per --execution-args, i seguenti argomenti devono essere passati come argomenti posizionati e quindi in questo ordine:

Argomento Descrizione
clouddq-executable.zip Un eseguibile precompilato che è stato passato in spark-file-uris da un ambiente Cloud Storage pubblico di sincronizzare la directory di una VM con un bucket.
ALL Esegui tutte le associazioni di regole. In alternativa, puoi fornire associazioni di regole specifiche come elenco separato da virgole. Ad esempio, RULE_1,RULE_2.
gcp-project-id L'ID progetto che esegue le query BigQuery.
gcp-region-id Regione per l'esecuzione dei job BigQuery per la convalida della qualità dei dati. Questa regione deve essere uguale a quella di gcp-bq-dataset-id e target_bigquery_summary_table.
gcp-bq-dataset-id Set di dati BigQuery utilizzato per archiviare le viste rule_binding e i risultati intermedi di riepilogo della qualità dei dati.
target-bigquery-summary-table Riferimento all'ID tabella della tabella BigQuery in cui vengono archiviati i risultati finali dei controlli sulla qualità dei dati. Non utilizzare il valore ID dq_summary perché è riservato alle attività di elaborazione interna.
--summary_to_stdout (Facoltativo) Quando questo flag viene passato, tutte le righe dei risultati di convalida create nella tabella dq_summary nell'ultima esecuzione vengono registrate come record JSON in Cloud Logging e stdout.

API

  1. Sostituisci quanto segue:

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Invia una richiesta POST HTTP:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Vedi anche Esempio di attività di qualità dei dati di Airflow per Dataplex.

Monitorare un'attività di qualità dei dati pianificata

Scopri come monitorare l'attività.

Visualizza i risultati

I risultati delle convalide della qualità dei dati vengono archiviati nel set di dati BigQuery e nella tabella di riepilogo che hai specificato, come descritto in Creare un set di dati per archiviare i risultati. La tabella di riepilogo contiene il riepilogo dell'output per ogni combinazione di associazione di regole e regola per ogni esecuzione della convalida. L'output nella tabella di riepilogo include le seguenti informazioni:

Nome colonna Descrizione
dataplex_lake (stringa) ID del lake Dataplex contenente la tabella in fase di convalida.
dataplex_zone (stringa) ID della zona Dataplex contenente la tabella in fase di convalida.
dataplex_asset_id (stringa) ID della risorsa Dataplex contenente la tabella da convalidare.
execution_ts (timestamp) Timestamp dell'esecuzione della query di convalida.
rule_binding_id (stringa) ID dell'associazione di regole per la quale sono stati visualizzati i risultati della convalida segnalato.
rule_id (stringa) ID della regola nell'ambito dell'associazione di regole per la quale vengono riportati i risultati.
dimension (stringa) La dimensione della qualità dei dati del rule_id. Questo valore può essere solo uno dei valori specificati nel nodo YAML rule_dimensions.
table_id (stringa) ID dell'entità per la quale sono segnalati i risultati della convalida. Questo ID viene specificato nel parametro entity della la rispettiva associazione di regole.
column_id (stringa) ID della colonna per cui vengono riportati i risultati di convalida. Questo ID viene specificato nel parametro column della la rispettiva associazione di regole.
last_modified (timestamp) Il timestamp dell'ultima modifica del table_id sottoposto a convalida.
metadata_json_string (stringa) Coppie chiave/valore dei contenuti del parametro dei metadati specificati nell'associazione delle regole o durante l'esecuzione della qualità dei dati.
configs_hashsum (stringa) La somma hash del documento JSON contenente l'associazione di regole e tutte le regole, le associazioni di regole, i filtri di riga e le configurazioni di entità associati. configs_hashsum consente di monitorare quando i contenuti di L'ID rule_binding o una delle configurazioni di riferimento è stato modificato.
dq_run_id (stringa) ID univoco del record.
invocation_id (stringa) ID dell'esecuzione della qualità dei dati. Tutti i record di riepilogo della qualità dei dati generati nella stessa istanza di esecuzione della qualità dei dati condividono lo stesso invocation_id.
progress_watermark (booleano) determina se questo particolare record viene considerato dal controllo di qualità dei dati per determinare il livello di filigrana elevato una convalida incrementale. Se FALSE, il rispettivo record è da ignorare quando si stabilisce il valore di filigrana alta. Queste informazioni sono utili quando eseguire convalide della qualità dei dati di test che non devono avanzare filigrana alta. Dataplex compila questo campo con TRUE per impostazione predefinita, ma è possibile eseguire l'override del valore se il valore L'argomento --progress_watermark ha il valore FALSE.
rows_validated (numero intero) Numero totale di record convalidati dopo l'applicazione row_filters e qualsiasi filtro con filigrana elevata nella Colonna incremental_time_filter_column_id, se specificata.
complex_rule_validation_errors_count (In virgola mobile) Numero di righe restituite da un CUSTOM_SQL_STATEMENT personalizzata.
complex_rule_validation_success_flag (booleano) Stato di operazione riuscita di CUSTOM_SQL_STATEMENT regole.
success_count (intero) Numero totale di record che hanno superato la convalida. Questo campo è impostato su NULL per le regole CUSTOM_SQL_STATEMENT.
success_percentage (in virgola mobile) Percentuale del numero di record che hanno superato la convalida rispetto al numero totale di record convalidati. Questo campo è impostato su NULL per le regole CUSTOM_SQL_STATEMENT.
failed_count (numero intero) Numero totale di record che non hanno superato la convalida. Questo campo è impostato su NULL per le regole CUSTOM_SQL_STATEMENT.
failed_percentage (float) Percentuale del numero di record che non hanno superato la convalida rispetto al numero totale di record convalidati. Questo campo è impostato su NULL per le regole CUSTOM_SQL_STATEMENT.
null_count (intero) Numero totale di record che hanno restituito un valore null durante la convalida. Questo campo è impostato su NULL per NOT_NULL e CUSTOM_SQL_STATEMENT regole.
null_percentage (float) Percentuale del numero di record che hanno restituito un valore nullo durante la convalida rispetto al numero totale di record convalidati. Questo campo è impostata su NULL per NOT_NULL e CUSTOM_SQL_STATEMENT regole.
failed_records_query Per ogni regola con esito negativo, questa colonna memorizza una query che puoi utilizzare per recuperare i record non riusciti. In questo documento, consulta Risolvere i problemi relativi alle regole non riuscite con failed_records_query.

Per le entità BigQuery, viene creata una visualizzazione per ogni rule_binding contenente la logica di convalida SQL dell'esecutione più recente. Puoi trovare queste viste nel set di dati BigQuery specificato nell'argomento --gcp-bq-dataset-id.

Ottimizzazioni dei costi

Puoi contribuire a ridurre i costi con le seguenti ottimizzazioni.

Convalide incrementali

Spesso, le tabelle vengono aggiornate regolarmente con nuove partizioni (nuove righe). Se non vuoi convalidare di nuovo le vecchie partizioni in ogni esecuzione, puoi utilizzare le convalide incrementali.

Per le convalide incrementali, devi avere una colonna di tipo TIMESTAMP o DATETIME nella tabella in cui il valore della colonna aumenta in modo monotonico. Puoi utilizzare le colonne su cui è partizionata la tabella BigQuery.

Per specificare la convalida incrementale, specifica un valore per incremental_time_filter_column_id=TIMESTAMP/DATETIME type column nell'ambito di un'associazione di regole.

Quando specifichi una colonna, l'attività di qualità dei dati prende in considerazione solo le righe con un valore TIMESTAMP maggiore del timestamp dell'ultima attività di qualità dei dati eseguita.

File di specifiche di esempio

Per utilizzare questi esempi, crea un set di dati BigQuery denominato sales. Poi, crea una tabella delle cose denominata sales_orders e aggiungi campione di dati per esecuzione di una query con le seguenti istruzioni GoogleSQL:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Esempio 1

Il seguente esempio di codice crea controlli di qualità dei dati per la convalida di questi valori:

  • amount: i valori sono zero o numeri positivi.
  • item_id: una stringa alfanumerica di 5 caratteri alfabetici, seguita da 15 cifre.
  • transaction_currency: un tipo di valuta consentito, come definito da un elenco statico. L'elenco statico di questo esempio consente GBP e JPY come tipi di valuta. Questa verifica si applica solo alle righe contrassegnate come internazionali.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto.
  • DATASET_ID: l'ID set di dati.

Esempio 2

Se la tabella da controllare fa parte di un lake Dataplex, puoi specificare le tabelle utilizzando la notazione di lake o zona. In questo modo puoi aggregare i risultati per lago o zona. Ad esempio, puoi generare un punteggio a livello di zona.

Per utilizzare questo esempio, crea un lake Dataplex con l'ID lakeoperations e l'ID zona procurement. Aggiungi poi la tabella sales_orders come asset alla zona.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto.
  • REGION_ID: l'ID regione del Lake Dataplex in cui esiste la tabella, ad esempio us-central1.

Esempio 3

Questo esempio migliora l'esempio 2 aggiungendo un controllo SQL personalizzato per verificare se i valori ID sono univoci.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Esempio 4

Questo esempio migliora il campione 3 aggiungendo convalide incrementali utilizzando la proprietà Colonna last_modified_timestamp. Puoi aggiungere convalide incrementali per un o più associazioni di regole.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Risolvere i problemi relativi alle regole non riuscite con failed_records_query

Per ogni regola con esito negativo, la tabella di riepilogo archivia una query nella colonna failed_records_query che puoi utilizzare per recuperare i record non riusciti.

Per eseguire il debug, puoi anche utilizzare reference columns nel file YAML, che ti consente di unire l'output di failed_records_query con i dati originali per ottenere l'intero record. Ad esempio, puoi specificare una colonna primary_key o una colonna primary_key composta come una colonna di riferimento.

Specifica le colonne di riferimento

Per generare colonne di riferimento, puoi aggiungere quanto segue alla specifica YAML:

  1. La sezione reference_columns. In questa sezione puoi creare uno o più set di colonne di riferimento, dove ogni insieme specifica una o più colonne.

  2. La sezione rule_bindings. In questa sezione, puoi aggiungere una riga a una associazione di regole che specifica un ID colonna di riferimento (reference_columns_id) da utilizzare per le regole nell'associazione di regole. Deve essere uno dei set di colonne di riferimento specificati nella sezione reference_columns.

Ad esempio, il seguente file YAML specifica una sezione reference_columns e definisce tre colonne: id, last_modified_timestamp e item_id all'interno dell'insieme ORDER_DETAILS_REFERENCE_COLUMNS. La nell'esempio seguente viene utilizzata la tabella di esempio sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Utilizzo della query sui record non riusciti

La query sui record non riusciti genera una riga per ogni record con una regola non riuscita. Include il nome della colonna che ha attivato l'errore, il valore che ha attivato l'errore e i valori delle colonne di riferimento. Inoltre, include i metadati che puoi utilizzare in relazione all'esecuzione dell'attività di qualità dei dati.

Di seguito è riportato un esempio dell'output di una query sui record non riusciti per il file YAML descritta in Specificare le colonne di riferimento. Mostra un errore per la colonna amount e un valore non riuscito di -10. Inoltre, registra il valore corrispondente per la colonna di riferimento.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag ID last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE quantità -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

Utilizzare le query sui record non riusciti per le regole CUSTOM_SQL_STATEMENT

Per le regole CUSTOM_SQL_STATEMENT, le query sui record non riusciti includono la colonna custom_sql_statement_validation_errors. La colonna custom_sql_statement_validation_errors è una colonna nidificata con campi che corrispondono all'output dell'istruzione SQL. Le colonne di riferimento non sono incluse nelle query dei record con errori per le regole CUSTOM_SQL_STATEMENT.

Ad esempio, la regola CUSTOM_SQL_STATEMENT potrebbe avere il seguente aspetto:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
I risultati per questo esempio conterranno una o più righe per custom_sql_statement_validation_errors , con una riga per ogni occorrenza in cui existing_id!=replacement_id.

Se visualizzati in JSON, i contenuti di una cella di questa colonna potrebbero avere il seguente aspetto:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

Puoi unire questi risultati alla tabella originale con un riferimento nidificato come join on custom_sql_statement_valdation_errors.product_key.

Passaggi successivi