Utiliser des tâches liées à la qualité des données

Ce document explique comment créer des tâches liées à la qualité des données Dataplex qui vous permettent de planifier et d'exécuter des contrôles de qualité des données pour vos tables BigQuery intégrées et externes.

Pour en savoir plus, consultez la section Présentation des tâches de qualité des données.

Avant de commencer

Ce document suppose que vous disposez d'un lac Dataplex dans lequel créer la tâche liée à la qualité des données.

Activer les API et services Google

  1. activer l'API Dataproc ;

    Activer l'API

  2. Activez l'accès privé à Google pour votre réseau et votre sous-réseau. Activez l'accès privé à Google sur le réseau que vous prévoyez d'utiliser avec les tâches liées à la qualité des données Dataplex. Si vous ne spécifiez pas de réseau ni de sous-réseau lorsque vous créez la tâche liée à la qualité des données Dataplex, Dataplex utilise le sous-réseau par défaut. Dans ce cas, vous devez activer l'accès privé à Google sur le sous-réseau par défaut.

Créer un fichier de spécification

Dataplex utilise Cloud DQ Open Source comme programme de pilote. Les exigences de contrôle de la qualité des données Dataplex sont définies dans les fichiers de spécification YAML de CloudDQ.

En tant qu'entrée de la tâche de qualité des données, vous pouvez avoir un seul fichier YAML ou une seule archive ZIP contenant un ou plusieurs fichiers YAML. Il est recommandé de capturer les exigences de vérification de la qualité des données dans des fichiers de spécification YAML distincts, avec un fichier pour chaque section.

Pour préparer un fichier de spécifications, procédez comme suit:

  1. Créez un ou plusieurs fichiers de spécification YAML CloudDQ qui définissent vos exigences de contrôle de la qualité des données. Pour en savoir plus sur la syntaxe requise, consultez la section À propos du fichier de spécification de ce document.

    Enregistrez le fichier de spécification YAML au format .yml ou .yaml. Si vous créez plusieurs fichiers de spécification YAML, enregistrez-les dans une seule archive ZIP.

  2. Créez un bucket Cloud Storage.
  3. Importez le fichier de spécifications dans le bucket Cloud Storage.

À propos du fichier de spécification

Votre fichier de spécification YAML CloudDQ doit comporter les sections suivantes :

  • Règles (définies dans le nœud YAML rules de premier niveau) : liste des règles à exécuter. Vous pouvez créer ces règles à partir de types de règles prédéfinis, comme NOT_NULL et REGEX, ou les étendre avec des instructions SQL personnalisées, telles que CUSTOM_SQL_EXPR et CUSTOM_SQL_STATEMENT. L'instruction CUSTOM_SQL_EXPR signale chaque ligne que custom_sql_expr a évalué False comme ayant échoué. L'instruction CUSTOM_SQL_STATEMENT signale toute valeur renvoyée par l'ensemble de l'instruction comme un échec.

  • Filtres de lignes (définis dans le nœud YAML row_filters de premier niveau) : les expressions SQL renvoient une valeur booléenne qui définit les filtres permettant d'extraire un sous-ensemble de données de l'entité sous-jacente pour la validation.

  • Liaisons de règles (définies dans le nœud YAML rule_bindings de premier niveau) : définit les règles (rules) et filtres de règles (rule filters) à appliquer aux tables.

  • Dimensions de règle (définies dans le nœud YAML rule_dimensions) : définit la liste des dimensions de règle de qualité des données qu'une règle peut définir dans le champ dimension correspondant.

    Exemple :

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    Le champ dimension est facultatif pour une règle. La section "Dimensions de la règle" est obligatoire si dimension est listé dans une règle.

Pour en savoir plus, consultez le guide de référence de CloudDQ et les exemples de fichiers de spécifications.

Créer un ensemble de données pour stocker les résultats

  • Pour stocker les résultats, créez un ensemble de données BigQuery.

    L'ensemble de données doit se trouver dans la même région que les tables sur lesquelles vous exécutez la tâche de qualité des données.

    Dataplex utilise cet ensemble de données et crée ou réutilise une table de votre choix pour stocker les résultats.

