Usar tarefas de qualidade de dados

Este documento mostra como criar tarefas de qualidade de dados do Dataplex que permitem programar e executar verificações de qualidade de dados para tabelas internas e externas do BigQuery.

Para mais informações, consulte Visão geral das tarefas de qualidade de dados.

Antes de começar

Este documento pressupõe que você tenha um lake do Dataplex para criar a tarefa de qualidade de dados.

Ativar APIs e serviços do Google

  1. Ativar a API Dataproc.

    Ativar a API

  2. Ative o Acesso privado do Google para sua rede e sub-rede. Ative o Acesso privado do Google na rede que você planeja usar com as tarefas de qualidade de dados do Dataplex. Se você não especificar uma rede ou sub-rede ao criar a tarefa de qualidade de dados do Dataplex, vai ser usada a sub-rede padrão. Nesse caso, é necessário ativar o Acesso privado do Google na sub-rede padrão.

Criar um arquivo de especificação

O Dataplex usa o CloudDQ de código aberto como programa do driver. Os requisitos de verificação de qualidade de dados do Dataplex são definidos nos arquivos de especificação YAML do CloudDQ.

Como entrada para a tarefa de qualidade de dados, é possível ter um único arquivo YAML ou um único arquivo ZIP contendo um ou mais arquivos YAML. É recomendado capturar os requisitos de verificação de qualidade de dados em arquivos de especificação YAML separados, com um arquivo para cada seção.

Para preparar um arquivo de especificação, faça o seguinte:

  1. Crie um ou mais arquivos de especificação YAML do CloudDQ que definam os requisitos de verificação de qualidade de dados. Para mais informações sobre a sintaxe necessária, consulte a seção Sobre o arquivo de especificação deste documento.

    Salve o arquivo de especificação YAML no formato .yml ou .yaml. Se você criar vários arquivos de especificação YAML, salve todos em um único arquivo ZIP.

  2. Crie um bucket do Cloud Storage.
  3. Faça upload do arquivo de especificação para o bucket do Cloud Storage.

Sobre o arquivo de especificação

Seu arquivo de especificação YAML do CloudDQ precisa ter estas três seções:

  • Regras (definidas sob o nó YAML rules de nível superior): uma lista de regras a serem executadas. É possível criar essas regras com base em tipos de regras predefinidos, como NOT_NULL e REGEX, ou ampliá-las com instruções SQL personalizadas, como CUSTOM_SQL_EXPR e CUSTOM_SQL_STATEMENT. A instrução CUSTOM_SQL_EXPR sinaliza qualquer linha que custom_sql_expr avaliou para o False como falha. A instrução CUSTOM_SQL_STATEMENT sinaliza qualquer valor retornado pela instrução inteira como uma falha.

  • Filtros de linha (definidos sob o nó YAML row_filters de nível superior): expressões SQL que retornam um valor booleano que define filtros para buscar um subconjunto de dados do assunto da entidade subjacente para validação.

  • Vinculações de regras (definidas no nó YAML rule_bindings de nível superior): define rules e rule filters a serem aplicados às tabelas.

  • Dimensões da regra (definidas no nó YAML rule_dimensions): define a lista de dimensões de regra de qualidade de dados permitidas que uma regra pode definir no campo dimension correspondente.

    Exemplo:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    O campo dimension é opcional para uma regra. A seção de dimensões da regra será obrigatória se dimension estiver listado em qualquer regra.

Para mais informações, consulte o guia de referência do CloudDQ e os arquivos de especificação de amostra.

Crie um conjunto de dados para armazenar os resultados

  • Para armazenar os resultados, crie um conjunto de dados do BigQuery.

    O conjunto de dados precisa estar na mesma região que as tabelas em que você executa a tarefa de qualidade de dados.

    O Dataplex usa esse conjunto de dados e cria ou reutiliza uma tabela de sua escolha para armazenar os resultados.

Crie uma conta de serviço

Crie uma conta de serviço com os seguintes papéis e permissões do Identity and Access Management (IAM):

Opcional: usar configurações avançadas

Estas etapas são opcionais:

  1. Por padrão, o BigQuery executa verificações de qualidade de dados no projeto do usuário atual. Como alternativa, é possível escolher um projeto diferente para executar os jobs do BigQuery usando o argumento --gcp_project_id TASK_ARGS para a propriedade --execution-args da tarefa.

  2. Se o ID do projeto especificado para executar consultas do BigQuery for diferente do projeto em que a conta de serviço (especificada pelo --execution-service-account) foi criada, verifique se a política da organização que desativa o uso da conta de serviço entre projetos (iam.disableServiceAccountCreation) está desativada. Além disso, verifique se a conta de serviço pode acessar a programação de jobs do BigQuery no projeto em que as consultas do BigQuery estão sendo executadas.

