Use tarefas de qualidade dos dados

Este documento mostra como criar tarefas de qualidade de dados do catálogo universal do Dataplex que lhe permitem agendar e executar verificações de qualidade de dados para as suas tabelas do BigQuery incorporadas e externas.

Para mais informações, consulte o artigo Vista geral das tarefas de qualidade dos dados.

Antes de começar

Este documento pressupõe que tem um lake do Dataplex Universal Catalog para criar a tarefa de qualidade de dados.

Ative as APIs e os serviços Google

  1. Ative a API Dataproc.

    Ative a API

  2. Ative o acesso privado da Google para a sua rede e sub-rede. Ative o acesso privado Google na rede que planeia usar com as tarefas de qualidade dos dados do Dataplex Universal Catalog. Se não especificar uma rede ou uma sub-rede quando criar a tarefa de qualidade dos dados do Dataplex Universal Catalog, o Dataplex Universal Catalog usa a sub-rede predefinida. Nesse caso, tem de ativar o acesso privado à Google na sub-rede predefinida.

Crie um ficheiro de especificação

O Dataplex Universal Catalog usa o CloudDQ de código aberto como o programa de controlador. Os requisitos de verificação da qualidade dos dados do Dataplex Universal Catalog são definidos nos ficheiros de especificação YAML do CloudDQ.

Como entrada para a tarefa de qualidade de dados, pode ter um único ficheiro YAML ou um único arquivo ZIP que contenha um ou mais ficheiros YAML. Recomendamos que capture os requisitos de verificação da qualidade dos dados em ficheiros de especificação YAML separados, com um ficheiro para cada secção.

Para preparar um ficheiro de especificação, faça o seguinte:

  1. Crie um ou mais ficheiros de especificação YAML do CloudDQ que definam os requisitos de verificação da qualidade dos dados. Para mais informações sobre a sintaxe necessária, consulte a secção Acerca do ficheiro de especificação deste documento.

    Guarde o ficheiro de especificação YAML no formato .yml ou .yaml. Se criar vários ficheiros de especificação YAML, guarde todos os ficheiros num único arquivo ZIP.

  2. Crie um contentor do Cloud Storage.
  3. Carregue o ficheiro de especificação para o contentor do Cloud Storage.

Acerca do ficheiro de especificação

O ficheiro de especificação YAML do CloudDQ tem de ter as seguintes secções:

  • Regras (definidas no nó YAML de nível superior): uma lista de regras a executar.rules Pode criar estas regras a partir de tipos de regras predefinidos, como NOT_NULL e REGEX, ou pode expandi-las com declarações SQL personalizadas, como CUSTOM_SQL_EXPR e CUSTOM_SQL_STATEMENT. A declaração CUSTOM_SQL_EXPR marca qualquer linha que custom_sql_expr tenha sido avaliada como False como uma falha. A declaração CUSTOM_SQL_STATEMENT marca qualquer valor devolvido pela declaração completa como uma falha.

  • Filtros de linhas (definidos no nó YAML row_filters de nível superior): expressões SQL que devolvem um valor booleano que define filtros para obter um subconjunto de dados da entidade subjacente sujeita a validação.

  • Associações de regras (definidas no nó YAML rule_bindings de nível superior): Define rules e rule filters a aplicar às tabelas.

  • Dimensões das regras (definidas no nó YAML rule_dimensions): define a lista permitida de dimensões das regras de qualidade dos dados que uma regra pode definir no campo dimension correspondente.

    Por exemplo:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    O campo dimension é opcional para uma regra. A secção de dimensões das regras é obrigatória se dimension estiver listado em qualquer regra.

Para mais informações, consulte o guia de referência do CloudDQ e os ficheiros de especificação de exemplo.

Crie um conjunto de dados para armazenar os resultados

  • Para armazenar os resultados, crie um conjunto de dados do BigQuery.

    O conjunto de dados tem de estar na mesma região que as tabelas nas quais executa a tarefa de qualidade dos dados.

    O Dataplex Universal Catalog usa este conjunto de dados e cria ou reutiliza uma tabela à sua escolha para armazenar os resultados.

