Transférer des données à partir d'autres services avec les opérateurs de transfert Google

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment transférer des données d'autres services avec Google Transférez des opérateurs dans vos DAG.

À propos des opérateurs de transfert Google

Les Opérateurs de transfert Google sont d'opérateurs Airflow que vous pouvez utiliser pour extraire des données d'autres services Google Cloud.

Ce guide présente les opérateurs compatibles avec le stockage Azure FileShare et Amazon S3 avec Cloud Storage. Il existe de nombreux autres opérateurs de transfert qui fonctionnent dans Google Cloud et avec des services autres que Google Cloud.

Avant de commencer

Amazon S3 vers Cloud Storage

Cette section explique comment synchroniser les données d'Amazon S3 bucket Cloud Storage.

Installer le package fournisseur Amazon

Le package apache-airflow-providers-amazon contient la connexion et des fonctionnalités qui interagissent avec Amazon S3. Installez ce package PyPI dans votre environnement.

Configurer une connexion à Amazon S3

Le package fournisseur Amazon fournit un type de connexion pour Amazon S3. Toi créer une connexion de ce type. La connexion pour Cloud Storage, nommé google_cloud_default est déjà configuré dans votre environnement.

Configurez une connexion à Amazon S3 comme suit:

  1. Dans l'interface utilisateur Airflow, accédez à Admin > Connexions :
  2. Créez une connexion.
  3. Sélectionnez Amazon S3 comme type de connexion.
  4. L'exemple suivant utilise une connexion nommée aws_s3. Vous pouvez utiliser cette ou tout autre nom pour la connexion.
  5. Spécifiez les paramètres de connexion comme décrit dans la documentation Airflow pour Amazon Web Services Connection Par exemple, pour configurer une connexion avec des clés d'accès AWS, vous devez générer un pour votre compte sur AWS, puis indiquez l'ID de clé d'accès AWS sous forme de connecter la clé d'accès secrète AWS en tant que mot de passe de la connexion.

Transférer des données depuis Amazon S3

Si vous souhaitez effectuer ultérieurement des opérations sur les données synchronisées dans un autre DAG ou une autre tâche, extrayez-le dans le dossier /data du bucket de votre environnement. Ce dossier est synchronisées avec les autres nœuds de calcul Airflow, de sorte que les tâches de votre DAG pour pouvoir l'utiliser.

L'exemple de DAG suivant effectue les opérations suivantes:

  • Il synchronise le contenu du répertoire /data-for-gcs à partir d'un bucket S3. dans le dossier /data/from-s3/data-for-gcs/ du bucket de votre environnement.
  • Attend deux minutes, le temps que les données soient synchronisées avec tous les nœuds de calcul Airflow dans votre environnement.
  • Génère la liste des fichiers de ce répertoire à l'aide de la commande ls. Remplacer cette tâche avec d'autres opérateurs Airflow qui utilisent vos données.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare vers Cloud Storage

Cette section explique comment synchroniser les données d'Azure FileShare bucket Cloud Storage.

Installer le package fournisseur Microsoft Azure

Le package apache-airflow-providers-microsoft-azure contient la connexion et les fonctionnalités qui interagissent avec Microsoft Azure. Installez ce package PyPI dans votre environnement.

Configurer une connexion à Azure FileShare

Le package fournisseur Microsoft Azure fournit un type de connexion pour le fichier Azure Partager. Vous créez une connexion de ce type. La connexion pour Cloud Storage, nommé google_cloud_default, est déjà configuré dans votre environnement.

Configurez une connexion à Azure FileShare de la manière suivante:

  1. Dans l'interface utilisateur Airflow, accédez à Admin > Connexions :
  2. Créez une connexion.
  3. Sélectionnez Azure FileShare comme type de connexion.
  4. L'exemple suivant utilise une connexion nommée azure_fileshare. Vous pouvez utiliser ce nom ou tout autre nom pour la connexion.
  5. Spécifiez les paramètres de connexion comme décrit dans la documentation Airflow pour Connexion au partage de fichiers Microsoft Azure. Par exemple, vous pouvez spécifier une chaîne de connexion pour votre compte de stockage clé d'accès.

Transférer des données à partir d'Azure FileShare

Si vous souhaitez effectuer ultérieurement des opérations sur les données synchronisées dans un autre DAG ou une autre tâche, extrayez-le dans le dossier /data du bucket de votre environnement. Ce dossier est synchronisées avec les autres nœuds de calcul Airflow, de sorte que les tâches de votre DAG pour pouvoir l'utiliser.

Le DAG suivant effectue les opérations suivantes:

L'exemple de DAG suivant effectue les opérations suivantes:

  • Il synchronise le contenu du répertoire /data-for-gcs à partir d'Azure File Share dans le dossier /data/from-azure du bucket de votre environnement.
  • Attend deux minutes, le temps que les données soient synchronisées avec tous les nœuds de calcul Airflow dans votre environnement.
  • Génère la liste des fichiers de ce répertoire à l'aide de la commande ls. Remplacer cette tâche avec d'autres opérateurs Airflow qui utilisent vos données.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Étape suivante