Usa tareas de calidad de los datos

En este documento, se muestra cómo crear tareas de calidad de los datos de Dataplex que te permiten programar y ejecutar verificaciones de calidad de los datos para tus tablas de BigQuery integradas y externas.

Para obtener más información, consulta la descripción general de las tareas de calidad de los datos.

Antes de comenzar

En este documento, se da por sentado que tienes un lake de Dataplex existente en el que crear la tarea de calidad de los datos.

Habilita las APIs y los servicios de Google

  1. Habilita la API de Dataproc

    Habilitar la API

  2. Habilita el Acceso privado a Google para tu red y subred. Habilita el Acceso privado a Google en la red que planeas usar con las tareas de calidad de los datos de Dataplex. Si no especificas una red o subred cuando creas la tarea de calidad de los datos de Dataplex, esta usa la subred predeterminada. En ese caso, debes habilitar el Acceso privado a Google en la subred predeterminada.

Crea un archivo de especificación

Dataplex usa CloudDQ de código abierto como el programa de controlador. Los requisitos de verificación de calidad de los datos de Dataplex se definen en los archivos de especificación YAML de CloudDQ.

Como entrada para la tarea de calidad de los datos, puedes tener un solo archivo YAML o un solo archivo ZIP que contenga uno o más archivos YAML. Se recomienda capturar los requisitos de verificación de calidad de los datos en archivos de especificación YAML separados, con un archivo para cada sección.

Para preparar un archivo de especificaciones, haz lo siguiente:

  1. Crea uno o más archivos de especificación YAML de CloudDQ que definan los requisitos de verificación de calidad de los datos. Para obtener más información sobre la sintaxis requerida, consulta la sección Acerca del archivo de especificaciones de este documento.

    Guarda el archivo de especificación YAML en formato .yml o .yaml. Si creas varios archivos de especificación YAML, guarda todos los archivos en un solo archivo ZIP.

  2. Crea buckets de Cloud Storage.
  3. Sube el archivo de especificaciones al bucket de Cloud Storage.

Información acerca del archivo de especificación

Tu archivo de especificación YAML de CloudDQ debe tener las siguientes secciones:

  • Reglas (definidas en el nodo YAML rules de nivel superior): Una lista de reglas para ejecutar. Puedes crear estas reglas a partir de tipos de reglas predefinidos, como NOT_NULL y REGEX, o puedes extenderlas con instrucciones de SQL personalizadas, como CUSTOM_SQL_EXPR y CUSTOM_SQL_STATEMENT. La sentencia CUSTOM_SQL_EXPR marca como una falla cualquier fila que custom_sql_expr haya evaluado como False. La declaración CUSTOM_SQL_STATEMENT marca cualquier valor que muestra toda la instrucción como una falla.

  • Filtros de fila (definidos en el nodo YAML row_filters de nivel superior): Expresiones de SQL que muestran un valor booleano que define filtros para recuperar un subconjunto de datos de la entidad subyacente sujeta a validación.

  • Vinculaciones de reglas (definidas en el nodo YAML de nivel superior rule_bindings): Define rules y rule filters para aplicar a las tablas.

  • Dimensiones de la regla (definidas en el nodo YAML de rule_dimensions): Define la lista permitida de dimensiones de la regla de calidad de los datos que una regla puede definir en el campo dimension correspondiente.

    Por ejemplo:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    El campo dimension es opcional para una regla. La sección de dimensiones de las reglas es obligatoria si dimension aparece en cualquier regla.

Para obtener más información, consulta la guía de referencia de CloudDQ y los archivos de especificación de muestra.

Crea un conjunto de datos para almacenar los resultados

  • Para almacenar los resultados, crea un conjunto de datos de BigQuery.

    El conjunto de datos debe estar en la misma región que las tablas en las que ejecutas la tarea de calidad de los datos.

    Dataplex usa este conjunto de datos y crea o reutiliza una tabla de tu elección para almacenar los resultados.