Criar uma conta de serviço

Crie uma conta de serviço com as seguintes funções e autorizações de gestão de identidade e de acesso (IAM):

Use definições avançadas

Estes passos são opcionais:

  • Por predefinição, o BigQuery executa verificações de qualidade dos dados no projeto atual. Pode escolher um projeto diferente para executar as tarefas do BigQuery. Use o argumento --gcp_project_id TASK_ARGS para a propriedade --execution-args da tarefa.

  • Se o ID do projeto especificado para executar consultas do BigQuery for diferente do projeto no qual a conta de serviço (especificada por --execution-service-account) é criada, certifique-se de que a política da organização que desativa a utilização da conta de serviço em vários projetos (iam.disableServiceAccountCreation) está desativada. Além disso, certifique-se de que a conta de serviço pode aceder à agenda de tarefas do BigQuery no projeto onde as consultas do BigQuery estão a ser executadas.

Limitações

Todas as tabelas especificadas para uma determinada tarefa de qualidade dos dados têm de pertencer à mesma Google Cloud região.

Agende uma tarefa de qualidade de dados

Consola

  1. Na Google Cloud consola, aceda à página Processo do Dataplex Universal Catalog.

    Aceda a Processar

  2. Clique em Criar tarefa.
  3. No cartão Verificar qualidade de dados, clique em Criar tarefa.
  4. Para o Dataplex lake, escolha o seu lake.
  5. Para ID, introduza um ID.
  6. Na secção Especificação de qualidade dos dados, faça o seguinte:
    1. No campo Selecionar ficheiro GCS, clique em Procurar.
    2. Selecione o seu contentor do Cloud Storage.

    3. Clique em Selecionar.

  7. Na secção Tabela de resultados, faça o seguinte:

    1. No campo Selecionar conjunto de dados do BigQuery, clique em Procurar.

    2. Selecione o conjunto de dados do BigQuery para armazenar os resultados da validação.

    3. Clique em Selecionar.

    4. No campo Tabela do BigQuery, introduza o nome da tabela para armazenar os resultados. Se a tabela não existir, o catálogo universal do Dataplex cria-a para si. Não use o nome dq_summary porque está reservado para tarefas de processamento interno.

  8. Na secção Conta de serviço, selecione uma conta de serviço no menu Utilizar conta de serviço.

  9. Clique em Continuar.

  10. Na secção Definir agendamento, configure o agendamento para executar a tarefa de qualidade de dados.

  11. Clique em Criar.

CLI gcloud

Segue-se um exemplo de execução de uma tarefa de qualidade dos dados que usa o comando gcloud CLI de tarefas do Dataplex Universal Catalog:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex Universal Catalog task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex Universal Catalog lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex Universal Catalog lake where your task is created.
export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Universal Catalog Editor, Dataproc Worker, and Service
Usage Consumer.
export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT"

# If you want to use a different dataset for storing the intermediate data quality summary results
and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET"

# The BigQuery dataset where the final results of the data quality checks are stored.
This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_ID}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parâmetro Descrição
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH O caminho do Cloud Storage para as configurações YAML de qualidade dos dados introduzidas para a tarefa de qualidade dos dados. Pode ter um único ficheiro YAML no formato .yml ou .yaml, ou um arquivo ZIP com vários ficheiros YAML.
GOOGLE_CLOUD_PROJECT O projeto Google Cloud onde a tarefa do Dataplex Universal Catalog e as tarefas do BigQuery são criadas.
DATAPLEX_REGION_ID A região do lake do Dataplex Universal Catalog onde a tarefa de qualidade de dados é criada.
SERVICE_ACCOUNT A conta de serviço usada para executar a tarefa. Certifique-se de que esta conta de serviço tem autorizações de IAM suficientes, conforme descrito na secção Antes de começar.

