使用 Google 轉移運算子轉移其他服務中的資料

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在 DAG 中使用 Google Transfer Operators,從其他服務轉移資料。

關於 Google Transfer Operators

Google 移轉運算子是一組 Airflow 運算子,可用於將其他服務的資料擷取到Google Cloud。

本指南會說明適用於 Azure FileShare Storage 和 Amazon S3 的運算子,這些運算子可搭配 Cloud Storage 使用。還有許多轉移運算子可與 Google Cloud 中的服務搭配使用,以及與Google Cloud以外的服務搭配使用。

Amazon S3 至 Cloud Storage

本節說明如何將資料從 Amazon S3 同步至 Cloud Storage 值區。

安裝 Amazon 供應商套件

apache-airflow-providers-amazon 套件包含與 Amazon S3 互動的連線類型和功能。在環境中安裝這個 PyPI 套件

設定與 Amazon S3 的連線

Amazon 供應商套件提供 Amazon S3 的連線類型。您可以建立這類連線。環境中已設定名為 google_cloud_default 的 Cloud Storage 連線。

請按照下列方式設定與 Amazon S3 的連線:

  1. Airflow UI 中,依序前往「Admin」(管理員) >「Connections」(連線)
  2. 建立新連線。
  3. 選取「Amazon S3」做為連線類型。
  4. 以下範例使用名為 aws_s3 的連線。您可以使用這個名稱,也可以為連線指定任何其他名稱。
  5. 如要指定連線參數,請參閱 Amazon Web Services 連線的 Airflow 說明文件。舉例來說,如要使用 AWS 存取金鑰設定連線,請在 AWS 上為帳戶產生存取金鑰,然後提供 AWS 存取金鑰 ID 做為連線的登入資訊,並提供 AWS 存取密鑰做為連線的密碼。

從 Amazon S3 轉移資料

如要在其他 DAG 或工作中使用同步處理的資料,請將資料拉到環境值區的 /data 資料夾。這個資料夾會同步處理其他 Airflow 工作站,因此 DAG 中的工作可以在這個資料夾中運作。

下列 DAG 範例會執行下列操作:

  • 將 S3 值區中的 /data-for-gcs 目錄內容同步到環境值區的 /data/from-s3/data-for-gcs/ 資料夾。
  • 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
  • 使用 ls 指令輸出這個目錄中的檔案清單。請將這項工作替換為可處理您資料的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

從 Azure FileShare 到 Cloud Storage

本節說明如何將資料從 Azure FileShare 同步至 Cloud Storage 值區。

安裝 Microsoft Azure 供應商套件

apache-airflow-providers-microsoft-azure 套件包含與 Microsoft Azure 互動的連線類型和功能。在環境中安裝這個 PyPI 套件

設定 Azure FileShare 的連線

Microsoft Azure 供應商套件提供 Azure 檔案共用的連線類型。建立這類連線。您環境中已設定名為 google_cloud_default 的 Cloud Storage 連線。

請按照下列方式設定 Azure FileShare 連線:

  1. Airflow UI 中,依序前往「Admin」(管理員) >「Connections」(連線)
  2. 建立新連線。
  3. 選取「Azure FileShare」做為連線類型。
  4. 以下範例使用名為 azure_fileshare 的連線。您可以使用這個名稱,或為連線指定任何其他名稱。
  5. 請按照 Airflow 說明文件,為Microsoft Azure 檔案共用連線指定連線參數。舉例來說,您可以指定儲存空間帳戶存取金鑰的連線字串。

從 Azure FileShare 轉移資料

如要在其他 DAG 或工作中使用同步處理的資料,請將資料拉到環境值區的 /data 資料夾。這個資料夾會同步處理其他 Airflow 工作站,因此 DAG 中的工作可以在這個資料夾中運作。

下列 DAG 會執行以下作業:

下列 DAG 範例會執行下列操作:

  • 將 Azure 檔案共用中的 /data-for-gcs 目錄內容,同步到環境值區中的 /data/from-azure 資料夾。
  • 等待兩分鐘,讓資料同步至環境中的所有 Airflow 工作站。
  • 使用 ls 指令輸出這個目錄中的檔案清單。請將這項工作替換為可處理您資料的其他 Airflow 運算子。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

後續步驟