Linaje de datos con Dataplex

Cloud Composer 1 | Cloud Composer 2

Acerca de la integración del linaje de datos

El linaje de datos es una función de Dataplex que te permite hacer un seguimiento de cómo se mueven los datos a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se les aplican. El linaje de datos está disponible para lo siguiente:

Una vez que la función está habilitada en tu entorno de Cloud Composer, la ejecución de DAG que usan cualquiera de los operadores compatibles hace que Cloud Composer informe la información de linaje a la API de Data Lineage.

Luego, puedes acceder a esa información mediante las siguientes opciones:

Operadores admitidos

Los siguientes operadores admiten los informes de linaje automáticos en Cloud Composer:

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

Por ejemplo, ejecutar la siguiente tarea:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': GCP_PROJECT,
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

Resultados en la creación del siguiente gráfico de linaje en la IU de Dataplex:

Ejemplo de gráfico de linaje en la IU de Dataplex.
Figure 1. Grafo de linaje de muestra para una tabla de BigQuery en la IU de Dataplex.

Consideraciones de los atributos para Cloud Composer

Cada ejecución de tarea de Airflow que informa el linaje de datos realiza lo siguiente:

  • Una solicitud de RPC de creación o actualización para un proceso de linaje
  • Una solicitud de RPC de creación o actualización para una ejecución de linaje
  • Una o más solicitudes de RPC para crear eventos de linaje (la mayoría de las veces, es 0 o 1)

Para obtener detalles sobre estas entidades, consulta el modelo de información de linaje y la referencia de la API de Lineage en la documentación de Dataplex.

El tráfico de linaje emitido está sujeto a las cuotas de la API de Data Lineage. Cloud Composer consume la cuota de escritura.

Los precios asociados con el manejo de datos de linaje están sujetos a los precios de linaje. Consulta las consideraciones del linaje de datos.

Implicaciones de rendimiento

El linaje de datos se informa al final de la ejecución de la tarea de Airflow. En promedio, los informes de linaje de datos demoran entre 1 y 2 segundos.

Esto no afecta el rendimiento de la tarea en sí: las tareas de Airflow no fallan si el linaje no se informa correctamente a la API de Lineage. No hay impacto en la lógica del operador principal, pero la instancia completa de la tarea se ejecuta un poco más para dar cuenta de los datos de linaje de informes.

Un entorno que informa el linaje de datos tendrá un aumento menor en los costos asociados, debido al tiempo adicional necesario para informar el linaje de datos.

Cumplimiento

El linaje de datos ofrece diferentes niveles de compatibilidad para funciones como los Controles del servicio de VPC. Revisa las consideraciones del linaje de datos para asegurarte de que los niveles de compatibilidad coincidan con los requisitos de tu entorno.

Trabaja con la integración del linaje de datos

La integración del linaje de datos para Cloud Composer se administra según el entorno. Esto significa que para habilitar la función se requieren dos pasos:

  1. Habilita la API de Data Lineage en tu proyecto.
  2. Habilitar la integración del linaje de datos en un entorno específico de Cloud Composer

Antes de comenzar

Cuando creas un entorno, la integración del linaje de datos se habilita de forma automática si se cumplen las siguientes condiciones:

  • La API de Data Lineage está habilitada en tu proyecto. Para obtener más información, consulta Habilita la API de Data Lineage en la documentación de Dataplex.

  • No se configura un backend de linaje personalizado en Airflow.

  • La clave de encriptación administrada por el cliente (CMEK) no está habilitada en el entorno. El linaje de datos no admite CMEK para los metadatos transferidos. No puedes habilitar la integración del linaje de datos en entornos de Cloud Composer que usan CMEK. Para obtener más información y otras limitaciones, consulta Consideraciones del linaje de datos.

Para un entorno existente, puedes enable o inhabilitar la integración del linaje de datos en cualquier momento.

Roles obligatorios

La integración con el linaje de datos requiere que se agreguen los siguientes permisos a la cuenta de servicio del entorno de Cloud Composer:

  • No se necesitan cambios en las cuentas de servicio predeterminadas. Las cuentas de servicio predeterminadas incluyen los permisos necesarios.
  • Para las cuentas de servicio administradas por el usuario, otorga el rol de trabajador de Composer (roles/composer.worker) a tu cuenta de servicio. Esta función incluye todos los permisos necesarios del linaje de datos.

