Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
À propos de l'intégration de la traçabilité des données
La traçabilité des données est un Dataplex Fonctionnalité qui vous permet de suivre la façon dont les données circulent dans vos systèmes et leur provenance l'endroit où elles sont transmises et les transformations qui leur sont appliquées. La traçabilité des données est disponible pour:
Environnements Cloud Composer 2 exécutant les versions 2.1.2 et ultérieures avec les versions 2.2.5 et ultérieures d'Airflow.
Environnements Cloud Composer 2 dans les mêmes régions que Régions Data Catalog compatibles avec la traçabilité des données.
Une fois la fonctionnalité activée dans votre environnement Cloud Composer, exécutez Les DAG qui utilisent l'un des opérateurs compatibles provoquent Cloud Composer pour transmettre les informations de traçabilité à l'API Data Lineage
Vous pouvez ensuite accéder à ces informations en utilisant:
- API Data Lineage
- Graphiques de visualisation de la traçabilité pour les entrées Data Catalog compatibles dans Dataplex. Voir Graphiques de visualisation de la traçabilité dans la documentation Dataplex.
Opérateurs compatibles
Les opérateurs suivants sont compatibles avec la création automatique de rapports de traçabilité dans Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Par exemple, en exécutant la tâche suivante :
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Cela se traduit par la création du graphique de traçabilité suivant dans l'UI Dataplex:
Remarques concernant les fonctionnalités de Cloud Composer
Chaque exécution de tâche Airflow qui génère un rapport sur la lignée des données effectue les opérations suivantes :
- Créer ou mettre à jour une requête RPC pour un processus de traçabilité
- Une requête RPC de création ou de mise à jour pour une exécution de lignée
- Une ou plusieurs requêtes RPC pour créer des événements de traçabilité (la plupart du temps 0 ou 1)
Pour en savoir plus sur ces entités, consultez modèle d'information de traçabilité et Documentation de référence de l'API de traçabilité dans Dataplex dans la documentation Google Cloud.
Le trafic de la traçabilité émis est soumis à des quotas dans l'API Data Lineage. Cloud Composer consomme le quota d'écriture.
La tarification associée à la gestion des données de traçabilité est soumise à une tarification de traçabilité. Consultez les considérations relatives à la traçabilité des données.
Implications en termes de performance
La traçabilité des données est indiquée à la fin de l'exécution de la tâche Airflow. La création de rapports sur la traçabilité des données prend en moyenne une à deux secondes.
Cela n'affecte pas les performances de la tâche elle-même : les tâches Airflow ne échouent pas si la lignée n'est pas correctement signalée à l'API Lineage. La logique de l'opérateur principal n'est pas affectée, mais l'exécution de l'instance de tâche entière prend un peu plus de temps pour tenir compte des données de lignée de rapports.
Un environnement qui génère des rapports sur la lignée des données entraînera une augmentation mineure des coûts associés, en raison du temps supplémentaire nécessaire pour générer ces rapports.
Conformité
La lignée de données propose différents niveaux d'assistance pour des fonctionnalités telles que VPC Service Controls. Examiner considérations relatives à la traçabilité des données pour vous assurer que les niveaux d'assistance correspondent aux exigences de votre environnement.
Utiliser l'intégration de la traçabilité des données
L'intégration de la traçabilité des données pour Cloud Composer est gérée par environnement. Pour activer cette fonctionnalité, vous devez suivre deux étapes :
- activer l'API Data Lineage dans votre projet ;
- Activez l'intégration de la traçabilité des données dans un environnement Cloud Composer spécifique.
Avant de commencer
Lorsque vous créez un environnement, l'intégration de la lignée de données est automatiquement activée si les conditions suivantes sont remplies :
L'API Data Lineage est activée dans votre projet. Pour en savoir plus, consultez Activer l'API Data Lineage dans Documentation Dataplex.
Un backend de ligneage personnalisé n'est pas configuré dans Airflow.
Pour un environnement existant, vous pouvez activer ou désactiver l'intégration de la traçabilité des données à tout moment.
Rôles requis
L'intégration de la traçabilité des données nécessite l'ajout des autorisations suivantes pour votre Compte de service de l'environnement Cloud Composer:
- Pour les comptes de service par défaut: aucune modification n'est nécessaire. Comptes de service par défaut incluent les autorisations requises.
- Pour les comptes de service gérés par l'utilisateur: attribuez le nœud de calcul Composer
(
roles/composer.worker
) à votre compte de service. Ce rôle inclut toutes les autorisations de lignée de données requises.
Pour en savoir plus, consultez Rôles et autorisations de traçabilité dans la documentation Dataplex.
Activer la traçabilité des données dans Cloud Composer
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Activer l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
gcloud
Utilisez l'argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.Le nom doit commencer par une lettre minuscule suivie d'un maximum de 62 caractères (lettres minuscules, chiffres ou traits d'union) et ne peut pas se terminer par un trait d'union. Le nom de l'environnement est utilisé pour créer des sous-composants pour l'environnement. Vous devez donc indiquer un nom également valide pour les buckets Cloud Storage. Pour obtenir une liste des restrictions, consultez la page Consignes de dénomination des buckets.
LOCATION
par la région de l'environnement.Un emplacement est la région dans laquelle se trouve le cluster GKE de l'environnement localisés.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Envoyer des événements de traçabilité personnalisés
Vous pouvez envoyer des événements de traçabilité personnalisés si vous souhaitez signaler la traçabilité pour un opérateur qui n'est pas compatible avec les rapports de traçabilité automatiques.
Par exemple, pour envoyer des événements personnalisés avec:
BashOperator
: modifiez le paramètreinlets
ououtlets
dans la tâche. définition.PythonOperator
, modifiez le paramètretask.inlets
outask.outlets
dans la définition de la tâche. L'utilisation deAUTO
pour le paramètreinlets
définit sa égale à la valeuroutlets
de sa tâche en amont.
Par exemple, exécutez cette tâche :
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Le graphique de traçabilité suivant est créé dans l'UI Dataplex :
Désactiver la traçabilité des données dans Cloud Composer
La désactivation de l'intégration de la traçabilité dans un environnement Cloud Composer ne désactive pas l'API Data Lineage. Si vous souhaitez désactiver complètement les rapports sur la traçabilité pour votre projet, désactivez également l'API Data Lineage. Consultez la section Désactiver des services.
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Désactiver l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
gcloud
Utilisez l'argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.Le nom doit commencer par une lettre minuscule suivie d'un maximum de 62 caractères (lettres minuscules, chiffres ou traits d'union) et ne peut pas se terminer par un trait d'union. Le nom de l'environnement est utilisé pour créer des sous-composants pour l'environnement. Vous devez donc indiquer un nom également valide pour les buckets Cloud Storage. Pour obtenir une liste des restrictions, consultez la page Consignes de dénomination des buckets.
LOCATION
par la région de l'environnement.Un emplacement est la région dans laquelle se trouve le cluster GKE de l'environnement localisés.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Afficher les journaux de lignée dans Cloud Composer
Vous pouvez inspecter les journaux liés à la traçabilité des données à l'aide du lien disponible sur Page de configuration de l'environnement dans Dataplex intégration de la traçabilité des données.
Dépannage
Si les données de lignée ne sont pas envoyées à l'API Lineage ou si vous ne les voyez pas dans Dataplex, suivez la procédure de dépannage ci-dessous :
- Assurez-vous que l'API Data Lineage est activée dans le projet de votre environnement Cloud Composer.
- Vérifiez si l'intégration de la lignée de données est activée dans l'environnement Cloud Composer.
- Vérifiez si l'opérateur que vous utilisez est inclus dans la compatibilité avec les rapports de lignée automatiques. Consultez la section Opérateurs Airflow compatibles.
- Consultez les journaux de traçabilité dans Cloud Composer pour identifier les problèmes éventuels.