Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
À propos de l'intégration de la traçabilité des données
La traçabilité des données est une fonctionnalité Dataplex qui vous permet de suivre la manière dont les données transitent par vos systèmes: leur origine, la cible de transmission, et les transformations qui leur sont appliquées. La traçabilité des données est disponible pour:
Environnements Cloud Composer 2 exécutant les versions 2.1.2 et ultérieures avec les versions 2.2.5 et ultérieures d'Airflow.
les environnements Cloud Composer 2 dans les mêmes régions que les régions Dataplex compatibles avec la lignée des données ;
Une fois la fonctionnalité activée dans votre environnement Cloud Composer, l'exécution de DAG qui utilisent l'un des opérateurs compatibles entraîne l'envoi d'informations sur la lignée à l'API Data Lineage.
Vous pouvez ensuite accéder à ces informations à l'aide des éléments suivants:
- API Data Lineage
- Graphiques de traçabilité pour les entrées compatibles dans Dataplex. Consultez la section Graphiques de traçabilité dans la documentation Dataplex.
Opérateurs compatibles
Les opérateurs suivants sont compatibles avec les rapports de lignée automatiques dans Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Par exemple, en exécutant la tâche suivante:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Le graphique de traçabilité suivant s'affiche dans l'UI Dataplex:

Fonctionnalités à prendre en compte pour Cloud Composer
Chaque exécution de tâche Airflow qui génère un rapport sur la lignée des données effectue les opérations suivantes:
- Une requête RPC de création ou de mise à jour pour un processus de lignée
- Une requête RPC de création ou de mise à jour pour une exécution de lignée
- Une ou plusieurs requêtes RPC pour créer des événements de lignée (généralement 0 ou 1)
Pour en savoir plus sur ces entités, consultez le modèle d'informations sur la traçabilité des données et la documentation de référence de l'API Lineage dans la documentation Dataplex.
Le trafic de la généalogie émis est soumis à des quotas dans l'API Data Lineage. Cloud Composer consomme le quota d'écriture.
Les tarifs associés à la gestion des données de filiation sont soumis à la tarification de la filiation. Consultez la section Considérations concernant la lignée des données.
Implications en termes de performance
La traçabilité des données est indiquée à la fin de l'exécution de la tâche Airflow. En moyenne, la création de rapports sur la lignée des données prend environ une à deux secondes.
Cela n'affecte pas les performances de la tâche elle-même: les tâches Airflow ne échouent pas si la lignée n'est pas correctement signalée à l'API Lineage. La logique de l'opérateur principal n'est pas affectée, mais l'exécution de l'ensemble de l'instance de tâche est un peu plus longue pour tenir compte des données de lignée de rapports.
Un environnement qui génère des rapports sur la lignée des données entraînera une augmentation mineure des coûts associés, en raison du temps supplémentaire nécessaire pour générer ces rapports.
Conformité
La lignée de données propose différents niveaux d'assistance pour des fonctionnalités telles que VPC Service Controls. Consultez les considérations sur la lignée des données pour vous assurer que les niveaux d'assistance correspondent aux exigences de votre environnement.
Utiliser l'intégration de la traçabilité des données
L'intégration de la traçabilité des données pour Cloud Composer est gérée par environnement. Pour activer cette fonctionnalité, vous devez suivre deux étapes:
- Activez l'API Data Lineage dans votre projet.
- Activez l'intégration de la traçabilité des données dans un environnement Cloud Composer spécifique.
Avant de commencer
Lorsque vous créez un environnement, l'intégration de la lignée de données est automatiquement activée si les conditions suivantes sont remplies:
L'API Data Lineage est activée dans votre projet. Pour en savoir plus, consultez la section Activer l'API Data Lineage dans la documentation Dataplex.
Un backend de ligneage personnalisé n'est pas configuré dans Airflow.
Pour un environnement existant, vous pouvez activer ou désactiver l'intégration de la lignée de données à tout moment.
Rôles requis
Pour intégrer la traçabilité des données, vous devez ajouter les autorisations suivantes au compte de service de votre environnement Cloud Composer:
- Pour les comptes de service par défaut: aucune modification n'est requise. Les comptes de service par défaut incluent les autorisations requises.
- Pour les comptes de service gérés par l'utilisateur: attribuez le rôle Worker Composer (
roles/composer.worker
) à votre compte de service. Ce rôle inclut toutes les autorisations de lignée de données requises.
Pour en savoir plus, consultez la section Rôles et autorisations de la généalogie dans la documentation Dataplex.
Activer la traçabilité des données dans Cloud Composer
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Activer l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
Utilisez l'argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.Le nom doit commencer par une lettre minuscule suivie d'un maximum de 62 caractères (lettres minuscules, chiffres ou traits d'union) et ne peut pas se terminer par un trait d'union. Le nom de l'environnement est utilisé pour créer des sous-composants pour l'environnement. Vous devez donc indiquer un nom également valide pour les buckets Cloud Storage. Pour obtenir une liste des restrictions, consultez la page Consignes de dénomination des buckets.
LOCATION
par la région de l'environnement.L'emplacement correspond à la région dans laquelle se trouve le cluster GKE de l'environnement.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Envoyer des événements de traçabilité personnalisés
Vous pouvez envoyer des événements de lignée personnalisés si vous souhaitez créer des rapports sur la lignée d'un opérateur pour lequel les rapports automatiques sur la lignée ne sont pas disponibles.
Par exemple, pour envoyer des événements personnalisés avec:
BashOperator
, modifiez le paramètreinlets
ououtlets
dans la définition de la tâche.PythonOperator
, modifiez le paramètretask.inlets
outask.outlets
dans la définition de la tâche. L'utilisation deAUTO
pour le paramètreinlets
définit sa valeur sur celle deoutlets
de sa tâche en amont.
Par exemple, exécutez cette tâche:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Le graphique de traçabilité suivant s'affiche dans l'UI Dataplex:

Désactiver la traçabilité des données dans Cloud Composer
La désactivation de l'intégration de la traçabilité dans un environnement Cloud Composer ne désactive pas l'API Data Lineage. Si vous souhaitez désactiver complètement les rapports sur la traçabilité pour votre projet, désactivez également l'API Data Lineage. Consultez la section Désactiver des services.
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Désactiver l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
Utilisez l'argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.Le nom doit commencer par une lettre minuscule suivie d'un maximum de 62 caractères (lettres minuscules, chiffres ou traits d'union) et ne peut pas se terminer par un trait d'union. Le nom de l'environnement est utilisé pour créer des sous-composants pour l'environnement. Vous devez donc indiquer un nom également valide pour les buckets Cloud Storage. Pour obtenir une liste des restrictions, consultez la page Consignes de dénomination des buckets.
LOCATION
par la région de l'environnement.L'emplacement correspond à la région dans laquelle se trouve le cluster GKE de l'environnement.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Afficher les journaux de lignée dans Cloud Composer
Vous pouvez inspecter les journaux liés à la lignée des données à l'aide du lien sur la page Configuration de l'environnement, dans la section Intégration de la lignée des données Dataplex.
Dépannage
Si les données de lignée ne sont pas envoyées à l'API Lineage ou si vous ne les voyez pas dans Dataplex, essayez de suivre les étapes de dépannage ci-dessous:
- Assurez-vous que l'API Data Lineage est activée dans le projet de votre environnement Cloud Composer.
- Vérifiez si l'intégration de la lignée de données est activée dans l'environnement Cloud Composer.
- Vérifiez si l'opérateur que vous utilisez est inclus dans la compatibilité avec les rapports de lignée automatisés. Consultez la section Opérateurs Airflow compatibles.
- Vérifiez les journaux de la lignée dans Cloud Composer pour détecter d'éventuels problèmes.