Linhagem de dados com o Dataplex Universal Catalog

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, explicamos como ativar a integração do rastreamento de dados no Cloud Composer.

Sobre a integração da linhagem de dados

A linhagem de dados é um recurso do Dataplex Universal Catalog que rastreia como os dados se movimentam nos sistemas: origem, destino e quais transformações são aplicadas a eles.

O Cloud Composer usa o pacote apache-airflow-providers-openlineage para gerar os eventos de linhagem que são enviados à API Data Lineage.

Esse pacote já está instalado em ambientes do Cloud Composer. Se você instalar outra versão desse pacote, a lista de operadores compatíveis poderá mudar. Recomendamos fazer isso apenas se necessário e manter a versão pré-instalada do pacote.

  • A linhagem de dados está disponível para ambientes nas mesmas regiões que as regiões do Dataplex Universal Catalog que oferecem suporte a linhagem de dados.

  • Se a linhagem de dados estiver ativada no seu ambiente do Cloud Composer, o Cloud Composer vai informar as informações de linhagem à API Data Lineage para DAGs que usam qualquer um dos operadores compatíveis. Também é possível enviar eventos de linhagem personalizados se você quiser informar a linhagem de um operador que não é compatível.

  • É possível acessar informações de linhagem com:

    • API Data Lineage
    • Gráficos de linhagem para entradas compatíveis no Dataplex Universal Catalog. Para mais informações, consulte Gráficos de linhagem na documentação do Dataplex Universal Catalog.

Quando você cria um ambiente, a integração da linhagem de dados é ativada automaticamente se as seguintes condições forem atendidas:

  • A API Data Lineage está ativada no seu projeto. Para mais informações, consulte Como ativar a API Data Lineage na documentação do Dataplex Universal Catalog.

  • Um back-end de linhagem personalizado não está configurado no Airflow.

É possível desativar a integração do linhagem de dados ao criar um ambiente.

Para um ambiente atual, é possível ativar ou desativar a integração do rastreamento de dados a qualquer momento.

Considerações sobre recursos no Cloud Composer

O Cloud Composer faz uma chamada de RPC para criar eventos de linhagem nos seguintes casos:

  • Quando uma tarefa do Airflow começa ou termina
  • Quando uma execução de DAG começa ou termina

Para detalhes sobre essas entidades, consulte o modelo de informações de linhagem e a referência da API Lineage na documentação do Dataplex Universal Catalog.

O tráfego de linhagem emitido está sujeito a cotas na API Data Lineage. O Cloud Composer consome a cota de gravação.

Os preços associados ao processamento de dados de linhagem estão sujeitos aos preços de linhagem. Consulte considerações sobre a linhagem de dados.

Considerações sobre desempenho no Cloud Composer

A linhagem de dados é informada ao final da execução da tarefa do Airflow. Em média, os relatórios de linhagem de dados levam de 1 a 2 segundos.

Isso não afeta o desempenho da tarefa em si: as tarefas do Airflow não falham se a linhagem não for informada à API Lineage. Não há impacto na lógica principal do operador, mas toda a instância da tarefa é executada um pouco mais para considerar os dados de linhagem de relatórios.

Um ambiente que informa a linhagem de dados terá um pequeno aumento nos custos associados devido ao tempo extra necessário para gerar o relatório.

Compliance

A linhagem de dados oferece diferentes níveis de suporte para recursos como o VPC Service Controls. Analise as considerações sobre linhagem de dados para garantir que os níveis de suporte correspondam aos requisitos do seu ambiente.

Antes de começar

Verificar se um operador é compatível

O suporte à linhagem de dados é fornecido pelo pacote do provedor em que o operador está localizado:

  1. Confira os changelogs do pacote do provedor em que o operador está localizado para entradas que adicionam suporte ao OpenLineage.

    Por exemplo, o BigQueryToBigQueryOperator é compatível com o OpenLineage a partir da apache-airflow-providers-google versão 11.0.0.

  2. Verifique a versão do pacote de provedor usado pelo seu ambiente. Para isso, consulte a lista de pacotes pré-instalados para a build do Airflow usada no seu ambiente. Também é possível instalar uma versão diferente do pacote no seu ambiente.

