Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、DAG で Google Transfer Operator を使用して他のサービスからデータ転送を行う方法について説明します。
Google Transfer Operator について
Google Transfer Operator は、他のサービスから Google Cloud にデータを pull するために使用できる Airflow 演算子のセットです。
このガイドでは、Cloud Storage で動作する Azure FileShare Storage と Amazon S3 の演算子について説明します。Google Cloud 内のサービスや Google Cloud 以外のサービスと連携する転送演算子は他にも多数あります。
始める前に
- このガイドは Airflow 2 を対象としています。環境で Airflow 1 を使用している場合は、バックポート プロバイダ パッケージを使用して演算子をインポートし、必要な接続タイプを環境で使用できるようにします。
Amazon S3 から Cloud Storage
このセクションでは、Amazon S3 から Cloud Storage バケットにデータを同期する方法を示します。
Amazon プロバイダ パッケージをインストールする
apache-airflow-providers-amazon
パッケージには、Amazon S3 とやり取りする接続タイプと機能が含まれています。
環境にこの PyPI パッケージをインストールします。
Amazon S3 への接続を構成する
Amazon プロバイダ パッケージは、Amazon S3 の接続タイプを提供します。このタイプの接続を作成します。google_cloud_default
という名前の Cloud Storage への接続は、すでに環境で設定されています。
次の方法で Amazon S3 への接続を設定します。
- Airflow UI で、[管理者] > [接続] に移動します。
- 新しい接続を作成します。
- 接続タイプとして
Amazon S3
を選択します。 - 次の例では、
aws_s3
という名前の接続を使用しています。この名前を使用することも、接続に他の名前を使用することもできます。 - Amazon Web Services 接続の Airflow ドキュメントの説明に従って、接続パラメータを指定します。たとえば、AWS アクセスキーによる接続を設定するには、AWS でアカウントのアクセスキーを生成し、AWS アクセスキー ID をログインとして、AWS シークレット アクセスキーを接続のパスワードとして提供します。
Amazon S3 からデータを転送する
同期データを後で別の DAG またはタスクで操作する場合は、環境のバケットの /data
フォルダに pull します。このフォルダは他の Airflow ワーカーと同期されるため、DAG のタスクでこのフォルダを操作できます。
次の DAG の例では、次のことを行います。
/data-for-gcs
ディレクトリの内容を S3 バケットから環境のバケット内の/data/from-s3/data-for-gcs/
フォルダに同期します。- 環境内のすべての Airflow ワーカーとデータが同期されるまで、2 分待ちます。
ls
コマンドを使用して、このディレクトリ内のファイルのリストを出力します。このタスクを、データで動作する他の Airflow オペレータに置き換えます。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare から Cloud Storage
このセクションでは、Azure FileShare から Cloud Storage バケットにデータを同期する方法について説明します。
Microsoft Azure プロバイダ パッケージをインストールする
apache-airflow-providers-microsoft-azure
パッケージには、Microsoft Azure とやり取りする接続タイプと機能が含まれています。
環境にこの PyPI パッケージをインストールします。
Azure FileShare への接続を構成する
Microsoft Azure プロバイダ パッケージは、Azure File Share 用の接続タイプを提供します。このタイプの接続を作成します。google_cloud_default
という名前の Cloud Storage への接続は、すでに環境で設定されています。
次の方法で Azure FileShare への接続を設定します。
- Airflow UI で、[管理者] > [接続] に移動します。
- 新しい接続を作成します。
- 接続タイプとして
Azure FileShare
を選択します。 - 次の例では、
azure_fileshare
という名前の接続を使用しています。この名前を使用することも、接続に他の名前を使用することもできます。 - Microsoft Azure ファイル共有接続の Airflow ドキュメントの説明に従って、接続パラメータを指定します。たとえば、ストレージ アカウントのアクセスキーの接続文字列を指定できます。
Azure FileShare からデータを転送する
同期データを後で別の DAG またはタスクで操作する場合は、環境のバケットの /data
フォルダに pull します。このフォルダは他の Airflow ワーカーと同期されるため、DAG のタスクでこのフォルダを操作できます。
次の DAG は次のことを行います。
次の DAG の例では、次のことを行います。
/data-for-gcs
ディレクトリの内容を Azure File Share から、環境バケットの/data/from-azure
フォルダと同期します。- 環境内のすべての Airflow ワーカーとデータが同期されるまで、2 分待ちます。
ls
コマンドを使用して、このディレクトリ内のファイルのリストを出力します。このタスクを、データで動作するほかの Airflow オペレータに置き換えます。
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files