Para --execution-args, os seguintes argumentos têm de ser transmitidos como argumentos posicionais e, por isso, nesta ordem:

Argumento Descrição
clouddq-executable.zip Um executável pré-compilado que foi transmitido spark-file-uris a partir de um contentor do Cloud Storage público.
ALL Executar todas as associações de regras. Em alternativa, pode fornecer associações de regras específicas como uma lista separada por vírgulas. Por exemplo, RULE_1,RULE_2.
gcp-project-id ID do projeto que executa as consultas do BigQuery.
gcp-region-id Região para executar as tarefas do BigQuery para validação da qualidade dos dados. Esta região deve ser igual à região de gcp-bq-dataset-id e target_bigquery_summary_table.
gcp-bq-dataset-id Conjunto de dados do BigQuery que é usado para armazenar as vistas e os resultados do resumo da qualidade dos dados intermédios.rule_binding
target-bigquery-summary-table Referência do ID da tabela do BigQuery onde os resultados finais das verificações de qualidade dos dados são armazenados. Não use o valor do ID dq_summary porque está reservado para tarefas de processamento interno.
--summary_to_stdout (Opcional) Quando esta flag é transmitida, todas as linhas de resultados da validação criadas na tabela dq_summary na última execução são registadas como registos JSON no Cloud Logging e stdout.

API

  1. Substitua o seguinte:

    PROJECT_ID = "Your Dataplex Universal Catalog Project ID"
    REGION = "Your Dataplex Universal Catalog lake region"
    LAKE_ID = "Your Dataplex Universal Catalog lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Envie um pedido HTTP POST:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Veja também o exemplo de DAG do Airflow para a tarefa de qualidade de dados do Dataplex Universal Catalog.

Monitorize uma tarefa de qualidade de dados agendada

Veja como monitorizar a sua tarefa.

Veja os resultados

Os resultados das validações de qualidade dos dados são armazenados no conjunto de dados do BigQuery e na tabela de resumo especificados, conforme descrito em Crie um conjunto de dados para armazenar os resultados. A tabela de resumo contém o resumo da saída para cada combinação de associação de regras e regra para cada execução de validação. A saída na tabela de resumo inclui as seguintes informações:

Nome da coluna Descrição
dataplex_lake (string) ID do lake do Dataplex Universal Catalog que contém a tabela a ser validada.
dataplex_zone (string) ID da zona do Dataplex Universal Catalog que contém a tabela a ser validada.
dataplex_asset_id (string) ID do recurso do Dataplex Universal Catalog que contém a tabela a ser validada.
execution_ts (timestamp) Data/hora em que a consulta de validação foi executada.
rule_binding_id (string) ID da associação de regras para a qual os resultados da validação são comunicados.
rule_id (string) ID da regra na associação de regras para a qual são comunicados os resultados da validação.
dimension (string) Dimensão de qualidade de dados do rule_id. Este valor só pode ser um dos valores especificados no nó YAML rule_dimensions.
table_id (string) ID da entidade para a qual os resultados da validação são comunicados. Este ID é especificado no parâmetro entity da associação de regras respetiva.
column_id (string) ID da coluna para a qual são comunicados os resultados da validação. Este ID é especificado no parâmetro column da associação de regras respetiva.
last_modified (timestamp) A data/hora da última modificação do table_id que está a ser validado.
metadata_json_string (string) Pares de chave-valor do conteúdo do parâmetro de metadados especificado na associação de regras ou durante a execução da qualidade dos dados.
configs_hashsum (string) A soma de hash do documento JSON que contém a associação de regras e todas as regras, associações de regras, filtros de linhas e configurações de entidades associados. configs_hashsum permite a monitorização quando o conteúdo de um ID de rule_binding ou uma das respetivas configurações referenciadas foi alterado.
dq_run_id (string) ID exclusivo do registo.
invocation_id (string) ID da execução da qualidade de dados. Todos os registos do resumo da qualidade dos dados gerados na mesma instância de execução da qualidade dos dados partilham o mesmo invocation_id.
progress_watermark (booleano) Determina se este registo específico é considerado pela verificação da qualidade dos dados para determinar o limite máximo para a validação incremental. Se FALSE, o registo respetivo é ignorado quando se estabelece o valor máximo. Estas informações são úteis quando executa validações de qualidade de dados de teste que não devem avançar o limite máximo. O catálogo universal do Dataplex preenche este campo com TRUE por predefinição, mas o valor pode ser substituído se o argumento --progress_watermark tiver um valor de FALSE.
rows_validated (integer) Total number of records validated after applying row_filters and any high-watermark filters on the incremental_time_filter_column_id column, if specified.
complex_rule_validation_errors_count (float) Número de linhas devolvidas por uma regra CUSTOM_SQL_STATEMENT.
complex_rule_validation_success_flag (booleano) Estado de êxito das regras CUSTOM_SQL_STATEMENT.
success_count (integer) Número total de registos que passaram na validação. Este campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
success_percentage (float) Percentagem do número de registos que passaram na validação no número total de registos validados. Este campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
failed_count (integer) Número total de registos cuja validação falhou. Este campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
failed_percentage (float) Percentagem do número de registos que falharam a validação no número total de registos validados. Este campo está definido como NULL para regras CUSTOM_SQL_STATEMENT.
null_count (integer) Número total de registos que devolveram nulo durante a validação. Este campo está definido como NULL para regras de NOT_NULL e CUSTOM_SQL_STATEMENT.
null_percentage (float) Percentagem do número de registos que devolveram nulo durante a validação no número total de registos validados. Este campo está definido como NULL para regras de NOT_NULL e CUSTOM_SQL_STATEMENT.
failed_records_query Para cada regra com falhas, esta coluna armazena uma consulta que pode usar para obter registos com falhas. Neste documento, consulte a secção Resolva problemas de regras com falhas com failed_records_query.

Para as entidades do BigQuery, é criada uma vista para cada rule_binding que contém a lógica de validação SQL da execução mais recente. Pode encontrar estas vistas no conjunto de dados do BigQuery especificado no argumento --gcp-bq-dataset-id.

Otimizações de custos

As tarefas de qualidade dos dados são executadas como tarefas do BigQuery no seu projeto. Para controlar o custo de execução de tarefas de qualidade de dados, use os preços do BigQuery no projeto onde as tarefas do BigQuery são executadas. Para mais informações, consulte o artigo Gestão da carga de trabalho do BigQuery.

Validações incrementais

Muitas vezes, tem tabelas que são atualizadas rotineiramente com novas partições (novas linhas). Se não quiser revalidar partições antigas em todas as execuções, pode usar validações incrementais.

Para validações incrementais, tem de ter uma coluna do tipo TIMESTAMP ou DATETIME na tabela em que o valor da coluna aumenta monotonicamente. Pode usar as colunas nas quais a tabela do BigQuery está particionada.

Para especificar a validação incremental, especifique um valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column como parte de uma associação de regras.

Quando especifica uma coluna, a tarefa de qualidade dos dados considera apenas as linhas com um valor TIMESTAMP superior à data/hora da última tarefa de qualidade dos dados executada.

Exemplos de ficheiros de especificação

Para usar estes exemplos, crie um conjunto de dados do BigQuery denominado sales. Em seguida, crie uma tabela de factos denominada sales_orders e adicione dados de exemplo executando uma consulta com as seguintes declarações GoogleSQL:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Exemplo 1

O exemplo de código seguinte cria verificações de qualidade dos dados para validar estes valores:

  • amount: os valores são números positivos ou zero.
  • item_id: uma string alfanumérica de 5 carateres alfabéticos, seguida de 15 dígitos.
  • transaction_currency: um tipo de moeda permitido, conforme definido por uma lista estática. A lista estática deste exemplo permite GBP e JPY como tipos de moeda. Esta validação aplica-se apenas às linhas marcadas como internacionais.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Substitua o seguinte:

  • PROJECT_ID: o ID do seu projeto.
  • DATASET_ID: o ID do conjunto de dados.