Limitações

  • Todas as tabelas especificadas para uma determinada tarefa de qualidade de dados precisam pertencer à mesma região do Google Cloud .

Programar uma tarefa de qualidade de dados

Console

  1. No console do Google Cloud , acesse a página Processo do Dataplex.

    Acesse Processo.

  2. Clique em Criar tarefa.
  3. No card Verificar qualidade de dados, clique em Criar tarefa.
  4. Em Dataplex lake, escolha seu lake.
  5. Em ID, insira um ID.
  6. Na seção Especificação da qualidade de dados, faça o seguinte:
    1. No campo Selecionar arquivo GCS, clique em Procurar.
    2. Selecione o bucket do Cloud Storage.

    3. Clique em Selecionar.

  7. Na seção Tabela de resultados, faça o seguinte:

    1. No campo Selecionar conjunto de dados do BigQuery, clique em Procurar.

    2. Selecione o conjunto de dados do BigQuery para armazenar os resultados da validação.

    3. Clique em Selecionar.

    4. No campo Tabela do BigQuery, insira o nome da tabela para armazenar os resultados. Se a tabela não existir, o Dataplex vai criar uma para você. Não use o nome dq_summary, porque ele é reservado para tarefas de processamento interno.

  8. Na seção Conta de serviço, selecione uma conta de serviço no menu Conta de serviço de usuário.

  9. Clique em Continuar.

  10. Na seção Definir programação, configure a programação para executar a tarefa de qualidade de dados.

  11. Clique em Criar.

CLI da gcloud

Veja a seguir um exemplo de execução de uma tarefa de qualidade de dados que usa o comando da CLI gcloud de tarefas do Dataplex:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex lake where your task is created.
export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Editor, Dataproc Worker, and Service
Usage Consumer.

# The BigQuery dataset used for storing the intermediate data
quality summary results and the BigQuery views associated with
each rule binding.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET

# The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_NAME}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parâmetro Descrição
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH O caminho do Cloud Storage para a entrada de configurações do YAML de qualidade de dados para a tarefa de qualidade de dados. É possível ter um único arquivo YAML no formato .yml ou .yaml ou um arquivo ZIP com vários arquivos YAML.
GOOGLE_CLOUD_PROJECT O projeto do Google Cloud em que a tarefa do Dataplex e os jobs do BigQuery são criados.
DATAPLEX_REGION_ID A região do lake do Dataplex em que a tarefa de qualidade de dados é criada.
SERVICE_ACCOUNT A conta de serviço usada para executar a tarefa. Verifique se essa conta de serviço tem permissões do IAM suficientes, conforme descrito na seção Antes de começar.

Para --execution-args, os argumentos a seguir precisam ser transmitidos como argumentos posicionados e, portanto, nesta ordem:

Argumento Descrição
clouddq-executable.zip Um executável pré-compilado que foi transmitido no spark-file-uris de um bucket público do Cloud Storage.
ALL Execute todas as vinculações de regras. Também é possível fornecer vinculações de regras específicas como uma lista separada por vírgulas. Por exemplo, RULE_1,RULE_2.
gcp-project-id ID do projeto que executa as consultas do BigQuery.
gcp-region-id Região para executar os jobs do BigQuery para validação da qualidade de dados. Essa região precisa ser igual à do gcp-bq-dataset-id e do target_bigquery_summary_table.
gcp-bq-dataset-id Conjunto de dados do BigQuery usado para armazenar as visualizações rule_binding e os resultados resumidos da qualidade de dados intermediários.
target-bigquery-summary-table Referência do ID da tabela da tabela do BigQuery em que os resultados finais das verificações de qualidade de dados são armazenados. Não use o valor do ID dq_summary, porque ele é reservado para tarefas de processamento interno.
--summary_to_stdout (Opcional) Quando essa sinalização é transmitida, todas as linhas de resultados de validação criadas na tabela dq_summary na última execução são registradas como registros JSON no Cloud Logging e stdout.

API

  1. Substitua:

    PROJECT_ID = "Your Dataplex Project ID"
    REGION = "Your Dataplex lake region"
    LAKE_ID = "Your Dataplex lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Envie uma solicitação POST HTTP:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Consulte também Exemplo de DAG do Airflow para tarefa de qualidade de dados do Dataplex.

Monitorar uma tarefa programada de qualidade de dados

Veja como monitorar sua tarefa.

Ver os resultados

Os resultados das validações de qualidade de dados são armazenados no conjunto de dados e na tabela de resumo do BigQuery que você especificou, conforme descrito em Criar um conjunto de dados para armazenar os resultados. A tabela de resumo contém o resumo da saída de cada combinação de vinculação de regras e regra para cada execução de validação. A saída na tabela de resumo inclui as seguintes informações:

