Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Cette page explique comment activer l'intégration du lineage de données dans Cloud Composer.
À propos de l'intégration de la traçabilité des données
La traçabilité des données est une fonctionnalité Dataplex Universal Catalog qui permet de suivre la manière dont les données transitent par vos systèmes : leur origine, la cible de transmission, et les transformations qui leur sont appliquées.
Cloud Composer utilise le package apache-airflow-providers-openlineage
pour générer les événements de lineage qui sont envoyés à l'API Data Lineage.
Ce package est déjà installé dans les environnements Cloud Composer. Si vous installez une autre version de ce package, la liste des opérateurs compatibles peut changer. Nous vous recommandons de ne le faire qu'en cas de nécessité et de conserver la version préinstallée du package dans le cas contraire.
La traçabilité des données est disponible pour les environnements situés dans les mêmes régions que les régions Dataplex Universal Catalog compatibles avec la traçabilité des données.
Si le lineage des données est activé dans votre environnement Cloud Composer, Cloud Composer transmet les informations de lineage à l'API Data Lineage pour les DAG qui utilisent l'un des opérateurs compatibles. Vous pouvez également envoyer des événements de lignée personnalisés si vous souhaitez signaler la lignée d'un opérateur non compatible.
Vous pouvez accéder aux informations de traçabilité avec :
- API Data Lineage
- Graphiques de traçabilité pour les entrées compatibles dans Dataplex Universal Catalog. Pour en savoir plus, consultez Graphiques de traçabilité dans la documentation Dataplex Universal Catalog.
Lorsque vous créez un environnement, l'intégration du lineage des données est automatiquement activée si les conditions suivantes sont remplies :
L'API Data Lineage est activée dans votre projet. Pour en savoir plus, consultez Activer l'API Data Lineage dans la documentation Dataplex Universal Catalog.
Aucun backend Lineage personnalisé n'est configuré dans Airflow.
Vous pouvez désactiver l'intégration du lineage des données lorsque vous créez un environnement.
Pour un environnement existant, vous pouvez activer ou désactiver l'intégration du lineage des données à tout moment.
Points à prendre en compte concernant les fonctionnalités dans Cloud Composer
Cloud Composer effectue un appel RPC pour créer des événements de lignage dans les cas suivants :
- Lorsqu'une tâche Airflow démarre ou se termine
- Lorsqu'une exécution de DAG démarre ou se termine
Pour en savoir plus sur ces entités, consultez Modèle d'informations sur la traçabilité et Documentation de référence de l'API Lineage dans la documentation Dataplex Universal Catalog.
Le trafic de traçabilité émis est soumis à des quotas dans l'API Data Lineage. Cloud Composer consomme du quota d'écriture.
La tarification associée à la gestion des données de lignée est soumise à la tarification de la lignée. Consultez Considérations relatives à la traçabilité des données.
Considérations sur les performances dans Cloud Composer
La traçabilité des données est signalée à la fin de l'exécution de la tâche Airflow. En moyenne, la création de rapports sur la lignée des données prend environ une à deux secondes.
Cela n'affecte pas les performances de la tâche elle-même : les tâches Airflow n'échouent pas si la provenance n'est pas signalée à l'API Lineage. La logique de l'opérateur principal n'est pas affectée, mais l'ensemble de l'instance de tâche s'exécute un peu plus longtemps pour tenir compte des données de lignée des rapports.
Un environnement qui génère des rapports sur la provenance des données entraînera une légère augmentation des coûts associés, en raison du temps supplémentaire nécessaire pour générer ces rapports.
Conformité
La traçabilité des données offre différents niveaux d'assistance pour des fonctionnalités telles que VPC Service Controls. Consultez les points à prendre en compte concernant la traçabilité des données pour vous assurer que les niveaux d'assistance correspondent aux exigences de votre environnement.
Avant de commencer
Cette fonctionnalité offre une prise en charge variable de la conformité. Assurez-vous d'abord de consulter les points à prendre en compte concernant les fonctionnalités spécifiques à Cloud Composer et les points à prendre en compte concernant la fonctionnalité de traçabilité des données.
Toutes les autorisations IAM requises pour le lineage des données sont déjà incluses dans le rôle Worker Composer (
roles/composer.worker
). Ce rôle est requis pour les comptes de service de l'environnement.Pour en savoir plus sur les autorisations de traçabilité des données, consultez Rôles et autorisations de traçabilité dans la documentation Dataplex Universal Catalog.
Vérifier si un opérateur est compatible
La prise en charge de la traçabilité des données est fournie par le package de fournisseur où se trouve l'opérateur :
Consultez les journaux des modifications du package de fournisseur où se trouve l'opérateur pour les entrées qui ajoutent la prise en charge d'OpenLineage.
Par exemple, BigQueryToBigQueryOperator est compatible avec OpenLineage à partir de la version 11.0.0 de
apache-airflow-providers-google
.Vérifiez la version du package de fournisseur utilisée par votre environnement. Pour ce faire, consultez la liste des packages préinstallés pour la version d'Airflow utilisée dans votre environnement. Vous pouvez également installer une autre version du package dans votre environnement.
De plus, la page Classes compatibles de la documentation apache-airflow-providers-openlineage
liste les derniers opérateurs compatibles.
Configurer l'intégration de la traçabilité des données
L'intégration de la traçabilité des données pour Cloud Composer est gérée par environnement. Cela signifie que l'activation de la fonctionnalité nécessite deux étapes :
- Activez l'API Data Lineage dans votre projet.
- Activez l'intégration de la traçabilité des données dans un environnement Cloud Composer spécifique.
Activer la traçabilité des données dans Cloud Composer
Console
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Activer l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
gcloud
Utilisez l'argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région où se trouve l'environnement.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Désactiver la traçabilité des données dans Cloud Composer
La désactivation de l'intégration de la traçabilité dans un environnement Cloud Composer ne désactive pas l'API Data Lineage. Si vous souhaitez désactiver complètement le reporting de la traçabilité pour votre projet, désactivez également l'API Data Lineage. Consultez Désactiver des services.
Console
Dans la console Google Cloud , accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Désactiver l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
gcloud
Utilisez l'argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Remplacez les éléments suivants :
ENVIRONMENT_NAME
: nom de votre environnementLOCATION
: région où se trouve l'environnement.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Envoyer des événements de traçabilité dans les opérateurs compatibles
Si la traçabilité des données est activée, les opérateurs compatibles envoient automatiquement des événements de traçabilité. Vous n'avez pas besoin de modifier le code de votre DAG.
Par exemple, en exécutant la tâche suivante :
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Le graphique de traçabilité suivant est créé dans l'UI Dataplex Universal Catalog :

