Usar tareas de calidad de los datos

En este documento se explica cómo crear tareas de calidad de los datos de Dataplex Universal Catalog que te permitan programar y ejecutar comprobaciones de calidad de los datos de tus tablas de BigQuery integradas y externas.

Para obtener más información, consulta el artículo Introducción a las tareas de calidad de los datos.

Antes de empezar

En este documento se presupone que ya tienes un lago de Dataplex Universal Catalog para crear la tarea de calidad de los datos.

Habilitar APIs y servicios de Google

  1. Habilita la API de Dataproc.

    Activar la API

  2. Habilita Acceso privado de Google en tu red y subred. Habilita el acceso privado a Google en la red que quieras usar con las tareas de calidad de los datos de Dataplex Universal Catalog. Si no especificas una red o una subred al crear la tarea de calidad de los datos de Dataplex Universal Catalog, Dataplex Universal Catalog usará la subred predeterminada. En ese caso, debes habilitar el acceso privado de Google en la subred predeterminada.

Crear un archivo de especificaciones

Dataplex Universal Catalog usa CloudDQ de código abierto como programa de controlador. Los requisitos de las comprobaciones de calidad de los datos de Dataplex Universal Catalog se definen en archivos de especificación YAML de CloudDQ.

Como entrada de la tarea de calidad de los datos, puede tener un único archivo YAML o un único archivo ZIP que contenga uno o varios archivos YAML. Le recomendamos que registre los requisitos de comprobación de la calidad de los datos en archivos de especificación YAML independientes, uno por sección.

Para preparar un archivo de especificaciones, sigue estos pasos:

  1. Crea uno o varios archivos de especificación YAML de Cloud Data Quality que definan los requisitos de comprobación de calidad de los datos. Para obtener más información sobre la sintaxis necesaria, consulta la sección Acerca del archivo de especificaciones de este documento.

    Guarda el archivo de especificación YAML en formato .yml o .yaml. Si creas varios archivos de especificación YAML, guarda todos los archivos en un único archivo ZIP.

  2. Crea un segmento de Cloud Storage.
  3. Suba el archivo de especificación al depósito de Cloud Storage.

Acerca del archivo de especificación

El archivo de especificación YAML de CloudDQ debe tener las siguientes secciones:

  • Reglas (definidas en el nodo rules de nivel superior de YAML): una lista de reglas que se van a ejecutar. Puedes crear estas reglas a partir de tipos de reglas predefinidos, como NOT_NULL y REGEX, o bien ampliarlas con instrucciones SQL personalizadas, como CUSTOM_SQL_EXPR y CUSTOM_SQL_STATEMENT. La instrucción CUSTOM_SQL_EXPR marca como error cualquier fila que custom_sql_expr evalúe como False. La instrucción CUSTOM_SQL_STATEMENT marca como error cualquier valor devuelto por toda la instrucción.

  • Filtros de filas (definidos en el nodo YAML row_filters de nivel superior): expresiones SQL que devuelven un valor booleano que define filtros para obtener un subconjunto de datos de la entidad subyacente sujeta a validación.

  • Enlaces de reglas (definidos en el nodo rule_bindings YAML de nivel superior): define rules y rule filters que se aplicarán a las tablas.

  • Dimensiones de la regla (definidas en el nodo YAML rule_dimensions): define la lista permitida de dimensiones de la regla de calidad de los datos que una regla puede definir en el campo dimension correspondiente.

    Por ejemplo:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    El campo dimension es opcional en una regla. La sección de dimensiones de la regla es obligatoria si dimension aparece en alguna regla.

Para obtener más información, consulta la guía de referencia de CloudDQ y los archivos de especificación de ejemplo.

Crea un conjunto de datos para almacenar los resultados

  • Para almacenar los resultados, crea un conjunto de datos de BigQuery.

    El conjunto de datos debe estar en la misma región que las tablas en las que ejecute la tarea de calidad de los datos.

    Dataplex Universal Catalog usa este conjunto de datos y crea o reutiliza una tabla de tu elección para almacenar los resultados.

Crear una cuenta de servicio

Crea una cuenta de servicio que tenga los siguientes roles y permisos de gestión de identidades y accesos (IAM):