Nome da coluna Descrição
dataplex_lake (string) ID do lake do Dataplex que contém a tabela que está sendo validada.
dataplex_zone (string) ID da zona do Dataplex que contém a tabela que está sendo validada.
dataplex_asset_id (string) ID do recurso do Dataplex que contém a tabela que está sendo validada.
execution_ts (carimbo de data/hora) Carimbo de data/hora de quando a consulta de validação foi executada.
rule_binding_id (string) ID da vinculação de regra para a qual os resultados de validação são informados.
rule_id (string) ID da regra sob a vinculação de regra para a qual os resultados de validação são informados.
dimension (string) Dimensão de qualidade de dados do rule_id. Esse valor só pode ser um dos valores especificados no nó do YAML rule_dimensions.
table_id (string) ID da entidade para que os resultados de validação são informados. Esse ID é especificado no parâmetro entity da vinculação da respectiva regra.
column_id (string) ID da coluna para que os resultados da validação informados. Esse ID é especificado no parâmetro column da vinculação da respectiva regra.
last_modified (carimbo de data/hora) O último carimbo de data/hora modificado do table_id que está sendo validado.
metadata_json_string (string) Pares de chave-valor do conteúdo do parâmetro de metadados especificado sob a vinculação de regra ou durante a execução da qualidade de dados.
configs_hashsum (string) A soma de hash do documento JSON que contém a vinculação de regra e todas as regras associadas, vinculações de regra, filtros de linha e configurações de entidade. configs_hashsum permite rastrear quando o conteúdo de um ID rule_binding ou uma das configurações referenciadas é alterado.
dq_run_id (string) ID exclusivo do registro.
invocation_id (string) ID da execução da qualidade de dados. Todos os registros de resumo da qualidade de dados gerados na mesma instância de execução de qualidade de dados compartilham o mesmo invocation_id.
progress_watermark (booleano) Determina se esse registro específico é considerado pela verificação de qualidade de dados ao determinar a alta marca d'água para validação incremental. Se FALSE, o respectivo registro será ignorado ao estabelecer o valor de marca-d'água alto. Essas informações são úteis ao executar validações de qualidade de dados de teste que não devem avançar a marca d'água alta. O Dataplex preenche esse campo com TRUE por padrão, mas isso pode ser substituído se o argumento --progress_watermark tiver um valor de FALSE.
rows_validated (número inteiro) Número total de registros validados após a aplicação de row_filters e quaisquer filtros de marca-d'água alta na coluna incremental_time_filter_column_id, se especificado.
complex_rule_validation_errors_count (float) Número de linhas retornadas por uma regra CUSTOM_SQL_STATEMENT.
complex_rule_validation_success_flag (booleano) Status de sucesso das regras CUSTOM_SQL_STATEMENT.
success_count (número inteiro) Número total de registros que passaram na validação. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
success_percentage (ponto flutuante) A porcentagem do número de registros que passaram na validação dentro do número total de registros validados. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
failed_count (número inteiro) Número total de registros que não passaram na validação. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
failed_percentage (ponto flutuante) A porcentagem do número de registros que não foram validados dentro do número total de registros validados. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
null_count (número inteiro) Número total de registros que retornaram o valor nulo durante a validação. Esse campo é definido como NULL para regras NOT_NULL e CUSTOM_SQL_STATEMENT.
null_percentage (ponto flutuante) A porcentagem do número de registros que retornaram nulo durante a validação dentro do número total de registros validados. Esse campo está definido como NULL para as regras NOT_NULL e CUSTOM_SQL_STATEMENT.
failed_records_query Para cada regra que falha, esta coluna armazena uma consulta que pode ser usada para receber registros com falha. Neste documento, consulte Resolver problemas de regras com falhas usando failed_records_query.

Para entidades do BigQuery, uma visualização é criada para cada rule_binding que contém a lógica de validação SQL da execução mais recente. Essas visualizações podem ser encontradas no conjunto de dados do BigQuery especificado no argumento --gcp-bq-dataset-id.

Otimizações de custo

Você pode ajudar a reduzir custos com as otimizações a seguir.

Validações incrementais

Muitas vezes, você tem tabelas atualizadas rotineiramente com novas partições (novas linhas). Se não quiser revalidar as partições antigas em cada execução, use validações incrementais.

Para validações incrementais, é necessário ter uma coluna do tipo TIMESTAMP ou DATETIME na tabela em que o valor da coluna aumenta monotonicamente. Use as colunas em que a tabela do BigQuery está particionada.

Para especificar a validação incremental, especifique um valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column como parte de uma vinculação de regra.

Quando você especifica uma coluna, a tarefa de qualidade de dados considera apenas as linhas com um valor de TIMESTAMP maior que o carimbo de data/hora da última tarefa de qualidade de dados executada.