Envoyer des événements de traçabilité personnalisés
Vous pouvez envoyer des événements de lignée personnalisés si vous souhaitez générer des rapports de lignée pour un opérateur qui n'est pas compatible avec les rapports de lignée automatisés.
Par exemple, pour envoyer des événements personnalisés avec :
- BashOperator : modifiez le paramètre
inlets
ououtlets
dans la définition de la tâche. - PythonOperator : modifiez le paramètre
task.inlets
outask.outlets
dans la définition de la tâche. - Vous pouvez utiliser
AUTO
pour le paramètreinlets
. Sa valeur est alors égale à laoutlets
de sa tâche en amont.
L'exemple suivant montre comment utiliser les entrées et les sorties :
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
Le graphique de traçabilité suivant est alors créé dans l'UI Dataplex Universal Catalog :

Afficher les journaux de lignée dans Cloud Composer
Vous pouvez inspecter les journaux liés à la traçabilité des données à l'aide du lien sur la page Configuration de l'environnement dans la section Intégration de la traçabilité des données Dataplex Universal Catalog.
Dépannage
Si les données de traçabilité ne sont pas signalées à l'API Lineage ou si vous ne les voyez pas dans Dataplex Universal Catalog, essayez les étapes de dépannage suivantes :
- Assurez-vous que l'API Data Lineage est activée dans le projet de votre environnement Cloud Composer.
- Vérifiez si l'intégration du lineage des données est activée dans l'environnement Cloud Composer.
- Vérifiez si l'opérateur que vous utilisez est inclus dans la compatibilité avec les rapports automatisés sur la lignée. Consultez la liste des opérateurs Airflow compatibles.
- Consultez les journaux de lignée dans Cloud Composer pour identifier d'éventuels problèmes.