Cloud Composer 1 | Cloud Composer 2
本页面演示了如何在 DAG 中使用 Google Transfer Operator 从其他服务转移数据。
Google 转移运营商简介
Google 传输运算符是一组 Airflow 运算符,可用于将数据从其他服务拉取到 Google Cloud 中。
本指南介绍适用于 Cloud Storage 的 Azure FileShare Storage 和 Amazon S3 的运算符。许多转移运算符可用于 Google Cloud 中的服务和 Google Cloud 以外的服务。
准备工作
- 本指南适用于 Airflow 2。如果您的环境使用 Airflow 1,请使用向后移植提供方软件包导入运算符,并在您的环境中提供所需的连接类型。
Amazon S3 到 Cloud Storage
本部分介绍如何将 Amazon S3 中的数据同步到 Cloud Storage 存储桶。
安装 Amazon 提供方软件包
apache-airflow-providers-amazon
软件包包含与 Amazon S3 交互的连接类型和功能。在您的环境中安装此 PyPI 软件包。
配置与 Amazon S3 的连接
Amazon 提供方软件包为 Amazon S3 提供连接类型。您可以创建此类型的连接。您的环境中已设置 Cloud Storage 连接(名为 google_cloud_default
)。
通过以下方式设置与 Amazon S3 的连接:
- 在 Airflow 界面中,依次前往管理 > 连接。
- 创建新连接。
- 选择
Amazon S3
作为连接类型。 - 以下示例使用名为
aws_s3
的连接。您可以使用此名称,或其他任何名称作为连接。 - 按照 Airflow 文档中关于 Amazon Web Services 连接的说明指定连接参数。例如,如需设置与 AWS 访问密钥的连接,您需要在 AWS 上为帐号生成一个访问密钥,然后提供 AWS 访问密钥 ID 作为登录名,作为连接密码登录 AWS 私有访问密钥。
从 Amazon S3 转移数据
如果您稍后要在其他 DAG 或任务中处理已同步的数据,请将其拉取到环境存储桶的 /data
文件夹中。此文件夹会同步到其他 Airflow 工作器,以便 DAG 中的任务可以对其执行操作。
以下示例 DAG 执行以下操作:
- 将
/data-for-gcs
目录的内容从 S3 存储桶同步到环境存储桶中的/data/from-s3/data-for-gcs/
文件夹。 - 等待两分钟,让数据同步到您环境中的所有 Airflow 工作器。
- 使用
ls
命令输出此目录中的文件列表。将此任务替换为处理数据的其他 Airflow 操作。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure 文件共享到 Cloud Storage
本部分介绍如何将 Azure FileShare 中的数据同步到 Cloud Storage 存储桶。
安装 Microsoft Azure 提供方软件包
apache-airflow-providers-microsoft-azure
软件包包含与 Microsoft Azure 交互的连接类型和功能。在您的环境中安装此 PyPI 软件包。
配置与 Azure FileShare 的连接
Microsoft Azure 提供方软件包提供 Azure 文件共享的连接类型。您可以创建此类型的连接。您的环境中已设置名为 google_cloud_default
的 Cloud Storage 连接。
通过以下方式设置与 Azure FileShare 的连接:
- 在 Airflow 界面中,依次前往管理 > 连接。
- 创建新连接。
- 选择
Azure FileShare
作为连接类型。 - 以下示例使用名为
azure_fileshare
的连接。您可以使用此名称,或其他任何名称作为连接名称。 - 按照 Microsoft Azure 文件共享连接的 Airflow 文档中的说明指定连接参数。例如,您可以为存储帐号访问密钥指定连接字符串。
从 Azure FileShare 转移数据
如果您稍后要在其他 DAG 或任务中处理已同步的数据,请将其拉取到环境存储桶的 /data
文件夹中。此文件夹会同步到其他 Airflow 工作器,以便 DAG 中的任务可以对其执行操作。
以下 DAG 会执行以下操作:
以下示例 DAG 执行以下操作:
- 将 Azure 文件共享中
/data-for-gcs
目录的内容同步到环境存储桶中的/data/from-azure
文件夹。 - 等待两分钟,让数据同步到您环境中的所有 Airflow 工作器。
- 使用
ls
命令输出此目录中的文件列表。将此任务替换为处理数据的其他 Airflow 操作。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files