Usar tarefas de qualidade de dados

Neste documento, você vai aprender a criar tarefas do Dataplex Universal Catalog para agendar e executar verificações de qualidade de dados em tabelas integradas ou externas do BigQuery.

Saiba mais em Visão geral das tarefas de qualidade de dados.

Antes de começar

Você precisa ter um lake do Dataplex Universal Catalog para criar a tarefa de qualidade de dados.

Ativar APIs e serviços do Google

  1. Ative a API Dataproc.

    Ativar a API

  2. Ative o Acesso Privado do Google para sua rede ou sub-rede. Ative o Acesso Privado do Google na rede que você planeja usar com as tarefas de qualidade de dados do Dataplex Universal Catalog. Se você não informar uma rede ou sub-rede ao criar a tarefa de qualidade de dados do Dataplex Universal Catalog, a sub-rede padrão será usada. Nesse caso, será necessário ativar o Acesso Privado do Google na sub-rede padrão.

Criar um arquivo de especificação

O Dataplex Universal Catalog usa o CloudDQ de código aberto como programa do driver. Os requisitos de verificação de qualidade de dados do Dataplex Universal Catalog são definidos em arquivos de especificação YAML do CloudDQ.

A entrada da tarefa de qualidade de dados pode ser um arquivo YAML ou um arquivo ZIP contendo um ou mais arquivos YAML. Recomendamos registrar os requisitos de verificação de qualidade de dados em arquivos de especificação YAML separados, com um arquivo para cada seção.

Para preparar um arquivo de especificação, faça o seguinte:

  1. Crie um ou mais arquivos de especificação YAML do CloudDQ que definam os requisitos de verificação de qualidade de dados. Para mais informações sobre a sintaxe necessária, consulte a seção Sobre o arquivo de especificação deste documento.

    Salve o arquivo de especificação YAML no formato .yml ou .yaml. Se você criar mais de um arquivo de especificação YAML, salve todos em um único arquivo ZIP.

  2. Crie um bucket do Cloud Storage.
  3. Faça upload do arquivo de especificação para o bucket do Cloud Storage.

Sobre o arquivo de especificação

O arquivo de especificação YAML do CloudDQ precisa ter estas seções:

  • Regras (definidas sob o nó YAML rules de nível superior): uma lista de regras a serem executadas. É possível criar essas regras com base em tipos de regras predefinidos, como NOT_NULL e REGEX, ou ampliá-las com instruções SQL personalizadas, como CUSTOM_SQL_EXPR e CUSTOM_SQL_STATEMENT. A instrução CUSTOM_SQL_EXPR sinaliza qualquer linha que custom_sql_expr avaliou como False como uma falha. A instrução CUSTOM_SQL_STATEMENT sinaliza qualquer valor retornado pela instrução inteira como uma falha.

  • Filtros de linha (definidos sob o nó YAML row_filters de nível superior): expressões SQL que retornam um valor booleano que define filtros para buscar um subconjunto de dados do assunto da entidade subjacente para validação.

  • Vinculações de regras (definidas no nó YAML rule_bindings de nível superior): define rules e rule filters a serem aplicados às tabelas.

  • Dimensões da regra (definidas no nó YAML rule_dimensions): define a lista de dimensões de regra de qualidade de dados permitidas que uma regra pode definir no campo dimension correspondente.

    Exemplo:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    O campo dimension é opcional para uma regra. A seção de dimensões da regra será obrigatória se dimension estiver listado em alguma regra.

Para mais informações, consulte o guia de referência do CloudDQ e os exemplos de arquivos de especificação.

Criar um conjunto de dados para armazenar os resultados

  • Para armazenar os resultados, crie um conjunto de dados do BigQuery.

    O conjunto de dados precisa estar na mesma região que as tabelas em que você executa a tarefa de qualidade de dados.

    O Dataplex Universal Catalog usa esse conjunto de dados e cria ou reutiliza uma tabela de sua escolha para armazenar os resultados.

