Transfiere datos desde otros servicios con los operadores de Google Transfer

Cloud Composer 1 | Cloud Composer 2

En esta página, se muestra cómo transferir datos desde otros servicios con operadores de transferencia de Google en tus DAG.

Acerca de los operadores de transferencia de Google

Los operadores de transferencia de Google son un conjunto de operadores de Airflow que puedes usar para extraer datos de otros servicios a Google Cloud.

En esta guía, se muestran los operadores de Azure FileShare Storage y Amazon S3 que funcionan con Cloud Storage. Hay muchos más operadores de transferencia que funcionan con servicios dentro de Google Cloud y con servicios que no son Google Cloud.

Antes de comenzar

De Amazon S3 a Cloud Storage

En esta sección, se muestra cómo sincronizar datos de Amazon S3 a un bucket de Cloud Storage.

Instala el paquete del proveedor de Amazon

El paquete apache-airflow-providers-amazon contiene los tipos de conexión y la funcionalidad que interactúa con Amazon S3. Instala este paquete de PyPI en tu entorno.

Configura una conexión a Amazon S3

El paquete del proveedor de Amazon proporciona un tipo de conexión para Amazon S3. Crea una conexión de este tipo. La conexión para Cloud Storage, llamada google_cloud_default, ya está configurada en tu entorno.

Configura una conexión con Amazon S3 de la siguiente manera:

  1. En la IU de Airflow, ve a Administrador > Conexiones.
  2. Crea una conexión nueva.
  3. Selecciona Amazon S3 como el tipo de conexión.
  4. En el siguiente ejemplo, se usa una conexión llamada aws_s3. Puedes usar este nombre o cualquier otro para la conexión.
  5. Especifica los parámetros de conexión como se describe en la documentación de Airflow para Amazon Web Services Connection. Por ejemplo, a fin de configurar una conexión con claves de acceso de AWS, debes generar una clave de acceso para tu cuenta en AWS y, luego, proporcionar el ID de clave de acceso de AWS como la clave de acceso secreta de AWS como una contraseña para la conexión.

Transfiere datos desde Amazon S3

Si deseas operar en los datos sincronizados más adelante en otro DAG o tarea, extráelo a la carpeta /data del bucket de tu entorno. Esta carpeta está sincronizada con otros trabajadores de Airflow, por lo que las tareas de tu DAG pueden operar en ella.

En el siguiente DAG de ejemplo, se hace lo siguiente:

  • Sincroniza el contenido del directorio /data-for-gcs desde un bucket S3 a la carpeta /data/from-s3/data-for-gcs/ en el bucket de tu entorno.
  • Espera dos minutos para que los datos se sincronicen con todos los trabajadores de Airflow en tu entorno.
  • Muestra la lista de archivos en este directorio con el comando ls. Reemplaza esta tarea por otros operadores de Airflow que trabajen con tus datos.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')

    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare a Cloud Storage

En esta sección, se muestra cómo sincronizar datos de Azure FileShare con un bucket de Cloud Storage.

Instala el paquete del proveedor de Microsoft Azure

El paquete apache-airflow-providers-microsoft-azure contiene los tipos de conexión y la funcionalidad que interactúa con Microsoft Azure. Instala este paquete de PyPI en tu entorno.

Configura una conexión a Azure FileShare

El paquete del proveedor de Microsoft Azure proporciona un tipo de conexión para Azure File Share. Creas una conexión de este tipo. La conexión para Cloud Storage, llamada google_cloud_default, ya está configurada en tu entorno.

Configura una conexión a Azure FileShare de la siguiente manera:

  1. En la IU de Airflow, ve a Administrador > Conexiones.
  2. Crea una conexión nueva.
  3. Selecciona Azure FileShare como el tipo de conexión.
  4. En el siguiente ejemplo, se usa una conexión llamada azure_fileshare. Puedes usar este nombre o cualquier otro para la conexión.
  5. Especifica los parámetros de conexión como se describe en la documentación de Airflow para la conexión de archivos compartidos de Microsoft Azure. Por ejemplo, puedes especificar una string de conexión para la clave de acceso de tu cuenta de almacenamiento.

Transfiere datos desde Azure FileShare

Si deseas operar en los datos sincronizados más adelante en otro DAG o tarea, extráelo a la carpeta /data del bucket de tu entorno. Esta carpeta está sincronizada con otros trabajadores de Airflow, por lo que las tareas de tu DAG pueden operar en ella.

El siguiente DAG hace lo siguiente:

En el siguiente DAG de ejemplo, se hace lo siguiente:

  • Sincroniza el contenido del directorio /data-for-gcs de Azure File Share con la carpeta /data/from-azure en el bucket de tu entorno.
  • Espera dos minutos para que los datos se sincronicen con todos los trabajadores de Airflow en tu entorno.
  • Muestra la lista de archivos en este directorio con el comando ls. Reemplaza esta tarea por otros operadores de Airflow que trabajen con tus datos.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')

    transfer_dir_from_azure >> sleep_2min >> print_dir_files

¿Qué sigue?