Transferir datos de otros servicios con los operadores de transferencia de Google

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se muestra cómo transferir datos de otros servicios con operadores de transferencia de Google en tus DAGs.

Acerca de los operadores de transferencia de Google

Los operadores de transferencia de Google son un conjunto de operadores de Airflow que puedes usar para extraer datos de otros servicios aGoogle Cloud.

En esta guía se muestran los operadores de Azure FileShare Storage y Amazon S3 que funcionan con Cloud Storage. Hay muchos más operadores de transferencia que trabajan con servicios dentro de Google Cloud y con servicios distintos deGoogle Cloud.

De Amazon S3 a Cloud Storage

En esta sección se muestra cómo sincronizar datos de Amazon S3 con un segmento de Cloud Storage.

Instalar el paquete del proveedor de Amazon

El paquete apache-airflow-providers-amazon contiene los tipos de conexión y las funciones que interactúan con Amazon S3. Instala este paquete de PyPI en tu entorno.

Configurar una conexión a Amazon S3

El paquete de proveedor de Amazon proporciona un tipo de conexión para Amazon S3. Crea una conexión de este tipo. La conexión de Cloud Storage, llamada google_cloud_default, ya está configurada en tu entorno.

Configura una conexión a Amazon S3 de la siguiente manera:

  1. En la interfaz de usuario de Airflow, vaya a Administrar > Conexiones.
  2. Crea una conexión.
  3. Selecciona Amazon S3 como tipo de conexión.
  4. En el siguiente ejemplo se usa una conexión llamada aws_s3. Puedes usar este nombre o cualquier otro para la conexión.
  5. Especifica los parámetros de conexión tal como se describe en la documentación de Airflow para Amazon Web Services Connection. Por ejemplo, para configurar una conexión con claves de acceso de AWS, genera una clave de acceso para tu cuenta en AWS y, a continuación, proporciona el ID de clave de acceso de AWS como nombre de inicio de sesión y la clave de acceso secreta de AWS como contraseña de la conexión.

Transferir datos desde Amazon S3

Si quieres operar con los datos sincronizados más adelante en otro DAG o tarea, cópialos en la carpeta /data del bucket de tu entorno. Esta carpeta se sincroniza con otros trabajadores de Airflow para que las tareas de tu DAG puedan operar en ella.

El siguiente DAG de ejemplo hace lo siguiente:

  • Sincroniza el contenido del directorio /data-for-gcs de un segmento de S3 con la carpeta /data/from-s3/data-for-gcs/ del segmento de tu entorno.
  • Espera dos minutos a que los datos se sincronicen con todos los trabajadores de Airflow de tu entorno.
  • Muestra la lista de archivos de este directorio con el comando ls. Sustituye esta tarea por otros operadores de Airflow que funcionen con tus datos.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare a Cloud Storage

En esta sección se muestra cómo sincronizar datos de Azure FileShare con un segmento de Cloud Storage.

Instalar el paquete del proveedor de Microsoft Azure

El paquete apache-airflow-providers-microsoft-azure contiene los tipos de conexión y las funciones que interactúan con Microsoft Azure. Instala este paquete de PyPI en tu entorno.

Configurar una conexión a Azure FileShare

El paquete de proveedor de Microsoft Azure proporciona un tipo de conexión para Azure File Share. Crea una conexión de este tipo. La conexión de Cloud Storage, llamada google_cloud_default, ya está configurada en tu entorno.

Configura una conexión a Azure FileShare de la siguiente manera:

  1. En la interfaz de usuario de Airflow, vaya a Administrar > Conexiones.
  2. Crea una conexión.
  3. Selecciona Azure FileShare como tipo de conexión.
  4. En el siguiente ejemplo se usa una conexión llamada azure_fileshare. Puedes usar este nombre o cualquier otro para la conexión.
  5. Especifica los parámetros de conexión tal como se describe en la documentación de Airflow para Microsoft Azure File Share Connection. Por ejemplo, puede especificar una cadena de conexión para la clave de acceso de su cuenta de almacenamiento.

Transferir datos desde Azure FileShare

Si quieres operar con los datos sincronizados más adelante en otro DAG o tarea, cópialos en la carpeta /data del bucket de tu entorno. Esta carpeta se sincroniza con otros trabajadores de Airflow para que las tareas de tu DAG puedan operar en ella.

El siguiente DAG hace lo siguiente:

El siguiente DAG de ejemplo hace lo siguiente:

  • Sincroniza el contenido del directorio /data-for-gcs de Azure File Share con la carpeta /data/from-azure del contenedor de tu entorno.
  • Espera dos minutos a que los datos se sincronicen con todos los trabajadores de Airflow de tu entorno.
  • Muestra la lista de archivos de este directorio con el comando ls. Sustituye esta tarea por otros operadores de Airflow que funcionen con tus datos.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Siguientes pasos