Ce document explique comment créer des tâches liées à la qualité des données Dataplex Universal Catalog, qui vous permettent de planifier et d'exécuter des contrôles de la qualité des données de vos tables BigQuery intégrées et externes.
Pour en savoir plus, consultez Présentation des tâches liées à la qualité des données.
Avant de commencer
Ce document suppose que vous disposez déjà d'un lac Dataplex Universal Catalog dans lequel créer la tâche liée à la qualité des données.
Activer les API et services Google
Activez l'API Dataproc.
Activez l'accès privé à Google pour votre réseau ou votre sous-réseau. Activez l'accès privé à Google sur le réseau que vous prévoyez d'utiliser pour les tâches liées à la qualité des données Dataplex Universal Catalog. Si vous ne spécifiez pas de réseau ni de sous-réseau lorsque vous créez la tâche liée à la qualité des données Dataplex Universal Catalog, Dataplex Universal Catalog utilise le sous-réseau par défaut. Dans ce cas, vous devez activer l'accès privé à Google sur le sous-réseau par défaut.
Créer un fichier de spécification
Dataplex Universal Catalog utilise CloudDQ Open Source comme programme de pilote. Les exigences de contrôle de la qualité des données Dataplex Universal Catalog sont définies dans des fichiers de spécification YAML de CloudDQ.
Comme entrée de la tâche liée à la qualité des données, vous pouvez avoir un fichier YAML ou une archive ZIP contenant un ou plusieurs fichiers YAML. Il est recommandé de capturer les exigences de contrôle de la qualité des données dans des fichiers de spécification YAML distincts, avec un fichier pour chaque section.
Pour préparer un fichier de spécification, procédez comme suit :
-
Créez un ou plusieurs fichiers de spécification YAML CloudDQ qui définissent vos exigences de contrôle de la qualité des données. Pour en savoir plus sur la syntaxe requise, consultez la section À propos des fichiers de spécification de ce document.
Enregistrez le fichier de spécification YAML au format
.yml
ou.yaml
. Si vous créez plusieurs fichiers de spécification YAML, enregistrez-les tous dans une même archive ZIP. - Créez un bucket Cloud Storage.
- Importez le fichier de spécification dans le bucket Cloud Storage.
À propos des fichiers de spécification
Votre fichier de spécification YAML CloudDQ doit comporter les sections suivantes :
Règles (définies dans le nœud YAML
rules
de premier niveau) : liste des règles à exécuter. Vous pouvez créer ces règles à partir de types de règles prédéfinis, commeNOT_NULL
etREGEX
, ou les étendre avec des instructions SQL personnalisées, telles queCUSTOM_SQL_EXPR
etCUSTOM_SQL_STATEMENT
. L'instructionCUSTOM_SQL_EXPR
signale comme un échec toutes les lignes quecustom_sql_expr
a évaluées commeFalse
. L'instructionCUSTOM_SQL_STATEMENT
signale comme un échec toute valeur renvoyée par l'instruction entière.Filtres de lignes (définis dans le nœud YAML
row_filters
de premier niveau) : les expressions SQL renvoient une valeur booléenne qui définit les filtres permettant d'extraire un sous-ensemble de données de l'entité sous-jacente soumise à la validation.Liaisons de règles (définies dans le nœud YAML
rule_bindings
de premier niveau) : définit les règles (rules
) et filtres de règles (rule filters
) à appliquer aux tables.Dimensions de règle (définies dans le nœud YAML
rule_dimensions
) : définit la liste des dimensions de règle de qualité des données qu'une règle peut définir dans le champdimension
correspondant.Exemple :
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
Le champ
dimension
est facultatif pour une règle. La section des dimensions de règle est obligatoire sidimension
apparaît dans une règle.
Pour en savoir plus, consultez le guide de référence de CloudDQ et les exemples de fichiers de spécification.
Créer un ensemble de données pour stocker les résultats
-
Pour stocker les résultats, vous devez créer un ensemble de données BigQuery.
L'ensemble de données doit se trouver dans la même région que les tables sur lesquelles vous exécutez la tâche liée à la qualité des données.
Dataplex Universal Catalog utilise cet ensemble de données, et crée ou réutilise une table de votre choix pour stocker les résultats.
Créer un compte de service
Créez un compte de service disposant des rôles et autorisations IAM (Identity and Access Management) suivants :
- Accès en lecture au chemin d'accès Cloud Storage contenant les spécifications YAML. Vous pouvez utiliser le rôle Lecteur d'objets Storage (
roles/storage.objectViewer
) sur le bucket Cloud Storage. - Accès en lecture aux ensembles de données BigQuery contenant les données à valider. Vous pouvez utiliser le rôle Lecteur de données BigQuery.
- Accès en écriture à l'ensemble de données BigQuery où créer une table (si nécessaire) pour y écrire les résultats. Vous pouvez utiliser le rôle Éditeur de données BigQuery (
roles/bigquery.dataEditor
) au niveau de l'ensemble de données. - Rôle Utilisateur de job BigQuery (
roles/bigquery.jobUser
) au niveau du projet pour créer des jobs BigQuery dans un projet. - Rôle Lecteur de métadonnées Dataplex Universal Catalog (
roles/dataplex.metadataReader
) au niveau du projet ou du lac. - Rôle Consommateur Service Usage (
roles/serviceusage.serviceUsageConsumer
) au niveau du projet. - Rôle Nœud de calcul Dataproc.
- Autorisation
iam.serviceAccounts.actAs
accordée à l'utilisateur qui envoie le job. - Rôle Utilisateur du compte de service attribué au compte de service du lac Dataplex Universal Catalog. Vous pouvez afficher le compte de service du lac Dataplex Universal Catalog dans la console Google Cloud .
Utiliser les paramètres avancés
Ces étapes sont facultatives :
Par défaut, BigQuery exécute les contrôles de la qualité des données dans le projet actuel. Vous pouvez choisir un autre projet pour exécuter les jobs BigQuery. Utilisez l'argument
--gcp_project_id
TASK_ARGS
pour la propriété--execution-args
de la tâche.Si l'ID de projet spécifié pour exécuter les requêtes BigQuery est différent du projet dans lequel est créé le compte de service (spécifié par
--execution-service-account
), assurez-vous que la règle d'administration qui désactive l'utilisation des comptes de service multiprojets (iam.disableServiceAccountCreation
) est désactivée. Vérifiez également que le compte de service peut accéder à la planification des jobs BigQuery dans le projet au sein duquel sont exécutées les requêtes BigQuery.
Limites
Toutes les tables spécifiées pour une tâche donnée liée à la qualité des données doivent appartenir à la même région Google Cloud.
Planifier une tâche liée à la qualité des données
Console
- Dans la console Google Cloud , accédez à la page Traiter de Dataplex Universal Catalog.
- Cliquez sur Créer une tâche.
- Sur la fiche Vérifier la qualité des données, cliquez sur Créer une tâche.
- Choisissez votre lac dans Lac Dataplex.
- Dans le champ ID, saisissez un ID.
- Dans la section Spécifications liées à la qualité des données, procédez comme suit :
- Dans le champ Sélectionnez un fichier GCS, cliquez sur Parcourir.
Sélectionnez votre bucket Cloud Storage.
Cliquez sur Sélectionner.
Dans la section Table des résultats, procédez comme suit :
Dans le champ Sélectionnez un ensemble de données BigQuery, cliquez sur Parcourir.
Sélectionnez l'ensemble de données BigQuery où stocker les résultats de la validation.
Cliquez sur Sélectionner.
Dans le champ Table BigQuery, saisissez le nom de la table dans laquelle stocker les résultats. Si la table n'existe pas, Dataplex Universal Catalog la crée pour vous. N'utilisez pas le nom
dq_summary
, car il est réservé aux tâches de traitement internes.
Dans la section Compte de service, sélectionnez un compte de service dans le menu Compte de service utilisateur.
Cliquez sur Continuer.
Dans la section Définir la programmation, configurez la programmation de l'exécution de la tâche liée à la qualité des données.
Cliquez sur Créer.
gcloud CLI
Voici un exemple d'exécution d'une tâche liée à la qualité des données Dataplex Universal Catalog à l'aide de la commande gcloud CLI :
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex Universal Catalog task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex Universal Catalog lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex Universal Catalog lake where your task is created. export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Universal Catalog Editor, Dataproc Worker, and Service Usage Consumer. export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET" # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_ID}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Paramètre | Description |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Chemin d'accès Cloud Storage à votre entrée de configuration YAML de qualité des données pour la tâche liée à la qualité des données. Vous pouvez avoir un fichier YAML au format .yml ou .yaml , ou une archive ZIP contenant plusieurs fichiers YAML. |
GOOGLE_CLOUD_PROJECT |
Projet Google Cloud dans lequel sont créés la tâche Dataplex Universal Catalog et les jobs BigQuery. |
DATAPLEX_REGION_ID |
Région du lac Dataplex Universal Catalog dans laquelle est créée la tâche liée à la qualité des données. |
SERVICE_ACCOUNT |
Compte de service utilisé pour exécuter la tâche. Veillez à ce que le compte de service dispose des autorisations IAM suffisantes, comme décrit dans la section Avant de commencer. |
Pour --execution-args
, les arguments suivants doivent être transmis en tant qu'arguments placés, et donc dans cet ordre :
Argument | Description |
---|---|
clouddq-executable.zip |
Exécutable précompilé transmis dans spark-file-uris à partir d'un bucket Cloud Storage public. |
ALL |
Exécution de toutes les liaisons de règles. Vous pouvez également fournir des liaisons de règles spécifiques sous la forme d'une liste d'éléments séparés par une virgule.
Par exemple : RULE_1,RULE_2 . |
gcp-project-id |
ID du projet qui exécute les requêtes BigQuery. |
gcp-region-id |
Région permettant d'exécuter les jobs BigQuery pour la validation de la qualité des données. Cette région doit être identique à celle de gcp-bq-dataset-id et target_bigquery_summary_table . |
gcp-bq-dataset-id |
Ensemble de données BigQuery utilisé pour stocker les vues rule_binding et le récapitulatif des résultats intermédiaires de la qualité des données. |
target-bigquery-summary-table |
Référence d'ID de la table BigQuery dans laquelle sont stockés les résultats finaux du contrôle de la qualité des données. N'utilisez pas la valeur d'ID dq_summary , car elle est réservée aux tâches de traitement internes. |
--summary_to_stdout |
(Facultatif) Lorsque cette option est transmise, toutes les lignes de résultat de validation créées dans la table dq_summary au cours de la dernière exécution sont consignées en tant qu'enregistrements JSON dans Cloud Logging et stdout . |
API
Remplacez les éléments suivants :
PROJECT_ID = "Your Dataplex Universal Catalog Project ID" REGION = "Your Dataplex Universal Catalog lake region" LAKE_ID = "Your Dataplex Universal Catalog lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Envoyez une requête HTTP POST :
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Consultez également Exemple de DAG Airflow pour une tâche liée à la qualité des données Dataplex Universal Catalog.
Surveiller une tâche planifiée liée à la qualité des données
Découvrez comment surveiller votre tâche.
Afficher les résultats
Les résultats des validations de la qualité des données sont stockés dans l'ensemble de données et la table récapitulative BigQuery que vous avez spécifiés, comme décrit dans la section Créer un ensemble de données pour stocker les résultats. La table récapitulative contient le récapitulatif des résultats pour chaque combinaison de règle et liaison de règle, pour chaque exécution de la validation. Les résultats de la table récapitulative comprennent les informations suivantes :
Nom de la colonne | Description |
---|---|
dataplex_lake |
(chaîne) ID du lac Dataplex Universal Catalog contenant la table en cours de validation. |
dataplex_zone |
(chaîne) ID de la zone Dataplex Universal Catalog contenant la table en cours de validation. |
dataplex_asset_id |
(chaîne) ID de l'élément Dataplex Universal Catalog contenant la table en cours de validation. |
execution_ts |
(code temporel) Code temporel de l'exécution de la requête de validation. |
rule_binding_id |
(chaîne) ID de la liaison de règle pour laquelle les résultats de validation sont enregistrés. |
rule_id |
(chaîne) ID de la règle soumise à la liaison de règle pour laquelle les résultats de la validation sont enregistrés. |
dimension |
(chaîne) Dimension de la qualité des données de rule_id . Cette colonne ne peut contenir que l'une des valeurs spécifiées dans le nœud YAML rule_dimensions . |
table_id |
(chaîne) ID de l'entité pour laquelle les résultats de la validation sont enregistrés.
Cet ID est spécifié dans le paramètre entity de la liaison de règle correspondante. |
column_id |
(chaîne) ID de la colonne pour laquelle les résultats de la validation sont enregistrés.
Cet ID est spécifié dans le paramètre column de la liaison de règle correspondante. |
last_modified |
(code temporel) Code temporel de la dernière modification de la table_id en cours de validation. |
metadata_json_string |
(chaîne) Paires clé/valeur du contenu du paramètre de métadonnées spécifié sous la liaison de règle ou pendant l'exécution de la qualité des données. |
configs_hashsum |
(chaîne) Somme de hachage du document JSON contenant la liaison de règle et toutes les règles, liaisons de règles, filtres de ligne et configurations d'entité associés.
configs_hashsum permet de signaler lorsque le contenu d'un ID rule_binding ou l'une de ses configurations référencées a changé. |
dq_run_id |
(chaîne) ID unique de l'enregistrement. |
invocation_id |
(chaîne) ID de l'exécution de la qualité des données. Tous les enregistrements récapitulatifs de la qualité des données générés dans la même instance d'exécution de la qualité des données partagent le même identifiant invocation_id . |
progress_watermark |
(valeur booléenne) Détermine si cet enregistrement spécifique est pris en compte par le contrôle de la qualité des données afin de déterminer le filigrane élevé pour la validation incrémentielle. Si la valeur est FALSE , l'enregistrement correspondant est ignoré lors de l'établissement de la valeur du filigrane élevé. Ces informations sont utiles lors de l'exécution des validations de la qualité des données de test qui ne doivent pas faire avancer le filigrane élevé. Par défaut, Dataplex Universal Catalog renseigne TRUE dans ce champ, mais la valeur peut être remplacée si l'argument --progress_watermark a comme valeur FALSE .
|
rows_validated |
(entier) Nombre total d'enregistrements validés après l'application de row_filters et de tous les filtres en filigrane élevé sur la colonne incremental_time_filter_column_id , le cas échéant. |
complex_rule_validation_errors_count |
(nombre à virgule flottante) Nombre de lignes renvoyées par une règle CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(valeur booléenne) État de réussite des règles CUSTOM_SQL_STATEMENT .
|
success_count |
(entier) Nombre total d'enregistrements dont la validation a réussi. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT .
|
success_percentage |
(float) Pourcentage du nombre d'enregistrements dont la validation a réussi par rapport au nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT . |
failed_count |
(entier) Nombre total d'enregistrements dont la validation a échoué. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT .
|
failed_percentage |
(float) Pourcentage du nombre d'enregistrements dont la validation a échoué par rapport au nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT . |
null_count |
(entier) Nombre total d'enregistrements ayant renvoyé une valeur nulle lors de la validation.
Ce champ est défini sur NULL pour les règles NOT_NULL et CUSTOM_SQL_STATEMENT . |
null_percentage |
(float) Pourcentage du nombre d'enregistrements dont la validation a renvoyé une valeur nulle par rapport au nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles NOT_NULL et CUSTOM_SQL_STATEMENT . |
failed_records_query |
Pour chaque règle ayant échoué, cette colonne stocke une requête que vous pouvez utiliser pour récupérer les enregistrements ayant échoué. Consultez la section Résoudre les problèmes liés aux règles ayant échoué avec failed_records_query de ce document. |
Pour les entités BigQuery, une vue est créée pour chaque liaison de règle rule_binding
contenant la logique de validation SQL de la dernière exécution. Vous trouverez ces vues dans l'ensemble de données BigQuery spécifié dans l'argument --gcp-bq-dataset-id
.
Optimisation des coûts
Les tâches liées à la qualité des données sont exécutées en tant que jobs BigQuery dans votre projet. Pour contrôler le coût d'exécution des jobs liés à la qualité des données, surveillez la tarification de BigQuery dans le projet où sont exécutés vos jobs BigQuery. Pour en savoir plus, consultez Gestion des charges de travail BigQuery.
Validations incrémentielles
Il arrive souvent que des tables soient régulièrement mises à jour avec de nouvelles partitions (nouvelles lignes). Si vous ne souhaitez pas revalider les anciennes partitions à chaque exécution, vous pouvez utiliser des validations incrémentielles.
Pour les validations incrémentielles, vous devez disposer dans votre table d'une colonne de type TIMESTAMP
ou DATETIME
, dans laquelle les valeurs augmentent de façon monotone. Vous pouvez utiliser les colonnes selon lesquelles votre table BigQuery est partitionnée.
Pour spécifier la validation incrémentielle, spécifiez une valeur pour incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
dans une liaison de règle.
Lorsque vous spécifiez une colonne, la tâche liée à la qualité des données prend en compte uniquement les lignes dont la valeur TIMESTAMP
est ultérieure au code temporel de la dernière tâche liée à la qualité des données exécutée.
Exemples de fichiers de spécification
Pour utiliser ces exemples, créez un ensemble de données BigQuery nommé sales
. Créez ensuite une table de faits nommée sales_orders
et ajoutez des exemples de données en exécutant une requête avec les instructions GoogleSQL suivantes :
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Exemple 1
L'exemple de code ci-dessous crée des contrôles de la qualité des données pour valider les valeurs suivantes :
amount
: les valeurs sont nulles ou positives.item_id
: chaîne alphanumérique de cinq caractères alphabétiques, suivie de 15 chiffres.transaction_currency
: type de devise autorisé, tel que défini par une liste statique. La liste statique de cet exemple autorise comme types de devises GBP et JPY. Cette validation ne s'applique qu'aux lignes marquées comme internationales.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.DATASET_ID
: ID de l'ensemble de données.
Exemple 2
Si la table à contrôler fait partie d'un lac Dataplex Universal Catalog, vous pouvez la spécifier à l'aide de la notation de lac ou de zone. Cela vous permet d'agréger vos résultats par lac ou par zone. Par exemple, vous pouvez générer un score au niveau de la zone.
Pour utiliser cet exemple, créez un lac Dataplex Universal Catalog avec l'ID de lac operations
et l'ID de zone procurement
. Ajoutez ensuite la table sales_orders
en tant qu'élément à la zone.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Remplacez les éléments suivants :
- PROJECT_ID : ID de votre projet.
- REGION_ID : ID de région du lac Dataplex Universal Catalog dans lequel se trouve la table (par exemple,
us-central1
).
Exemple 3
Cet exemple améliore l'exemple 2 en ajoutant une vérification SQL personnalisée pour déterminer si les valeurs d'ID sont uniques.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Exemple 4
Cet exemple améliore l'exemple 3 en ajoutant des validations incrémentielles à l'aide de la colonne last_modified_timestamp
. Vous pouvez ajouter des validations incrémentielles pour une ou plusieurs liaisons de règles.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Résoudre les problèmes liés aux règles ayant échoué avec failed_records_query
Pour chaque règle ayant échoué, la table récapitulative stocke dans la colonne failed_records_query
une requête que vous pouvez utiliser pour récupérer les enregistrements ayant échoué.
Pour corriger les erreurs, vous pouvez également utiliser reference columns
dans votre fichier YAML. Cela vous permet d'effectuer la jointure de la sortie de failed_records_query
avec les données d'origine pour obtenir l'enregistrement complet. Par exemple, vous pouvez spécifier une colonne primary_key
ou une colonne primary_key
composée en tant que colonne de référence.
Spécifier des colonnes de référence
Pour générer des colonnes de référence, vous pouvez ajouter les éléments suivants à votre fichier de spécification YAML :
La section
reference_columns
vous permet de créer un ou plusieurs ensembles de colonnes de référence, chaque ensemble spécifiant une ou plusieurs colonnes.La section
rule_bindings
vous permet d'ajouter à une liaison de règle une ligne qui spécifie un ID de colonne de référence (reference_columns_id
) à utiliser pour les règles de cette liaison. Il doit s'agir de l'un des ensembles de colonnes de référence spécifiés dans la sectionreference_columns
.
Par exemple, le fichier YAML suivant spécifie une section reference_columns
et définit trois colonnes : id
, last_modified_timestamp
et item_id
dans l'ensemble ORDER_DETAILS_REFERENCE_COLUMNS
. L'exemple suivant utilise l'exemple de table sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Utiliser la requête des enregistrements ayant échoué
La requête des enregistrements ayant échoué génère une ligne pour chaque enregistrement qui présente une règle ayant échoué. Elle inclut le nom de la colonne ayant déclenché l'échec, la valeur qui l'a déclenché et les valeurs des colonnes de référence. Elle inclut également des métadonnées que vous pouvez utiliser pour mettre en correspondance l'exécution de la tâche liée à la qualité des données.
Voici un exemple de résultat tiré de la requête des enregistrements ayant échoué pour le fichier YAML décrit dans Spécifier des colonnes de référence. Il indique un échec pour la colonne amount
et une valeur d'échec de -10
. Il enregistre également la valeur correspondante pour la colonne de référence.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Utiliser les requêtes des enregistrements ayant échoué pour les règles CUSTOM_SQL_STATEMENT
Pour les règles CUSTOM_SQL_STATEMENT
, les requêtes des enregistrements ayant échoué incluent la colonne custom_sql_statement_validation_errors
. La colonne custom_sql_statement_validation_errors
est une colonne imbriquée dont les champs correspondent à la sortie de votre instruction SQL. Les colonnes de référence ne sont pas incluses dans les requêtes des enregistrements ayant échoué pour les règles CUSTOM_SQL_STATEMENT
.
Par exemple, votre règle CUSTOM_SQL_STATEMENT
pourrait ressembler à ce qui suit :
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
, avec une ligne pour chaque occurrence où existing_id!=replacement_id
.
Une fois au format JSON, le contenu d'une cellule de cette colonne peut ressembler à ce qui suit :
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Vous pouvez relier ces résultats à la table d'origine à l'aide d'une référence imbriquée telle que join on custom_sql_statement_valdation_errors.product_key
.
Étapes suivantes
- Consultez la documentation de référence sur les spécifications YAML pour CloudDQ.
- Pour obtenir des exemples de règles sur la qualité des données, consultez Règles simples et Règles avancées.
- Consultez Exemple de DAG Airflow pour une tâche liée à la qualité des données Dataplex Universal Catalog.