Cloud Composer 1 | Cloud Composer 2
Questa pagina mostra come trasferire i dati da altri servizi con gli operatori di trasferimento Google nei tuoi DAG.
Informazioni su Google Transfer Operators
Gli operatori di trasferimento di Google sono un insieme di operatori di Airflow che puoi utilizzare per eseguire il pull di dati da altri servizi in Google Cloud.
Questa guida mostra gli operatori per Azure FileShare Storage e Amazon S3 che funzionano con Cloud Storage. Esistono molti altri operatori di trasferimento che funzionano con i servizi all'interno di Google Cloud e con servizi diversi da Google Cloud.
Prima di iniziare
- Questa guida si riferisce a Airflow 2. Se il tuo ambiente utilizza Airflow 1, utilizza pacchetti del provider di backport per importare gli operatori e rendere disponibili i tipi di connessione richiesti nel tuo ambiente.
Da Amazon S3 a Cloud Storage
Questa sezione illustra come sincronizzare i dati da Amazon S3 a un bucket Cloud Storage.
Installa il pacchetto del provider Amazon
Il pacchetto apache-airflow-providers-amazon
contiene i tipi di connessione e le funzionalità che interagiscono con Amazon S3.
Installa questo pacchetto PyPI nel tuo
ambiente.
Configurare una connessione ad Amazon S3
Il pacchetto del provider Amazon fornisce un tipo di connessione per Amazon S3. Crei una connessione di questo tipo. La connessione per Cloud Storage,
denominata google_cloud_default
, è già configurata nel tuo ambiente.
Configura una connessione ad Amazon S3 nel seguente modo:
- Nella UI di Airflow, vai ad Amministrazione > Connessioni.
- Crea una nuova connessione.
- Seleziona
Amazon S3
come tipo di connessione. - L'esempio seguente utilizza una connessione denominata
aws_s3
. Puoi utilizzare questo nome o un altro nome per la connessione. - Specifica i parametri di connessione come descritto nella documentazione di Airflow per la connessione Amazon Web Services. Ad esempio, per configurare una connessione con le chiavi di accesso AWS, devi generare una chiave di accesso per il tuo account su AWS, quindi fornire l'ID della chiave di accesso AWS come accesso alla chiave di accesso secret AWS come password per la connessione.
Trasferisci dati da Amazon S3
Se vuoi eseguire operazioni sui dati sincronizzati in un secondo momento in un altro DAG o un'altra attività,
devi trasferirli nella cartella /data
del bucket del tuo ambiente. Questa cartella è sincronizzata con altri worker Airflow, in modo che le attività nel tuo DAG possano eseguirvi operazioni.
Il DAG di esempio riportato di seguito esegue quanto segue:
- Sincronizza i contenuti della directory
/data-for-gcs
da un bucket S3 alla cartella/data/from-s3/data-for-gcs/
nel bucket del tuo ambiente. - Attende due minuti per la sincronizzazione dei dati con tutti i worker Airflow nel tuo ambiente.
- Restituisce come output l'elenco dei file in questa directory utilizzando il comando
ls
. Sostituisci questa attività con altri operatori Airflow che lavorano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Condivisione file di Azure in Cloud Storage
Questa sezione illustra come sincronizzare i dati da Azure FileShare a un bucket Cloud Storage.
Installa il pacchetto del provider Microsoft Azure
Il pacchetto apache-airflow-providers-microsoft-azure
contiene i tipi di connessione e le funzionalità che interagiscono con Microsoft Azure.
Installa questo pacchetto PyPI nel tuo
ambiente.
Configura una connessione ad Azure FileShare
Il pacchetto del provider Microsoft Azure fornisce un tipo di connessione per Condivisione file di Azure. Crei una connessione di questo tipo. La connessione per
Cloud Storage, denominata google_cloud_default
, è già configurata
nel tuo ambiente.
Configura una connessione ad Azure FileShare nel seguente modo:
- Nella UI di Airflow, vai ad Amministrazione > Connessioni.
- Crea una nuova connessione.
- Seleziona
Azure FileShare
come tipo di connessione. - L'esempio seguente utilizza una connessione denominata
azure_fileshare
. Puoi utilizzare questo nome o un altro nome per la connessione. - Specifica i parametri di connessione come descritto nella documentazione di Airflow per Connessione condivisione file di Microsoft Azure. Ad esempio, puoi specificare una stringa di connessione per la chiave di accesso dell'account di archiviazione.
Trasferisci i dati da Condivisione file di Azure
Se vuoi eseguire operazioni sui dati sincronizzati in un secondo momento in un altro DAG o un'altra attività,
devi trasferirli nella cartella /data
del bucket del tuo ambiente. Questa cartella è sincronizzata con altri worker Airflow, in modo che le attività nel tuo DAG possano eseguirvi operazioni.
Il seguente DAG esegue quanto segue:
Il DAG di esempio riportato di seguito esegue quanto segue:
- Sincronizza i contenuti della directory
/data-for-gcs
da Condivisione file di Azure alla cartella/data/from-azure
nel bucket del tuo ambiente. - Attende due minuti per la sincronizzazione dei dati con tutti i worker Airflow nel tuo ambiente.
- Restituisce come output l'elenco dei file in questa directory utilizzando il comando
ls
. Sostituisci questa attività con altri operatori Airflow che lavorano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files