Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
À propos de l'intégration de la traçabilité des données
La traçabilité des données est un Dataplex Fonctionnalité qui vous permet de suivre la façon dont les données circulent dans vos systèmes et leur provenance l'endroit où elles sont transmises et les transformations qui leur sont appliquées. La traçabilité des données est disponible pour:
Environnements Cloud Composer 2 exécutant les versions 2.1.2 et ultérieures avec Airflow 2.2.5 et versions ultérieures.
les environnements Cloud Composer 2 situés dans les mêmes régions que les régions Data Catalog compatibles avec la lignée de données ;
Une fois la fonctionnalité activée dans votre environnement Cloud Composer, exécutez Les DAG qui utilisent l'un des opérateurs compatibles provoquent Cloud Composer pour transmettre les informations de traçabilité à l'API Data Lineage
Vous pouvez ensuite accéder à ces informations à l'aide des éléments suivants :
- API Data Lineage
- Graphiques de visualisation de la traçabilité pour les entrées Data Catalog compatibles dans Dataplex. Voir Graphiques de visualisation de la traçabilité dans la documentation Dataplex.
Opérateurs compatibles
Les opérateurs suivants sont compatibles avec la création automatique de rapports de traçabilité dans Cloud Composer:
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
Par exemple, vous pouvez exécuter la tâche suivante:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': GCP_PROJECT,
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
Cela se traduit par la création du graphique de traçabilité suivant dans l'UI Dataplex:
Remarques concernant les fonctionnalités de Cloud Composer
Chaque exécution de tâche Airflow qui génère la lignée des données effectue les opérations suivantes :
- Une requête RPC de création ou de mise à jour pour un processus de lignée
- Créer ou mettre à jour une requête RPC pour une exécution de traçabilité
- Une ou plusieurs requêtes RPC pour créer des événements de lignée (généralement 0 ou 1)
Pour en savoir plus sur ces entités, consultez le modèle d'informations sur la traçabilité des données et la documentation de référence de l'API Lineage dans la documentation Dataplex.
Le trafic de la généalogie émis est soumis à des quotas dans l'API Data Lineage. Cloud Composer consomme le quota d'écriture.
Les tarifs associés à la gestion des données de filiation sont soumis à la tarification de la filiation. Consultez la section Considérations concernant la lignée des données.
Implications en termes de performance
La traçabilité des données est signalée à la fin de l'exécution de la tâche Airflow. En moyenne, la création de rapports sur la lignée des données prend environ une à deux secondes.
Cela n'affecte pas les performances de la tâche elle-même : les tâches Airflow ne échouent pas si la lignée n'est pas correctement signalée à l'API Lineage. La logique de l'opérateur principal n'est pas affectée, mais l'exécution de l'ensemble de l'instance de tâche est un peu plus longue pour tenir compte des données de lignée de rapports.
Pour un environnement qui signale la traçabilité des données, les coûts associés, en raison du temps supplémentaire nécessaire pour signaler la traçabilité des données.
Conformité
La lignée de données propose différents niveaux d'assistance pour des fonctionnalités telles que VPC Service Controls. Vérifier considérations relatives à la traçabilité des données pour vous assurer que les niveaux d'assistance correspondent aux exigences de votre environnement.
Utiliser l'intégration de la traçabilité des données
L'intégration de la traçabilité des données pour Cloud Composer est gérée par environnement. Par conséquent, l'activation de cette fonctionnalité se fait en deux étapes:
- Activez l'API Data Lineage dans votre projet.
- Activez l'intégration de la traçabilité des données dans un environnement Cloud Composer spécifique.
Avant de commencer
Lorsque vous créez un environnement, l'intégration de la traçabilité des données Automatiquement activé si les conditions suivantes sont remplies:
L'API Data Lineage est activée dans votre projet. Pour en savoir plus, consultez Activer l'API Data Lineage dans Documentation Dataplex.
Un backend de ligneage personnalisé n'est pas configuré dans Airflow.
Pour un environnement existant, vous pouvez activer ou désactiver l'intégration de la lignée de données à tout moment.
Rôles requis
L'intégration de la traçabilité des données nécessite l'ajout des autorisations suivantes pour votre Compte de service de l'environnement Cloud Composer:
- Pour les comptes de service par défaut : aucune modification n'est requise. Les comptes de service par défaut incluent les autorisations requises.
- Pour les comptes de service gérés par l'utilisateur: attribuez le nœud de calcul Composer
(
roles/composer.worker
) à votre compte de service. Ce rôle inclut toutes les autorisations de lignée de données requises.
Pour en savoir plus, consultez la section Rôles et autorisations de la généalogie dans la documentation Dataplex.
Activer la traçabilité des données dans Cloud Composer
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Activer l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
gcloud
Utilisez l'argument --enable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.Le nom doit commencer par une lettre minuscule suivie d'un maximum de 62 caractères (lettres minuscules, chiffres ou traits d'union) et ne peut pas se terminer par un trait d'union. Le nom de l'environnement est utilisé pour créer des sous-composants pour l'environnement. Vous devez donc indiquer un nom également valide pour les buckets Cloud Storage. Pour obtenir une liste des restrictions, consultez la page Consignes de dénomination des buckets.
LOCATION
par la région de l'environnement.Un emplacement est la région dans laquelle se trouve le cluster GKE de l'environnement localisés.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
Envoyer des événements de traçabilité personnalisés
Vous pouvez envoyer des événements de lignée personnalisés si vous souhaitez créer des rapports sur la lignée d'un opérateur pour lequel les rapports automatiques sur la lignée ne sont pas disponibles.
Par exemple, pour envoyer des événements personnalisés avec :
BashOperator
: modifiez le paramètreinlets
ououtlets
dans la tâche. définition.PythonOperator
, modifiez le paramètretask.inlets
outask.outlets
dans la définition de la tâche. L'utilisation deAUTO
pour le paramètreinlets
définit sa égale à la valeuroutlets
de sa tâche en amont.
Par exemple, l'exécution de cette tâche:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
…
bash_task = BashOperator(
task_id='bash_task',
dag=dag,
bash_command='sleep 0',
inlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table1',
)],
outlets=[BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table2',
)]
)
def _python_task(task):
task.inlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table3',
))
task.outlets.append(BigQueryTable(
project_id=GCP_PROJECT,
dataset_id='dataset',
table_id='table4',
))
python_task = PythonOperator(
task_id='python_task',
dag=dag,
python_callable=_python_task,
inlets=[AUTO],
)
bash_task >> python_task
Cela se traduit par la création du graphique de traçabilité suivant dans l'UI Dataplex:
Désactiver la traçabilité des données dans Cloud Composer
Désactiver l'intégration de la traçabilité dans un environnement Cloud Composer désactiver l'API Data Lineage. Si vous souhaitez désactiver complètement les rapports sur la traçabilité pour votre projet, désactivez également l'API Data Lineage. Voir Désactiver des services
Console
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le nom de votre environnement. La page Détails de l'environnement s'ouvre.
Sélectionnez l'onglet Configuration de l'environnement.
Dans la section Intégration de la traçabilité des données Dataplex, cliquez sur Modifier.
Dans le panneau Intégration de la traçabilité des données Dataplex, sélectionnez Désactiver l'intégration avec la traçabilité des données Dataplex, puis cliquez sur Enregistrer.
gcloud
Utilisez l'argument --disable-cloud-data-lineage-integration
.
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
Remplacez :
ENVIRONMENT_NAME
par le nom de l'environnement.Le nom doit commencer par une lettre minuscule suivie d'un maximum de 62 caractères (lettres minuscules, chiffres ou traits d'union) et ne peut pas se terminer par un trait d'union. Le nom de l'environnement est utilisé pour créer des sous-composants pour l'environnement. Vous devez donc indiquer un nom également valide pour les buckets Cloud Storage. Pour obtenir une liste des restrictions, consultez la page Consignes de dénomination des buckets.
LOCATION
par la région de l'environnement.Un emplacement est la région dans laquelle se trouve le cluster GKE de l'environnement localisés.
Exemple :
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
Afficher les journaux de traçabilité dans Cloud Composer
Vous pouvez inspecter les journaux liés à la lignée des données à l'aide du lien sur la page Configuration de l'environnement, dans la section Intégration de la lignée des données Dataplex.
Dépannage
Si les données de lignée ne sont pas envoyées à l'API Lineage ou si vous ne les voyez pas dans Dataplex, suivez la procédure de dépannage ci-dessous :
- Assurez-vous que l'API Data Lineage est activée dans le projet de votre Cloud Composer.
- Vérifiez si l'intégration de la lignée de données est activée dans l'environnement Cloud Composer.
- Vérifiez si l'opérateur que vous utilisez est inclus dans la compatibilité avec les rapports de lignée automatiques. Consultez la section Opérateurs Airflow compatibles.
- Vérifiez les journaux de la lignée dans Cloud Composer pour détecter d'éventuels problèmes.