Criar uma conta de serviço

Crie uma conta de serviço com os seguintes papéis e permissões do Identity and Access Management (IAM):

Usar configurações avançadas

Estas etapas são opcionais:

  • Por padrão, o BigQuery executa verificações de qualidade de dados no projeto atual. É possível escolher outro projeto para executar os jobs do BigQuery. Use o argumento --gcp_project_id TASK_ARGS para a propriedade --execution-args da tarefa.

  • Se o ID do projeto especificado para executar consultas do BigQuery for diferente do projeto em que a conta de serviço (especificada pelo --execution-service-account) foi criada, desative a política da organização que impede o uso da conta de serviço entre projetos (iam.disableServiceAccountCreation). Além disso, verifique se a conta de serviço pode acessar a agenda de jobs do BigQuery no projeto em que as consultas do BigQuery estão sendo executadas.

Limitações

Todas as tabelas de determinada tarefa de qualidade de dados precisam pertencer à mesma região do Google Cloud.

Agendar uma tarefa de qualidade de dados

Console

  1. No console do Google Cloud , acesse a página Processo do Dataplex Universal Catalog.

    Acesse Processo

  2. Clique em Criar tarefa.
  3. No card Verificar a qualidade dos dados, clique em Criar tarefa.
  4. Em Lake do Dataplex, escolha seu lake.
  5. Em ID, digite um ID.
  6. Na seção Especificação da qualidade de dados, faça o seguinte:
    1. No campo Selecionar arquivo GCS, clique em Procurar.
    2. Selecione o bucket do Cloud Storage.

    3. Clique em Selecionar.

  7. Na seção Tabela de resultados, faça o seguinte:

    1. No campo Selecionar conjunto de dados do BigQuery, clique em Procurar.

    2. Selecione o conjunto de dados do BigQuery para armazenar os resultados da validação.

    3. Clique em Selecionar.

    4. No campo Tabela do BigQuery, digite o nome da tabela para armazenar os resultados. Se a tabela não existir, o Dataplex Universal Catalog vai criá-la. Não use o nome dq_summary porque ele está reservado para tarefas de processamento interno.

  8. Na seção Conta de serviço, selecione uma conta de serviço no menu Conta de serviço de usuário.

  9. Clique em Continuar.

  10. Na seção Definir programação, configure o agendamento para executar a tarefa de qualidade de dados.

  11. Clique em Criar.

gcloud CLI

Confira um exemplo da execução de uma tarefa de qualidade de dados que usa o comando da gcloud CLI de tarefas do Dataplex Universal Catalog:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex Universal Catalog task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex Universal Catalog lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex Universal Catalog lake where your task is created.
export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Universal Catalog Editor, Dataproc Worker, and Service
Usage Consumer.
export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT"

# If you want to use a different dataset for storing the intermediate data quality summary results
and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET"

# The BigQuery dataset where the final results of the data quality checks are stored.
This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_ID}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parâmetro Descrição
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH O caminho da entrada das configurações YAML no Cloud Storage, que será usada na tarefa de qualidade de dados. É possível usar um único arquivo YAML no formato .yml ou .yaml ou um arquivo ZIP com vários arquivos YAML.
GOOGLE_CLOUD_PROJECT O projeto do Google Cloud em que a tarefa do Dataplex Universal Catalog e os jobs do BigQuery são criados.
DATAPLEX_REGION_ID A região do lake do Dataplex Universal Catalog em que a tarefa de qualidade de dados é criada.
SERVICE_ACCOUNT A conta de serviço usada para executar a tarefa. A conta precisa ter as permissões do IAM indicadas na seção Antes de começar.

Para --execution-args, os argumentos a seguir precisam ser transmitidos como argumentos posicionados e, portanto, nesta ordem:

Argumento Descrição
clouddq-executable.zip Um executável pré-compilado em um bucket público do Cloud Storage que foi transmitido no spark-file-uris.
ALL Execute todas as vinculações de regras. Também é possível fornecer vinculações de regras específicas como uma lista separada por vírgulas. Por exemplo, RULE_1,RULE_2.
gcp-project-id ID do projeto que executa as consultas do BigQuery.
gcp-region-id Região para executar os jobs do BigQuery para validação da qualidade de dados. Essa região precisa ser igual à do gcp-bq-dataset-id e do target_bigquery_summary_table.
gcp-bq-dataset-id Conjunto de dados do BigQuery usado para armazenar as visualizações rule_binding e os resultados resumidos da qualidade de dados intermediários.
target-bigquery-summary-table Referência do ID da tabela do BigQuery em que os resultados finais das verificações de qualidade de dados são armazenados. Não use o valor de ID dq_summary porque ele está reservado para tarefas de processamento interno.
--summary_to_stdout (Opcional) Quando essa sinalização é transmitida, todas as linhas de resultados de validação criadas na tabela dq_summary na última execução são registradas como registros JSON no Cloud Logging e stdout.

API

  1. Substitua:

    PROJECT_ID = "Your Dataplex Universal Catalog Project ID"
    REGION = "Your Dataplex Universal Catalog lake region"
    LAKE_ID = "Your Dataplex Universal Catalog lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Envie uma solicitação POST HTTP:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Consulte também Exemplo de DAG do Airflow para tarefa de qualidade de dados do Dataplex Universal Catalog.

Monitorar uma tarefa agendada de qualidade de dados

Saiba como monitorar uma tarefa.

Conferir os resultados

Os resultados das validações de qualidade de dados são armazenados no conjunto de dados e na tabela de resumo do BigQuery especificados, conforme descrito em Criar um conjunto de dados para armazenar os resultados. A tabela contém um resumo do resultado de cada combinação de regra e vinculação de regra para cada execução da validação. O resultado na tabela de resumo inclui as seguintes informações:

Nome da coluna Descrição
dataplex_lake (string) ID do lake do Dataplex Universal Catalog que contém a tabela sendo validada.
dataplex_zone (string) ID da zona do Dataplex Universal Catalog que contém a tabela sendo validada.
dataplex_asset_id (string) ID do recurso do Dataplex Universal Catalog que contém a tabela sendo validada.
execution_ts (carimbo de data/hora) Carimbo da data/hora de execução da consulta.
rule_binding_id (string) ID da vinculação de regra dos resultados de validação.
rule_id (string) ID da regra da vinculação relacionada aos resultados.
dimension (string) Dimensão de qualidade de dados do rule_id. Esse valor só pode ser um dos valores especificados no nó do YAML rule_dimensions.
table_id (string) ID da entidade dos resultados da validação. Esse ID é especificado no parâmetro entity da vinculação da respectiva regra.
column_id (string) ID da coluna dos resultados da validação. Esse ID é especificado no parâmetro column da vinculação da respectiva regra.
last_modified (carimbo de data/hora) O carimbo de data/hora da última modificação do table_id que está sendo validado.
metadata_json_string (string) Pares de chave-valor do conteúdo do parâmetro de metadados especificado sob a vinculação de regra ou durante a execução da qualidade de dados.
configs_hashsum (string) A soma de hash do documento JSON que contém a vinculação de regra e todas as regras associadas, vinculações de regra, filtros de linha e configurações de entidade. Com configs_hashsum, é possível monitorar mudanças no conteúdo de um ID do rule_binding ou de uma das configurações referenciadas.
dq_run_id (string) ID exclusivo do registro.
invocation_id (string) ID da execução da qualidade de dados. Todos os registros de resumo gerados na mesma instância de execução de qualidade de dados têm o mesmo invocation_id.
progress_watermark (booleano) Determina se esse registro específico é considerado pela verificação de qualidade de dados ao determinar a alta marca-d'água para validação incremental. Se for FALSE, o respectivo registro será ignorado ao estabelecer o valor de marca-d'água alto. Essas informações são úteis ao executar validações de qualidade de dados de teste que não devem avançar a marca-d'água alta. O Dataplex Universal Catalog preenche esse campo com TRUE por padrão, mas isso pode ser substituído se o argumento --progress_watermark tem um valor de FALSE.
rows_validated (número inteiro) Número total de registros validados após a aplicação de row_filters e quaisquer filtros de marca-d'água alta na coluna incremental_time_filter_column_id, se especificado.
complex_rule_validation_errors_count (ponto flutuante) Número de linhas retornadas por uma regra CUSTOM_SQL_STATEMENT.
complex_rule_validation_success_flag (booleano) Status de sucesso das regras CUSTOM_SQL_STATEMENT.
success_count (número inteiro) Número total de registros que passaram na validação. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
success_percentage (ponto flutuante) A porcentagem do número de registros que passaram na validação dentro do número total de registros validados. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
failed_count (número inteiro) Número total de registros que não passaram na validação. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
failed_percentage (ponto flutuante) A porcentagem do número de registros que não foram validados dentro do número total de registros validados. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
null_count (número inteiro) Número total de registros que retornaram o valor nulo durante a validação. Esse campo é definido como NULL para regras NOT_NULL e CUSTOM_SQL_STATEMENT.
null_percentage (ponto flutuante) A porcentagem do número de registros que retornaram nulo durante a validação dentro do número total de registros validados. Esse campo está definido como NULL para as regras NOT_NULL e CUSTOM_SQL_STATEMENT.
failed_records_query Para cada regra que falha, esta coluna armazena uma consulta que pode ser usada para receber registros com falha. Neste documento, consulte Resolver falhas de regras usando failed_records_query.