Cree una cuenta de servicio

Crea una cuenta de servicio que tenga los siguientes roles y permisos de Identity and Access Management (IAM):

Opcional: Usa la configuración avanzada

Estos pasos son opcionales:

  1. BigQuery ejecuta controles de calidad de los datos en el proyecto del usuario actual de forma predeterminada. Como alternativa, puedes elegir un proyecto diferente para ejecutar los trabajos de BigQuery mediante el argumento --gcp_project_id TASK_ARGS para la propiedad --execution-args de la tarea.

  2. Si el ID del proyecto especificado para ejecutar consultas de BigQuery es diferente del proyecto en el que se crea la cuenta de servicio (especificada por --execution-service-account), asegúrate de que la política de la organización que inhabilita el uso de la cuenta de servicio entre proyectos (iam.disableServiceAccountCreation) esté desactivada. Además, asegúrate de que la cuenta de servicio pueda acceder a la programación de trabajos de BigQuery en el proyecto en el que se ejecutan las consultas de BigQuery.

Limitaciones

  • Todas las tablas especificadas para una tarea de calidad de los datos determinada deben pertenecer a la misma región de Google Cloud .

Programa una tarea de calidad de los datos

Console

  1. En la consola de Google Cloud , ve a la página Proceso de Dataplex.

    Ve a Proceso

  2. Haz clic en Crear tarea.
  3. En la tarjeta Verificar la calidad de los datos, haz clic en Crear tarea.
  4. En el lake Dataplex, selecciona tu lake.
  5. En ID, ingresa un ID.
  6. En la sección Especificación de calidad de los datos, haz lo siguiente:
    1. En el campo Seleccionar archivo GCS, haz clic en Explorar.
    2. Selecciona tu bucket de Cloud Storage.

    3. Haz clic en Seleccionar.

  7. En la sección Tabla de resultados, haz lo siguiente:

    1. En el campo Selecciona un conjunto de datos de BigQuery, haz clic en Explorar.

    2. Selecciona el conjunto de datos de BigQuery para almacenar los resultados de la validación.

    3. Haz clic en Seleccionar.

    4. En el campo Tabla de BigQuery, ingresa el nombre de la tabla para almacenar los resultados. Si la tabla no existe, Dataplex la creará por ti. No uses el nombre dq_summary porque está reservado para tareas de procesamiento interno.

  8. En la sección Cuenta de servicio, selecciona una cuenta de servicio del menú Cuenta de servicio del usuario.

  9. Haga clic en Continuar.

  10. En la sección Establecer programación, configura la programación para ejecutar la tarea de calidad de los datos.

  11. Haz clic en Crear.

gcloud CLI

El siguiente es un ejemplo de ejecución de una tarea de calidad de los datos que usa el comando de gcloud CLI de Dataplex:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parámetro Descripción
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH La ruta de acceso de Cloud Storage a la entrada de configuración de YAML de calidad de los datos para la tarea de calidad de los datos. Puedes tener un solo archivo YAML en formato .yml o .yaml, o bien un archivo ZIP con varios archivos YAML.
GOOGLE_CLOUD_PROJECT El proyecto de Google Cloud en el que se crean la tarea de Dataplex y los trabajos de BigQuery.
DATAPLEX_REGION_ID Es la región del lake Dataplex en el que se crea la tarea de calidad de los datos.
SERVICE_ACCOUNT La cuenta de servicio que se usa para ejecutar la tarea. Asegúrate de que esta cuenta de servicio tenga los permisos de IAM necesarios, como se describe en la sección Antes de comenzar.

Para --execution-args, los siguientes argumentos deben pasarse como argumentos posicionales y, por lo tanto, en este orden:

Argumento Descripción
clouddq-executable.zip Un ejecutable precompilado que se pasó en spark-file-uris desde un bucket público de Cloud Storage
ALL Ejecuta todas las vinculaciones de reglas. Como alternativa, puedes proporcionar vinculaciones de reglas específicas como una lista separada por comas. Por ejemplo, RULE_1,RULE_2.
gcp-project-id Es el ID del proyecto que ejecuta las consultas de BigQuery.
gcp-region-id Es la región para ejecutar las tareas de BigQuery para la validación de la calidad de los datos. Esta región debe ser la misma que la de gcp-bq-dataset-id y target_bigquery_summary_table.
gcp-bq-dataset-id Conjunto de datos de BigQuery que se usa para almacenar las vistas rule_binding y los resultados intermedios de resumen de calidad de los datos.
target-bigquery-summary-table Es la referencia del ID de la tabla de BigQuery en la que se almacenan los resultados finales de las verificaciones de calidad de los datos. No uses el valor de ID dq_summary, ya que está reservado para tareas de procesamiento interno.
--summary_to_stdout Cuando se pasa esta marca, todas las filas de los resultados de validación creadas en la tabla dq_summary en la última ejecución se registran como registros JSON en Cloud Logging y stdout (opcional).

API

  1. Reemplaza lo siguiente:

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Envía una solicitud HTTP POST:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Consulta también Ejemplo de DAG de Airflow para la tarea de calidad de los datos de Dataplex.

Supervisa una tarea de calidad de los datos programada

Consulta cómo supervisar tu tarea.

Vea los resultados

Los resultados de las validaciones de calidad de los datos se almacenan en el conjunto de datos y la tabla de resumen de BigQuery que especificaste, como se describe en Crea un conjunto de datos para almacenar los resultados. La tabla de resumen contiene el resumen de salida de cada combinación de vinculación de reglas y reglas para cada ejecución de validación. El resultado en la tabla de resumen incluye la siguiente información:

Nombre de la columna Descripción
dataplex_lake (string) ID del lake Dataplex que contiene la tabla que se está validando.
dataplex_zone (string) ID de la zona de Dataplex que contiene la tabla que se está validando.
dataplex_asset_id (string) ID del recurso de Dataplex que contiene la tabla que se está validando.
execution_ts (marca de tiempo) Marca de tiempo de la ejecución de la consulta de validación.
rule_binding_id (string) ID de la vinculación de regla para la que se informan los resultados de validación.
rule_id (string) ID de la regla en la vinculación de la regla para la que se informan los resultados de validación.
dimension (string) Dimensión de calidad de los datos de rule_id. Este valor solo puede ser uno de los valores especificados en el nodo YAML rule_dimensions.
table_id (string) Es el ID de la entidad para la que se informan los resultados de validación. Este ID se especifica en el parámetro entity de la vinculación de reglas correspondiente.
column_id (cadena) Es el ID de la columna para la que se informan los resultados de validación. Este ID se especifica en el parámetro column de la vinculación de reglas correspondiente.
last_modified (marca de tiempo) La última marca de tiempo modificada del table_id que se valida.
metadata_json_string (string) Pares clave-valor del contenido del parámetro de metadatos especificado en la vinculación de la regla o durante la ejecución de la calidad de los datos.
configs_hashsum La string hash del documento JSON que contiene la vinculación de la regla y todas sus reglas asociadas, vinculaciones de reglas, filtros de fila y configuraciones de entidades. configs_hashsum permite el seguimiento cuando cambia el contenido de un ID rule_binding o una de sus configuraciones a las que se hace referencia.
dq_run_id El ID único del registro (string).
invocation_id (string) ID de la ejecución de la calidad de los datos. Todos los registros de resumen de calidad de los datos generados dentro de la misma instancia de ejecución de calidad de los datos comparten el mismo invocation_id.
progress_watermark Determina si este registro en particular se considera mediante la verificación de calidad de los datos cuando se determina la marca de agua alta para la validación incremental (booleano). Si es FALSE, el registro correspondiente se ignora cuando se establece el valor de la marca de agua alta. Esta información es útil cuando se ejecutan validaciones de calidad de datos de prueba que no deben avanzar la marca de agua alta. Dataplex propaga este campo con TRUE de forma predeterminada, pero el valor se puede anular si el argumento --progress_watermark tiene un valor de FALSE.
rows_validated (número entero) Cantidad total de registros validados después de aplicar row_filters y cualquier filtro de marca de agua alta en la columna incremental_time_filter_column_id, si se especifica.
complex_rule_validation_errors_count (número de punto flotante) Cantidad de filas que muestra una regla CUSTOM_SQL_STATEMENT.
complex_rule_validation_success_flag (booleano) Estado de éxito de las reglas CUSTOM_SQL_STATEMENT.
success_count (número entero) Cantidad total de registros que aprobaron la validación. Este campo se establece como NULL para las reglas CUSTOM_SQL_STATEMENT.
success_percentage Porcentaje de la cantidad de registros que aprobaron la validación dentro de la cantidad total de registros validados (número de punto flotante). Este campo se establece como NULL para las reglas CUSTOM_SQL_STATEMENT.
failed_count (número entero) Cantidad total de registros que no aprobaron la validación. Este campo se establece como NULL para las reglas CUSTOM_SQL_STATEMENT.
failed_percentage Porcentaje de la cantidad de registros que fallaron en la validación dentro de la cantidad total de registros validados (número de punto flotante). Este campo se establece como NULL para las reglas CUSTOM_SQL_STATEMENT.
null_count (número entero) Cantidad total de registros que mostraron nulo durante la validación. Este campo se establece como NULL para las reglas NOT_NULL y CUSTOM_SQL_STATEMENT.
null_percentage Porcentaje de la cantidad de registros que mostraron nulo durante la validación dentro de la cantidad total de registros validados (número de punto flotante). Este campo se establece en NULL para las reglas NOT_NULL y CUSTOM_SQL_STATEMENT.
failed_records_query Para cada regla que falle, esta columna almacena una consulta que puedes usar para obtener registros con errores. En este documento, consulta Soluciona problemas de reglas fallidas con failed_records_query.