Créer un compte de service

Créez un compte de service disposant des rôles et autorisations Identity and Access Management (IAM) suivants:

Facultatif : Utiliser les paramètres avancés

Ces étapes sont facultatives :

  1. Par défaut, BigQuery exécute des vérifications de la qualité des données dans le projet utilisateur actuel. Vous pouvez également choisir un autre projet pour exécuter les jobs BigQueryà l'aide de l'argument --gcp_project_id TASK_ARGS pour la propriété --execution-args de la tâche.

  2. Si l'ID de projet spécifié pour exécuter des requêtes BigQuery est différent du projet dans lequel le compte de service (spécifié par --execution-service-account) est créé, assurez-vous que la règle d'administration qui désactive l'utilisation des comptes de service multi-projets (iam.disableServiceAccountCreation) est désactivée. Vérifiez également que le compte de service peut accéder à la planification de tâches BigQuery dans le projet dans lequel les requêtes BigQuery sont exécutées.

Limites

  • Toutes les tables spécifiées pour une tâche liée à la qualité des données donnée doivent appartenir à la même région Google Cloud .

Planifier une tâche liée à la qualité des données

Console

  1. Dans la console Google Cloud , accédez à la page Processus de Dataplex.

    Accéder à la page Processus

  2. Cliquez sur Créer une tâche .
  3. Sur la fiche Vérifier la qualité des données, cliquez sur Créer une tâche.
  4. Choisissez votre lac pour le lac Dataplex.
  5. Dans le champ ID, saisissez un ID.
  6. Dans la section Spécification de la qualité des données, procédez comme suit :
    1. Dans le champ Sélectionner un fichier GCS, cliquez sur Parcourir.
    2. Sélectionnez votre bucket Cloud Storage.

    3. Cliquez sur Sélectionner.

  7. Dans la section Table des résultats, procédez comme suit :

    1. Dans le champ Sélectionner un ensemble de données BigQuery, cliquez sur Parcourir.

    2. Sélectionnez l'ensemble de données BigQuery pour stocker les résultats de la validation.

    3. Cliquez sur Sélectionner.

    4. Dans le champ Table BigQuery, saisissez le nom de la table dans laquelle stocker les résultats. Si la table n'existe pas, Dataplex la crée pour vous. N'utilisez pas le nom dq_summary, car il est réservé aux tâches de traitement internes.

  8. Dans la section Compte de service, sélectionnez un compte de service dans le menu Compte de service utilisateur.

  9. Cliquez sur Continuer.

  10. Dans la section Définir la planification, configurez la planification d'exécution de la tâche de qualité des données.

  11. Cliquez sur Créer.

CLI gcloud

Voici un exemple d'exécution d'une tâche liée à la qualité des données utilisant la commande gcloud CLI  des tâches Dataplex :

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parameter Description
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH Chemin d'accès Cloud Storage vers votre entrée de configuration YAML de qualité des données pour la tâche liée à la qualité des données. Vous pouvez avoir un seul fichier YAML au format .yml ou .yaml ou une archive ZIP contenant plusieurs fichiers YAML.
GOOGLE_CLOUD_PROJECT Le projet Google Cloud dans lequel la tâche Dataplex et les jobs BigQuery sont créés.
DATAPLEX_REGION_ID Région du lac Dataplex dans laquelle la tâche liée à la qualité des données est créée.
SERVICE_ACCOUNT Compte de service utilisé pour exécuter la tâche. Assurez-vous que ce compte de service dispose des autorisations IAM suffisantes, comme décrit dans la section Avant de commencer.

Pour --execution-args, les arguments suivants doivent être transmis en tant qu'arguments positionnés, et donc dans cet ordre :

