En este documento, se muestra cómo crear tareas de calidad de los datos de Dataplex que te permiten programar y ejecutar verificaciones de calidad de los datos para tus tablas de BigQuery integradas y externas.
Para obtener más información, consulta la descripción general de las tareas de calidad de los datos.
Antes de comenzar
En este documento, se da por sentado que tienes un lake de Dataplex existente en el que crear la tarea de calidad de los datos.
Antes de crear una tarea de calidad de los datos, haz lo siguiente:
Habilita las APIs y los servicios de Google
Habilita la API de Dataproc
Habilita el Acceso privado a Google para tu red o subred. Habilita el Acceso privado a Google en la red que planeas usar con las tareas de calidad de los datos de Dataplex. Si no especificas una red o subred cuando creas el Para la tarea de calidad de los datos, Dataplex usa subred predeterminada. En ese caso, debes habilitar Acceso privado a Google en la subred predeterminada.
Crea un archivo de especificación
Dataplex usa CloudDQ de código abierto como el programa de controlador. Los requisitos de verificación de calidad de los datos de Dataplex se definen en los archivos de especificación YAML de CloudDQ.
Como entrada para la tarea de calidad de los datos, puedes tener un solo archivo YAML o un solo archivo ZIP que contenga uno o más archivos YAML. Se recomienda capturar los requisitos de verificación de calidad de los datos en archivos de especificación YAML separados, con un archivo para cada sección.
Para preparar un archivo de especificación, haz lo siguiente:
-
Crea uno o más archivos de especificación YAML de CloudDQ que definan los requisitos de verificación de calidad de los datos. Para obtener más información sobre la sintaxis requerida, consulta la sección Acerca del archivo de especificación de este documento.
Guarda el archivo de especificación YAML en formato
.yml
o.yaml
. Si creas varios archivos de especificación YAML, guarda todos los archivos en un solo archivo ZIP. - Crea buckets de Cloud Storage.
- Sube el archivo de especificación al archivo bucket de Cloud Storage.
Acerca del archivo de especificación
Tu archivo de especificación YAML de CloudDQ debe tener las siguientes secciones:
Reglas (definidas en el nodo YAML
rules
de nivel superior): Una lista de reglas para ejecutar. Puedes crear estas reglas a partir de tipos de reglas predefinidos, comoNOT_NULL
yREGEX
, o puedes extenderlas con instrucciones de SQL personalizadas, comoCUSTOM_SQL_EXPR
yCUSTOM_SQL_STATEMENT
. La sentenciaCUSTOM_SQL_EXPR
marca como una falla cualquier fila quecustom_sql_expr
haya evaluado comoFalse
. La declaraciónCUSTOM_SQL_STATEMENT
marca cualquier valor que muestra toda la instrucción como una falla.Filtros de fila (definidos en el nodo YAML
row_filters
de nivel superior): Expresiones de SQL que muestran un valor booleano que define filtros para recuperar un subconjunto de datos de la entidad subyacente sujeta a validación.Vinculaciones de reglas (definidas en el nodo YAML de nivel superior
rule_bindings
): Definerules
yrule filters
para aplicar a las tablas.Dimensiones de la regla (definidas en el nodo YAML de
rule_dimensions
): Define la lista permitida de dimensiones de la regla de calidad de los datos que una regla puede definir en el campodimension
correspondiente.Por ejemplo:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
El campo
dimension
es opcional para una regla. La sección de dimensiones de las reglas es obligatoria sidimension
aparece en cualquier regla.
Para obtener más información, consulta la Guía de referencia de CloudDQ y los archivos de especificación de muestra.
Crea un conjunto de datos para almacenar los resultados
-
Para almacenar los resultados, crea un conjunto de datos de BigQuery.
El conjunto de datos debe estar en la misma región que las tablas en las que ejecutas la tarea de calidad de los datos.
Dataplex usa este conjunto de datos y crea o reutiliza una tabla de tu elección para almacenar los resultados.
Crea una cuenta de servicio
Crea una cuenta de servicio que tenga los siguientes roles y permisos de administración de identidades y accesos (IAM):
- Acceso de lectura a la ruta de Cloud Storage que contiene las especificaciones de YAML Puedes usar el rol de visualizador de objetos de almacenamiento (
roles/storage.objectViewer
) en el bucket de Cloud Storage. - Acceso de lectura a los conjuntos de datos de BigQuery con datos que se deben validar Puedes usar el rol de visualizador de datos de BigQuery.
- Acceso de escritura al conjunto de datos de BigQuery para crear una tabla (si es necesario) y escribir los resultados en esa tabla. Puedes usar el rol de editor de datos de BigQuery (
roles/bigquery.dataEditor
) a nivel de conjunto de datos. - Rol del usuario de trabajo de BigQuery (
roles/bigquery.jobUser
) a nivel de proyecto para crear trabajos de BigQuery en un proyecto - El rol de lector de metadatos de Dataplex (
roles/dataplex.metadataReader
) a nivel de proyecto o lake. - El rol de consumidor de Service Usage (
roles/serviceusage.serviceUsageConsumer
) a nivel de proyecto. - El rol de trabajador de Dataproc.
- El permiso
iam.serviceAccounts.actAs
que se le otorga al usuario que envía el trabajo - El rol de usuario de cuenta de servicio otorgada a la cuenta de servicio del lake Dataplex. Puedes ver la cuenta de servicio del lake Dataplex en la consola de Google Cloud.
Opcional: Usa la configuración avanzada
Estos pasos son opcionales:
BigQuery ejecuta controles de calidad de los datos en el proyecto del usuario actual de forma predeterminada. Como alternativa, puedes elegir un proyecto diferente para ejecutar los trabajos de BigQuery mediante el argumento
--gcp_project_id
TASK_ARGS
para la propiedad--execution-args
de la tarea.Si el ID del proyecto especificado para ejecutar consultas de BigQuery es diferente del proyecto en el que se crea la cuenta de servicio (especificada por
--execution-service-account
), asegúrate de que la política de la organización que inhabilita el uso de la cuenta de servicio entre proyectos (iam.disableServiceAccountCreation
) esté desactivada. Además, asegúrate de que la cuenta de servicio pueda acceder a la programación de trabajos de BigQuery en el proyecto en el que se ejecutan las consultas de BigQuery.
Limitaciones
- Todas las tablas especificadas para una tarea de calidad de los datos determinada deben pertenecer a la misma región de Google Cloud.
Programa una tarea de calidad de los datos
Consola
- En a consola de Google Cloud, ve a la página Proceso de Dataplex.
- Haz clic en Crear tarea.
- En la tarjeta Verificar la calidad de los datos, haz clic en Crear tarea.
- En el lake Dataplex, selecciona tu lake.
- En ID, ingresa un ID.
- En la sección Especificación de calidad de los datos, haz lo siguiente:
- En el campo Seleccionar archivo GCS, haz clic en Explorar.
Selecciona tu bucket de Cloud Storage.
Haz clic en Seleccionar.
En la sección Tabla de resultados, haz lo siguiente:
En el campo Selecciona un conjunto de datos de BigQuery, haz clic en Explorar.
Selecciona el conjunto de datos de BigQuery para almacenar los resultados de la validación.
Haz clic en Seleccionar.
En el campo Tabla de BigQuery, ingresa el nombre de la tabla para almacenar los resultados. Si la tabla no existe, Dataplex la crea por ti. No uses el nombre
dq_summary
porque está reservado para tareas de procesamiento interno.
En la sección Cuenta de servicio, selecciona una cuenta de servicio del menú Cuenta de servicio del usuario.
Haga clic en Continuar.
En la sección Configurar programa, configura el programa para ejecutar los datos. tarea de calidad.
Haz clic en Crear.
gcloud CLI
El siguiente es un ejemplo de ejecución de una tarea de calidad de los datos que usa el comando de gcloud CLI de Dataplex:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parámetro | Descripción |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
La ruta de acceso de Cloud Storage a la entrada de configuración de YAML de calidad de los datos para la tarea de calidad de los datos. Puedes tener un solo archivo YAML en formato .yml o .yaml , o bien un archivo ZIP con varios archivos YAML. |
GOOGLE_CLOUD_PROJECT |
El proyecto de Google Cloud en el que se crean la tarea de Dataplex y los trabajos de BigQuery. |
DATAPLEX_REGION_ID |
Es la región del lake Dataplex en el que se crea la tarea de calidad de los datos. |
SERVICE_ACCOUNT |
La cuenta de servicio que se usa para ejecutar la tarea. Asegúrate de que esta cuenta de servicio tenga los permisos de IAM necesarios, como se describe en la sección Antes de comenzar. |
Para --execution-args
, los siguientes argumentos deben pasarse como argumentos posicionales y, por lo tanto, en este orden:
Argumento | Descripción |
---|---|
clouddq-executable.zip |
Un ejecutable precompilado que se pasó
spark-file-uris desde un almacenamiento público de Cloud Storage
bucket. |
ALL |
Ejecuta todas las vinculaciones de reglas. Como alternativa, puedes proporcionar vinculaciones de reglas específicas como una lista separada por comas.
Por ejemplo, RULE_1,RULE_2 . |
gcp-project-id |
ID del proyecto que ejecuta las consultas de BigQuery. |
gcp-region-id |
Región en la que se ejecuta
Trabajos de BigQuery para la validación de la calidad de los datos. Esta región
debe ser la misma que la región de gcp-bq-dataset-id y
target_bigquery_summary_table |
gcp-bq-dataset-id |
Conjunto de datos de BigQuery que se usa para almacenar las vistas rule_binding y los resultados intermedios de resumen de calidad de los datos. |
target-bigquery-summary-table |
La referencia de ID de tabla de la tabla de BigQuery en la que
se almacenan los resultados finales
de las verificaciones de calidad de los datos. No uses el valor de ID dq_summary , ya que está reservado para tareas de procesamiento interno. |
--summary_to_stdout |
Cuando se pasa esta marca, todas las filas de los resultados de validación creadas en la tabla dq_summary en la última ejecución se registran como registros JSON en Cloud Logging y stdout (opcional). |
API
Reemplaza lo siguiente:
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Envía una solicitud HTTP POST:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Consulta también Ejemplo de DAG de Airflow para la tarea de calidad de los datos de Dataplex.
Supervisa una tarea programada para la calidad de los datos
Consulta cómo supervisar tu tarea.
Vea los resultados
Los resultados de las validaciones de calidad de los datos se almacenan en el conjunto de datos de BigQuery y la tabla de resumen que especificaste, como se describe en Crea un conjunto de datos para almacenar los resultados. La tabla de resumen contiene las resumen de resultados para cada combinación de vinculación de reglas y regla para cada validación cuando se ejecute. El resultado en la tabla de resumen incluye la siguiente información:
Nombre de la columna | Descripción |
---|---|
dataplex_lake |
(string) ID del lake Dataplex que contiene la tabla que se está validando. |
dataplex_zone |
(string) ID de la zona de Dataplex que contiene la tabla que se está validando. |
dataplex_asset_id |
(string) ID del recurso de Dataplex que contiene la tabla que se está validando. |
execution_ts |
(marca de tiempo) Marca de tiempo de la ejecución de la consulta de validación. |
rule_binding_id |
(string) ID de la vinculación de regla para la que se informan los resultados de validación. |
rule_id |
(string) ID de la regla en la vinculación de la regla para la que se informan los resultados de validación. |
dimension |
(string) Dimensión de calidad de los datos de rule_id . Este valor solo puede ser uno de los valores especificados en el nodo YAML rule_dimensions . |
table_id |
(string) Es el ID de la entidad para la que se informan los resultados de validación.
Este ID se especifica en el parámetro entity de la
respectiva vinculación de regla. |
column_id |
(cadena) Es el ID de la columna para la que se informan los resultados de validación.
Este ID se especifica en el parámetro column de la
respectiva vinculación de regla. |
last_modified |
(marca de tiempo) La última marca de tiempo modificada del table_id que se valida. |
metadata_json_string |
(string) Pares clave-valor del contenido del parámetro de metadatos especificado en la vinculación de la regla o durante la ejecución de la calidad de los datos. |
configs_hashsum |
La string hash del documento JSON que contiene la vinculación de la regla y todas sus reglas asociadas, vinculaciones de reglas, filtros de fila y configuraciones de entidades.
configs_hashsum permite el seguimiento cuando cambia el contenido de un ID rule_binding o una de sus configuraciones a las que se hace referencia. |
dq_run_id |
El ID único del registro (string). |
invocation_id |
(string) ID de la ejecución de la calidad de los datos. Todos los registros de resumen de calidad de los datos generados dentro de la misma instancia de ejecución de calidad de los datos comparten el mismo invocation_id . |
progress_watermark |
Determina si este registro en particular se considera mediante la verificación de calidad de los datos cuando se determina la marca de agua alta para la validación incremental (booleano). Si es FALSE , el registro correspondiente se ignora cuando se establece el valor de la marca de agua alta. Esta información es útil cuando se ejecutan validaciones de calidad de datos de prueba que no deben avanzar la marca de agua alta. Dataplex propaga este campo con TRUE de forma predeterminada, pero el valor se puede anular si el argumento --progress_watermark tiene un valor de FALSE .
|
rows_validated |
(número entero) Cantidad total de registros validados después de aplicar row_filters y cualquier filtro de marca de agua alta en la columna incremental_time_filter_column_id , si se especifica. |
complex_rule_validation_errors_count |
(número de punto flotante) Cantidad de filas que muestra una regla CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(booleano) Estado correcto de CUSTOM_SQL_STATEMENT reglas.
|
success_count |
(número entero) Cantidad total de registros que aprobaron la validación. Este campo
se estableció en NULL para las reglas CUSTOM_SQL_STATEMENT .
|
success_percentage |
Porcentaje de la cantidad de registros que aprobaron la validación dentro de la cantidad total de registros validados (número de punto flotante). Este campo se establece en NULL para las reglas CUSTOM_SQL_STATEMENT . |
failed_count |
(número entero) Cantidad total de registros que no aprobaron la validación. Este campo
se estableció en NULL para las reglas CUSTOM_SQL_STATEMENT .
|
failed_percentage |
Porcentaje de la cantidad de registros que fallaron en la validación dentro de la cantidad total de registros validados (número de punto flotante). Este campo se establece en NULL para las reglas CUSTOM_SQL_STATEMENT . |
null_count |
(número entero) Cantidad total de registros que mostraron nulos durante la validación.
Este campo se establece en NULL para las reglas NOT_NULL y CUSTOM_SQL_STATEMENT . |
null_percentage |
Porcentaje de la cantidad de registros que mostraron nulo durante la validación dentro de la cantidad total de registros validados (número de punto flotante). Este campo se configura en NULL para las reglas NOT_NULL y CUSTOM_SQL_STATEMENT . |
failed_records_query |
Para cada regla que falla, esta columna almacena una consulta que
puedes usar para obtener registros con errores. En este documento, consulta
Solucionar problemas de reglas con errores en
failed_records_query |
Para las entidades de BigQuery, se crea una vista para cada rule_binding
que contiene la lógica de validación de SQL de la ejecución más reciente. Puedes encontrar estas vistas en el conjunto de datos de BigQuery especificado en el argumento --gcp-bq-dataset-id
.
Optimizaciones de costos
Puedes reducir los costos con las siguientes optimizaciones.
Validaciones incrementales
A menudo, tienes tablas que se actualizan de forma rutinaria con particiones nuevas (filas nuevas). Si no quieres volver a validar particiones antiguas en cada ejecución, puedes usar validaciones incrementales.
Para las validaciones incrementales, debes tener una columna de tipo TIMESTAMP
o DATETIME
en tu tabla donde el valor de la columna aumenta de forma monotónica. Puedes usar las columnas en las que se particiona tu tabla de BigQuery.
Para especificar la validación incremental, indica un valor para
incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
como parte de una vinculación de reglas.
Cuando especificas una columna, la tarea de calidad de los datos considera solo las filas con una
Un valor de TIMESTAMP
mayor que la marca de tiempo de la última tarea de calidad de los datos que
.
Archivos de especificación de ejemplo
Para usar estos ejemplos, crea un conjunto de datos de BigQuery llamado sales
. Luego, crea una tabla de hechos llamada sales_orders
y agrega datos de muestra ejecutando una consulta con las siguientes instrucciones de GoogleSQL:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Ejemplo 1
En la siguiente muestra de código, se crean verificaciones de calidad de los datos para validar estos valores:
amount
: los valores son cero o números positivos.item_id
: una string alfanumérica de 5 caracteres alfabéticos, seguida de 15 dígitos.transaction_currency
: es un tipo de moneda permitido, según lo definido por una lista estática. La lista estática de esta muestra permite GBP y JPY como tipos de moneda. Esta validación solo se aplica a las filas marcadas como internacionales.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Reemplaza lo siguiente:
PROJECT_ID
: el ID de tu proyectoDATASET_ID
: El ID del conjunto de datos.
Muestra 2
Si la tabla que se va a verificar forma parte de un lake de Dataplex, puedes especificar las tablas con la notación de lake o zona. Esto te permite sumar tus resultados por lago o zona. Por ejemplo, puedes generar una puntuación a nivel de zona.
Para usar esta muestra, crea un lake de Dataplex con el ID de lake
operations
y el ID de zona procurement
. Luego, agrega la tabla sales_orders
.
como un recurso a la zona.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Reemplaza lo siguiente:
- PROJECT_ID: el ID de tu proyecto
- REGION_ID: Es el ID de la región del lago de Dataplex en el que existe la tabla, como
us-central1
.
Ejemplo 3
En este ejemplo, se mejora la muestra 2 porque se agrega una verificación de SQL personalizada para ver si los valores de ID son únicos.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Ejemplo 4
Este ejemplo mejora la muestra 3 incorporando validaciones incrementales usando el
last_modified_timestamp
. Puedes agregar validaciones incrementales para una o más vinculaciones de reglas.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Soluciona problemas con las reglas que fallaron con failed_records_query
Por cada regla que falla, la tabla de resumen
almacena una consulta en la columna failed_records_query
que puedes usar para obtener registros con errores.
Para depurar, también puedes usar reference columns
en tu archivo YAML.
que te permite unir la salida de failed_records_query
con
para obtener el registro completo. Por ejemplo, puedes especificar
una columna primary_key
o una columna compuesta primary_key
como
una columna de referencia.
Especificar columnas de referencia
Para generar columnas de referencia, puedes agregar lo siguiente a tu especificación YAML:
La sección
reference_columns
. En esta sección, puedes crear uno o más conjuntos de columnas de referencia, cada uno de los cuales especifica una o más columnas.La sección
rule_bindings
. En esta sección, puedes agregar una línea a una vinculación de reglas que especifique un ID de columna de referencia (reference_columns_id
) para usar en las reglas de esa vinculación. Debe ser uno de los conjuntos de columnas de referencia especificados en la secciónreference_columns
.
Por ejemplo, el siguiente archivo YAML especifica una sección reference_columns
y define tres columnas: id
, last_modified_timestamp
y item_id
como parte del conjunto ORDER_DETAILS_REFERENCE_COLUMNS
. En el siguiente ejemplo, se usa la tabla de muestra sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Usa la consulta de registros con errores
La consulta de registros con errores genera una fila para cada registro que tiene una regla que falló. Incluye el nombre de la columna que activó la falla, el valor que la activó y los valores de las columnas de referencia. También incluye metadatos que puedes usar para relacionar la ejecución de la tarea de calidad de los datos.
El siguiente es un ejemplo del resultado de una consulta de registros con errores para el archivo YAML descrito en Especifica columnas de referencia. Muestra una falla para la columna amount
y un valor con errores de -10
. También registra el valor correspondiente para la columna de referencia.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | cantidad | -10 | FALSE | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Usa consultas de registros con errores para las reglas CUSTOM_SQL_STATEMENT
Para las reglas de CUSTOM_SQL_STATEMENT
, las consultas de registro con errores incluyen la columna custom_sql_statement_validation_errors
. La columna custom_sql_statement_validation_errors
es una columna anidada con campos que coinciden con el resultado de tu instrucción de SQL. Las columnas de referencia no se incluyen en las consultas de registro con errores para las reglas CUSTOM_SQL_STATEMENT
.
Por ejemplo, tu regla CUSTOM_SQL_STATEMENT
podría verse así:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
.
con una fila para cada caso en el que existing_id!=replacement_id
.
Cuando se renderiza en JSON, el contenido de una celda de esta columna podría verse de la siguiente manera:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
Puedes unir estos resultados con la tabla original con una referencia anidada como join on custom_sql_statement_valdation_errors.product_key
.
¿Qué sigue?
- Referencia de la especificación de YAML de CloudDQ
- Ejemplos de reglas de calidad de los datos: Consulta Reglas simples o Reglas avanzadas.
- Ejemplo de una tarea de DAG de Airflow para la calidad de los datos de Dataplex