Usar ajustes avanzados

Estos pasos son opcionales:

  • BigQuery ejecuta comprobaciones de calidad de los datos en el proyecto actual de forma predeterminada. Puedes elegir otro proyecto para ejecutar las tareas de BigQuery. Usa el argumento --gcp_project_id TASK_ARGS en la propiedad --execution-args de la tarea.

  • Si el ID de proyecto especificado para ejecutar consultas de BigQuery es diferente del proyecto en el que se crea la cuenta de servicio (especificada por --execution-service-account), asegúrate de que la política de la organización que inhabilita el uso de cuentas de servicio entre proyectos (iam.disableServiceAccountCreation) esté desactivada. Además, asegúrate de que la cuenta de servicio pueda acceder a la programación de tareas de BigQuery en el proyecto en el que se ejecutan las consultas de BigQuery.

Limitaciones

Todas las tablas especificadas en una tarea de calidad de los datos deben pertenecer a la misma región. Google Cloud

Programar una tarea de calidad de los datos

Consola

  1. En la Google Cloud consola, ve a la página Procesar de Dataplex Universal Catalog.

    Ve a Proceso.

  2. Haz clic en Crear tarea.
  3. En la tarjeta Comprobar la calidad de los datos, haga clic en Crear tarea.
  4. En Lago de Dataplex, elija el lago.
  5. En ID, introduce un ID.
  6. En la sección Especificación de la calidad de los datos, haga lo siguiente:
    1. En el campo Seleccionar archivo de GCS, haz clic en Examinar.
    2. Selecciona tu segmento de Cloud Storage.

    3. Haz clic en Seleccionar.

  7. En la sección Tabla de resultados, haga lo siguiente:

    1. En el campo Seleccionar conjunto de datos de BigQuery, haga clic en Buscar.

    2. Selecciona el conjunto de datos de BigQuery en el que se almacenarán los resultados de la validación.

    3. Haz clic en Seleccionar.

    4. En el campo Tabla de BigQuery, introduce el nombre de la tabla en la que se almacenarán los resultados. Si la tabla no existe, Dataplex Universal Catalog la crea por ti. No uses el nombre dq_summary porque está reservado para tareas de procesamiento internas.

  8. En la sección Cuenta de servicio, selecciona una cuenta de servicio en el menú Cuenta de servicio de usuario.

  9. Haz clic en Continuar.

  10. En la sección Definir programación, configura la programación para ejecutar la tarea de calidad de los datos.

  11. Haz clic en Crear.

CLI de gcloud

A continuación, se muestra un ejemplo de ejecución de una tarea de calidad de los datos que usa el comando de la CLI de gcloud de tareas de Dataplex Universal Catalog:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex Universal Catalog task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex Universal Catalog lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex Universal Catalog lake where your task is created.
export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Universal Catalog Editor, Dataproc Worker, and Service
Usage Consumer.
export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT"

# If you want to use a different dataset for storing the intermediate data quality summary results
and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET"

# The BigQuery dataset where the final results of the data quality checks are stored.
This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_ID}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parámetro Descripción
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH Ruta de Cloud Storage al archivo YAML de configuración de calidad de los datos que se va a usar como entrada en la tarea de calidad de los datos. Puede tener un solo archivo YAML en formato .yml o .yaml, o bien un archivo ZIP que contenga varios archivos YAML.
GOOGLE_CLOUD_PROJECT El Google Cloud proyecto en el que se crean la tarea de Dataplex Universal Catalog y los trabajos de BigQuery.
DATAPLEX_REGION_ID Región del lago de Dataplex Universal Catalog en la que se crea la tarea de calidad de los datos.
SERVICE_ACCOUNT La cuenta de servicio que se usa para ejecutar la tarea. Asegúrate de que esta cuenta de servicio tenga los permisos de IAM suficientes, tal como se indica en la sección Antes de empezar.

En el caso de --execution-args, los siguientes argumentos deben transferirse como argumentos posicionados y, por lo tanto, en este orden:

Argumento Descripción
clouddq-executable.zip Un archivo ejecutable precompilado que se ha transferido spark-file-uris desde un segmento público de Cloud Storage.
ALL Ejecuta todos los enlaces de reglas. También puedes proporcionar enlaces de reglas específicos como una lista separada por comas. Por ejemplo, RULE_1,RULE_2.
gcp-project-id ID del proyecto que ejecuta las consultas de BigQuery.
gcp-region-id Región en la que se ejecutan las tareas de BigQuery para validar la calidad de los datos. Esta región debe ser la misma que la de gcp-bq-dataset-id y target_bigquery_summary_table.
gcp-bq-dataset-id Conjunto de datos de BigQuery que se usa para almacenar las vistas rule_binding y los resultados intermedios del resumen de calidad de los datos.
target-bigquery-summary-table Referencia del ID de la tabla de BigQuery en la que se almacenan los resultados finales de las comprobaciones de calidad de los datos. No utilices el valor de ID dq_summary, ya que está reservado para tareas de procesamiento internas.
--summary_to_stdout (Opcional) Si se incluye esta marca, todas las filas de resultados de validación creadas en la tabla dq_summary en la última ejecución se registrarán como registros JSON en Cloud Logging y stdout.

API

  1. Haz los cambios siguientes:

    PROJECT_ID = "Your Dataplex Universal Catalog Project ID"
    REGION = "Your Dataplex Universal Catalog lake region"
    LAKE_ID = "Your Dataplex Universal Catalog lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Envía una solicitud HTTP POST:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Consulta también el DAG de Airflow de ejemplo para la tarea de calidad de los datos de Dataplex Universal Catalog.

Monitorizar una tarea de calidad de los datos programada

Consulta cómo monitorizar tu tarea.

Ver los resultados

Los resultados de las validaciones de calidad de los datos se almacenan en el conjunto de datos y en la tabla de resumen de BigQuery que hayas especificado, tal como se describe en el artículo Crear un conjunto de datos para almacenar los resultados. La tabla de resumen contiene el resumen de salida de cada combinación de vinculación de reglas y regla de cada ejecución de validación. La tabla de resumen incluye la siguiente información:

Nombre de la columna Descripción
dataplex_lake (cadena) ID del lago de Dataplex Universal Catalog que contiene la tabla que se está validando.
dataplex_zone (cadena) ID de la zona de Dataplex Universal Catalog que contiene la tabla que se está validando.
dataplex_asset_id (cadena) ID del recurso de Dataplex Universal Catalog que contiene la tabla que se está validando.
execution_ts (marca de tiempo) Marca de tiempo que indica cuándo se ejecutó la consulta de validación.
rule_binding_id (cadena) ID de la vinculación de la regla de la que se informan los resultados de validación.
rule_id (cadena) ID de la regla en la vinculación de reglas para la que se informan los resultados de la validación.
dimension (cadena) Dimensión de calidad de los datos de rule_id. Este valor solo puede ser uno de los valores especificados en el nodo YAML rule_dimensions.
table_id (cadena) ID de la entidad de la que se han obtenido los resultados de validación. Este ID se especifica en el parámetro entity de la vinculación de reglas correspondiente.
column_id (cadena) ID de la columna de la que se informan los resultados de validación. Este ID se especifica en el parámetro column de la vinculación de reglas correspondiente.
last_modified (marca de tiempo) Marca de tiempo de la última modificación del table_id que se está validando.
metadata_json_string (cadena) Pares clave-valor del contenido del parámetro de metadatos especificado en el enlace de la regla o durante la ejecución de la calidad de los datos.
configs_hashsum (cadena) Suma hash del documento JSON que contiene la vinculación de la regla y todas las reglas, vinculaciones de reglas, filtros de filas y configuraciones de entidades asociados. configs_hashsum permite hacer un seguimiento cuando cambia el contenido de un ID de rule_binding o de una de sus configuraciones de referencia.
dq_run_id (cadena) ID único del registro.
invocation_id (Cadena) ID de la ejecución de la calidad de los datos. Todos los registros de resumen de calidad de los datos generados en la misma instancia de ejecución de calidad de los datos comparten el mismo invocation_id.
progress_watermark (booleano) Determina si este registro concreto se tiene en cuenta en la comprobación de calidad de los datos para determinar la marca de agua alta de la validación incremental. Si FALSE, el registro correspondiente se ignora al establecer el valor de marca de agua alta. Esta información es útil cuando se ejecutan validaciones de calidad de datos de prueba que no deben avanzar la marca de agua alta. Dataplex Universal Catalog rellena este campo con TRUE de forma predeterminada, pero el valor se puede sustituir si el argumento --progress_watermark tiene el valor FALSE.
rows_validated (entero) Número total de registros validados después de aplicar row_filters y los filtros de marca de agua alta en la columna incremental_time_filter_column_id, si se han especificado.
complex_rule_validation_errors_count (float) Número de filas devueltas por una regla CUSTOM_SQL_STATEMENT.
complex_rule_validation_success_flag (booleano) Estado de éxito de las reglas de CUSTOM_SQL_STATEMENT.
success_count (entero) Número total de registros que han superado la validación. Este campo tiene el valor NULL en las reglas CUSTOM_SQL_STATEMENT.
success_percentage (float) Porcentaje del número de registros que han superado la validación en el número total de registros validados. Este campo se define como NULL para las reglas CUSTOM_SQL_STATEMENT.
failed_count (entero) Número total de registros que no han superado la validación. Este campo tiene el valor NULL en las reglas CUSTOM_SQL_STATEMENT.
failed_percentage (float) Porcentaje del número de registros que no han superado la validación en el número total de registros validados. Este campo se define como NULL para las reglas CUSTOM_SQL_STATEMENT.
null_count (entero) Número total de registros que han devuelto un valor nulo durante la validación. Este campo tiene el valor NULL en las reglas NOT_NULL y CUSTOM_SQL_STATEMENT.
null_percentage (float) Porcentaje del número de registros que han devuelto un valor nulo durante la validación en relación con el número total de registros validados. Este campo tiene el valor NULL en las reglas NOT_NULL y CUSTOM_SQL_STATEMENT.
failed_records_query En cada regla que falla, esta columna almacena una consulta que puedes usar para obtener los registros fallidos. En este documento, consulta la sección Solucionar problemas con reglas fallidas con failed_records_query.