Exemplo 2

Se a tabela a verificar fizer parte de um lake do Dataplex Universal Catalog, pode especificar as tabelas através da notação de lake ou zona. Isto permite-lhe agregar os resultados por lago ou zona. Por exemplo, pode gerar uma classificação ao nível da zona.

Para usar este exemplo, crie um lago do catálogo universal do Dataplex com o ID do lago operations e o ID da zona procurement. Em seguida, adicione a tabela sales_orders como um recurso à zona.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Substitua o seguinte:

  • PROJECT_ID: o ID do seu projeto.
  • REGION_ID: o ID da região do lake do Dataplex Universal Catalog no qual a tabela existe, como us-central1.

Exemplo 3

Este exemplo melhora o exemplo 2 adicionando uma verificação SQL personalizada para ver se os valores de ID são únicos.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Exemplo 4

Este exemplo melhora o exemplo 3 adicionando validações incrementais através da coluna last_modified_timestamp. Pode adicionar validações incrementais para uma ou mais associações de regras.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Resolva problemas de regras com falhas com failed_records_query

Para cada regra com falhas, a tabela de resumo armazena uma consulta na coluna failed_records_query que pode usar para obter registos com falhas.

Para depurar, também pode usar reference columns no ficheiro YAML, o que lhe permite juntar o resultado de failed_records_query com os dados originais para obter o registo completo. Por exemplo, pode especificar uma coluna primary_key ou uma coluna primary_key composta como uma coluna de referência.

Especifique colunas de referência

Para gerar colunas de referência, pode adicionar o seguinte à especificação YAML:

  1. A secção reference_columns. Nesta secção, pode criar um ou mais conjuntos de colunas de referência, com cada conjunto a especificar uma ou mais colunas.

  2. A secção rule_bindings. Nesta secção, pode adicionar uma linha a uma associação de regras que especifique um ID de coluna de referência (reference_columns_id) a usar para as regras nessa associação de regras. Deve ser um dos conjuntos de colunas de referência especificados na secção reference_columns.

Por exemplo, o ficheiro YAML seguinte especifica uma secção reference_columns e define três colunas: id, last_modified_timestamp e item_id como parte do conjunto ORDER_DETAILS_REFERENCE_COLUMNS. O exemplo seguinte usa a tabela de amostras sales_orders.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Usar a consulta de registos com falhas

A consulta de registos com falhas gera uma linha para cada registo que tenha uma regra que falhou. Inclui o nome da coluna que acionou a falha, o valor que acionou a falha e os valores das colunas de referência. Também inclui metadados que pode usar para relacionar com a execução da tarefa de qualidade dos dados.

Segue-se um exemplo da saída de uma consulta de registos com falha para o ficheiro YAML descrito em Especifique colunas de referência. Apresenta uma falha para a coluna amount e um valor com falha de -10. Também regista o valor correspondente para a coluna de referência.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE valor -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

Use consultas de registos com falhas para regras CUSTOM_SQL_STATEMENT

Para regras de CUSTOM_SQL_STATEMENT, as consultas de registos com falhas incluem a coluna custom_sql_statement_validation_errors. A coluna custom_sql_statement_validation_errors é uma coluna aninhada com campos que correspondem ao resultado da sua declaração SQL. As colunas de referência não estão incluídas nas consultas de registos com falhas para regras de CUSTOM_SQL_STATEMENT.

Por exemplo, a sua regra CUSTOM_SQL_STATEMENT pode ter o seguinte aspeto:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Os resultados deste exemplo vão conter uma ou mais linhas para a coluna custom_sql_statement_validation_errors, com uma linha para cada ocorrência em que existing_id!=replacement_id.

Quando renderizado em JSON, o conteúdo de uma célula nesta coluna pode ter o seguinte aspeto:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

Pode juntar estes resultados à tabela original com uma referência aninhada, como join on custom_sql_statement_valdation_errors.product_key.

O que se segue?