Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página explica como ativar a integração da linhagem de dados no Cloud Composer.
Acerca da integração da linhagem de dados
A linhagem de dados é uma funcionalidade do Catálogo universal do Dataplex que acompanha a forma como os dados se movem nos seus sistemas: de onde vêm, para onde são transmitidos e que transformações lhes são aplicadas.
O Cloud Composer usa o pacote apache-airflow-providers-openlineage
para gerar os eventos de linhagem que são enviados para a
API Data Lineage.
Este pacote já está instalado em ambientes do Cloud Composer. Se instalar outra versão deste pacote, a lista de operadores suportados pode mudar. Recomendamos que o faça apenas se for necessário e, caso contrário, mantenha a versão pré-instalada do pacote.
A linhagem de dados está disponível para ambientes nas mesmas regiões que as regiões do catálogo universal do Dataplex que suportam a linhagem de dados.
Se a linhagem de dados estiver ativada no seu ambiente do Cloud Composer, o Cloud Composer comunica informações de linhagem à API Data Lineage para DAGs que usam qualquer um dos operadores suportados. Também pode enviar eventos de linhagem personalizados se quiser comunicar a linhagem para um operador não suportado.
Pode aceder às informações de linhagem com:
- API Data Lineage
- Gráficos de linhagem para entradas suportadas no catálogo universal do Dataplex. Para mais informações, consulte o artigo Gráficos de linhagem na documentação do catálogo universal do Dataplex.
Quando cria um ambiente, a integração da linhagem de dados é ativada automaticamente se forem cumpridas as seguintes condições:
A API Data Lineage está ativada no seu projeto. Para mais informações, consulte o artigo Ativar a API Data Lineage na documentação do catálogo universal do Dataplex.
Não está configurado um Lineage Backend personalizado no Airflow.
Pode desativar a integração da linhagem de dados quando cria um ambiente.
Para um ambiente existente, pode ativar ou desativar a integração da linhagem de dados em qualquer altura.
Considerações sobre funcionalidades no Cloud Composer
O Cloud Composer faz uma chamada RPC para criar eventos de linhagem nos seguintes casos:
- Quando uma tarefa do Airflow começa ou termina
- Quando uma execução de DAG começa ou termina
Para ver detalhes sobre estas entidades, consulte o modelo de informações de linhagem e a referência da API Lineage na documentação do catálogo universal do Dataplex.
O tráfego de linhagem emitido está sujeito a quotas na API Data Lineage. O Cloud Composer consome a quota de escrita.
Os preços associados ao processamento de dados de linhagem estão sujeitos aos preços de linhagem. Consulte as considerações sobre a linhagem de dados.
Considerações sobre o desempenho no Cloud Composer
A linhagem de dados é comunicada no final da execução da tarefa do Airflow. Em média, os relatórios de linhagem de dados demoram cerca de 1 a 2 segundos.
Isto não afeta o desempenho da própria tarefa: as tarefas do Airflow não falham se a linhagem não for comunicada com êxito à API Lineage. Não existe impacto na lógica do operador principal, mas a instância da tarefa completa é executada durante mais tempo para ter em conta os dados de linhagem de relatórios.
Um ambiente que comunica a linhagem de dados vai ter um pequeno aumento nos custos associados, devido ao tempo adicional necessário para comunicar a linhagem de dados.
Conformidade
A linhagem de dados oferece diferentes níveis de apoio técnico para funcionalidades como os VPC Service Controls. Reveja as considerações sobre a linhagem de dados para se certificar de que os níveis de apoio técnico correspondem aos requisitos do seu ambiente.
Antes de começar
Esta funcionalidade oferece suporte de conformidade variável. Certifique-se de que revê primeiro as considerações sobre funcionalidades específicas do Cloud Composer e as considerações sobre funcionalidades de linhagem de dados.
A integração da linhagem de dados é suportada no Cloud Composer versão 2.1.2 e posteriores com as versões 2.2.5 e posteriores do Airflow.
Todas as autorizações de IAM necessárias para a linhagem de dados já estão incluídas na função Composer Worker (
roles/composer.worker
). Esta função é a função necessária para as contas de serviço do ambiente.Para mais informações sobre as autorizações de linhagem de dados, consulte as funções e autorizações de linhagem na documentação do catálogo universal do Dataplex.
Verifique se um operador é compatível
O apoio técnico da linhagem de dados é fornecido pelo pacote do fornecedor onde o operador está localizado:
Verifique os registos de alterações do pacote do fornecedor onde o operador está localizado para entradas que adicionam suporte do OpenLineage.
Por exemplo, o BigQueryToBigQueryOperator suporta o OpenLineage a partir da
apache-airflow-providers-google
versão 11.0.0.Verifique a versão do pacote do fornecedor usado pelo seu ambiente. Para o fazer, consulte a lista de pacotes pré-instalados para a versão do Cloud Composer usada no seu ambiente. Também pode instalar uma versão diferente do pacote no seu ambiente.
Além disso, a página Classes suportadas
na documentação apache-airflow-providers-openlineage
lista os operadores
suportados mais recentes.
Configure a integração da linhagem de dados
A integração da linhagem de dados para o Cloud Composer é gerida por ambiente. Isto significa que a ativação da funcionalidade requer dois passos:
- Ative a API Data Lineage no seu projeto.
- Ative a integração da linhagem de dados num ambiente específico do Cloud Composer.
Ative a linhagem de dados no Cloud Composer
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Selecione o separador Configuração do ambiente.
Na secção Integração da linhagem de dados do Dataplex, clique em Editar.
No painel Integração da linhagem de dados do Dataplex, selecione Ativar integração com a linhagem de dados do Dataplex e clique em Guardar.
gcloud
Use o argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Substitua o seguinte:
ENVIRONMENT_NAME
: o nome do seu ambiente.LOCATION
: a região onde o ambiente está localizado.
Exemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Desative a linhagem de dados no Cloud Composer
A desativação da integração da linhagem num ambiente do Cloud Composer não desativa a API Data Lineage. Se quiser desativar completamente os relatórios de linhagem para o seu projeto, também deve desativar a API Data Lineage. Consulte a secção Desativar serviços.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. É apresentada a página Detalhes do ambiente.
Selecione o separador Configuração do ambiente.
Na secção Integração da linhagem de dados do Dataplex, clique em Editar.
No painel Integração da linhagem de dados do Dataplex, selecione Desativar integração com a linhagem de dados do Dataplex e clique em Guardar.
gcloud
Use o argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Substitua o seguinte:
ENVIRONMENT_NAME
: o nome do seu ambiente.LOCATION
: a região onde o ambiente está localizado.
Exemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Envie eventos de linhagem em operadores suportados
Se a linhagem de dados estiver ativada, os operadores suportados enviam eventos de linhagem automaticamente. Não precisa de alterar o código do DAG.
Por exemplo, executar a seguinte tarefa:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Resulta na criação do seguinte gráfico de linhagem na IU do catálogo universal do Dataplex:

