Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 페이지에서는 DAG의 Google Transfer Operators를 사용하여 다른 서비스에서 데이터를 전송하는 방법을 보여줍니다.
Google Transfer Operators 정보
Transfer Operators는 다른 서비스의 데이터를 Google Cloud로 가져오는 데 사용할 수 있는 Airflow 연산자 집합입니다.
이 가이드에서는 Cloud Storage에서 사용할 수 있는 Azure FileShare Storage 및 Amazon S3의 연산자를 보여줍니다. 이 외에도 Google Cloud 내의 서비스와 Google Cloud 이외의 서비스에서 사용할 수 있는 전송 연산자가 더 많이 있습니다.
시작하기 전에
- 이 가이드는 Airflow 2용입니다. 환경에서 Airflow 1을 사용하는 경우 백포트 제공업체 패키지를 사용하여 연산자를 가져오고 사용자 환경에서 필요한 연결 유형을 사용할 수 있도록 합니다.
Amazon S3에서 Cloud Storage로 전송
이 섹션에서는 Amazon S3의 데이터를 Cloud Storage 버킷에 동기화하는 방법을 보여줍니다.
Amazon 제공업체 패키지 설치
apache-airflow-providers-amazon
패키지에는 Amazon S3와 상호작용하는 연결 유형과 기능이 포함됩니다.
사용자 환경에 이 PyPI 패키지를 설치합니다.
Amazon S3에 대한 연결 구성
Amazon 제공업체 패키지는 Amazon S3의 연결 유형을 제공합니다. 이 유형의 연결을 만듭니다. 이름이 google_cloud_default
인 Cloud Storage 연결이 이미 환경에 설정되어 있습니다.
다음 방법으로 Amazon S3에 대한 연결을 설정합니다.
- Airflow UI에서 관리자 > 연결로 이동합니다.
- 새 연결을 만듭니다.
- 연결 유형으로
Amazon S3
을 선택합니다. - 다음 예시에서는
aws_s3
이라는 연결을 사용합니다. 연결에는 이 이름 또는 다른 이름을 사용할 수 있습니다. - Amazon Web Services Connection에 대한 Airflow 문서에 설명된 대로 연결 매개변수를 지정합니다. 예를 들어 AWS 액세스 키와의 연결을 설정하려면 AWS에서 계정의 액세스 키를 생성한 다음 연결에 대해 AWS 액세스 키 ID를 로그인으로, AWS 보안 비밀 액세스 키를 비밀번호로 제공합니다.
Amazon S3에서 데이터 전송
나중에 다른 DAG 또는 태스크에서 동기화된 데이터에 대해 작업하려면 환경 버킷의 /data
폴더로 가져옵니다. 이 폴더는 다른 Airflow 작업자와 동기화되므로 DAG의 태스크가 여기에서 작동할 수 있습니다.
다음 DAG 예시는 다음을 수행합니다.
- S3 버킷의
/data-for-gcs
디렉터리 콘텐츠를 환경 버킷의/data/from-s3/data-for-gcs/
폴더에 동기화합니다. - 데이터가 사용자 환경의 모든 Airflow 작업자와 동기화될 때까지 2분 정도 기다립니다.
ls
명령어를 사용하여 이 디렉터리의 파일 목록을 출력합니다. 이 태스크를 데이터에 사용할 수 있는 다른 Airflow 연산자로 바꿉니다.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare에서 Cloud Storage로
이 섹션에서는 Azure FileShare의 데이터를 Cloud Storage 버킷에 동기화하는 방법을 보여줍니다.
Microsoft Azure 제공업체 패키지 설치
apache-airflow-providers-microsoft-azure
패키지에는 Microsoft Azure와 상호작용하는 연결 유형 및 기능이 포함되어 있습니다.
사용자 환경에 이 PyPI 패키지를 설치합니다.
Azure FileShare에 대한 연결 구성
Microsoft Azure 제공업체 패키지는 Azure 파일 공유의 연결 유형을 제공합니다. 이 유형의 연결을 만듭니다. 이름이 google_cloud_default
인 Cloud Storage 연결이 이미 환경에 설정되어 있습니다.
다음 방법으로 Azure FileShare에 대한 연결을 설정합니다.
- Airflow UI에서 관리자 > 연결로 이동합니다.
- 새 연결을 만듭니다.
- 연결 유형으로
Azure FileShare
을 선택합니다. - 다음 예시에서는
azure_fileshare
라는 연결을 사용합니다. 연결에는 이 이름 또는 다른 이름을 사용할 수 있습니다. - Microsoft Azure 파일 공유 연결에 대한 Airflow 문서에 설명된 대로 연결 매개변수를 지정합니다. 예를 들어 스토리지 계정 액세스 키의 연결 문자열을 지정할 수 있습니다.
Azure FileShare에서 데이터 전송
나중에 다른 DAG 또는 태스크에서 동기화된 데이터에 대해 작업하려면 환경 버킷의 /data
폴더로 가져옵니다. 이 폴더는 다른 Airflow 작업자와 동기화되므로 DAG의 태스크가 여기에서 작동할 수 있습니다.
다음 DAG는 다음을 수행합니다.
다음 DAG 예시는 다음을 수행합니다.
- Azure 파일 공유의
/data-for-gcs
디렉터리 콘텐츠를 환경 버킷의/data/from-azure
폴더에 동기화합니다. - 데이터가 사용자 환경의 모든 Airflow 작업자와 동기화될 때까지 2분 정도 기다립니다.
ls
명령어를 사용하여 이 디렉터리의 파일 목록을 출력합니다. 이 태스크를 데이터에 사용할 수 있는 다른 Airflow 연산자로 바꿉니다.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files