Transferir dados de outros serviços com os operadores de transferência do Google

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, mostramos como transferir dados de outros serviços com o Google Operadores de transferência nos DAGs.

Sobre os operadores de transferência do Google

Os operadores de transferência do Google são uma um conjunto de operadores do Airflow que podem ser usados para extrair dados de outros serviços para Google Cloud.

Este guia mostra os operadores do Azure FileShare Storage e do Amazon S3 que funcionam com o Cloud Storage. Há muitos outros operadores de transferência que funcionam com serviços no Google Cloud e outros serviços, Google Cloud.

Antes de começar

  • Este guia é para o Airflow 2. Se o ambiente usar o Airflow 1, use pacotes de provedor de backport para importar operadores e tornar os tipos de conexão necessários disponíveis no seu ambiente.

Amazon S3 para Cloud Storage

Esta seção demonstra como sincronizar dados do Amazon S3 para um do bucket do Cloud Storage.

Instalar o pacote de provedor da Amazon

O pacote apache-airflow-providers-amazon contém a conexão tipos e funcionalidades que interagem com o Amazon S3. Instale este pacote PyPI no de nuvem.

Configurar uma conexão com o Amazon S3

O pacote de provedor da Amazon oferece um tipo de conexão para o Amazon S3. Você crie uma conexão desse tipo. A conexão com o Cloud Storage, chamado google_cloud_default já está configurado no seu ambiente.

Configure uma conexão com o Amazon S3 da seguinte maneira:

  1. Na IU do Airflow, acesse Administrador > Conexões.
  2. Criar uma nova conexão.
  3. Selecione Amazon S3 como o tipo de conexão.
  4. O exemplo a seguir usa uma conexão chamada aws_s3. Você pode usar isso nome ou qualquer outro nome para a conexão.
  5. Especifique parâmetros de conexão conforme descrito na documentação do Airflow para Amazon Web Services Connection. Por exemplo, para configurar uma conexão com chaves de acesso da AWS, você gera uma chave de acesso da AWS para sua conta na AWS e forneça o ID da chave de acesso da AWS como um faça login na chave de acesso secreta da AWS como senha para a conexão.
.

Transferir dados do Amazon S3

Se você quiser operar nos dados sincronizados mais tarde em outro DAG ou tarefa, extraia-o para a pasta /data do bucket do ambiente. Esta pasta é sincronizadas com outros workers do Airflow, para que as tarefas no DAG possa operar nele.

O DAG de exemplo a seguir faz o seguinte:

  • Sincroniza o conteúdo do diretório /data-for-gcs de um bucket S3 para a pasta /data/from-s3/data-for-gcs/ no bucket do ambiente.
  • Aguarda dois minutos para que os dados sejam sincronizados com todos os workers do Airflow em em seu ambiente.
  • Gera a lista de arquivos neste diretório usando o comando ls. Substituir essa tarefa com outros operadores do Airflow que trabalham com seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Compartilhamento de Arquivos do Azure com o Cloud Storage

Esta seção demonstra como sincronizar dados do Azure FileShare com um do bucket do Cloud Storage.

Instalar o pacote de provedor do Microsoft Azure

O pacote apache-airflow-providers-microsoft-azure contém a conexão tipos e funcionalidades que interagem com o Microsoft Azure. Instale este pacote PyPI no de nuvem.

Configurar uma conexão com o Azure FileShare

O pacote de provedor do Microsoft Azure fornece um tipo de conexão para o Arquivo do Azure Compartilhar. Você cria uma conexão desse tipo. A conexão para O Cloud Storage, chamado google_cloud_default, já está configurado em em seu ambiente.

Configure uma conexão com o Azure FileShare da seguinte maneira:

  1. Na IU do Airflow, acesse Administrador > Conexões.
  2. Criar uma nova conexão.
  3. Selecione Azure FileShare como o tipo de conexão.
  4. O exemplo a seguir usa uma conexão chamada azure_fileshare. Você pode usar esse nome ou qualquer outro nome para a conexão.
  5. Especifique parâmetros de conexão conforme descrito na documentação do Airflow para Conexão de compartilhamento de arquivos do Microsoft Azure. Por exemplo, você pode especificar uma string de conexão para sua conta de armazenamento chave de acesso.
.

Transferir dados do Azure FileShare

Se você quiser operar nos dados sincronizados mais tarde em outro DAG ou tarefa, extraia-o para a pasta /data do bucket do ambiente. Esta pasta é sincronizadas com outros workers do Airflow, para que as tarefas no DAG possa operar nele.

O DAG a seguir faz o seguinte:

O DAG de exemplo a seguir faz o seguinte:

  • Sincroniza o conteúdo do diretório /data-for-gcs do Compartilhamento de Arquivos do Azure para a pasta /data/from-azure no bucket do ambiente.
  • Aguarda dois minutos para que os dados sejam sincronizados com todos os workers do Airflow em em seu ambiente.
  • Gera a lista de arquivos neste diretório usando o comando ls. Substituir essa tarefa com outros operadores do Airflow que trabalham com seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

A seguir