Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se explica cómo habilitar la integración del linaje de datos en Cloud Composer.
Acerca de la integración del linaje de datos
El linaje de datos es una función de Dataplex Universal Catalog que monitoriza cómo se mueven los datos por tus sistemas: de dónde proceden, a dónde se transfieren y qué transformaciones se les aplican.
Cloud Composer usa el paquete apache-airflow-providers-openlineage
para generar los eventos de linaje que se envían a la API Data Lineage.
Este paquete ya está instalado en los entornos de Cloud Composer. Si instalas otra versión de este paquete, es posible que cambie la lista de operadores admitidos. Te recomendamos que lo hagas solo si es necesario y que conserves la versión preinstalada del paquete.
El linaje de datos está disponible en los entornos de las mismas regiones que las regiones de Dataplex Universal Catalog que admiten el linaje de datos.
Si el linaje de datos está habilitado en tu entorno de Cloud Composer, Cloud Composer envía información sobre el linaje a la API Data Lineage de los DAGs que utilizan cualquiera de los operadores admitidos. También puedes enviar eventos de linaje personalizados si quieres registrar el linaje de un operador que no sea compatible.
Puedes acceder a la información de linaje con lo siguiente:
- API Data Lineage
- Gráficos de linaje de las entradas admitidas en Dataplex Universal Catalog. Para obtener más información, consulta la sección Gráficos de linaje de la documentación de Universal Catalog de Dataplex.
Cuando creas un entorno, la integración del linaje de datos se habilita automáticamente si se cumplen las siguientes condiciones:
La API Data Lineage está habilitada en tu proyecto. Para obtener más información, consulta el artículo Habilitar la API Data Lineage de la documentación de Dataplex Universal Catalog.
No se ha configurado ningún backend de linaje personalizado en Airflow.
Puede inhabilitar la integración del linaje de datos al crear un entorno.
En un entorno ya creado, puede habilitar o inhabilitar la integración del linaje de datos en cualquier momento.
Consideraciones sobre las funciones de Cloud Composer
Cloud Composer hace una llamada RPC para crear eventos de linaje en los siguientes casos:
- Cuando una tarea de Airflow empieza o termina
- Cuando se inicia o finaliza una ejecución de DAG
Para obtener información detallada sobre estas entidades, consulta el modelo de información de linaje y la referencia de la API Lineage en la documentación de Dataplex Universal Catalog.
El tráfico de linaje emitido está sujeto a las cuotas de la API Data Lineage. Cloud Composer consume cuota de escritura.
Los precios asociados a la gestión de datos de linaje están sujetos a los precios del linaje. Consulta las consideraciones sobre el linaje de datos.
Consideraciones sobre el rendimiento en Cloud Composer
El linaje de datos se indica al final de la ejecución de la tarea de Airflow. De media, los informes de linaje de datos tardan entre 1 y 2 segundos.
Esto no afecta al rendimiento de la tarea en sí: las tareas de Airflow no fallan si el linaje no se comunica correctamente a la API Lineage. No hay ningún impacto en la lógica del operador principal, pero toda la instancia de la tarea se ejecuta un poco más para tener en cuenta los datos del linaje de los informes.
En un entorno que registre el linaje de datos, los costes asociados aumentarán ligeramente debido al tiempo adicional que se necesita para registrar el linaje de datos.
Cumplimiento
El linaje de datos ofrece diferentes niveles de asistencia para funciones como Controles de Servicio de VPC. Consulta las consideraciones sobre el linaje de datos para asegurarte de que los niveles de asistencia se ajustan a los requisitos de tu entorno.
Antes de empezar
Esta función ofrece compatibilidad con diferentes normativas. Primero, consulta las consideraciones específicas de la función de Cloud Composer y las consideraciones de la función de linaje de datos.
La integración del linaje de datos es compatible con Cloud Composer 2.1.2 y versiones posteriores con Airflow 2.2.5 y versiones posteriores.
Todos los permisos de gestión de identidades y accesos necesarios para el linaje de datos ya están incluidos en el rol de trabajador de Composer (
roles/composer.worker
). Este rol es el rol necesario para las cuentas de servicio del entorno.Para obtener más información sobre los permisos de linaje de datos, consulta los roles y permisos de linaje en la documentación de Dataplex Universal Catalog.
Comprobar si se admite un operador
El paquete del proveedor proporciona asistencia para el linaje de datos en el que se encuentra el operador:
Consulta los registros de cambios del paquete del proveedor en el que se encuentra el operador para ver las entradas que añaden compatibilidad con OpenLineage.
Por ejemplo, BigQueryToBigQueryOperator admite OpenLineage a partir de la
apache-airflow-providers-google
versión 11.0.0.Comprueba la versión del paquete del proveedor que usa tu entorno. Para ello, consulta la lista de paquetes preinstalados de la versión de Cloud Composer que se usa en tu entorno. También puedes instalar otra versión del paquete en tu entorno.
Además, en la página Clases admitidas de la documentación de apache-airflow-providers-openlineage
se indican los operadores admitidos más recientes.
Configurar la integración del linaje de datos
La integración del linaje de datos de Cloud Composer se gestiona por entorno. Esto significa que, para habilitar la función, debes seguir dos pasos:
- Habilita la API Data Lineage en tu proyecto.
- Habilita la integración del linaje de datos en un entorno de Cloud Composer específico.
Habilitar el linaje de datos en Cloud Composer
Consola
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haga clic en Editar.
En el panel Integración del linaje de datos de Dataplex, seleccione Habilitar la integración con el linaje de datos de Dataplex y haga clic en Guardar.
gcloud
Usa el argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Haz los cambios siguientes:
ENVIRONMENT_NAME
: el nombre de tu entorno.LOCATION
: la región en la que se encuentra el entorno.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Inhabilitar el linaje de datos en Cloud Composer
Inhabilitar la integración del linaje en un entorno de Cloud Composer no inhabilita la API Data Lineage. Si quieres inhabilitar por completo los informes de linaje de datos de tu proyecto, inhabilita también la API Data Lineage. Consulta Inhabilitar servicios.
Consola
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haga clic en Editar.
En el panel Integración del linaje de datos de Dataplex, seleccione Inhabilitar la integración con el linaje de datos de Dataplex y haga clic en Guardar.
gcloud
Usa el argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Haz los cambios siguientes:
ENVIRONMENT_NAME
: el nombre de tu entorno.LOCATION
: la región en la que se encuentra el entorno.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Enviar eventos de linaje en operadores admitidos
Si el linaje de datos está habilitado, los operadores admitidos envían eventos de linaje automáticamente. No es necesario que cambie el código de su DAG.
Por ejemplo, al ejecutar la siguiente tarea:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
El resultado es la creación del siguiente gráfico de linaje en la interfaz de usuario de Dataplex Universal Catalog:

Enviar eventos de linaje personalizados
Puede enviar eventos de linaje personalizados si quiere registrar el linaje de un operador que no admita el registro de linaje automatizado.
Por ejemplo, para enviar eventos personalizados con lo siguiente:
- BashOperator: modifica el parámetro
inlets
ooutlets
en la definición de la tarea. - PythonOperator: modifica el parámetro
task.inlets
otask.outlets
en la definición de la tarea. - Puedes usar
AUTO
para el parámetroinlets
. De esta forma, su valor será igual aloutlets
de su tarea anterior.
En el siguiente ejemplo se muestra el uso de entradas y salidas:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Como resultado, se crea el siguiente gráfico de linaje en la interfaz de usuario de Dataplex Universal Catalog:

Ver registros de linaje en Cloud Composer
Puedes inspeccionar los registros relacionados con el linaje de datos mediante el enlace de la página Configuración del entorno, en la sección Integración del linaje de datos de Dataplex Universal Catalog.
Solución de problemas
Si los datos de linaje no se registran en la API Lineage o no los ves en Dataplex Universal Catalog, prueba los siguientes pasos para solucionar el problema:
- Asegúrate de que la API Data Lineage esté habilitada en el proyecto de tu entorno de Cloud Composer.
- Comprueba si la integración del linaje de datos está habilitada en el entorno de Cloud Composer.
- Comprueba si el operador que usas está incluido en la asistencia para informes de linaje automatizados. Consulta los operadores de Airflow admitidos.
- Consulta los registros de linaje de Cloud Composer para identificar posibles problemas.