Para las entidades de BigQuery, se crea una vista para cada rule_binding que contiene la lógica de validación de SQL de la ejecución más reciente. Puedes encontrar estas vistas en el conjunto de datos de BigQuery especificado en el argumento --gcp-bq-dataset-id.

Optimizaciones de costos

Puedes reducir los costos con las siguientes optimizaciones.

Validaciones incrementales

A menudo, tienes tablas que se actualizan de forma rutinaria con particiones nuevas (filas nuevas). Si no quieres volver a validar particiones antiguas en cada ejecución, puedes usar validaciones incrementales.

Para las validaciones incrementales, debes tener una columna de tipo TIMESTAMP o DATETIME en tu tabla en la que el valor de la columna aumente de forma monótona. Puedes usar las columnas en las que se particiona tu tabla de BigQuery.

Para especificar la validación incremental, especifica un valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column como parte de una vinculación de reglas.

Cuando especificas una columna, la tarea de calidad de los datos solo considera las filas con un valor de TIMESTAMP mayor que la marca de tiempo de la última tarea de calidad de los datos que se ejecutó.

Archivos de especificación de ejemplo

Para usar estos ejemplos, crea un conjunto de datos de BigQuery llamado sales. Luego, crea una tabla de hechos llamada sales_orders y agrega datos de muestra ejecutando una consulta con las siguientes instrucciones de GoogleSQL:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Ejemplo 1

En la siguiente muestra de código, se crean verificaciones de calidad de los datos para validar estos valores:

  • amount: los valores son cero o números positivos.
  • item_id: una string alfanumérica de 5 caracteres alfabéticos, seguida de 15 dígitos.
  • transaction_currency: es un tipo de moneda permitido, según lo definido por una lista estática. La lista estática de esta muestra permite GBP y JPY como tipos de moneda. Esta validación solo se aplica a las filas marcadas como internacionales.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Reemplaza lo siguiente:

  • PROJECT_ID: el ID de tu proyecto
  • DATASET_ID: El ID del conjunto de datos.

