Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa pagina mostra come trasferire dati da altri servizi con gli operatori di trasferimento di Google nei DAG.
Informazioni sugli operatori di trasferimento di Google
Gli operatori di trasferimento Google sono un insieme di operatori Airflow che puoi utilizzare per estrarre dati da altri servizi in Google Cloud.
Questa guida mostra gli operatori per l'archiviazione di condivisioni file di Azure e Amazon S3 che funzionano con Cloud Storage. Esistono molti altri operatori di trasferimento che collaborano con servizi all'interno di Google Cloud e con servizi diversi da Google Cloud.
Da Amazon S3 a Cloud Storage
Questa sezione mostra come sincronizzare i dati da Amazon S3 a un bucket Cloud Storage.
Installa il pacchetto del provider Amazon
Il pacchetto apache-airflow-providers-amazon
contiene i tipi di connessione
e le funzionalità che interagiscono con Amazon S3.
Installa questo pacchetto PyPI nel tuo
ambiente.
Configura una connessione ad Amazon S3
Il pacchetto del provider Amazon fornisce un tipo di connessione per Amazon S3. crei una connessione di questo tipo. La connessione per Cloud Storage,
denominata google_cloud_default
è già configurata nel tuo ambiente.
Configura una connessione ad Amazon S3 nel seguente modo:
- Nell'interfaccia utente di Airflow, vai ad Amministrazione > Connessioni.
- Crea una nuova connessione.
- Seleziona
Amazon S3
come tipo di connessione. - L'esempio seguente utilizza una connessione denominata
aws_s3
. Puoi utilizzare questo nome o qualsiasi altro nome per la connessione. - Specifica i parametri di connessione come descritto nella documentazione di Airflow per la connessione Amazon Web Services. Ad esempio, per configurare una connessione con le chiavi di accesso AWS, genera una chiave di accesso per il tuo account su AWS, quindi fornisci l'ID chiave di accesso AWS come nome utente e la chiave di accesso segreta AWS come password per la connessione.
Trasferire dati da Amazon S3
Se vuoi operare sui dati sincronizzati in un secondo momento in un altro DAG o attività,
trascinali nella cartella /data
del bucket del tuo ambiente. Questa cartella viene sincronizzata con altri worker Airflow, in modo che le attività nel tuo DAG possano operare su di essa.
Il seguente DAG di esempio esegue le seguenti operazioni:
- Sincronizza i contenuti della directory
/data-for-gcs
da un bucket S3 alla cartella/data/from-s3/data-for-gcs/
nel bucket del tuo ambiente. - Attende due minuti affinché i dati vengano sincronizzati con tutti i worker Airflow nel tuo ambiente.
- Restituisce l'elenco dei file in questa directory utilizzando il comando
ls
. Sostituisci questa attività con altri operatori Airflow che funzionano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Da Azure FileShare a Cloud Storage
Questa sezione mostra come sincronizzare i dati da Azure FileShare a un bucket Cloud Storage.
Installa il pacchetto del provider Microsoft Azure
Il pacchetto apache-airflow-providers-microsoft-azure
contiene i tipi di connessione e le funzionalità che interagiscono con Microsoft Azure.
Installa questo pacchetto PyPI nel tuo
ambiente.
Configura una connessione ad Azure FileShare
Il pacchetto del provider Microsoft Azure fornisce un tipo di connessione per la condivisione file di Azure. Crea una connessione di questo tipo. La connessione per
Cloud Storage, denominata google_cloud_default
, è già configurata nel tuo ambiente.
Configura una connessione ad Azure FileShare nel seguente modo:
- Nell'interfaccia utente di Airflow, vai ad Amministrazione > Connessioni.
- Crea una nuova connessione.
- Seleziona
Azure FileShare
come tipo di connessione. - L'esempio seguente utilizza una connessione denominata
azure_fileshare
. Puoi utilizzare questo nome o qualsiasi altro nome per la connessione. - Specifica i parametri di connessione come descritto nella documentazione di Airflow per la connessione alla condivisione file Microsoft Azure. Ad esempio, puoi specificare una stringa di connessione per la chiave di accesso dell'account di archiviazione.
Trasferire dati da Azure FileShare
Se vuoi operare sui dati sincronizzati in un secondo momento in un altro DAG o attività,
trascinali nella cartella /data
del bucket del tuo ambiente. Questa cartella viene sincronizzata con altri worker Airflow, in modo che le attività nel tuo DAG possano operare su di essa.
Il seguente DAG esegue le operazioni indicate di seguito:
Il seguente DAG di esempio esegue le seguenti operazioni:
- Sincronizza i contenuti della directory
/data-for-gcs
da Azure File Share alla cartella/data/from-azure
nel bucket del tuo ambiente. - Attende due minuti affinché i dati vengano sincronizzati con tutti i worker Airflow nel tuo ambiente.
- Restituisce l'elenco dei file in questa directory utilizzando il comando
ls
. Sostituisci questa attività con altri operatori Airflow che funzionano con i tuoi dati.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files