En este documento se explica cómo crear tareas de calidad de los datos de Dataplex Universal Catalog que te permitan programar y ejecutar comprobaciones de calidad de los datos de tus tablas de BigQuery integradas y externas.
Para obtener más información, consulta el artículo Introducción a las tareas de calidad de los datos.
Antes de empezar
En este documento se presupone que ya tienes un lago de Dataplex Universal Catalog para crear la tarea de calidad de los datos.
Habilitar APIs y servicios de Google
Habilita la API de Dataproc.
Habilita Acceso privado de Google en tu red y subred. Habilita el acceso privado a Google en la red que quieras usar con las tareas de calidad de los datos de Dataplex Universal Catalog. Si no especificas una red o una subred al crear la tarea de calidad de los datos de Dataplex Universal Catalog, Dataplex Universal Catalog usará la subred predeterminada. En ese caso, debes habilitar el acceso privado de Google en la subred predeterminada.
Crear un archivo de especificaciones
Dataplex Universal Catalog usa CloudDQ de código abierto como programa de controlador. Los requisitos de las comprobaciones de calidad de los datos de Dataplex Universal Catalog se definen en archivos de especificación YAML de CloudDQ.
Como entrada de la tarea de calidad de los datos, puede tener un único archivo YAML o un único archivo ZIP que contenga uno o varios archivos YAML. Le recomendamos que registre los requisitos de comprobación de la calidad de los datos en archivos de especificación YAML independientes, uno por sección.
Para preparar un archivo de especificaciones, sigue estos pasos:
-
Crea uno o varios archivos de especificación YAML de Cloud Data Quality que definan los requisitos de comprobación de calidad de los datos. Para obtener más información sobre la sintaxis necesaria, consulta la sección Acerca del archivo de especificaciones de este documento.
Guarda el archivo de especificación YAML en formato
.yml
o.yaml
. Si creas varios archivos de especificación YAML, guarda todos los archivos en un único archivo ZIP. - Crea un segmento de Cloud Storage.
- Suba el archivo de especificación al depósito de Cloud Storage.
Acerca del archivo de especificación
El archivo de especificación YAML de CloudDQ debe tener las siguientes secciones:
Reglas (definidas en el nodo
rules
de nivel superior de YAML): una lista de reglas que se van a ejecutar. Puedes crear estas reglas a partir de tipos de reglas predefinidos, comoNOT_NULL
yREGEX
, o bien ampliarlas con instrucciones SQL personalizadas, comoCUSTOM_SQL_EXPR
yCUSTOM_SQL_STATEMENT
. La instrucciónCUSTOM_SQL_EXPR
marca como error cualquier fila quecustom_sql_expr
evalúe comoFalse
. La instrucciónCUSTOM_SQL_STATEMENT
marca como error cualquier valor devuelto por toda la instrucción.Filtros de filas (definidos en el nodo YAML
row_filters
de nivel superior): expresiones SQL que devuelven un valor booleano que define filtros para obtener un subconjunto de datos de la entidad subyacente sujeta a validación.Enlaces de reglas (definidos en el nodo
rule_bindings
YAML de nivel superior): definerules
yrule filters
que se aplicarán a las tablas.Dimensiones de la regla (definidas en el nodo YAML
rule_dimensions
): define la lista permitida de dimensiones de la regla de calidad de los datos que una regla puede definir en el campodimension
correspondiente.Por ejemplo:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
El campo
dimension
es opcional en una regla. La sección de dimensiones de la regla es obligatoria sidimension
aparece en alguna regla.
Para obtener más información, consulta la guía de referencia de CloudDQ y los archivos de especificación de ejemplo.
Crea un conjunto de datos para almacenar los resultados
-
Para almacenar los resultados, crea un conjunto de datos de BigQuery.
El conjunto de datos debe estar en la misma región que las tablas en las que ejecute la tarea de calidad de los datos.
Dataplex Universal Catalog usa este conjunto de datos y crea o reutiliza una tabla de tu elección para almacenar los resultados.
Crear una cuenta de servicio
Crea una cuenta de servicio que tenga los siguientes roles y permisos de gestión de identidades y accesos (IAM):
- Acceso de lectura a la ruta de Cloud Storage que contiene las especificaciones de YAML. Puedes usar el rol Lector de objetos de Storage
(
roles/storage.objectViewer
) en el segmento de Cloud Storage. - Acceso de lectura a los conjuntos de datos de BigQuery con los datos que se van a validar.
Puede usar el rol Lector de datos de BigQuery
(
roles/bigquery.dataViewer
). - Acceso de escritura al conjunto de datos de BigQuery para crear una tabla (si es necesario) y escribir los resultados en ella. Puede usar el rol Editor de datos de BigQuery (
roles/bigquery.dataEditor
) en el nivel del conjunto de datos. - Rol Usuario de tareas de BigQuery (
roles/bigquery.jobUser
) en el nivel de proyecto para crear tareas de BigQuery en un proyecto. - El rol Lector de metadatos de Dataplex (
roles/dataplex.metadataReader
) a nivel de proyecto o de lago. - El rol Consumidor del uso del servicio
(
roles/serviceusage.serviceUsageConsumer
) a nivel de proyecto. - El rol Trabajador de Dataproc
(
roles/dataproc.worker
). - El permiso
iam.serviceAccounts.actAs
concedido al usuario que envía el trabajo. - El rol de usuario de cuenta de servicio concedido a la cuenta de servicio del lago de datos de Universal Catalog de Dataplex. Puede ver la cuenta de servicio del lago de Dataplex Universal Catalog en la Google Cloud consola.
Usar ajustes avanzados
Estos pasos son opcionales:
BigQuery ejecuta comprobaciones de calidad de los datos en el proyecto actual de forma predeterminada. Puedes elegir otro proyecto para ejecutar las tareas de BigQuery. Usa el argumento
--gcp_project_id
TASK_ARGS
en la propiedad--execution-args
de la tarea.Si el ID de proyecto especificado para ejecutar consultas de BigQuery es diferente del proyecto en el que se crea la cuenta de servicio (especificada por
--execution-service-account
), asegúrate de que la política de la organización que inhabilita el uso de cuentas de servicio entre proyectos (iam.disableServiceAccountCreation
) esté desactivada. Además, asegúrate de que la cuenta de servicio pueda acceder a la programación de tareas de BigQuery en el proyecto en el que se ejecutan las consultas de BigQuery.
Limitaciones
Todas las tablas especificadas en una tarea de calidad de los datos deben pertenecer a la misma región. Google Cloud
Programar una tarea de calidad de los datos
Consola
- En la Google Cloud consola, ve a la página Procesar de Dataplex Universal Catalog.
- Haz clic en Crear tarea.
- En la tarjeta Comprobar la calidad de los datos, haga clic en Crear tarea.
- En Lago de Dataplex, elija el lago.
- En ID, introduce un ID.
- En la sección Especificación de la calidad de los datos, haga lo siguiente:
- En el campo Seleccionar archivo de GCS, haz clic en Examinar.
Selecciona tu segmento de Cloud Storage.
Haz clic en Seleccionar.
En la sección Tabla de resultados, haga lo siguiente:
En el campo Seleccionar conjunto de datos de BigQuery, haga clic en Buscar.
Selecciona el conjunto de datos de BigQuery en el que se almacenarán los resultados de la validación.
Haz clic en Seleccionar.
En el campo Tabla de BigQuery, introduce el nombre de la tabla en la que se almacenarán los resultados. Si la tabla no existe, Dataplex Universal Catalog la crea por ti. No uses el nombre
dq_summary
porque está reservado para tareas de procesamiento internas.
En la sección Cuenta de servicio, selecciona una cuenta de servicio en el menú Cuenta de servicio de usuario.
Haz clic en Continuar.
En la sección Definir programación, configura la programación para ejecutar la tarea de calidad de los datos.
Haz clic en Crear.
CLI de gcloud
A continuación, se muestra un ejemplo de ejecución de una tarea de calidad de los datos que usa el comando de la CLI de gcloud de tareas de Dataplex Universal Catalog:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex Universal Catalog task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex Universal Catalog lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex Universal Catalog lake where your task is created. export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Universal Catalog Editor, Dataproc Worker, and Service Usage Consumer. export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET" # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_ID}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parámetro | Descripción |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
Ruta de Cloud Storage al archivo YAML de configuración de calidad de los datos que se va a usar como entrada en la tarea de calidad de los datos. Puede tener un solo archivo YAML en formato .yml o .yaml , o bien un archivo ZIP que contenga varios archivos YAML. |
GOOGLE_CLOUD_PROJECT |
El Google Cloud proyecto en el que se crean la tarea de Dataplex Universal Catalog y los trabajos de BigQuery. |
DATAPLEX_REGION_ID |
Región del lago de Dataplex Universal Catalog en la que se crea la tarea de calidad de los datos. |
SERVICE_ACCOUNT |
La cuenta de servicio que se usa para ejecutar la tarea. Asegúrate de que esta cuenta de servicio tenga los permisos de IAM suficientes, tal como se indica en la sección Antes de empezar. |
En el caso de --execution-args
, los siguientes argumentos deben transferirse como argumentos posicionados y, por lo tanto, en este orden:
Argumento | Descripción |
---|---|
clouddq-executable.zip |
Un archivo ejecutable precompilado que se ha transferido
spark-file-uris desde un segmento público de Cloud Storage. |
ALL |
Ejecuta todos los enlaces de reglas. También puedes proporcionar enlaces de reglas específicos como una lista separada por comas.
Por ejemplo, RULE_1,RULE_2 . |
gcp-project-id |
ID del proyecto que ejecuta las consultas de BigQuery. |
gcp-region-id |
Región en la que se ejecutan las tareas de BigQuery para validar la calidad de los datos. Esta región debe ser la misma que la de gcp-bq-dataset-id y target_bigquery_summary_table . |
gcp-bq-dataset-id |
Conjunto de datos de BigQuery que se usa para almacenar las vistas rule_binding y los resultados intermedios del resumen de calidad de los datos. |
target-bigquery-summary-table |
Referencia del ID de la tabla de BigQuery en la que se almacenan los resultados finales de las comprobaciones de calidad de los datos. No utilices el valor de ID dq_summary , ya que está reservado para tareas de procesamiento internas. |
--summary_to_stdout |
(Opcional) Si se incluye esta marca, todas las filas de resultados de validación creadas en la tabla dq_summary en la última ejecución se registrarán como registros JSON en Cloud Logging y stdout . |
API
Haz los cambios siguientes:
PROJECT_ID = "Your Dataplex Universal Catalog Project ID" REGION = "Your Dataplex Universal Catalog lake region" LAKE_ID = "Your Dataplex Universal Catalog lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Envía una solicitud HTTP POST:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Consulta también el DAG de Airflow de ejemplo para la tarea de calidad de los datos de Dataplex Universal Catalog.
Monitorizar una tarea de calidad de los datos programada
Consulta cómo monitorizar tu tarea.
Ver los resultados
Los resultados de las validaciones de calidad de los datos se almacenan en el conjunto de datos y en la tabla de resumen de BigQuery que hayas especificado, tal como se describe en el artículo Crear un conjunto de datos para almacenar los resultados. La tabla de resumen contiene el resumen de salida de cada combinación de vinculación de reglas y regla de cada ejecución de validación. La tabla de resumen incluye la siguiente información:
Nombre de la columna | Descripción |
---|---|
dataplex_lake |
(cadena) ID del lago de Dataplex Universal Catalog que contiene la tabla que se está validando. |
dataplex_zone |
(cadena) ID de la zona de Dataplex Universal Catalog que contiene la tabla que se está validando. |
dataplex_asset_id |
(cadena) ID del recurso de Dataplex Universal Catalog que contiene la tabla que se está validando. |
execution_ts |
(marca de tiempo) Marca de tiempo que indica cuándo se ejecutó la consulta de validación. |
rule_binding_id |
(cadena) ID de la vinculación de la regla de la que se informan los resultados de validación. |
rule_id |
(cadena) ID de la regla en la vinculación de reglas para la que se informan los resultados de la validación. |
dimension |
(cadena) Dimensión de calidad de los datos de rule_id . Este valor solo puede ser uno de los valores especificados en el nodo YAML rule_dimensions . |
table_id |
(cadena) ID de la entidad de la que se han obtenido los resultados de validación.
Este ID se especifica en el parámetro entity de la
vinculación de reglas correspondiente. |
column_id |
(cadena) ID de la columna de la que se informan los resultados de validación.
Este ID se especifica en el parámetro column de la
vinculación de reglas correspondiente. |
last_modified |
(marca de tiempo) Marca de tiempo de la última modificación del table_id
que se está validando. |
metadata_json_string |
(cadena) Pares clave-valor del contenido del parámetro de metadatos especificado en el enlace de la regla o durante la ejecución de la calidad de los datos. |
configs_hashsum |
(cadena) Suma hash del documento JSON que contiene la vinculación de la regla
y todas las reglas, vinculaciones de reglas, filtros de filas y configuraciones de entidades asociados.
configs_hashsum permite hacer un seguimiento cuando cambia el contenido de un ID de rule_binding o de una de sus configuraciones de referencia. |
dq_run_id |
(cadena) ID único del registro. |
invocation_id |
(Cadena) ID de la ejecución de la calidad de los datos. Todos los registros de resumen de calidad de los datos generados en la misma instancia de ejecución de calidad de los datos comparten el mismo invocation_id . |
progress_watermark |
(booleano) Determina si este registro concreto se tiene en cuenta
en la comprobación de calidad de los datos para determinar la marca de agua alta de
la validación incremental. Si FALSE , el registro correspondiente se
ignora al establecer el valor de marca de agua alta. Esta información es útil cuando se ejecutan validaciones de calidad de datos de prueba que no deben avanzar la marca de agua alta. Dataplex Universal Catalog rellena este campo con TRUE de forma predeterminada, pero el valor se puede sustituir si el argumento --progress_watermark tiene el valor FALSE .
|
rows_validated |
(entero) Número total de registros validados después de aplicar row_filters y los filtros de marca de agua alta en la columna incremental_time_filter_column_id , si se han especificado. |
complex_rule_validation_errors_count |
(float) Número de filas devueltas por una regla CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(booleano) Estado de éxito de las reglas de CUSTOM_SQL_STATEMENT .
|
success_count |
(entero) Número total de registros que han superado la validación. Este campo tiene el valor NULL en las reglas CUSTOM_SQL_STATEMENT .
|
success_percentage |
(float) Porcentaje del número de registros que han superado la validación
en el número total de registros validados. Este campo se define como NULL para las reglas CUSTOM_SQL_STATEMENT . |
failed_count |
(entero) Número total de registros que no han superado la validación. Este campo tiene el valor NULL en las reglas CUSTOM_SQL_STATEMENT .
|
failed_percentage |
(float) Porcentaje del número de registros que no han superado la validación
en el número total de registros validados. Este campo se define como NULL para las reglas CUSTOM_SQL_STATEMENT . |
null_count |
(entero) Número total de registros que han devuelto un valor nulo durante la validación.
Este campo tiene el valor NULL en las reglas NOT_NULL y CUSTOM_SQL_STATEMENT . |
null_percentage |
(float) Porcentaje del número de registros que han devuelto un valor nulo durante la validación en relación con el número total de registros validados. Este campo tiene el valor NULL en las reglas NOT_NULL y CUSTOM_SQL_STATEMENT . |
failed_records_query |
En cada regla que falla, esta columna almacena una consulta que puedes usar para obtener los registros fallidos. En este documento, consulta la sección Solucionar problemas con reglas fallidas con failed_records_query . |
En el caso de las entidades de BigQuery, se crea una vista para cada rule_binding
que contiene la lógica de validación de SQL de la última ejecución. Puedes encontrar estas vistas en el conjunto de datos de BigQuery especificado en el argumento --gcp-bq-dataset-id
.
Optimizaciones de costes
Las tareas de calidad de los datos se ejecutan como trabajos de BigQuery en tu proyecto. Para controlar el coste de ejecutar tareas de calidad de los datos, usa los precios de BigQuery en el proyecto en el que se ejecuten tus tareas de BigQuery. Para obtener más información, consulta el artículo sobre la gestión de la carga de trabajo de BigQuery.
Validaciones incrementales
A menudo, las tablas se actualizan periódicamente con nuevas particiones (nuevas filas). Si no quieres volver a validar las particiones antiguas en cada ejecución, puedes usar validaciones incrementales.
Para las validaciones incrementales, debe tener una columna de tipo TIMESTAMP
o DATETIME
en su tabla en la que el valor de la columna aumente de forma monótona. Puedes usar las columnas por las que se ha particionado tu tabla de BigQuery.
Para especificar la validación incremental, indica un valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
como parte de un enlace de regla.
Cuando especifica una columna, la tarea de calidad de los datos solo tiene en cuenta las filas con un valor de TIMESTAMP
superior a la marca de tiempo de la última tarea de calidad de los datos que se ejecutó.
Archivos de especificación de ejemplo
Para usar estas muestras, crea un conjunto de datos de BigQuery
llamado sales
. A continuación, crea una tabla de hechos llamada sales_orders
y añade datos de muestra ejecutando una consulta con las siguientes instrucciones de GoogleSQL:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Ejemplo 1
El siguiente ejemplo de código crea comprobaciones de calidad de los datos para validar estos valores:
amount
: los valores son cero o números positivos.item_id
: una cadena alfanumérica de 5 caracteres alfabéticos seguida de 15 dígitos.transaction_currency
: un tipo de moneda permitido, tal como se define en una lista estática. La lista estática de este ejemplo permite usar GBP y JPY como tipos de moneda. Esta validación solo se aplica a las filas marcadas como internacionales.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Haz los cambios siguientes:
PROJECT_ID
: tu ID de proyecto.DATASET_ID
: el ID del conjunto de datos.
Muestra 2
Si la tabla que se va a comprobar forma parte de un lago de Dataplex Universal Catalog, puede especificar las tablas mediante la notación de lago o de zona. De esta forma, puedes agregar los resultados por lago o zona. Por ejemplo, puedes generar una puntuación a nivel de zona.
Para usar este ejemplo, crea un lago de Dataplex Universal Catalog con el ID de lago operations
y el ID de zona procurement
. A continuación, añada la tabla sales_orders
como recurso a la zona.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Haz los cambios siguientes:
- PROJECT_ID: tu ID de proyecto.
- REGION_ID: el ID de la región del lago de Dataplex Universal Catalog en el que se encuentra la tabla, como
us-central1
.
Ejemplo 3
En este ejemplo se mejora el ejemplo 2 añadiendo una comprobación SQL personalizada para ver si los valores de ID son únicos.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Ejemplo 4
En este ejemplo se mejora el ejemplo 3 añadiendo validaciones incrementales mediante la columna last_modified_timestamp
. Puedes añadir validaciones incrementales para una o varias vinculaciones de reglas.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Solucionar problemas con las reglas fallidas con failed_records_query
Por cada regla que no se cumpla, la tabla de resumen almacena una consulta en la columna failed_records_query
que puedes usar para obtener los registros fallidos.
Para depurar, también puedes usar reference columns
en tu archivo YAML, lo que te permite combinar el resultado de failed_records_query
con los datos originales para obtener el registro completo. Por ejemplo, puedes especificar una columna primary_key
o una columna primary_key
compuesta como columna de referencia.
Especificar columnas de referencia
Para generar columnas de referencia, puede añadir lo siguiente a su especificación YAML:
La sección
reference_columns
. En esta sección, puede crear uno o varios conjuntos de columnas de referencia, y cada conjunto puede especificar una o varias columnas.La sección
rule_bindings
. En esta sección, puede añadir una línea a un enlace de regla que especifique un ID de columna de referencia (reference_columns_id
) que se usará en las reglas de ese enlace. Debe ser uno de los conjuntos de columnas de referencia especificados en la secciónreference_columns
.
Por ejemplo, el siguiente archivo YAML especifica una sección reference_columns
y define tres columnas: id
, last_modified_timestamp
y item_id
como parte del conjunto ORDER_DETAILS_REFERENCE_COLUMNS
. En el siguiente ejemplo se usa la tabla de ejemplo sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Usar la consulta de registros fallidos
La consulta de registros fallidos genera una fila por cada registro que tenga una regla que haya fallado. Incluye el nombre de la columna que ha provocado el error, el valor que lo ha provocado y los valores de las columnas de referencia. También incluye metadatos que puedes usar para relacionarlos con la ejecución de la tarea de calidad de los datos.
A continuación, se muestra un ejemplo de la salida de una consulta de registros fallida para el archivo YAML descrito en Especificar columnas de referencia. Se muestra un error en la columna amount
y el valor -10
. También registra el valor correspondiente de la columna de referencia.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSO | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Usar consultas de registros fallidos para reglas de CUSTOM_SQL_STATEMENT
En el caso de las reglas de CUSTOM_SQL_STATEMENT
, las consultas de registros fallidas incluyen la columna custom_sql_statement_validation_errors
. La columna custom_sql_statement_validation_errors
es una columna anidada con campos que coinciden con el resultado de tu instrucción SQL. Las columnas de referencia no se incluyen en las consultas de registros fallidos de reglas de CUSTOM_SQL_STATEMENT
.
Por ejemplo, tu regla CUSTOM_SQL_STATEMENT
podría tener este aspecto:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
, con una fila por cada vez que aparezca existing_id!=replacement_id
.
Cuando se representa en JSON, el contenido de una celda de esta columna puede tener este aspecto:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Puedes combinar estos resultados con la tabla original mediante una referencia anidada, como join on custom_sql_statement_valdation_errors.product_key
.
Siguientes pasos
- Consulte la referencia de la especificación YAML de Cloud Data Quality.
- Para ver ejemplos de reglas de calidad de los datos, consulta Reglas sencillas y Reglas avanzadas.
- Consulta el DAG de Airflow de ejemplo para la tarea de calidad de datos de Dataplex Universal Catalog.