Para obtener más información, consulta los permisos y roles de linaje en la documentación de Dataplex.

Habilita el linaje de datos en Cloud Composer

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Selecciona la pestaña Configuración del entorno.

  4. En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.

  5. En el panel Integración del linaje de datos de Dataplex, selecciona Habilitar la integración con el linaje de datos de Dataplex y haz clic en Guardar.

gcloud

Usa el argumento --enable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.

    El nombre debe comenzar con una letra minúscula seguida por un máximo de 62 letras minúsculas, números o guiones, y no puede terminar con un guion. El nombre del entorno se usa a fin de crear subcomponentes para el entorno, por lo que debes proporcionar un nombre que también sea válido como un nombre de bucket de Cloud Storage. Consulta los Lineamientos para asignación de nombres de bucket a fin de obtener una lista de restricciones.

  • LOCATION por la región del entorno.

    Una ubicación es la región en la que se encuentra el clúster de GKE del entorno.

Ejemplo:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

Envía eventos de linaje personalizados

Puedes enviar eventos de linaje personalizados si deseas informarlo para un operador que no es compatible con los informes de linaje automatizados.

Por ejemplo, para enviar eventos personalizados con lo siguiente:

  • BashOperator, modifica el parámetro inlets o outlets en la definición de la tarea.
  • PythonOperator, modifica el parámetro task.inlets o task.outlets en la definición de la tarea. Cuando usas AUTO para el parámetro inlets, se establece su valor igual al outlets de la tarea ascendente.

Por ejemplo, ejecutar esta tarea:


from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

…

bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)

def _python_task(task):
   task.inlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table3',
   ))

   task.outlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table4',
   ))

python_task = PythonOperator(
   task_id='python_task',
   dag=dag,
   python_callable=_python_task,
   inlets=[AUTO],
)

bash_task >> python_task

Resultados en la creación del siguiente gráfico de linaje en la IU de Dataplex:

Ejemplo de gráfico de linaje para eventos personalizados en la IU de Dataplex.
Figure 2. Gráfico de linaje de muestra para varias tablas de BigQuery en la IU de Dataplex.

Inhabilitar el linaje de datos en Cloud Composer

Inhabilitar la integración de linaje en un entorno de Cloud Composer no inhabilita la API de Data Lineage. Si deseas inhabilitar por completo los informes de linaje para tu proyecto, también inhabilita la API de Data Lineage. Consulta Inhabilita servicios.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

  3. Selecciona la pestaña Configuración del entorno.

  4. En la sección Integración del linaje de datos de Dataplex, haz clic en Editar.

  5. En el panel Integración del linaje de datos de Dataplex, selecciona Inhabilitar la integración con el linaje de datos de Dataplex y haz clic en Guardar.

gcloud

Usa el argumento --disable-cloud-data-lineage-integration.

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.

    El nombre debe comenzar con una letra minúscula seguida por un máximo de 62 letras minúsculas, números o guiones, y no puede terminar con un guion. El nombre del entorno se usa a fin de crear subcomponentes para el entorno, por lo que debes proporcionar un nombre que también sea válido como un nombre de bucket de Cloud Storage. Consulta los Lineamientos para asignación de nombres de bucket a fin de obtener una lista de restricciones.

  • LOCATION por la región del entorno.

    Una ubicación es la región en la que se encuentra el clúster de GKE del entorno.

Ejemplo:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

Ver registros de linaje en Cloud Composer

Puedes inspeccionar los registros relacionados con el linaje de datos mediante el vínculo de la página Configuración del entorno en la sección Integración del linaje de datos de Dataplex.

Soluciona problemas

Si los datos de linaje no se informan a la API de Lineage o no puedes verlos en Dataplex, prueba los siguientes pasos para solucionar problemas:

  • Asegúrate de que la API de Data Lineage esté habilitada en el proyecto de tu entorno de Cloud Composer.
  • Comprueba si la integración del linaje de datos está habilitada en el entorno de Cloud Composer.
  • Verifica si el operador que usas está incluido en la asistencia para los informes de linaje automatizados. Consulta Operadores de Airflow compatibles.
  • Consulta los registros de linaje en Cloud Composer para detectar posibles problemas.