Para entidades do BigQuery, uma visualização é criada para cada rule_binding que contém a lógica de validação SQL da execução mais recente. Essas visualizações podem ser encontradas no conjunto de dados do BigQuery especificado no argumento --gcp-bq-dataset-id.

Otimizações de custo

As tarefas de qualidade de dados são executadas como jobs do BigQuery no seu projeto. Para controlar o custo da execução desses jobs, use os preços do BigQuery no projeto em que seus jobs do BigQuery são executados. Para mais informações, consulte Gerenciamento de carga de trabalho do BigQuery.

Validações incrementais

Muitas vezes, tabelas são atualizadas rotineiramente com novas partições (novas linhas). Se não quiser revalidar as partições antigas em cada execução, use validações incrementais.

Para validações incrementais, é necessário ter uma coluna do tipo TIMESTAMP ou DATETIME na tabela, cujo valor aumenta monotonicamente. Use as colunas em que a tabela do BigQuery está particionada.

Para especificar a validação incremental, especifique um valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column como parte de uma vinculação de regra.

Quando você especifica uma coluna, a tarefa de qualidade de dados considera apenas as linhas com um valor de TIMESTAMP maior que o carimbo de data/hora da última tarefa de qualidade de dados executada.

Exemplos de arquivos de especificação

Para usar os exemplos, crie um conjunto de dados do BigQuery chamado sales. Em seguida, crie uma tabela de fatos chamada sales_orders e adicione dados de amostra executando uma consulta com as seguintes instruções do GoogleSQL:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Exemplo 1

O código a seguir cria verificações de qualidade de dados para validar estes valores:

  • amount: os valores são zero ou números positivos.
  • item_id: uma string alfanumérica de 5 caracteres alfabéticos, seguida de 15 dígitos.
  • transaction_currency: um tipo de moeda permitido, definido por uma lista estática. A lista estática do exemplo permite GBP e JPY como tipos de moeda. Essa validação se aplica somente às linhas marcadas como internacionais.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Substitua:

  • PROJECT_ID: o ID do projeto.
  • DATASET_ID: o ID do conjunto de dados

