使用 Google 转移操作符从其他服务转移数据

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页演示了如何在 DAG 中使用 Google Transfer 运算符从其他服务转移数据。

Google 转移运营商简介

Google Transfer Operators 是一组 Airflow 运算符,可用于将其他服务中的数据提取到Google Cloud中。

本指南面向使用 Cloud Storage 的 Azure FileShare Storage 和 Amazon S3 操作员。还有许多转移运算符可与 Google Cloud 中的服务以及Google Cloud以外的服务搭配使用。

Amazon S3 到 Cloud Storage

本部分演示了如何将数据从 Amazon S3 同步到 Cloud Storage 存储桶。

安装 Amazon 提供程序软件包

apache-airflow-providers-amazon 软件包包含与 Amazon S3 交互的连接类型和功能。在您的环境中安装此 PyPI 软件包

配置与 Amazon S3 的连接

Amazon 提供程序软件包为 Amazon S3 提供了一种连接类型。您创建了此类连接。名为 google_cloud_default 的 Cloud Storage 连接已在您的环境中设置。

按以下方式设置与 Amazon S3 的连接:

  1. Airflow 界面中,依次前往管理 > 连接
  2. 创建新连接。
  3. 选择 Amazon S3 作为连接类型。
  4. 以下示例使用名为 aws_s3 的连接。您可以使用此名称,也可以使用连接的任何其他名称。
  5. 按照 Airflow 文档中的Amazon Web Services 连接说明指定连接参数。 例如,如需使用 AWS 访问密钥设置连接,您需要在 AWS 上为自己的账号生成一个访问密钥,然后提供 AWS 访问密钥 ID 作为登录信息,并提供 AWS 私有访问密钥作为连接密码。

从 Amazon S3 转移数据

如果您想在以后的其他 DAG 或任务中处理同步的数据,请将其拉取到环境存储桶的 /data 文件夹中。此文件夹会同步到其他 Airflow 工作器,以便 DAG 中的任务可以对其进行操作。

以下示例 DAG 执行以下操作:

  • 将 S3 存储桶中 /data-for-gcs 目录的内容同步到环境存储桶中的 /data/from-s3/data-for-gcs/ 文件夹。
  • 等待 2 分钟,以便将数据同步到环境中的所有 Airflow 工作人员。
  • 使用 ls 命令输出此目录中的文件列表。将此任务替换为可处理您的数据的其他 Airflow 运算符。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare 到 Cloud Storage

本部分演示了如何将数据从 Azure FileShare 同步到 Cloud Storage 存储桶。

安装 Microsoft Azure 提供程序软件包

apache-airflow-providers-microsoft-azure 软件包包含与 Microsoft Azure 交互的连接类型和功能。在您的环境中安装此 PyPI 软件包

配置与 Azure FileShare 的连接

Microsoft Azure 提供程序软件包为 Azure 文件共享提供了一种连接类型。您创建了此类连接。您已在环境中设置名为 google_cloud_default 的 Cloud Storage 连接。

按以下方式设置与 Azure 文件共享的连接:

  1. Airflow 界面中,依次前往管理 > 连接
  2. 创建新连接。
  3. 选择 Azure FileShare 作为连接类型。
  4. 以下示例使用名为 azure_fileshare 的连接。您可以使用此名称,也可以使用连接的任何其他名称。
  5. 按照 Airflow 文档中有关 Microsoft Azure 文件共享连接的说明指定连接参数。 例如,您可以为存储账号访问密钥指定连接字符串。

从 Azure 文件共享转移数据

如果您想在以后的其他 DAG 或任务中处理同步的数据,请将其拉取到环境存储桶的 /data 文件夹中。此文件夹会同步到其他 Airflow 工作器,以便 DAG 中的任务可以对其进行操作。

以下 DAG 会执行以下操作:

以下示例 DAG 执行以下操作:

  • 将 Azure 文件共享中的 /data-for-gcs 目录的内容同步到环境的存储桶中的 /data/from-azure 文件夹。
  • 等待 2 分钟,以便将数据同步到环境中的所有 Airflow 工作人员。
  • 使用 ls 命令输出此目录中的文件列表。将此任务替换为可处理您的数据的其他 Airflow 运算符。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

后续步骤