Muestra 2

Si la tabla que se va a verificar forma parte de un lake de Dataplex, puedes especificar las tablas con la notación de lake o zona. Esto te permite sumar tus resultados por lago o zona. Por ejemplo, puedes generar una puntuación a nivel de la zona.

Para usar este ejemplo, crea un lake de Dataplex con el ID del lake operations y el ID de la zona procurement. Luego, agrega la tabla sales_orders como un recurso a la zona.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Reemplaza lo siguiente:

  • PROJECT_ID: el ID de tu proyecto
  • REGION_ID: Es el ID de la región del lago de Dataplex en el que existe la tabla, como us-central1.

Ejemplo 3

En este ejemplo, se agrega una verificación de SQL personalizada al Ejemplo 2 para ver si los valores de ID son únicos.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Ejemplo 4

En este ejemplo, se mejora el ejemplo 3 agregando validaciones incrementales con la columna last_modified_timestamp. Puedes agregar validaciones incrementales para una o más vinculaciones de reglas.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Soluciona problemas con las reglas que fallaron con failed_records_query

Para cada regla que falle, la tabla de resumen almacenará una consulta en la columna failed_records_query que puedes usar para obtener registros con errores.

Para depurar, también puedes usar reference columns en tu archivo YAML, lo que te permite unir el resultado de failed_records_query con los datos originales para obtener el registro completo. Por ejemplo, puedes especificar una columna primary_key o una columna primary_key compuesta como una columna de referencia.

Especifica las columnas de referencia

Para generar columnas de referencia, puedes agregar lo siguiente a tu especificación YAML:

  1. La sección reference_columns. En esta sección, puedes crear uno o más conjuntos de columnas de referencia, cada uno de los cuales especifica una o más columnas.

  2. La sección rule_bindings. En esta sección, puedes agregar una línea a una vinculación de reglas que especifique un ID de columna de referencia (reference_columns_id) para usar en las reglas de esa vinculación. Debe ser uno de los conjuntos de columnas de referencia especificados en la sección reference_columns.

Por ejemplo, el siguiente archivo YAML especifica una sección reference_columns y define tres columnas: id, last_modified_timestamp y item_id como parte del conjunto ORDER_DETAILS_REFERENCE_COLUMNS. En el siguiente ejemplo, se usa la tabla de muestra sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Usa la consulta de registros con errores

La consulta de registros con errores genera una fila para cada registro que tiene una regla que falló. Incluye el nombre de la columna que activó la falla, el valor que la activó y los valores de las columnas de referencia. También incluye metadatos que puedes usar para relacionar la ejecución de la tarea de calidad de los datos.

El siguiente es un ejemplo del resultado de una consulta de registros con errores para el archivo YAML descrito en Especifica columnas de referencia. Muestra una falla para la columna amount y un valor con errores de -10. También registra el valor correspondiente para la columna de referencia.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE cantidad -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

Usa consultas de registros con errores para las reglas CUSTOM_SQL_STATEMENT

Para las reglas de CUSTOM_SQL_STATEMENT, las consultas de registro con errores incluyen la columna custom_sql_statement_validation_errors. La columna custom_sql_statement_validation_errors es una columna anidada con campos que coinciden con el resultado de tu instrucción de SQL. Las columnas de referencia no se incluyen en las consultas de registro con errores para las reglas CUSTOM_SQL_STATEMENT.

Por ejemplo, tu regla CUSTOM_SQL_STATEMENT podría verse así:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Los resultados de este ejemplo contendrán una o más filas para la columna custom_sql_statement_validation_errors, con una fila para cada ocurrencia en la que existing_id!=replacement_id.

Cuando se renderiza en JSON, el contenido de una celda en esta columna podría verse de la siguiente manera:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

Puedes unir estos resultados a la tabla original con una referencia anidada, como join on custom_sql_statement_valdation_errors.product_key.

¿Qué sigue?