Trasferisci dati da altri servizi con Google Transfer Operators

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina mostra come trasferire i dati da altri servizi con Google Operatori di trasferimento nei DAG.

Informazioni sugli operatori di trasferimento di Google

Gli operatori di trasferimento di Google sono un di operatori Airflow che puoi utilizzare per eseguire il pull dei dati da altri servizi in in Google Cloud.

Questa guida mostra gli operatori per Azure FileShare Storage e Amazon S3 che funzionano con Cloud Storage. Esistono molti altri operatori di trasferimento che funzionano con servizi all'interno di Google Cloud e con servizi diversi da in Google Cloud.

Prima di iniziare

  • Questa guida si riferisce ad Airflow 2. Se il tuo ambiente utilizza Airflow 1, usa pacchetti del provider backport per importare operatori e creare i tipi di connessione richiesti disponibili nel tuo ambiente.

Da Amazon S3 a Cloud Storage

Questa sezione illustra come sincronizzare i dati da Amazon S3 a un nel bucket Cloud Storage.

Installa il pacchetto del provider Amazon

Il pacchetto apache-airflow-providers-amazon contiene la connessione tipi e funzionalità che interagiscono con Amazon S3. Installa questo pacchetto PyPI nel tuo completamente gestito di Google Cloud.

Configurare una connessione ad Amazon S3

Il pacchetto del provider Amazon fornisce un tipo di connessione per Amazon S3. Tu una connessione di questo tipo. La connessione per Cloud Storage, denominata google_cloud_default è già configurata nel tuo ambiente.

Configura una connessione ad Amazon S3 nel seguente modo:

  1. In UI di Airflow, vai ad Admin (Amministrazione) > Connessioni.
  2. Crea una nuova connessione.
  3. Seleziona Amazon S3 come tipo di connessione.
  4. L'esempio seguente utilizza una connessione denominata aws_s3. Puoi utilizzare questo o qualsiasi altro nome per la connessione.
  5. Specifica i parametri di connessione come descritto nella documentazione di Airflow per Connessione Amazon Web Services. Ad esempio, per configurare una connessione con le chiavi di accesso AWS, generi un chiave di accesso per il tuo account su AWS, quindi fornisci l'ID della chiave di accesso AWS come accedi alla chiave di accesso del secret AWS come password per la connessione.

Trasferire dati da Amazon S3

Se vuoi eseguire operazioni sui dati sincronizzati in un secondo momento in un altro DAG o in un'altra attività, esegui il pull nella cartella /data del bucket dell'ambiente. Questa cartella è sincronizzati con altri worker Airflow, in modo che le attività nel tuo DAG possono operare su di esso.

Il DAG di esempio seguente svolge le seguenti operazioni:

  • Sincronizza il contenuto della directory /data-for-gcs da un bucket S3 nella cartella /data/from-s3/data-for-gcs/ del bucket dell'ambiente.
  • Attende due minuti, affinché i dati vengano sincronizzati con tutti i worker Airflow in del tuo ambiente.
  • Restituisce l'elenco dei file in questa directory utilizzando il comando ls. Sostituisci questa attività con altri operatori Airflow che lavorano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Condivisione file di Azure in Cloud Storage

Questa sezione illustra come sincronizzare i dati da Azure FileShare a un nel bucket Cloud Storage.

Installa il pacchetto del provider Microsoft Azure

Il pacchetto apache-airflow-providers-microsoft-azure contiene la connessione che interagiscono con Microsoft Azure. Installa questo pacchetto PyPI nel tuo completamente gestito di Google Cloud.

Configura una connessione a Condivisione file di Azure

Il pacchetto del provider Microsoft Azure fornisce un tipo di connessione per il file Azure Condividi. Puoi creare una connessione di questo tipo. La connessione per Cloud Storage, denominato google_cloud_default, è già configurato in del tuo ambiente.

Configura una connessione a Condivisione file di Azure nel seguente modo:

  1. In UI di Airflow, vai ad Admin (Amministrazione) > Connessioni.
  2. Crea una nuova connessione.
  3. Seleziona Azure FileShare come tipo di connessione.
  4. L'esempio seguente utilizza una connessione denominata azure_fileshare. Puoi utilizzare la modalità questo o qualsiasi altro nome per la connessione.
  5. Specifica i parametri di connessione come descritto nella documentazione di Airflow per Connessione di condivisione file di Microsoft Azure. Ad esempio, puoi specificare una stringa di connessione per il tuo account di archiviazione. chiave di accesso.

Trasferisci dati da Condivisione file di Azure

Se vuoi eseguire operazioni sui dati sincronizzati in un secondo momento in un altro DAG o in un'altra attività, esegui il pull nella cartella /data del bucket dell'ambiente. Questa cartella è sincronizzati con altri worker Airflow, in modo che le attività nel tuo DAG possono operare su di esso.

Il seguente DAG fa quanto segue:

Il DAG di esempio seguente svolge le seguenti operazioni:

  • Sincronizza i contenuti della directory /data-for-gcs da Condivisione file di Azure nella cartella /data/from-azure del bucket dell'ambiente.
  • Attende due minuti, affinché i dati vengano sincronizzati con tutti i worker Airflow in del tuo ambiente.
  • Restituisce l'elenco dei file in questa directory utilizzando il comando ls. Sostituisci questa attività con altri operatori Airflow che lavorano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Passaggi successivi