Ce document explique comment créer des tâches liées à la qualité des données Dataplex qui vous permettent de planifier et d'exécuter des contrôles de qualité des données pour vos tables BigQuery intégrées et externes.
Pour en savoir plus, consultez la section Présentation des tâches de qualité des données.
Avant de commencer
Ce document suppose que vous disposez d'un lac Dataplex dans lequel créer la tâche liée à la qualité des données.
Activer les API et services Google
activer l'API Dataproc ;
Activez l'accès privé à Google pour votre réseau et votre sous-réseau. Activez l'accès privé à Google sur le réseau que vous prévoyez d'utiliser avec les tâches liées à la qualité des données Dataplex. Si vous ne spécifiez pas de réseau ni de sous-réseau lorsque vous créez la tâche liée à la qualité des données Dataplex, Dataplex utilise le sous-réseau par défaut. Dans ce cas, vous devez activer l'accès privé à Google sur le sous-réseau par défaut.
Créer un fichier de spécification
Dataplex utilise Cloud DQ Open Source comme programme de pilote. Les exigences de contrôle de la qualité des données Dataplex sont définies dans les fichiers de spécification YAML de CloudDQ.
En tant qu'entrée de la tâche de qualité des données, vous pouvez avoir un seul fichier YAML ou une seule archive ZIP contenant un ou plusieurs fichiers YAML. Il est recommandé de capturer les exigences de vérification de la qualité des données dans des fichiers de spécification YAML distincts, avec un fichier pour chaque section.
Pour préparer un fichier de spécifications, procédez comme suit:
-
Créez un ou plusieurs fichiers de spécification YAML CloudDQ qui définissent vos exigences de contrôle de la qualité des données. Pour en savoir plus sur la syntaxe requise, consultez la section À propos du fichier de spécification de ce document.
Enregistrez le fichier de spécification YAML au format
.yml
ou.yaml
. Si vous créez plusieurs fichiers de spécification YAML, enregistrez-les dans une seule archive ZIP. - Créez un bucket Cloud Storage.
- Importez le fichier de spécifications dans le bucket Cloud Storage.
À propos du fichier de spécification
Votre fichier de spécification YAML CloudDQ doit comporter les sections suivantes :
Règles (définies dans le nœud YAML
rules
de premier niveau) : liste des règles à exécuter. Vous pouvez créer ces règles à partir de types de règles prédéfinis, commeNOT_NULL
etREGEX
, ou les étendre avec des instructions SQL personnalisées, telles queCUSTOM_SQL_EXPR
etCUSTOM_SQL_STATEMENT
. L'instructionCUSTOM_SQL_EXPR
signale chaque ligne quecustom_sql_expr
a évaluéFalse
comme ayant échoué. L'instructionCUSTOM_SQL_STATEMENT
signale toute valeur renvoyée par l'ensemble de l'instruction comme un échec.Filtres de lignes (définis dans le nœud YAML
row_filters
de premier niveau) : les expressions SQL renvoient une valeur booléenne qui définit les filtres permettant d'extraire un sous-ensemble de données de l'entité sous-jacente pour la validation.Liaisons de règles (définies dans le nœud YAML
rule_bindings
de premier niveau) : définit les règles (rules
) et filtres de règles (rule filters
) à appliquer aux tables.Dimensions de règle (définies dans le nœud YAML
rule_dimensions
) : définit la liste des dimensions de règle de qualité des données qu'une règle peut définir dans le champdimension
correspondant.Exemple :
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
Le champ
dimension
est facultatif pour une règle. La section "Dimensions de la règle" est obligatoire sidimension
est listé dans une règle.
Pour en savoir plus, consultez le guide de référence de CloudDQ et les exemples de fichiers de spécifications.
Créer un ensemble de données pour stocker les résultats
-
Pour stocker les résultats, créez un ensemble de données BigQuery.
L'ensemble de données doit se trouver dans la même région que les tables sur lesquelles vous exécutez la tâche de qualité des données.
Dataplex utilise cet ensemble de données et crée ou réutilise une table de votre choix pour stocker les résultats.
Créer un compte de service
Créez un compte de service disposant des rôles et autorisations Identity and Access Management (IAM) suivants:
- Accès en lecture au chemin d'accès Cloud Storage contenant les spécifications YAML. Vous pouvez utiliser le rôle de lecteur des objets de l'espace de stockage (
roles/storage.objectViewer
) sur le bucket Cloud Storage. - Accès en lecture aux ensembles de données BigQuery contenant des données à valider. Vous pouvez utiliser le rôle Lecteur de données BigQuery.
- Accès en écriture à l'ensemble de données BigQuery pour créer une table (si nécessaire) et y écrire les résultats. Vous pouvez utiliser le rôle Éditeur de données BigQuery (
roles/bigquery.dataEditor
) au niveau de l'ensemble de données. - Rôle Utilisateur de tâche BigQuery (
roles/bigquery.jobUser
) au niveau du projet pour créer des tâches BigQuery dans un projet - Rôle Lecteur de métadonnées Dataplex (
roles/dataplex.metadataReader
) au niveau du projet ou du lac. - Le rôle Consommateur Service Usage (
roles/serviceusage.serviceUsageConsumer
) au niveau du projet. - Rôle Nœud de calcul Dataproc.
- L'autorisation
iam.serviceAccounts.actAs
accordée à l'utilisateur qui envoie la tâche. - Rôle Utilisateur du compte de service attribué au compte de service du lac de Dataplex. Vous pouvez afficher le compte de service de lac Dataplex dans la console Google Cloud .
Facultatif : Utiliser les paramètres avancés
Ces étapes sont facultatives :
Par défaut, BigQuery exécute des vérifications de la qualité des données dans le projet utilisateur actuel. Vous pouvez également choisir un autre projet pour exécuter les jobs BigQueryà l'aide de l'argument
--gcp_project_id
TASK_ARGS
pour la propriété--execution-args
de la tâche.Si l'ID de projet spécifié pour exécuter des requêtes BigQuery est différent du projet dans lequel le compte de service (spécifié par
--execution-service-account
) est créé, assurez-vous que la règle d'administration qui désactive l'utilisation des comptes de service multi-projets (iam.disableServiceAccountCreation
) est désactivée. Vérifiez également que le compte de service peut accéder à la planification de tâches BigQuery dans le projet dans lequel les requêtes BigQuery sont exécutées.
Limites
- Toutes les tables spécifiées pour une tâche liée à la qualité des données donnée doivent appartenir à la même région Google Cloud .
Planifier une tâche liée à la qualité des données
Console
- Dans la console Google Cloud , accédez à la page Processus de Dataplex.
- Cliquez sur Créer une tâche .
- Sur la fiche Vérifier la qualité des données, cliquez sur Créer une tâche.
- Choisissez votre lac pour le lac Dataplex.
- Dans le champ ID, saisissez un ID.
- Dans la section Spécification de la qualité des données, procédez comme suit :
- Dans le champ Sélectionner un fichier GCS, cliquez sur Parcourir.
Sélectionnez votre bucket Cloud Storage.
Cliquez sur Sélectionner.
Dans la section Table des résultats, procédez comme suit :
Dans le champ Sélectionner un ensemble de données BigQuery, cliquez sur Parcourir.
Sélectionnez l'ensemble de données BigQuery pour stocker les résultats de la validation.
Cliquez sur Sélectionner.
Dans le champ Table BigQuery, saisissez le nom de la table dans laquelle stocker les résultats. Si la table n'existe pas, Dataplex la crée pour vous. N'utilisez pas le nom
dq_summary
, car il est réservé aux tâches de traitement internes.
Dans la section Compte de service, sélectionnez un compte de service dans le menu Compte de service utilisateur.
Cliquez sur Continuer.
Dans la section Définir la planification, configurez la planification d'exécution de la tâche de qualité des données.
Cliquez sur Créer.
CLI gcloud
Voici un exemple d'exécution d'une tâche liée à la qualité des données utilisant la commande gcloud CLI des tâches Dataplex :
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parameter | Description |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Chemin d'accès Cloud Storage vers votre entrée de configuration YAML de qualité des données pour la tâche liée à la qualité des données. Vous pouvez avoir un seul fichier YAML au format .yml ou .yaml ou une archive ZIP contenant plusieurs fichiers YAML. |
GOOGLE_CLOUD_PROJECT |
Le projet Google Cloud dans lequel la tâche Dataplex et les jobs BigQuery sont créés. |
DATAPLEX_REGION_ID |
Région du lac Dataplex dans laquelle la tâche liée à la qualité des données est créée. |
SERVICE_ACCOUNT |
Compte de service utilisé pour exécuter la tâche. Assurez-vous que ce compte de service dispose des autorisations IAM suffisantes, comme décrit dans la section Avant de commencer. |
Pour --execution-args
, les arguments suivants doivent être transmis en tant qu'arguments positionnés, et donc dans cet ordre :
Argument | Description |
---|---|
clouddq-executable.zip |
Un exécutable précompilé transmis dans spark-file-uris à partir d'un bucket Cloud Storage public. |
ALL |
Exécutez toutes les liaisons de règles. Vous pouvez également fournir des liaisons de règles spécifiques sous la forme d'une liste d'éléments séparés par une virgule.
Par exemple, RULE_1,RULE_2 . |
gcp-project-id |
ID du projet qui exécute les requêtes BigQuery. |
gcp-region-id |
Région permettant d'exécuter les tâches BigQuery pour la validation de la qualité des données. Cette région doit être identique à celle de gcp-bq-dataset-id et target_bigquery_summary_table . |
gcp-bq-dataset-id |
Ensemble de données BigQuery utilisé pour stocker les vues rule_binding et le récapitulatif des résultats intermédiaires de la qualité des données. |
target-bigquery-summary-table |
Référence d'ID de table de la table BigQuery dans laquelle les résultats finaux des contrôles de qualité des données sont stockés. N'utilisez pas la valeur d'ID dq_summary , car elle est réservée aux tâches de traitement internes. |
--summary_to_stdout |
(Facultatif) Lorsque cette option est transmise, toutes les lignes de résultat de validation créées dans la table dq_summary de la dernière exécution sont consignées en tant qu'enregistrements JSON dans Cloud Logging et stdout . |
API
Remplacez les éléments suivants :
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Envoyez une requête POST HTTP :
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Consultez également Exemple de DAG Airflow pour une tâche liée à la qualité des données.
Surveiller une tâche liée à la qualité des données planifiée
Découvrez comment surveiller votre tâche.
Afficher les résultats
Les résultats des validations de la qualité des données sont stockés dans l'ensemble de données BigQuery et la table récapitulative que vous avez spécifiés, comme décrit dans la section Créer un ensemble de données pour stocker les résultats. Le tableau récapitulatif contient le résumé des résultats pour chaque combinaison de liaisons de règle et règles à chaque exécution de validation. Les résultats du tableau récapitulatif incluent les informations suivantes:
Nom de la colonne | Description |
---|---|
dataplex_lake |
(chaîne) ID du lac Dataplex contenant la table en cours de validation. |
dataplex_zone |
(chaîne) ID de la zone Dataplex contenant la table en cours de validation. |
dataplex_asset_id |
(chaîne) ID de l'élément Dataplex contenant la table en cours de validation. |
execution_ts |
(code temporel) Horodatage de l'exécution de la requête de validation. |
rule_binding_id |
(chaîne) ID de la liaison de règle pour laquelle les résultats de validation sont signalés. |
rule_id |
(chaîne) ID de la règle sous la liaison de règle pour laquelle les résultats de la validation sont signalés. |
dimension |
(chaîne) Dimension de la qualité des données de rule_id . Cette valeur ne peut être que l'une des valeurs spécifiées dans le nœud YAML rule_dimensions . |
table_id |
(chaîne) ID de l'entité pour laquelle les résultats de la validation sont signalés.
Cet ID est spécifié dans le paramètre entity de la liaison de règle correspondante. |
column_id |
(chaîne) ID de la colonne pour laquelle les résultats de la validation sont signalés.
Cet ID est spécifié dans le paramètre column de la liaison de règle correspondante. |
last_modified |
(code temporel) : dernier horodatage modifié de table_id en cours de validation. |
metadata_json_string |
(chaîne) Paires clé/valeur du contenu du paramètre de métadonnées spécifié sous la liaison de règle ou pendant l'exécution de la qualité des données. |
configs_hashsum |
(chaîne) Somme de hachage du document JSON contenant la liaison de règle et toutes les règles, liaisons de règles, filtres de ligne et configurations d'entité associés.
configs_hashsum permet de signaler lorsque le contenu d'un ID rule_binding ou de l'une de ses configurations référencées a changé. |
dq_run_id |
(chaîne) Identifiant unique de l'enregistrement. |
invocation_id |
(chaîne) ID de l'exécution de la qualité des données. Tous les enregistrements de résumé de qualité des données générés dans la même instance d'exécution de qualité partagent le même identifiant invocation_id . |
progress_watermark |
(booléen) Détermine si cet enregistrement spécifique est pris en compte par la vérification de la qualité des données afin de déterminer le filigrane élevé pour la validation incrémentielle. Si la valeur est FALSE , l'enregistrement correspondant est ignoré lors de l'établissement de la valeur du filigrane élevé. Ces informations sont utiles lors de l'exécution des validations de la qualité des données de test qui ne doivent pas progresser dans le filigrane élevé. Par défaut, Dataplex renseigne ce champ avec TRUE , mais la valeur peut être remplacée si l'argument --progress_watermark a la valeur FALSE .
|
rows_validated |
(entier) Nombre total d'enregistrements validés après l'application de row_filters et de tous les filtres en filigrane élevé sur la colonne incremental_time_filter_column_id , si spécifié. |
complex_rule_validation_errors_count |
(Nombre à virgule flottante) Nombre de lignes renvoyées par une règle CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(booléen) État de réussite des règles CUSTOM_SQL_STATEMENT .
|
success_count |
(entier) Nombre total d'enregistrements ayant réussi la validation. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT .
|
success_percentage |
(nombre à virgule flottante) Pourcentage du nombre d'enregistrements ayant réussi la validation dans le nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT . |
failed_count |
(entier) Nombre total d'enregistrements dont la validation a échoué. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT .
|
failed_percentage |
(nombre à virgule flottante) Pourcentage du nombre d'enregistrements ayant échoué à la validation dans le nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles CUSTOM_SQL_STATEMENT . |
null_count |
(entier) Nombre total d'enregistrements ayant renvoyé une valeur nulle lors de la validation.
Ce champ est défini sur NULL pour les règles NOT_NULL et CUSTOM_SQL_STATEMENT . |
null_percentage |
(nombre à virgule flottante) Pourcentage du nombre d'enregistrements ayant renvoyé une valeur nulle lors de la validation dans le nombre total d'enregistrements validés. Ce champ est défini sur NULL pour les règles NOT_NULL et CUSTOM_SQL_STATEMENT . |
failed_records_query |
Pour chaque règle ayant échoué, cette colonne stocke une requête que vous pouvez utiliser pour obtenir des enregistrements ayant échoué. Dans ce document, consultez la section Résoudre les problèmes liés aux règles ayant échoué avec failed_records_query . |
Pour les entités BigQuery, une vue est créée pour chaque rule_binding
contenant la logique de validation SQL de la dernière exécution. Vous trouverez ces vues dans l'ensemble de données BigQuery spécifié dans l'argument --gcp-bq-dataset-id
.
Optimisation des coûts
Vous pouvez réduire les coûts à l'aide des optimisations suivantes.
Validations incrémentielles
Souvent, vous avez des tables qui sont régulièrement mises à jour avec de nouvelles partitions (nouvelles lignes). Si vous ne souhaitez pas revalider les anciennes partitions à chaque exécution, vous pouvez utiliser des validations incrémentielles.
Pour les validations incrémentielles, vous devez avoir une colonne de type TIMESTAMP
ou DATETIME
dans votre table où la valeur de la colonne augmente de façon linéaire. Vous pouvez utiliser les colonnes sur lesquelles votre table BigQuery est partitionnée.
Pour spécifier la validation incrémentielle, spécifiez une valeur pour incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
dans le cadre d'une liaison de règle.
Lorsque vous spécifiez une colonne, la tâche de qualité des données prend en compte uniquement les lignes dont la valeur TIMESTAMP
est supérieure à l'horodatage de la dernière tâche liée à la qualité des données exécutée.
Exemples de fichiers de spécification
Pour utiliser ces exemples, créez un ensemble de données BigQuery nommé sales
. Ensuite, créez une table de faits nommée sales_orders
et ajoutez des exemples de données en exécutant une requête avec les instructions GoogleSQL suivantes:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Exemple 1
L'exemple de code suivant crée des contrôles de qualité des données pour valider ces valeurs :
amount
: les valeurs sont nulles ou positives.item_id
: chaîne alphanumérique de cinq caractères alphabétiques, suivie de 15 chiffres.transaction_currency
: type de devise autorisé, tel que défini par une liste statique. La liste statique de cet exemple autorise les types de devises GBP et JPY. Cette validation ne s'applique qu'aux lignes marquées comme internationales.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.DATASET_ID
: ID de l'ensemble de données
Sample 2
Si la table à vérifier fait partie d'un lac Dataplex, vous pouvez spécifier les tables à l'aide de la notation lac ou zone. Vous pouvez ainsi regrouper vos résultats par lac ou zone. Par exemple, vous pouvez générer un score au niveau de la zone.
Pour utiliser cet exemple, créez un lac Dataplex avec l'ID de lac operations
et l'ID de zone procurement
. Ajoutez ensuite le tableau sales_orders
en tant qu'élément à la zone.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Remplacez les éléments suivants :
- PROJECT_ID : ID de votre projet.
- REGION_ID: ID de région du lac Dataplex dans lequel la table existe, par exemple
us-central1
.
Sample 3
Cet exemple améliore l'exemple 2 en ajoutant une vérification SQL personnalisée pour vérifier si les valeurs d'ID sont uniques.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Exemple 4
Cet exemple améliore l'exemple 3 en ajoutant des validations incrémentielles à l'aide de la colonne last_modified_timestamp
. Vous pouvez ajouter des validations incrémentielles pour une ou plusieurs liaisons de règles.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Résoudre les problèmes de règles ayant échoué avec failed_records_query
Pour chaque règle ayant échoué, le tableau récapitulatif stocke une requête dans la colonne failed_records_query
que vous pouvez utiliser pour obtenir les enregistrements ayant échoué.
Pour déboguer, vous pouvez également utiliser reference columns
dans votre fichier YAML, ce qui vous permet de joindre la sortie de failed_records_query
aux données d'origine pour obtenir l'enregistrement complet. Par exemple, vous pouvez spécifier une colonne primary_key
ou une colonne primary_key
composée en tant que colonne de référence.
Spécifier des colonnes de référence
Pour générer des colonnes de référence, vous pouvez ajouter les éléments suivants à votre spécification YAML :
La section
reference_columns
. Dans cette section, vous pouvez créer un ou plusieurs ensembles de colonnes de référence, chaque ensemble spécifiant une ou plusieurs colonnes.La section
rule_bindings
. Dans cette section, vous pouvez ajouter une ligne à une liaison de règle qui spécifie un ID de colonne de référence (reference_columns_id
) à utiliser pour les règles de cette liaison de règle. Il doit s'agir de l'un des ensembles de colonnes de référence spécifiés dans la sectionreference_columns
.
Par exemple, le fichier YAML suivant spécifie une section reference_columns
et définit trois colonnes : id
, last_modified_timestamp
et item_id
dans le cadre de l'ensemble ORDER_DETAILS_REFERENCE_COLUMNS
. L'exemple suivant utilise l'exemple de table sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Utiliser la requête d'enregistrements ayant échoué
La requête d'enregistrements ayant échoué génère une ligne pour chaque enregistrement ayant une règle ayant échoué. Elle inclut le nom de la colonne ayant déclenché l'échec, la valeur qui l'a déclenché et les valeurs des colonnes de référence. Elle inclut également des métadonnées que vous pouvez utiliser pour exécuter la tâche liée à la qualité des données.
Voici un exemple de résultat d'une requête d'enregistrements ayant échoué pour le fichier YAML décrit dans la section Spécifier des colonnes de référence. Il indique un échec pour la colonne amount
et une valeur ayant échoué de -10
. Il enregistre également la valeur correspondante pour la colonne de référence.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FAUX | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Utiliser les requêtes d'enregistrement ayant échoué pour les règles CUSTOM_SQL_STATEMENT
Pour les règles CUSTOM_SQL_STATEMENT
, les requêtes d'enregistrement ayant échoué incluent la colonne custom_sql_statement_validation_errors
. La colonne custom_sql_statement_validation_errors
est une colonne imbriquée avec des champs correspondant à la sortie de votre instruction SQL. Les colonnes de référence ne sont pas incluses dans les requêtes d'enregistrement ayant échoué pour les règles CUSTOM_SQL_STATEMENT
.
Par exemple, votre règle CUSTOM_SQL_STATEMENT
pourrait ressembler à ceci :
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
, avec une ligne pour chaque occurrence où existing_id!=replacement_id
.
Lors du rendu en JSON, le contenu d'une cellule de cette colonne peut ressembler à ceci :
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Vous pouvez joindre ces résultats à la table d'origine avec une référence imbriquée telle que join on custom_sql_statement_valdation_errors.product_key
.
Étape suivante
- Consultez la documentation de référence sur les spécifications YAML de CloudDQ.
- Pour obtenir des exemples de règles de qualité des données, consultez les sections Règles simples et Règles avancées.
- Consultez Exemple de DAG Airflow pour Dataplex la qualité des données.