Transfira dados de outros serviços com os operadores de transferência da Google

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página demonstra como transferir dados de outros serviços com os operadores de transferência da Google nos seus DAGs.

Acerca dos operadores de transferência da Google

Os operadores de transferência da Google são um conjunto de operadores do Airflow que pode usar para extrair dados de outros serviços para oGoogle Cloud.

Este guia mostra os operadores do armazenamento de ficheiros partilhados do Azure e do Amazon S3 que funcionam com o Cloud Storage. Existem muitos mais operadores de transferência que funcionam com serviços dentro de Google Cloud e com serviços que não sejam Google Cloud.

Antes de começar

  • Este guia destina-se ao Airflow 2. Se o seu ambiente usar o Airflow 1, use pacotes de fornecedores de backport para importar operadores e disponibilizar os tipos de ligação necessários no seu ambiente.

Amazon S3 para o Cloud Storage

Esta secção demonstra como sincronizar dados do Amazon S3 para um contentor do Cloud Storage.

Instale o pacote do fornecedor Amazon

O pacote apache-airflow-providers-amazon contém os tipos de ligação e a funcionalidade que interagem com o Amazon S3. Instale este pacote PyPI no seu ambiente.

Configure uma associação ao Amazon S3

O pacote de fornecedor da Amazon oferece um tipo de ligação para o Amazon S3. Cria uma associação deste tipo. A ligação para o Cloud Storage, denominada google_cloud_default, já está configurada no seu ambiente.

Configure uma associação ao Amazon S3 da seguinte forma:

  1. Na IU do Airflow, aceda a Admin > Ligações.
  2. Crie uma nova associação.
  3. Selecione Amazon S3 como o tipo de ligação.
  4. O exemplo seguinte usa uma associação denominada aws_s3. Pode usar este nome ou qualquer outro nome para a associação.
  5. Especifique os parâmetros de ligação, conforme descrito na documentação do Airflow para a ligação aos Amazon Web Services. Por exemplo, para configurar uma associação com chaves de acesso da AWS, gera uma chave de acesso para a sua conta na AWS e, em seguida, fornece o ID da chave de acesso da AWS como um início de sessão e a chave de acesso secreta da AWS como uma palavra-passe para a associação.

Transfira dados do Amazon S3

Se quiser operar nos dados sincronizados mais tarde noutro DAG ou tarefa, transfira-os para a pasta /data do contentor do seu ambiente. Esta pasta é sincronizada com outros trabalhadores do Airflow, para que as tarefas no seu DAG possam operar nela.

O exemplo de DAG seguinte faz o seguinte:

  • Sincroniza o conteúdo do diretório /data-for-gcs de um contentor S3 com a pasta /data/from-s3/data-for-gcs/ no contentor do seu ambiente.
  • Aguarde dois minutos para que os dados sejam sincronizados com todos os trabalhadores do Airflow no seu ambiente.
  • Produz a lista de ficheiros neste diretório através do comando ls. Substitua esta tarefa por outros operadores do Airflow que funcionem com os seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare para o Cloud Storage

Esta secção demonstra como sincronizar dados do Azure FileShare com um contentor do Cloud Storage.

Instale o pacote do fornecedor do Microsoft Azure

O pacote apache-airflow-providers-microsoft-azure contém os tipos de ligação e a funcionalidade que interagem com o Microsoft Azure. Instale este pacote PyPI no seu ambiente.

Configure uma ligação ao Azure FileShare

O pacote de fornecedor do Microsoft Azure fornece um tipo de ligação para a partilha de ficheiros do Azure. Cria uma associação deste tipo. A ligação para o Cloud Storage, denominada google_cloud_default, já está configurada no seu ambiente.

Configure uma ligação ao Azure FileShare da seguinte forma:

  1. Na IU do Airflow, aceda a Admin > Ligações.
  2. Crie uma nova associação.
  3. Selecione Azure FileShare como o tipo de ligação.
  4. O exemplo seguinte usa uma associação denominada azure_fileshare. Pode usar este nome ou qualquer outro nome para a associação.
  5. Especifique os parâmetros de ligação conforme descrito na documentação do Airflow para a ligação de partilha de ficheiros do Microsoft Azure. Por exemplo, pode especificar uma string de ligação para a chave de acesso da sua conta de armazenamento.

Transfira dados do Azure FileShare

Se quiser operar nos dados sincronizados mais tarde noutro DAG ou tarefa, transfira-os para a pasta /data do contentor do seu ambiente. Esta pasta é sincronizada com outros trabalhadores do Airflow, para que as tarefas no seu DAG possam operar nela.

O seguinte DAG faz o seguinte:

O exemplo de DAG seguinte faz o seguinte:

  • Sincroniza o conteúdo do diretório /data-for-gcs da partilha de ficheiros do Azure com a pasta /data/from-azure no contentor do seu ambiente.
  • Aguarde dois minutos para que os dados sejam sincronizados com todos os trabalhadores do Airflow no seu ambiente.
  • Produz a lista de ficheiros neste diretório através do comando ls. Substitua esta tarefa por outros operadores do Airflow que funcionem com os seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

O que se segue?