En el caso de las entidades de BigQuery, se crea una vista para cada rule_binding que contiene la lógica de validación de SQL de la última ejecución. Puedes encontrar estas vistas en el conjunto de datos de BigQuery especificado en el argumento --gcp-bq-dataset-id.

Optimizaciones de costes

Las tareas de calidad de los datos se ejecutan como trabajos de BigQuery en tu proyecto. Para controlar el coste de ejecutar tareas de calidad de los datos, usa los precios de BigQuery en el proyecto en el que se ejecuten tus tareas de BigQuery. Para obtener más información, consulta el artículo sobre la gestión de la carga de trabajo de BigQuery.

Validaciones incrementales

A menudo, las tablas se actualizan periódicamente con nuevas particiones (nuevas filas). Si no quieres volver a validar las particiones antiguas en cada ejecución, puedes usar validaciones incrementales.

Para las validaciones incrementales, debe tener una columna de tipo TIMESTAMP o DATETIME en su tabla en la que el valor de la columna aumente de forma monótona. Puedes usar las columnas por las que se ha particionado tu tabla de BigQuery.

Para especificar la validación incremental, indica un valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column como parte de un enlace de regla.

Cuando especifica una columna, la tarea de calidad de los datos solo tiene en cuenta las filas con un valor de TIMESTAMP superior a la marca de tiempo de la última tarea de calidad de los datos que se ejecutó.

Archivos de especificación de ejemplo

Para usar estas muestras, crea un conjunto de datos de BigQuery llamado sales. A continuación, crea una tabla de hechos llamada sales_orders y añade datos de muestra ejecutando una consulta con las siguientes instrucciones de GoogleSQL:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Ejemplo 1

El siguiente ejemplo de código crea comprobaciones de calidad de los datos para validar estos valores:

  • amount: los valores son cero o números positivos.
  • item_id: una cadena alfanumérica de 5 caracteres alfabéticos seguida de 15 dígitos.
  • transaction_currency: un tipo de moneda permitido, tal como se define en una lista estática. La lista estática de este ejemplo permite usar GBP y JPY como tipos de moneda. Esta validación solo se aplica a las filas marcadas como internacionales.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Haz los cambios siguientes:

  • PROJECT_ID: tu ID de proyecto.
  • DATASET_ID: el ID del conjunto de datos.