Argument Description
clouddq-executable.zip Un exécutable précompilé transmis dans spark-file-uris à partir d'un bucket Cloud Storage public.
ALL Exécutez toutes les liaisons de règles. Vous pouvez également fournir des liaisons de règles spécifiques sous la forme d'une liste d'éléments séparés par une virgule. Par exemple, RULE_1,RULE_2.
gcp-project-id ID du projet qui exécute les requêtes BigQuery.
gcp-region-id Région permettant d'exécuter les tâches BigQuery pour la validation de la qualité des données. Cette région doit être identique à celle de gcp-bq-dataset-id et target_bigquery_summary_table.
gcp-bq-dataset-id Ensemble de données BigQuery utilisé pour stocker les vues rule_binding et le récapitulatif des résultats intermédiaires de la qualité des données.
target-bigquery-summary-table Référence d'ID de table de la table BigQuery dans laquelle les résultats finaux des contrôles de qualité des données sont stockés. N'utilisez pas la valeur d'ID dq_summary, car elle est réservée aux tâches de traitement internes.
--summary_to_stdout (Facultatif) Lorsque cette option est transmise, toutes les lignes de résultat de validation créées dans la table dq_summary de la dernière exécution sont consignées en tant qu'enregistrements JSON dans Cloud Logging et stdout.

API

  1. Remplacez les éléments suivants :

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Envoyez une requête POST HTTP :
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Consultez également Exemple de DAG Airflow pour une tâche liée à la qualité des données.

Surveiller une tâche liée à la qualité des données planifiée

Découvrez comment surveiller votre tâche.

Afficher les résultats

Les résultats des validations de la qualité des données sont stockés dans l'ensemble de données BigQuery et la table récapitulative que vous avez spécifiés, comme décrit dans la section Créer un ensemble de données pour stocker les résultats. Le tableau récapitulatif contient le résumé des résultats pour chaque combinaison de liaisons de règle et règles à chaque exécution de validation. Les résultats du tableau récapitulatif incluent les informations suivantes:

Nom de la colonne Description
dataplex_lake (chaîne) ID du lac Dataplex contenant la table en cours de validation.
dataplex_zone (chaîne) ID de la zone Dataplex contenant la table en cours de validation.
dataplex_asset_id (chaîne) ID de l'élément Dataplex contenant la table en cours de validation.
execution_ts (code temporel) Horodatage de l'exécution de la requête de validation.
rule_binding_id (chaîne) ID de la liaison de règle pour laquelle les résultats de validation sont signalés.
rule_id (chaîne) ID de la règle sous la liaison de règle pour laquelle les résultats de la validation sont signalés.
dimension (chaîne) Dimension de la qualité des données de rule_id. Cette valeur ne peut être que l'une des valeurs spécifiées dans le nœud YAML rule_dimensions.
table_id (chaîne) ID de l'entité pour laquelle les résultats de la validation sont signalés. Cet ID est spécifié dans le paramètre entity de la liaison de règle correspondante.
column_id (chaîne) ID de la colonne pour laquelle les résultats de la validation sont signalés. Cet ID est spécifié dans le paramètre column de la liaison de règle correspondante.
last_modified (code temporel) : dernier horodatage modifié de table_id en cours de validation.
metadata_json_string (chaîne) Paires clé/valeur du contenu du paramètre de métadonnées spécifié sous la liaison de règle ou pendant l'exécution de la qualité des données.
configs_hashsum (chaîne) Somme de hachage du document JSON contenant la liaison de règle et toutes les règles, liaisons de règles, filtres de ligne et configurations d'entité associés. configs_hashsum permet de signaler lorsque le contenu d'un ID rule_binding ou de l'une de ses configurations référencées a changé.
dq_run_id (chaîne) Identifiant unique de l'enregistrement.
invocation_id (chaîne) ID de l'exécution de la qualité des données. Tous les enregistrements de résumé de qualité des données générés dans la même instance d'exécution de qualité partagent le même identifiant invocation_id.
progress_watermark (booléen) Détermine si cet enregistrement spécifique est pris en compte par la vérification de la qualité des données afin de déterminer le filigrane élevé pour la validation incrémentielle. Si la valeur est FALSE, l'enregistrement correspondant est ignoré lors de l'établissement de la valeur du filigrane élevé. Ces informations sont utiles lors de l'exécution des validations de la qualité des données de test qui ne doivent pas progresser dans le filigrane élevé. Par défaut, Dataplex renseigne ce champ avec TRUE, mais la valeur peut être remplacée si l'argument --progress_watermark a la valeur FALSE.
rows_validated (entier) Nombre total d'enregistrements validés après l'application de row_filters et de tous les filtres en filigrane élevé sur la colonne incremental_time_filter_column_id, si spécifié.
complex_rule_validation_errors_count (Nombre à virgule flottante) Nombre de lignes renvoyées par une règle CUSTOM_SQL_STATEMENT.
complex_rule_validation_success_flag (booléen) État de réussite des règles CUSTOM_SQL_STATEMENT.
success_count (entier) Nombre total d'enregistrements ayant réussi la validation. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT.
success_percentage (nombre à virgule flottante) Pourcentage du nombre d'enregistrements ayant réussi la validation dans le nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT.
failed_count (entier) Nombre total d'enregistrements dont la validation a échoué. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT.
failed_percentage (nombre à virgule flottante) Pourcentage du nombre d'enregistrements ayant échoué à la validation dans le nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT.
null_count (entier) Nombre total d'enregistrements ayant renvoyé une valeur nulle lors de la validation. Ce champ est défini sur NULL pour les règles NOT_NULL et CUSTOM_SQL_STATEMENT.
null_percentage (nombre à virgule flottante) Pourcentage du nombre d'enregistrements ayant renvoyé une valeur nulle lors de la validation dans le nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles NOT_NULL et CUSTOM_SQL_STATEMENT.
failed_records_query Pour chaque règle ayant échoué, cette colonne stocke une requête que vous pouvez utiliser pour obtenir des enregistrements ayant échoué. Dans ce document, consultez la section Résoudre les problèmes liés aux règles ayant échoué avec failed_records_query.