Além disso, a página Classes compatíveis na documentação do apache-airflow-providers-openlineage lista os operadores compatíveis mais recentes.

Configurar a integração da linhagem de dados

A integração da linhagem de dados para o Cloud Composer é gerenciada por ambiente. Isso significa que a ativação do recurso requer duas etapas:

  1. Ative a API Data Lineage no seu projeto.
  2. Ative a integração da linhagem de dados em um ambiente específico do Cloud Composer.

Ativar a linhagem de dados no Cloud Composer

Console

  1. No console Google Cloud , acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Selecione a guia Configuração do ambiente.

  4. Na seção Integração da linhagem de dados do Dataplex, clique em Editar.

  5. No painel Integração da linhagem de dados do Dataplex, selecione Ativar a integração com a linhagem de dados do Dataplex e clique em Salvar.

gcloud

Use o argumento --enable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.

Exemplo:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Desativar a linhagem de dados no Cloud Composer

Desativar a integração de linhagem em um ambiente do Cloud Composer não desativa a API Data Lineage. Se quiser desativar completamente os relatórios de linhagem do seu projeto, desative também a API Data Lineage. Veja como desativar serviços.

Console

  1. No console Google Cloud , acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Selecione a guia Configuração do ambiente.

  4. Na seção Integração da linhagem de dados do Dataplex, clique em Editar.

  5. No painel Integração da linhagem de dados do Dataplex, selecione Desativar a integração com a linhagem de dados do Dataplex e clique em Salvar.

gcloud

Use o argumento --disable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

Substitua:

  • ENVIRONMENT_NAME: o nome do ambiente;
  • LOCATION: a região em que o ambiente está localizado.

Exemplo:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

Enviar eventos de linhagem em operadores compatíveis

Se a linhagem de dados estiver ativada, os operadores compatíveis vão enviar eventos de linhagem automaticamente. Não é necessário mudar o código do DAG.

Por exemplo, execute a seguinte tarefa:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

Resulta na criação do seguinte gráfico de linhagem na interface do Dataplex Universal Catalog:

Exemplo de gráfico de linhagem na interface do Dataplex.
Figura 1. Exemplo de gráfico de linhagem para uma tabela do BigQuery na interface do Dataplex Universal Catalog.

Enviar eventos de linhagem personalizados

É possível enviar eventos de linhagem personalizados se quiser gerar relatórios de linhagem para um operador que não é compatível com a geração de relatórios de linhagem automatizada.

Por exemplo, para enviar eventos personalizados com:

  • BashOperator: modifique o parâmetro inlets ou outlets na definição da tarefa.
  • PythonOperator: modifique o parâmetro task.inlets ou task.outlets na definição da tarefa.
  • Você pode usar AUTO para o parâmetro inlets. Isso define o valor igual ao outlets da tarefa upstream.

O exemplo a seguir demonstra o uso de entradas e saídas:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

Como resultado, o seguinte gráfico de linhagem é criado na interface do Dataplex Universal Catalog:

Exemplo de gráfico de linhagem para eventos personalizados na interface do Dataplex.
Figura 2. Exemplo de gráfico de linhagem para várias tabelas do BigQuery na interface do Dataplex Universal Catalog.

Ver registros de linhagem no Cloud Composer

É possível inspecionar os registros relacionados à linhagem de dados usando o link na página Configuração do ambiente na seção Integração da linhagem de dados do Dataplex Universal Catalog.

Solução de problemas

Se os dados de linhagem não forem informados à API Lineage ou não aparecerem no Dataplex Universal Catalog, tente as seguintes etapas de solução de problemas:

  • Verifique se a API Data Lineage está ativada no projeto do seu ambiente do Cloud Composer.
  • Verifique se a integração do rastreamento de dados está ativada no ambiente do Cloud Composer.
  • Verifique se o operador usado está incluído no suporte de relatórios de linhagem automatizada. Consulte Operadores do Airflow compatíveis.
  • Verifique os registros de linhagem no Cloud Composer para identificar possíveis problemas.

A seguir