Muestra 2

Si la tabla que se va a comprobar forma parte de un lago de Dataplex Universal Catalog, puede especificar las tablas mediante la notación de lago o de zona. De esta forma, puedes agregar los resultados por lago o zona. Por ejemplo, puedes generar una puntuación a nivel de zona.

Para usar este ejemplo, crea un lago de Dataplex Universal Catalog con el ID de lago operations y el ID de zona procurement. A continuación, añada la tabla sales_orders como recurso a la zona.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Haz los cambios siguientes:

  • PROJECT_ID: tu ID de proyecto.
  • REGION_ID: el ID de la región del lago de Dataplex Universal Catalog en el que se encuentra la tabla, como us-central1.

Ejemplo 3

En este ejemplo se mejora el ejemplo 2 añadiendo una comprobación SQL personalizada para ver si los valores de ID son únicos.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Ejemplo 4

En este ejemplo se mejora el ejemplo 3 añadiendo validaciones incrementales mediante la columna last_modified_timestamp. Puedes añadir validaciones incrementales para una o varias vinculaciones de reglas.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Solucionar problemas con las reglas fallidas con failed_records_query

Por cada regla que no se cumpla, la tabla de resumen almacena una consulta en la columna failed_records_query que puedes usar para obtener los registros fallidos.

Para depurar, también puedes usar reference columns en tu archivo YAML, lo que te permite combinar el resultado de failed_records_query con los datos originales para obtener el registro completo. Por ejemplo, puedes especificar una columna primary_key o una columna primary_key compuesta como columna de referencia.

Especificar columnas de referencia

Para generar columnas de referencia, puede añadir lo siguiente a su especificación YAML:

  1. La sección reference_columns. En esta sección, puede crear uno o varios conjuntos de columnas de referencia, y cada conjunto puede especificar una o varias columnas.

  2. La sección rule_bindings. En esta sección, puede añadir una línea a un enlace de regla que especifique un ID de columna de referencia (reference_columns_id) que se usará en las reglas de ese enlace. Debe ser uno de los conjuntos de columnas de referencia especificados en la sección reference_columns.

Por ejemplo, el siguiente archivo YAML especifica una sección reference_columns y define tres columnas: id, last_modified_timestamp y item_id como parte del conjunto ORDER_DETAILS_REFERENCE_COLUMNS. En el siguiente ejemplo se usa la tabla de ejemplo sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Usar la consulta de registros fallidos

La consulta de registros fallidos genera una fila por cada registro que tenga una regla que haya fallado. Incluye el nombre de la columna que ha provocado el error, el valor que lo ha provocado y los valores de las columnas de referencia. También incluye metadatos que puedes usar para relacionarlos con la ejecución de la tarea de calidad de los datos.

A continuación, se muestra un ejemplo de la salida de una consulta de registros fallida para el archivo YAML descrito en Especificar columnas de referencia. Se muestra un error en la columna amount y el valor -10. También registra el valor correspondiente de la columna de referencia.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSO order1 2022-01-22T02:30:06.321Z bad_item_id

Usar consultas de registros fallidos para reglas de CUSTOM_SQL_STATEMENT

En el caso de las reglas de CUSTOM_SQL_STATEMENT, las consultas de registros fallidas incluyen la columna custom_sql_statement_validation_errors. La columna custom_sql_statement_validation_errors es una columna anidada con campos que coinciden con el resultado de tu instrucción SQL. Las columnas de referencia no se incluyen en las consultas de registros fallidos de reglas de CUSTOM_SQL_STATEMENT.

Por ejemplo, tu regla CUSTOM_SQL_STATEMENT podría tener este aspecto:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Los resultados de este ejemplo contendrán una o varias filas de la columna custom_sql_statement_validation_errors, con una fila por cada vez que aparezca existing_id!=replacement_id.

Cuando se representa en JSON, el contenido de una celda de esta columna puede tener este aspecto:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

Puedes combinar estos resultados con la tabla original mediante una referencia anidada, como join on custom_sql_statement_valdation_errors.product_key.

Siguientes pasos