Pour les entités BigQuery, une vue est créée pour chaque rule_binding contenant la logique de validation SQL de la dernière exécution. Vous trouverez ces vues dans l'ensemble de données BigQuery spécifié dans l'argument --gcp-bq-dataset-id.

Optimisation des coûts

Vous pouvez réduire les coûts à l'aide des optimisations suivantes.

Validations incrémentielles

Souvent, vous avez des tables qui sont régulièrement mises à jour avec de nouvelles partitions (nouvelles lignes). Si vous ne souhaitez pas revalider les anciennes partitions à chaque exécution, vous pouvez utiliser des validations incrémentielles.

Pour les validations incrémentielles, vous devez avoir une colonne de type TIMESTAMP ou DATETIME dans votre table où la valeur de la colonne augmente de façon linéaire. Vous pouvez utiliser les colonnes sur lesquelles votre table BigQuery est partitionnée.

Pour spécifier la validation incrémentielle, spécifiez une valeur pour incremental_time_filter_column_id=TIMESTAMP/DATETIME type column dans le cadre d'une liaison de règle.

Lorsque vous spécifiez une colonne, la tâche de qualité des données prend en compte uniquement les lignes dont la valeur TIMESTAMP est supérieure à l'horodatage de la dernière tâche liée à la qualité des données exécutée.

Exemples de fichiers de spécification

Pour utiliser ces exemples, créez un ensemble de données BigQuery nommé sales. Ensuite, créez une table de faits nommée sales_orders et ajoutez des exemples de données en exécutant une requête avec les instructions GoogleSQL suivantes:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Exemple 1

L'exemple de code suivant crée des contrôles de qualité des données pour valider ces valeurs :

  • amount : les valeurs sont nulles ou positives.
  • item_id : chaîne alphanumérique de cinq caractères alphabétiques, suivie de 15 chiffres.
  • transaction_currency : type de devise autorisé, tel que défini par une liste statique. La liste statique de cet exemple autorise les types de devises GBP et JPY. Cette validation ne s'applique qu'aux lignes marquées comme internationales.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet.
  • DATASET_ID : ID de l'ensemble de données

Sample 2

Si la table à vérifier fait partie d'un lac Dataplex, vous pouvez spécifier les tables à l'aide de la notation lac ou zone. Vous pouvez ainsi regrouper vos résultats par lac ou zone. Par exemple, vous pouvez générer un score au niveau de la zone.

