Trasferire dati con gli operatori di trasferimento di Google

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina mostra come trasferire i dati da altri servizi con gli operatori di trasferimento di Google nei tuoi DAG.

Informazioni sugli operatori di trasferimento di Google

Gli operatori di trasferimento di Google sono un insieme di operatori Airflow che puoi utilizzare per estrarre i dati da altri servizi inGoogle Cloud.

Questa guida mostra gli operatori per Azure FileShare Storage e Amazon S3 che funzionano con Cloud Storage. Esistono molti altri operatori di trasferimento che lavorano con i servizi di Google Cloud e con servizi diversi daGoogle Cloud.

Da Amazon S3 a Cloud Storage

Questa sezione illustra come sincronizzare i dati da Amazon S3 a un bucket Cloud Storage.

Installa il pacchetto del provider Amazon

Il pacchetto apache-airflow-providers-amazon contiene i tipi di connessione e le funzionalità che interagiscono con Amazon S3. Installa questo pacchetto PyPI nel tuo ambiente.

Configurare una connessione ad Amazon S3

Il pacchetto del provider Amazon fornisce un tipo di connessione per Amazon S3. Puoi creare una connessione di questo tipo. La connessione per Cloud Storage, chiamata google_cloud_default, è già configurata nel tuo ambiente.

Configura una connessione ad Amazon S3 nel seguente modo:

  1. Nell'interfaccia utente di Airflow, vai ad Amministrazione > Connessioni.
  2. Crea una nuova connessione.
  3. Seleziona Amazon S3 come tipo di connessione.
  4. L'esempio seguente utilizza una connessione denominata aws_s3. Puoi utilizzare questo nome o un altro nome per la connessione.
  5. Specifica i parametri di connessione come descritto nella documentazione di Airflow per la connessione ad Amazon Web Services. Ad esempio, per configurare una connessione con le chiavi di accesso AWS, genera una chiave di accesso per il tuo account su AWS, poi fornisci l'ID chiave di accesso AWS come nome utente e la chiave di accesso segreta AWS come password per la connessione.

Trasferisci i dati da Amazon S3

Se vuoi utilizzare i dati sincronizzati in un secondo momento in un'altra attività o in un altro DAG, trascinali nella cartella /data del bucket del tuo ambiente. Questa cartella viene sincronizzata con altri worker Airflow, in modo che le attività nel DAG possano eseguire operazioni su di essa.

Il seguente DAG di esempio esegue le seguenti operazioni:

  • Sincronizza i contenuti della directory /data-for-gcs da un bucket S3 alla cartella /data/from-s3/data-for-gcs/ nel bucket del tuo ambiente.
  • Attende due minuti per la sincronizzazione dei dati con tutti i worker Airflow nel tuo ambiente.
  • Stampa l'elenco dei file in questa directory utilizzando il comando ls. Sostituisci questa attività con altri operatori Airflow che lavorano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare to Cloud Storage

Questa sezione illustra come sincronizzare i dati da Azure FileShare a un bucket Cloud Storage.

Installa il pacchetto del provider Microsoft Azure

Il pacchetto apache-airflow-providers-microsoft-azure contiene i tipi di connessione e le funzionalità che interagiscono con Microsoft Azure. Installa questo pacchetto PyPI nel tuo ambiente.

Configura una connessione ad Azure FileShare

Il pacchetto del provider Microsoft Azure fornisce un tipo di connessione per la condivisione file di Azure. Creazione di una connessione di questo tipo. La connessione per Cloud Storage, denominata google_cloud_default, è già configurata nel tuo ambiente.

Configura una connessione a Azure FileShare nel seguente modo:

  1. Nell'interfaccia utente di Airflow, vai ad Amministrazione > Connessioni.
  2. Crea una nuova connessione.
  3. Seleziona Azure FileShare come tipo di connessione.
  4. L'esempio seguente utilizza una connessione denominata azure_fileshare. Puoi utilizzare questo nome o un altro nome per la connessione.
  5. Specifica i parametri di connessione come descritto nella documentazione di Airflow per la connessione alla condivisione file di Microsoft Azure. Ad esempio, puoi specificare una stringa di connessione per la chiave di accesso dell'account di archiviazione.

Trasferire dati da una condivisione file Azure

Se vuoi utilizzare i dati sincronizzati in un secondo momento in un'altra attività o in un altro DAG, trascinali nella cartella /data del bucket del tuo ambiente. Questa cartella viene sincronizzata con altri worker Airflow, in modo che le attività nel DAG possano eseguire operazioni su di essa.

Il seguente DAG esegue le seguenti operazioni:

Il seguente DAG di esempio esegue le seguenti operazioni:

  • Sincronizza i contenuti della directory /data-for-gcs da Condivisione file di Azure alla cartella /data/from-azure nel bucket del tuo ambiente.
  • Attende due minuti per la sincronizzazione dei dati con tutti i worker Airflow nel tuo ambiente.
  • Stampa l'elenco dei file in questa directory utilizzando il comando ls. Sostituisci questa attività con altri operatori Airflow che lavorano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Passaggi successivi