Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
En esta página, se muestra cómo transferir datos de otros servicios con operadores de transferencia de Google en tus DAG.
Acerca de los operadores de transferencia de Google
Los operadores de transferencia de Google son un conjunto de operadores de Airflow que puedes usar para extraer datos de otros servicios a Google Cloud.
En esta guía, se muestran los operadores de Azure FileShare Storage y Amazon S3 que funcionan con Cloud Storage. Existen muchos más operadores de transferencia que trabajan con servicios dentro de Google Cloud y con servicios que no son de Google Cloud.
De Amazon S3 a Cloud Storage
En esta sección, se muestra cómo sincronizar datos de Amazon S3 con un bucket de Cloud Storage.
Instala el paquete del proveedor de Amazon
El paquete apache-airflow-providers-amazon
contiene los tipos de conexión y la funcionalidad que interactúa con Amazon S3.
Instala este paquete de PyPI en tu entorno.
Configura una conexión a Amazon S3
El paquete del proveedor de Amazon proporciona un tipo de conexión para Amazon S3. Tú
crear una conexión de este tipo. La conexión de Cloud Storage,
llamada google_cloud_default
, ya está configurada en tu entorno.
Configura una conexión con Amazon S3 de la siguiente manera:
- En la IU de Airflow, ve a Administrador > Conexiones.
- Crea una conexión nueva.
- Selecciona
Amazon S3
como el tipo de conexión. - En el siguiente ejemplo, se usa una conexión llamada
aws_s3
. Puedes usar esta nombre o cualquier otro nombre para la conexión. - Especifica los parámetros de conexión como se describe en la documentación de Airflow para la conexión con Amazon Web Services. Por ejemplo, para configurar una conexión con claves de acceso de AWS, generas una clave de acceso para tu cuenta en AWS y, luego, proporcionas el ID de clave de acceso de AWS como acceso a la clave de acceso secreta de AWS como contraseña para la conexión.
Transfiere datos desde Amazon S3
Si quieres operar sobre los datos sincronizados
más adelante en otro DAG o tarea
extraerla a la carpeta /data
del bucket de tu entorno Esta carpeta se sincroniza con otros trabajadores de Airflow para que las tareas de tu DAG puedan operar en ella.
En el siguiente DAG de ejemplo, se hace lo siguiente:
- Sincroniza el contenido del directorio
/data-for-gcs
de un bucket de S3 con la carpeta/data/from-s3/data-for-gcs/
en el bucket de tu entorno. - Espera dos minutos, para que los datos se sincronicen con todos los trabajadores de Airflow en tu entorno.
- Muestra la lista de archivos de este directorio con el comando
ls
. Reemplazar esta tarea con otros operadores de Airflow que trabajan con tus datos.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
De Azure FileShare a Cloud Storage
En esta sección, se muestra cómo sincronizar datos de Azure FileShare con un bucket de Cloud Storage.
Instala el paquete del proveedor de Microsoft Azure
El paquete apache-airflow-providers-microsoft-azure
contiene la conexión
y funcionalidades que interactúan con Microsoft Azure.
Instala este paquete de PyPI en tu entorno.
Configura una conexión con Azure FileShare
El paquete del proveedor de Microsoft Azure proporciona un tipo de conexión para Azure File Share. Creas una conexión de este tipo. La conexión para
Cloud Storage, llamada google_cloud_default
, ya está configurada en
tu entorno.
Configura una conexión con Azure FileShare de la siguiente manera:
- En la IU de Airflow, ve a Administrador > Conexiones.
- Crea una conexión nueva.
- Selecciona
Azure FileShare
como el tipo de conexión. - En el siguiente ejemplo, se usa una conexión llamada
azure_fileshare
. Puedes usar este nombre o cualquier otro nombre para la conexión. - Especifica los parámetros de conexión como se describe en la documentación de Airflow para Conexión de archivos compartidos de Microsoft Azure. Por ejemplo, puedes especificar una cadena de conexión para la clave de acceso de tu cuenta de almacenamiento.
Transfiere datos desde Azure FileShare
Si deseas operar en los datos sincronizados más adelante en otro DAG o tarea, transfiérelos a la carpeta /data
del bucket de tu entorno. Esta carpeta se sincroniza con otros trabajadores de Airflow para que las tareas de tu DAG puedan operar en ella.
El siguiente DAG hace lo siguiente:
En el siguiente ejemplo de DAG, se realiza lo siguiente:
- Sincroniza el contenido del directorio
/data-for-gcs
de Azure File Share con la carpeta/data/from-azure
en el bucket de tu entorno. - Espera dos minutos para que los datos se sincronicen con todos los trabajadores de Airflow en tu entorno.
- Muestra la lista de archivos de este directorio con el comando
ls
. Reemplazar esta tarea con otros operadores de Airflow que trabajan con tus datos.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files