Cloud Composer 1 | Cloud Composer 2
Nesta página, demonstramos como transferir dados de outros serviços com os operadores de transferência do Google nos seus DAGs.
Sobre as operadoras de transferência do Google
Os operadores de transferência do Google são um conjunto de operadores do Airflow que podem ser usados para extrair dados de outros serviços para o Google Cloud.
Neste guia, mostramos os operadores do Azure FileShare Storage e do Amazon S3 que funcionam com o Cloud Storage. Há muitos outros operadores de transferência que funcionam com serviços dentro do Google Cloud e com outros serviços além do Google Cloud.
Antes de começar
- Este guia é para o Airflow 2. Se o ambiente utiliza o Airflow 1, use pacotes de provedor de backport para importar operadores e disponibilizar os tipos de conexão necessários no ambiente.
Amazon S3 para Cloud Storage
Nesta seção, demonstramos como sincronizar dados do Amazon S3 com um bucket do Cloud Storage.
Instalar o pacote do provedor da Amazon
O pacote apache-airflow-providers-amazon
contém os tipos de conexão e a funcionalidade que interage com o Amazon S3.
Instale este pacote PyPI no ambiente.
Configurar uma conexão com o Amazon S3
O pacote do provedor da Amazon fornece um tipo de conexão para o Amazon S3. Você
cria uma conexão desse tipo. A conexão do Cloud Storage,
chamada google_cloud_default
, já está configurada no seu ambiente.
Configure uma conexão com o Amazon S3 da seguinte maneira:
- Na interface do Airflow, acesse Administrador > Conexões.
- Criar uma nova conexão.
- Selecione
Amazon S3
como o tipo de conexão. - O exemplo a seguir usa uma conexão chamada
aws_s3
. Você pode usar esse nome ou qualquer outro para a conexão. - Especifique os parâmetros de conexão conforme descrito na documentação do Airflow para Conexão do Amazon Web Services. Por exemplo, para configurar uma conexão com as chaves de acesso da AWS, gere uma chave de acesso para sua conta na AWS e forneça o ID da chave de acesso da AWS como um login, a chave de acesso secreta da AWS como senha para a conexão.
Transferir dados do Amazon S3
Se você quiser operar nos dados sincronizados posteriormente em outro DAG ou tarefa,
extraia-os para a pasta /data
do bucket do ambiente. Essa pasta é sincronizada com outros workers do Airflow, para que as tarefas no DAG possam operá-la.
O DAG de exemplo a seguir faz o seguinte:
- Sincroniza o conteúdo do diretório
/data-for-gcs
de um bucket do S3 com a pasta/data/from-s3/data-for-gcs/
no bucket do ambiente. - Aguarde dois minutos para que os dados sejam sincronizados com todos os workers do Airflow no ambiente.
- Gera a lista de arquivos nesse diretório usando o comando
ls
. Substitua essa tarefa por outros operadores do Airflow que funcionem com seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare com o Cloud Storage
Nesta seção, demonstramos como sincronizar dados do Azure FileShare com um bucket do Cloud Storage.
Instalar o pacote de provedor do Microsoft Azure
O pacote apache-airflow-providers-microsoft-azure
contém os tipos de
conexão e a funcionalidade que interage com o Microsoft Azure.
Instale este pacote PyPI no ambiente.
Configurar uma conexão com o Azure FileShare
O pacote de provedor do Microsoft Azure oferece um tipo de conexão para o Compartilhamento de Arquivos do Azure. Você cria uma conexão desse tipo. A conexão do
Cloud Storage, chamada google_cloud_default
, já está configurada no
seu ambiente.
Configure uma conexão com o Azure FileShare da seguinte maneira:
- Na interface do Airflow, acesse Administrador > Conexões.
- Criar uma nova conexão.
- Selecione
Azure FileShare
como o tipo de conexão. - O exemplo a seguir usa uma conexão chamada
azure_fileshare
. É possível usar esse nome ou qualquer outro para a conexão. - Especifique os parâmetros de conexão conforme descrito na documentação do Airflow para Conexão de compartilhamento de arquivos do Microsoft Azure. Por exemplo, você pode especificar uma string de conexão para a chave de acesso da sua conta de armazenamento.
Transferir dados do Azure FileShare
Se você quiser operar nos dados sincronizados posteriormente em outro DAG ou tarefa,
extraia-os para a pasta /data
do bucket do ambiente. Essa pasta é sincronizada com outros workers do Airflow, para que as tarefas no DAG possam operá-la.
O DAG a seguir faz o seguinte:
O DAG de exemplo a seguir faz o seguinte:
- Sincroniza o conteúdo do diretório
/data-for-gcs
do Azure File Share com a pasta/data/from-azure
no bucket do ambiente. - Aguarde dois minutos para que os dados sejam sincronizados com todos os workers do Airflow no ambiente.
- Gera a lista de arquivos nesse diretório usando o comando
ls
. Substitua essa tarefa por outros operadores do Airflow que funcionem com seus dados.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files