Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Sobre a integração da linhagem de dados
A linhagem de dados é um Dataplex. que permite acompanhar como os dados se movem pelos sistemas: de onde eles vêm de onde é passado e quais transformações são aplicadas a ele. A linhagem de dados está disponível para:
Ambientes do Cloud Composer 2 que executam as versões 2.1.2 e mais recentes com as versões 2.2.5 e mais recentes do Airflow.
Ambientes do Cloud Composer 2 nas mesmas regiões que as regiões do Data Catalog que oferecem suporte à linhagem de dados.
Depois que o recurso for ativado no ambiente do Cloud Composer, executar Os DAGs que usam qualquer um dos operadores compatíveis causam Cloud Composer para enviar informações de linhagem à API Data Lineage.
Você pode acessar essas informações usando:
- API Data Lineage
- Gráficos de visualização de linhagem para entradas compatíveis do Data Catalog no Dataplex. Consulte Gráficos de visualização de linhagem na documentação do Dataplex.
Operadores compatíveis
Os seguintes operadores são compatíveis com relatórios de linhagem automáticos em O Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Por exemplo, execute a seguinte tarefa:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Resultados na criação do seguinte gráfico de linhagem na interface do Dataplex:
Considerações sobre recursos do Cloud Composer
Cada execução de tarefa do Airflow que relata a linhagem de dados executa:
- Uma solicitação de RPC de criação ou atualização para um processo de linhagem
- Uma solicitação de criação ou atualização de RPC para uma execução de linhagem
- Uma ou mais solicitações de RPC para criar eventos de linhagem (na maioria das vezes, 0 ou 1)
Para detalhes sobre essas entidades, consulte o modelo de informações de linhagem e a referência da API de linhagem na documentação do Dataplex.
O tráfego de linhagem emitido está sujeito a cotas na API Data Lineage. O Cloud Composer consome a cota de gravação.
O preço associado ao processamento de dados de linhagem está sujeito aos preços de linhagem. Confira considerações sobre a linhagem de dados.
Implicações no desempenho
A linhagem de dados é informada ao final da execução da tarefa do Airflow. Em média, os relatórios de linhagem de dados levam cerca de 1 a 2 segundos.
Isso não afeta o desempenho da tarefa em si: as tarefas do Airflow não falham se a linhagem não for informada à API Lineage. Não há impacto na lógica principal do operador, mas a instância de tarefa inteira é executada por mais tempo para considerar os dados de linhagem do relatório.
Um ambiente que informa a linhagem de dados terá um pequeno aumento nos custos associados devido ao tempo extra necessário para informar a linhagem de dados.
Compliance
O lineage de dados oferece diferentes níveis de suporte para recursos como VPC Service Controls. Consulte as considerações sobre a linhagem de dados para garantir que os níveis de suporte correspondam aos requisitos do seu ambiente.
Trabalhar com a integração da linhagem de dados
A integração da linhagem de dados do Cloud Composer é gerenciada por ambiente. Isso significa que a ativação do recurso requer duas etapas:
- Ativar a API Data Lineage no projeto.
- Ativar a integração da linhagem de dados em um Cloud Composer específico de nuvem.
Antes de começar
Quando você cria um ambiente, a integração de linhagem de dados é ativada automaticamente se as seguintes condições forem atendidas:
a API Data Lineage esteja ativada no projeto; Para mais informações, consulte Como ativar a API Data Lineage na documentação do Dataplex.
Um personalizado Back-end de linhagem não está configurado no Airflow.
Em um ambiente atual, é possível ativar ou desativar a integração da linhagem de dados a qualquer momento.
Funções exigidas
A integração com a linhagem de dados requer a adição das seguintes permissões à sua conta de serviço do ambiente do Cloud Composer:
- Para as contas de serviço padrão, não é necessário fazer mudanças. Contas padrão de serviço inclua as permissões necessárias.
- Para contas de serviço gerenciado pelo usuário: conceda ao Composer Worker
(
roles/composer.worker
) à sua conta de serviço. Esse papel inclui todas as permissões de linhagem de dados necessárias.
Para mais detalhes, consulte Papéis e permissões de linhagem na documentação do Dataplex.
Ativar a linhagem de dados no Cloud Composer
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.
Selecione a guia Configuração do ambiente.
Na seção Integração da linhagem de dados do Dataplex, clique em Editar.
No painel Integração da linhagem de dados do Dataplex, selecione Ativar a integração com a linhagem de dados do Dataplex e clique em Salvar.
gcloud
Use o argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;O nome precisa começar com uma letra minúscula seguida por até 62 letras minúsculas, números ou hifens. Ele não pode terminar com um hífen. O nome do ambiente é usado para criar subcomponentes para o ambiente. Você precisa fornecer um nome que também seja válido como um nome de bucket do Cloud Storage. Para ver uma lista de restrições, consulte Diretrizes de nomenclatura de bucket.
LOCATION
pela região do ambiente.Um local é a região em que o cluster do GKE do ambiente está localizado.
Exemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Enviar eventos de linhagem personalizados
É possível enviar eventos de linhagem personalizados se você quiser gerar relatórios de linhagem para um operador que não é compatível com relatórios de linhagem automatizados.
Por exemplo, para enviar eventos personalizados com:
BashOperator
, modifique o parâmetroinlets
ououtlets
na definição da tarefa.PythonOperator
, modifique o parâmetrotask.inlets
outask.outlets
na definição da tarefa. O uso deAUTO
para o parâmetroinlets
define o valor igual aooutlets
da tarefa upstream.
Por exemplo, executar esta tarefa:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Resultados na criação do seguinte gráfico de linhagem na interface do Dataplex:
Desativar a linhagem de dados no Cloud Composer
Desativar a integração da linhagem em um ambiente do Cloud Composer não desativar a API Data Lineage. Se você quiser desativar completamente os relatórios de linhagem para seu projeto, desative também a API Data Lineage. Consulte Como desativar serviços.
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Selecione a guia Configuração do ambiente.
Na seção Integração da linhagem de dados do Dataplex, clique em Editar.
No painel Integração da linhagem de dados do Dataplex, selecione Desativar a integração com a linhagem de dados do Dataplex e clique em Salvar.
gcloud
Use o argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;O nome precisa começar com uma letra minúscula seguida por até 62 letras minúsculas, números ou hifens. Ele não pode terminar com um hífen. O nome do ambiente é usado para criar subcomponentes para o ambiente. Você precisa fornecer um nome que também seja válido como um nome de bucket do Cloud Storage. Para ver uma lista de restrições, consulte Diretrizes de nomenclatura de bucket.
LOCATION
pela região do ambiente.Um local é a região em que o cluster do GKE do ambiente está localizado.
Exemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Conferir registros de linhagem no Cloud Composer
É possível inspecionar registros relacionados à linhagem de dados usando o link na página Página de Configuração do ambiente no Dataplex integração da linhagem de dados.
Solução de problemas
Se os dados de linhagem não forem informados à API Lineage ou não aparecerem no Dataplex, siga estas etapas de solução de problemas:
- Verifique se a API Data Lineage está ativada no projeto do seu ambiente do Cloud Composer.
- Verifique se a integração de linhagem de dados está ativada no ambiente do Cloud Composer.
- Verifique se o operador que você usa está incluído no suporte de relatórios de linhagem automatizada. Consulte Operadores do Airflow compatíveis.
- Verifique os registros de linhagem no Cloud Composer para encontrar possíveis problemas.