Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se explica cómo habilitar la integración del linaje de datos en Cloud Composer.
Acerca de la integración del linaje de datos
El linaje de datos es una función de Dataplex Universal Catalog que hace un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.
Cloud Composer usa el paquete apache-airflow-providers-openlineage
para generar los eventos de linaje que se envían a la API de Data Lineage.
Este paquete ya está instalado en los entornos de Cloud Composer. Si instalas otra versión de este paquete, es posible que cambie la lista de operadores compatibles. Te recomendamos que lo hagas solo si es necesario y que conserves la versión preinstalada del paquete.
El linaje de datos está disponible para los entornos en las mismas regiones que las regiones de Dataplex Universal Catalog que admiten el linaje de datos.
Si el linaje de datos está habilitado en tu entorno de Cloud Composer, Cloud Composer informa la información del linaje a la API de Data Lineage para los DAG que utilizan cualquiera de los operadores compatibles. También puedes enviar eventos de linaje personalizados si deseas informar el linaje de un operador que no es compatible.
Puedes acceder a la información del linaje con lo siguiente:
- API de Data Lineage
- Gráficos de linaje para las entradas admitidas en Dataplex Universal Catalog Para obtener más información, consulta Gráficos de linaje en la documentación de Dataplex Universal Catalog.
Cuando creas un entorno, la integración del linaje de datos se habilita automáticamente si se cumplen las siguientes condiciones:
La API de Data Lineage está habilitada en tu proyecto. Para obtener más información, consulta Cómo habilitar la API de Data Lineage en la documentación de Dataplex Universal Catalog.
No se configuró un backend de linaje personalizado en Airflow.
Puedes inhabilitar la integración del linaje de datos cuando creas un entorno.
En el caso de un entorno existente, puedes habilitar o inhabilitar la integración del linaje de datos en cualquier momento.
Consideraciones sobre las funciones en Cloud Composer
Cloud Composer realiza una llamada a RPC para crear eventos de linaje en los siguientes casos:
- Cuando una tarea de Airflow comienza o finaliza
- Cuando se inicia o finaliza una ejecución del DAG
Para obtener detalles sobre estas entidades, consulta el modelo de información de linaje y la referencia de la API de Lineage en la documentación de Dataplex Universal Catalog.
El tráfico de linaje emitido está sujeto a cuotas en la API de Data Lineage. Cloud Composer consume cuota de escritura.
Los precios asociados con el manejo de datos de linaje están sujetos a los precios de linaje. Consulta las consideraciones del linaje de datos.
Consideraciones de rendimiento en Cloud Composer
El linaje de datos se informa al final de la ejecución de la tarea de Airflow. En promedio, la generación de informes de linaje de datos tarda entre 1 y 2 segundos.
Esto no afecta el rendimiento de la tarea en sí: las tareas de Airflow no fallan si el linaje no se informa correctamente a la API de Lineage. No hay ningún impacto en la lógica principal del operador, pero toda la instancia de la tarea se ejecuta un poco más para tener en cuenta los datos del linaje de informes.
Un entorno que informa el linaje de datos tendrá un pequeño aumento en los costos asociados debido al tiempo adicional necesario para informar el linaje de datos.
Cumplimiento
El linaje de datos ofrece diferentes niveles de asistencia para funciones como los Controles del servicio de VPC. Revisa las consideraciones sobre el linaje de datos para asegurarte de que los niveles de asistencia coincidan con los requisitos de tu entorno.
Antes de comenzar
Esta función proporciona compatibilidad variable con el cumplimiento. Primero, asegúrate de revisar las consideraciones sobre las funciones específicas de Cloud Composer y las consideraciones sobre las funciones de linaje de datos.
Todos los permisos de IAM necesarios para el linaje de datos ya están incluidos en el rol de trabajador de Composer (
roles/composer.worker
). Este rol es el rol obligatorio para las cuentas de servicio del entorno.Para obtener más información sobre los permisos de linaje de datos, consulta Roles y permisos de linaje en la documentación de Dataplex Universal Catalog.
Cómo verificar si se admite un operador
El paquete del proveedor en el que se encuentra el operador proporciona compatibilidad con el linaje de datos:
Consulta los registros de cambios del paquete del proveedor en el que se encuentra el operador para ver las entradas que agregan compatibilidad con OpenLineage.
Por ejemplo, BigQueryToBigQueryOperator admite OpenLineage a partir de la versión 11.0.0 de
apache-airflow-providers-google
.Verifica la versión del paquete del proveedor que usa tu entorno. Para ello, consulta la lista de paquetes preinstalados para la compilación de Airflow que se usa en tu entorno. También puedes instalar una versión diferente del paquete en tu entorno.
Además, en la página Clases admitidas de la documentación de apache-airflow-providers-openlineage
, se enumeran los operadores admitidos más recientes.
Configura la integración del linaje de datos
La integración del linaje de datos para Cloud Composer se administra por entorno. Esto significa que habilitar la función requiere dos pasos:
- Habilita la API de Data Lineage en tu proyecto.
- Habilita la integración del linaje de datos en un entorno específico de Cloud Composer.
Habilita el linaje de datos en Cloud Composer
Console
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.
En el panel Integración del linaje de datos de Dataplex, selecciona Habilitar la integración del linaje de datos de Dataplex y haz clic en Guardar.
gcloud
Usa el argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Reemplaza lo siguiente:
ENVIRONMENT_NAME
: Es el nombre de tu entorno.LOCATION
: Es la región en la que se encuentra el entorno.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Inhabilita el linaje de datos en Cloud Composer
Si inhabilitas la integración del linaje en un entorno de Cloud Composer, no se inhabilita la API de Data Lineage. Si quieres inhabilitar por completo los informes de linaje para tu proyecto, también inhabilita la API de Data Lineage. Consulta Inhabilita servicios.
Console
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.
En el panel Integración del linaje de datos de Dataplex, selecciona Inhabilitar la integración en el linaje de datos de Dataplex y haz clic en Guardar.
gcloud
Usa el argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Reemplaza lo siguiente:
ENVIRONMENT_NAME
: Es el nombre de tu entorno.LOCATION
: Es la región en la que se encuentra el entorno.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Envía eventos de linaje en operadores compatibles
Si el linaje de datos está habilitado, los operadores compatibles envían eventos de linaje automáticamente. No es necesario que cambies el código de tu DAG.
Por ejemplo, si ejecutas la siguiente tarea:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Se crea el siguiente gráfico de linaje en la IU de Dataplex Universal Catalog:

Envía eventos de linaje personalizados
Puedes enviar eventos de linaje personalizados si deseas informar el linaje de un operador que no es compatible con los informes de linaje automatizados.
Por ejemplo, para enviar eventos personalizados con lo siguiente:
- BashOperator: Modifica el parámetro
inlets
ooutlets
en la definición de la tarea. - PythonOperator: Modifica el parámetro
task.inlets
otask.outlets
en la definición de la tarea. - Puedes usar
AUTO
para el parámetroinlets
. Esto establece su valor igual aloutlets
de su tarea anterior.
En el siguiente ejemplo, se muestra el uso de las entradas y salidas:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Como resultado, se crea el siguiente gráfico de linaje en la IU de Dataplex Universal Catalog:

Visualiza los registros de linaje en Cloud Composer
Puedes inspeccionar los registros relacionados con el linaje de datos a través del vínculo en la página Configuración del entorno en la sección Integración del linaje de datos de Dataplex Universal Catalog.
Soluciona problemas
Si los datos de linaje no se informan a la API de Lineage o no los ves en Dataplex Universal Catalog, prueba los siguientes pasos para solucionar problemas:
- Asegúrate de que la API de Data Lineage esté habilitada en el proyecto de tu entorno de Cloud Composer.
- Verifica si la integración del linaje de datos está habilitada en el entorno de Cloud Composer.
- Verifica si el operador que usas está incluido en la compatibilidad con los informes de linaje automatizados. Consulta Operadores de Airflow compatibles.
- Revisa los registros de linaje en Cloud Composer para detectar posibles problemas.