Pour utiliser cet exemple, créez un lac Dataplex avec l'ID de lac operations et l'ID de zone procurement. Ajoutez ensuite le tableau sales_orders en tant qu'élément à la zone.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet.
  • REGION_ID: ID de région du lac Dataplex dans lequel la table existe, par exemple us-central1.

Sample 3

Cet exemple améliore l'exemple 2 en ajoutant une vérification SQL personnalisée pour vérifier si les valeurs d'ID sont uniques.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Exemple 4

Cet exemple améliore l'exemple 3 en ajoutant des validations incrémentielles à l'aide de la colonne last_modified_timestamp. Vous pouvez ajouter des validations incrémentielles pour une ou plusieurs liaisons de règles.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Résoudre les problèmes de règles ayant échoué avec failed_records_query

Pour chaque règle ayant échoué, le tableau récapitulatif stocke une requête dans la colonne failed_records_query que vous pouvez utiliser pour obtenir les enregistrements ayant échoué.

Pour déboguer, vous pouvez également utiliser reference columns dans votre fichier YAML, ce qui vous permet de joindre la sortie de failed_records_query aux données d'origine pour obtenir l'enregistrement complet. Par exemple, vous pouvez spécifier une colonne primary_key ou une colonne primary_key composée en tant que colonne de référence.

Spécifier des colonnes de référence

Pour générer des colonnes de référence, vous pouvez ajouter les éléments suivants à votre spécification YAML :

  1. La section reference_columns. Dans cette section, vous pouvez créer un ou plusieurs ensembles de colonnes de référence, chaque ensemble spécifiant une ou plusieurs colonnes.

  2. La section rule_bindings. Dans cette section, vous pouvez ajouter une ligne à une liaison de règle qui spécifie un ID de colonne de référence (reference_columns_id) à utiliser pour les règles de cette liaison de règle. Il doit s'agir de l'un des ensembles de colonnes de référence spécifiés dans la section reference_columns.

Par exemple, le fichier YAML suivant spécifie une section reference_columns et définit trois colonnes : id, last_modified_timestamp et item_id dans le cadre de l'ensemble ORDER_DETAILS_REFERENCE_COLUMNS. L'exemple suivant utilise l'exemple de table sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Utiliser la requête d'enregistrements ayant échoué

La requête d'enregistrements ayant échoué génère une ligne pour chaque enregistrement ayant une règle ayant échoué. Elle inclut le nom de la colonne ayant déclenché l'échec, la valeur qui l'a déclenché et les valeurs des colonnes de référence. Elle inclut également des métadonnées que vous pouvez utiliser pour exécuter la tâche liée à la qualité des données.

Voici un exemple de résultat d'une requête d'enregistrements ayant échoué pour le fichier YAML décrit dans la section Spécifier des colonnes de référence. Il indique un échec pour la colonne amount et une valeur ayant échoué de -10. Il enregistre également la valeur correspondante pour la colonne de référence.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FAUX order1 2022-01-22T02:30:06.321Z bad_item_id

Utiliser les requêtes d'enregistrement ayant échoué pour les règles CUSTOM_SQL_STATEMENT

Pour les règles CUSTOM_SQL_STATEMENT, les requêtes d'enregistrement ayant échoué incluent la colonne custom_sql_statement_validation_errors. La colonne custom_sql_statement_validation_errors est une colonne imbriquée avec des champs correspondant à la sortie de votre instruction SQL. Les colonnes de référence ne sont pas incluses dans les requêtes d'enregistrement ayant échoué pour les règles CUSTOM_SQL_STATEMENT.

Par exemple, votre règle CUSTOM_SQL_STATEMENT pourrait ressembler à ceci :

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Les résultats de cet exemple contiennent une ou plusieurs lignes pour la colonne custom_sql_statement_validation_errors, avec une ligne pour chaque occurrence où existing_id!=replacement_id.

Lors du rendu en JSON, le contenu d'une cellule de cette colonne peut ressembler à ceci :

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

Vous pouvez joindre ces résultats à la table d'origine avec une référence imbriquée telle que join on custom_sql_statement_valdation_errors.product_key.

Étape suivante