Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Información acerca de la integración del linaje de datos
El linaje de datos es una función de Dataplex que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos. El linaje de datos está disponible para lo siguiente:
Entornos de Cloud Composer 2 que ejecutan las versiones 2.1.2 y posteriores con versiones de Airflow 2.2.5 y posteriores
Entornos de Cloud Composer 2 en las mismas regiones que las regiones del Catálogo de datos que admiten el linaje de datos
Una vez que la función esté habilitada en tu entorno de Cloud Composer, se ejecutará Los DAG que usan cualquiera de los operadores admitidos provocan Cloud Composer para informar sobre el linaje a la API de Data Lineage.
Luego, puedes acceder a esa información de las siguientes maneras:
- API de Data Lineage
- Gráficos de visualización de linaje para entradas de Data Catalog compatibles en Dataplex Consulta Gráficos de visualización del linaje en la documentación de Dataplex.
Operadores admitidos
Los siguientes operadores admiten informes de linaje automáticos en Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Por ejemplo, ejecuta la siguiente tarea:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Se crea el siguiente gráfico de linaje en la IU de Dataplex:
Consideraciones sobre las funciones de Cloud Composer
Cada ejecución de tarea de Airflow que informa el linaje de datos realiza las siguientes acciones:
- Una solicitud de RPC de creación o actualización para un proceso de linaje
- Una solicitud de RPC de creación o actualización para una ejecución de linaje
- Una o más solicitudes de RPC para crear eventos de linaje (la mayoría de las veces, 0 o 1)
Para obtener detalles sobre estas entidades, consulta el modelo de información de linaje y la referencia de la API de linaje en la documentación de Dataplex.
El tráfico de linaje emitido está sujeto las cuotas de la API de Data Lineage. Cloud Composer consume la cuota de escritura.
Los precios asociados con el manejo de los datos de linaje están sujetos a los precios de linaje. Consulta las consideraciones del linaje de datos.
Implicaciones de rendimiento
El linaje de datos se informa al final de la ejecución de la tarea de Airflow. En promedio, los informes de linaje de datos tardan entre 1 y 2 segundos.
Esto no afecta el rendimiento de la tarea en sí: las tareas de Airflow no fallará si el linaje no se informa correctamente a la API de Lineage. No hay ningún impacto en la lógica del operador principal, pero toda la instancia de la tarea se ejecuta un poco más para tener en cuenta los datos de linaje de los informes.
Un entorno que informa el linaje de datos tendrá un aumento menor en los costos asociados, debido al tiempo adicional necesario para informar el linaje de datos.
Cumplimiento
El linaje de datos ofrece diferentes niveles de compatibilidad para funciones, como los Controles del servicio de VPC. Revisión consideraciones del linaje de datos para garantizar que los niveles de asistencia coincidan con los requisitos del entorno.
Trabaja con la integración del linaje de datos
La integración del linaje de datos para Cloud Composer se administra por entorno. Esto significa que se requieren dos pasos para habilitar la función:
- Habilitar la API de Data Lineage en tu proyecto
- Habilitar la integración del linaje de datos en un Cloud Composer específico en un entorno de nube.
Antes de comenzar
Cuando creas un entorno, la integración del linaje de datos habilitarse automáticamente si se cumplen las siguientes condiciones:
La API de Data Lineage está habilitada en tu proyecto. Para obtener más información, consulta Habilita la API de Data Lineage en la documentación de Dataplex.
No se configuró un backend de linaje personalizado en Airflow.
En el caso de un entorno existente, puedes habilitar o inhabilitar la integración del linaje de datos en cualquier momento.
Roles obligatorios
La integración con el linaje de datos requiere que se agreguen los siguientes permisos para tu Cuenta de servicio del entorno de Cloud Composer:
- En el caso de las cuentas de servicio predeterminadas, no es necesario realizar cambios. Cuentas de servicio predeterminadas incluyen los permisos necesarios.
- Para cuentas de servicio administradas por el usuario, otorga el rol de Composer Worker
(
roles/composer.worker
) a tu cuenta de servicio. Este rol incluye todos los permisos necesarios para el linaje de datos.
Para obtener más detalles, consulta roles y permisos de linaje en la documentación de Dataplex.
Habilita el linaje de datos en Cloud Composer
Console
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.
En el panel Integración del linaje de datos de Dataplex, selecciona Habilitar la integración con el linaje de datos de Dataplex y haz clic en Guardar.
gcloud
Usa el argumento --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Reemplaza lo siguiente:
ENVIRONMENT_NAME
por el nombre del entorno.El nombre debe comenzar con una letra minúscula seguida por un máximo de 62 letras minúsculas, números o guiones, y no puede terminar con un guion. El nombre del entorno se usa a fin de crear subcomponentes para el entorno, por lo que debes proporcionar un nombre que también sea válido como un nombre de bucket de Cloud Storage. Consulta los Lineamientos para asignación de nombres de bucket a fin de obtener una lista de restricciones.
LOCATION
por la región del entorno.Una ubicación es la región en la que se encuentra el clúster de GKE del entorno ubicado.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Envía eventos de linaje personalizados
Puedes enviar eventos de linaje personalizados si quieres informar sobre el linaje de un operador que no sea compatible con los informes de linaje automático.
Por ejemplo, para enviar eventos personalizados con lo siguiente:
BashOperator
, modifica el parámetroinlets
ooutlets
en la definición de tareas.PythonOperator
, modifica el parámetrotask.inlets
otask.outlets
en la definición de la tarea. Si usasAUTO
para el parámetroinlets
, se establece su igual aloutlets
de su tarea ascendente.
Por ejemplo, si ejecutas esta tarea:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Se crea el siguiente gráfico de linaje en la IU de Dataplex:
Inhabilita el linaje de datos en Cloud Composer
Inhabilitar la integración del linaje en un entorno de Cloud Composer no inhabilitar la API de Data Lineage. Si quieres inhabilitar por completo el linaje para tu proyecto, también inhabilitarás la API de Data Lineage. Consulta Inhabilitar servicios.
Console
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Selecciona la pestaña Configuración del entorno.
En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.
En el panel Integración del linaje de datos de Dataplex, selecciona Inhabilitar la integración con el linaje de datos de Dataplex y haz clic en Guardar.
gcloud
Usa el argumento --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Reemplaza lo siguiente:
ENVIRONMENT_NAME
por el nombre del entorno.El nombre debe comenzar con una letra minúscula seguida por un máximo de 62 letras minúsculas, números o guiones, y no puede terminar con un guion. El nombre del entorno se usa a fin de crear subcomponentes para el entorno, por lo que debes proporcionar un nombre que también sea válido como un nombre de bucket de Cloud Storage. Consulta los Lineamientos para asignación de nombres de bucket a fin de obtener una lista de restricciones.
LOCATION
por la región del entorno.Una ubicación es la región en la que se encuentra el clúster de GKE del entorno ubicado.
Ejemplo:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Cómo ver registros de linaje en Cloud Composer
Puedes inspeccionar los registros relacionados con el linaje de datos usando el vínculo Página de configuración del entorno en Dataplex Integración del linaje de datos.
Soluciona problemas
Si los datos de linaje no se informan a la API de Lineage o si no puedes verlos en Dataplex, prueba los siguientes pasos para solucionar problemas:
- Asegúrate de que la API de Data Lineage esté habilitada en el proyecto de tu entorno de Cloud Composer.
- Comprueba si la integración del linaje de datos está habilitada en Cloud Composer en un entorno de nube.
- Verifica si el operador que usas está incluido en la compatibilidad con los informes de linaje automatizados. Consulta Operadores de Airflow compatibles.
- Revisa los registros de linaje en Cloud Composer para detectar posibles problemas.