Exemplos de arquivos de especificação

Para usar esses exemplos, crie um conjunto de dados do BigQuery chamado sales. Em seguida, crie uma tabela de fatos chamada sales_orders e adicione dados de amostra executando uma consulta com as seguintes instruções do GoogleSQL:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Exemplo 1

O exemplo de código a seguir cria verificações de qualidade de dados para validar esses valores:

  • amount: os valores são zero ou números positivos.
  • item_id: uma string alfanumérica de cinco caracteres alfabéticos, seguida de 15 dígitos.
  • transaction_currency: um tipo de moeda permitido, conforme definido por uma lista estática. A lista estática dessa amostra permite GBP e JPY como tipos de moeda. Essa validação se aplica somente às linhas marcadas como internacionais.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Substitua:

  • PROJECT_ID: o ID do projeto.
  • DATASET_ID: o código do conjunto de dados.

Sample 2

Se a tabela a ser verificada fizer parte de um lake do Dataplex, você poderá especificar as tabelas usando a notação de lake ou zona. Isso permite agregar os resultados por lago ou zona. Por exemplo, é possível gerar uma pontuação no nível da zona.

Para usar este exemplo, crie um lake do Dataplex com o ID operations e o ID da zona procurement. Em seguida, adicione a tabela sales_orders como um recurso à zona.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Substitua:

  • PROJECT_ID: o ID do projeto.
  • REGION_ID: o ID da região do lago do Dataplex em que a tabela existe, como us-central1.

Exemplo 3

Este exemplo aprimora o Exemplo 2 adicionando uma verificação SQL personalizada para saber se os valores de ID são exclusivos.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Exemplo 4

Este exemplo melhora o Exemplo 3 adicionando validações incrementais usando a coluna last_modified_timestamp. É possível adicionar validações incrementais para uma ou mais vinculações de regras.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Resolver problemas de regras com falhas usando failed_records_query

Para cada regra que falha, a tabela de resumo armazena uma consulta na coluna failed_records_query que pode ser usada para receber registros com falha.

Para depurar, também é possível usar reference columns no arquivo YAML, o que permite mesclar a saída de failed_records_query com os dados originais para acessar o registro inteiro. Por exemplo, é possível especificar uma coluna primary_key ou uma coluna primary_key composta como uma coluna de referência.

Especificar as colunas de referência

Para gerar colunas de referência, adicione o seguinte à sua especificação YAML:

  1. Seção reference_columns. Nesta seção, é possível criar um ou mais conjuntos de colunas de referência, cada um especificando uma ou mais colunas.

  2. Seção rule_bindings. Nesta seção, é possível adicionar uma linha a uma vinculação de regra que especifica um ID de coluna de referência (reference_columns_id) a ser usado pelas regras nessa vinculação de regra. Precisa ser um dos conjuntos de colunas de referência especificados na seção reference_columns.

Por exemplo, o arquivo YAML a seguir especifica uma seção reference_columns e define três colunas: id, last_modified_timestamp e item_id como parte do conjunto ORDER_DETAILS_REFERENCE_COLUMNS. O exemplo a seguir usa a tabela de exemplo sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Como usar a consulta de registros com falha

A consulta de registros com falha gera uma linha para cada registro com uma regra que falhou. Isso inclui o nome da coluna que acionou a falha, o valor que acionou a falha e os valores das colunas de referência. Também inclui metadados que podem ser usados para estabelecer uma relação à execução da tarefa de qualidade de dados.

Veja a seguir um exemplo de saída de uma consulta de registros com falha para o arquivo YAML descrito em Especificar colunas de referência. O exemplo mostra uma falha para a coluna amount e um valor com falha de -10. Também registra o valor correspondente para a coluna de referência.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSO order1 2022-01-22T02:30:06.321Z bad_item_id

Usar consultas de registros com falha para regras CUSTOM_SQL_STATEMENT

Para regras CUSTOM_SQL_STATEMENT, as consultas de registro com falha incluem a coluna custom_sql_statement_validation_errors. A coluna custom_sql_statement_validation_errors é aninhada com campos que correspondem à saída da instrução SQL. As colunas de referência não estão incluídas nas consultas de registros com falha para regras CUSTOM_SQL_STATEMENT.

Por exemplo, sua regra CUSTOM_SQL_STATEMENT pode ser assim:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Os resultados deste exemplo vão conter uma ou mais linhas para a coluna custom_sql_statement_validation_errors, com uma linha para cada ocorrência em que existing_id!=replacement_id.

Quando renderizada em JSON, o conteúdo de uma célula na coluna pode ter a seguinte aparência:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

É possível mesclar esses resultados com a tabela original com uma referência aninhada, como join on custom_sql_statement_valdation_errors.product_key.

A seguir