Exemplo 2

Quando a tabela a ser verificada faz parte de um data lake do Dataplex Universal Catalog, é possível especificar as tabelas usando a notação de lake ou zona. Assim é possível agregar os resultados por lago ou zona. Por exemplo, é possível gerar uma pontuação no nível da zona.

Para usar este exemplo, crie um lake do Dataplex Universal Catalog com o ID operations e o ID da zona procurement. Em seguida, adicione a tabela sales_orders como um recurso à zona.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Substitua:

  • PROJECT_ID: o ID do projeto.
  • REGION_ID: o ID da região do lake do Dataplex Universal Catalog em que a tabela está, como us-central1.

Exemplo 3

Baseado no Exemplo 2, este exemplo adiciona um código SQL personalizado para verificar se os valores de ID são exclusivos.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Exemplo 4

Baseado no Exemplo 3, este exemplo adiciona validações incrementais usando a coluna last_modified_timestamp. É possível adicionar validações incrementais para uma ou mais vinculações de regras.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Resolver falhas de regras com failed_records_query

Para cada regra que falha, a tabela de resumo armazena uma consulta na coluna failed_records_query, que pode ser usada para conferir os registros com falha.

Para depurar, também é possível usar reference columns no arquivo YAML, o que permite mesclar a saída de failed_records_query com os dados originais para acessar o registro inteiro. Por exemplo, é possível especificar uma coluna primary_key ou uma coluna primary_key composta como uma coluna de referência.

Especificar as colunas de referência

Para gerar colunas de referência, adicione o seguinte à especificação YAML:

  1. Seção reference_columns. Nesta seção, é possível criar um ou mais conjuntos de colunas de referência, cada um especificando uma ou mais colunas.

  2. Seção rule_bindings. Nesta seção, é possível adicionar uma linha a uma vinculação de regra que especifica um ID de coluna de referência (reference_columns_id) a ser usado pelas regras da vinculação. Precisa ser um dos conjuntos de colunas de referência especificados na seção reference_columns.

Por exemplo, o arquivo YAML a seguir especifica uma seção reference_columns e define três colunas: id, last_modified_timestamp e item_id como parte do conjunto ORDER_DETAILS_REFERENCE_COLUMNS. No exemplo a seguir, usamos a tabela de amostra sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Como usar a consulta de registros com falha

A consulta de registros com falha gera uma linha para cada registro com uma regra que falhou. Ela inclui o nome da coluna que acionou a falha, o valor que acionou a falha e os valores das colunas de referência. Também inclui metadados que podem ser usados para estabelecer uma relação à execução da tarefa de qualidade de dados.

Confira a seguir um exemplo de saída de uma consulta de registros com falha para o arquivo YAML descrito em Especificar colunas de referência. O exemplo mostra uma falha para a coluna amount e um valor com falha de -10. Também registra o valor correspondente para a coluna de referência.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

Usar consultas de registros com falha para regras CUSTOM_SQL_STATEMENT

Para regras CUSTOM_SQL_STATEMENT, as consultas de registro com falha incluem a coluna custom_sql_statement_validation_errors. A coluna custom_sql_statement_validation_errors é aninhada, com campos que correspondem ao resultado da instrução SQL. As colunas de referência não estão incluídas nas consultas de registros com falha para regras CUSTOM_SQL_STATEMENT.

Por exemplo, sua regra CUSTOM_SQL_STATEMENT pode ser assim:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Os resultados deste exemplo contêm uma ou mais linhas para a coluna custom_sql_statement_validation_errors, com uma linha para cada ocorrência em que existing_id!=replacement_id.

Quando renderizado em JSON, o conteúdo de uma célula na coluna pode ficar assim:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

É possível mesclar esses resultados com a tabela original com uma referência aninhada, como join on custom_sql_statement_valdation_errors.product_key.

A seguir