Envie eventos de linhagem personalizados
Pode enviar eventos de linhagem personalizados se quiser comunicar a linhagem de um operador que não seja suportado para relatórios de linhagem automatizados.
Por exemplo, para enviar eventos personalizados com:
- BashOperator: modifique o parâmetro
inlets
ououtlets
na definição da tarefa. - PythonOperator: modifique o parâmetro
task.inlets
outask.outlets
na definição da tarefa. - Pode usar
AUTO
para o parâmetroinlets
. Isto define o respetivo valor igual aooutlets
da respetiva tarefa a montante.
O exemplo seguinte demonstra a utilização de entradas e saídas:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Consequentemente, é criado o seguinte gráfico de linhagem na IU do catálogo universal do Dataplex:

Veja os registos de linhagem no Cloud Composer
Pode inspecionar os registos relacionados com a linhagem de dados através do link na página Configuração do ambiente na secção Integração da linhagem de dados do catálogo universal do Dataplex.
Resolução de problemas
Se os dados de linhagem não forem comunicados à API Lineage ou não os conseguir ver no Dataplex Universal Catalog, experimente os seguintes passos de resolução de problemas:
- Certifique-se de que a API Data Lineage está ativada no projeto do seu ambiente do Cloud Composer.
- Verifique se a integração da linhagem de dados está ativada no ambiente do Cloud Composer.
- Verifique se o operador que usa está incluído no suporte de relatórios de linhagem automatizados. Consulte os operadores do Airflow suportados.
- Verifique os registos de linhagem no Cloud Composer quanto a possíveis problemas.