Google Transfer Operator を使用して他のサービスからデータを転送する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、DAG で Google Transfer Operator を使用して他のサービスからデータ転送を行う方法について説明します。

Google Transfer Operator について

Google Transfer Operator は、他のサービスから Google Cloud にデータを pull するために使用できる Airflow 演算子のセットです。

このガイドでは、Cloud Storage で動作する Azure FileShare Storage と Amazon S3 の演算子について説明します。Google Cloud 内のサービスや Google Cloud 以外のサービスと連携する転送演算子は他にも多数あります。

始める前に

  • このガイドは Airflow 2 を対象としています。環境で Airflow 1 を使用している場合は、バックポート プロバイダ パッケージを使用して演算子をインポートし、必要な接続タイプを環境で使用できるようにします。

Amazon S3 から Cloud Storage

このセクションでは、Amazon S3 から Cloud Storage バケットにデータを同期する方法を示します。

Amazon プロバイダ パッケージをインストールする

apache-airflow-providers-amazon パッケージには、Amazon S3 とやり取りする接続タイプと機能が含まれています。 環境にこの PyPI パッケージをインストールします。

Amazon S3 への接続を構成する

Amazon プロバイダ パッケージは、Amazon S3 の接続タイプを提供します。このタイプの接続を作成します。google_cloud_default という名前の Cloud Storage への接続は、すでに環境で設定されています。

次の方法で Amazon S3 への接続を設定します。

  1. Airflow UI で、[管理者] > [接続] に移動します。
  2. 新しい接続を作成します。
  3. 接続タイプとして Amazon S3 を選択します。
  4. 次の例では、aws_s3 という名前の接続を使用しています。この名前を使用することも、接続に他の名前を使用することもできます。
  5. Amazon Web Services 接続の Airflow ドキュメントの説明に従って、接続パラメータを指定します。たとえば、AWS アクセスキーによる接続を設定するには、AWS でアカウントのアクセスキーを生成し、AWS アクセスキー ID をログインとして、AWS シークレット アクセスキーを接続のパスワードとして提供します。

Amazon S3 からデータを転送する

同期データを後で別の DAG またはタスクで操作する場合は、環境のバケットの /data フォルダに pull します。このフォルダは他の Airflow ワーカーと同期されるため、DAG のタスクでこのフォルダを操作できます。

次の DAG の例では、次のことを行います。

  • /data-for-gcs ディレクトリの内容を S3 バケットから環境のバケット内の /data/from-s3/data-for-gcs/ フォルダに同期します。
  • 環境内のすべての Airflow ワーカーとデータが同期されるまで、2 分待ちます。
  • ls コマンドを使用して、このディレクトリ内のファイルのリストを出力します。このタスクを、データで動作する他の Airflow オペレータに置き換えます。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare から Cloud Storage

このセクションでは、Azure FileShare から Cloud Storage バケットにデータを同期する方法について説明します。

Microsoft Azure プロバイダ パッケージをインストールする

apache-airflow-providers-microsoft-azure パッケージには、Microsoft Azure とやり取りする接続タイプと機能が含まれています。 環境にこの PyPI パッケージをインストールします。

Azure FileShare への接続を構成する

Microsoft Azure プロバイダ パッケージは、Azure File Share 用の接続タイプを提供します。このタイプの接続を作成します。google_cloud_default という名前の Cloud Storage への接続は、すでに環境で設定されています。

次の方法で Azure FileShare への接続を設定します。

  1. Airflow UI で、[管理者] > [接続] に移動します。
  2. 新しい接続を作成します。
  3. 接続タイプとして Azure FileShare を選択します。
  4. 次の例では、azure_fileshare という名前の接続を使用しています。この名前を使用することも、接続に他の名前を使用することもできます。
  5. Microsoft Azure ファイル共有接続の Airflow ドキュメントの説明に従って、接続パラメータを指定します。たとえば、ストレージ アカウントのアクセスキーの接続文字列を指定できます。

Azure FileShare からデータを転送する

同期データを後で別の DAG またはタスクで操作する場合は、環境のバケットの /data フォルダに pull します。このフォルダは他の Airflow ワーカーと同期されるため、DAG のタスクでこのフォルダを操作できます。

次の DAG は次のことを行います。

次の DAG の例では、次のことを行います。

  • /data-for-gcs ディレクトリの内容を Azure File Share から、環境バケットの /data/from-azure フォルダと同期します。
  • 環境内のすべての Airflow ワーカーとデータが同期されるまで、2 分待ちます。
  • ls コマンドを使用して、このディレクトリ内のファイルのリストを出力します。このタスクを、データで動作するほかの Airflow オペレータに置き換えます。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

次のステップ