Google Transfer Operators를 사용하여 다른 서비스에서 데이터 전송

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 DAG의 Google Transfer Operators를 사용하여 다른 서비스에서 데이터를 전송하는 방법을 보여줍니다.

Google Transfer Operators 정보

Transfer Operators는 다른 서비스의 데이터를 Google Cloud로 가져오는 데 사용할 수 있는 Airflow 연산자 집합입니다.

이 가이드에서는 Cloud Storage에서 사용할 수 있는 Azure FileShare Storage 및 Amazon S3의 연산자를 보여줍니다. 이 외에도 Google Cloud 내의 서비스와 Google Cloud 이외의 서비스에서 사용할 수 있는 전송 연산자가 더 많이 있습니다.

시작하기 전에

  • 이 가이드는 Airflow 2용입니다. 환경에서 Airflow 1을 사용하는 경우 백포트 제공업체 패키지를 사용하여 연산자를 가져오고 사용자 환경에서 필요한 연결 유형을 사용할 수 있도록 합니다.

Amazon S3에서 Cloud Storage로 전송

이 섹션에서는 Amazon S3의 데이터를 Cloud Storage 버킷에 동기화하는 방법을 보여줍니다.

Amazon 제공업체 패키지 설치

apache-airflow-providers-amazon 패키지에는 Amazon S3와 상호작용하는 연결 유형과 기능이 포함됩니다. 사용자 환경에 이 PyPI 패키지를 설치합니다.

Amazon S3에 대한 연결 구성

Amazon 제공업체 패키지는 Amazon S3의 연결 유형을 제공합니다. 이 유형의 연결을 만듭니다. 이름이 google_cloud_default인 Cloud Storage 연결이 이미 환경에 설정되어 있습니다.

다음 방법으로 Amazon S3에 대한 연결을 설정합니다.

  1. Airflow UI에서 관리자 > 연결로 이동합니다.
  2. 새 연결을 만듭니다.
  3. 연결 유형으로 Amazon S3을 선택합니다.
  4. 다음 예시에서는 aws_s3이라는 연결을 사용합니다. 연결에는 이 이름 또는 다른 이름을 사용할 수 있습니다.
  5. Amazon Web Services Connection에 대한 Airflow 문서에 설명된 대로 연결 매개변수를 지정합니다. 예를 들어 AWS 액세스 키와의 연결을 설정하려면 AWS에서 계정의 액세스 키를 생성한 다음 연결에 대해 AWS 액세스 키 ID를 로그인으로, AWS 보안 비밀 액세스 키를 비밀번호로 제공합니다.

Amazon S3에서 데이터 전송

나중에 다른 DAG 또는 태스크에서 동기화된 데이터에 대해 작업하려면 환경 버킷의 /data 폴더로 가져옵니다. 이 폴더는 다른 Airflow 작업자와 동기화되므로 DAG의 태스크가 여기에서 작동할 수 있습니다.

다음 DAG 예시는 다음을 수행합니다.

  • S3 버킷의 /data-for-gcs 디렉터리 콘텐츠를 환경 버킷의 /data/from-s3/data-for-gcs/ 폴더에 동기화합니다.
  • 데이터가 사용자 환경의 모든 Airflow 작업자와 동기화될 때까지 2분 정도 기다립니다.
  • ls 명령어를 사용하여 이 디렉터리의 파일 목록을 출력합니다. 이 태스크를 데이터에 사용할 수 있는 다른 Airflow 연산자로 바꿉니다.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare에서 Cloud Storage로

이 섹션에서는 Azure FileShare의 데이터를 Cloud Storage 버킷에 동기화하는 방법을 보여줍니다.

Microsoft Azure 제공업체 패키지 설치

apache-airflow-providers-microsoft-azure 패키지에는 Microsoft Azure와 상호작용하는 연결 유형 및 기능이 포함되어 있습니다. 사용자 환경에 이 PyPI 패키지를 설치합니다.

Azure FileShare에 대한 연결 구성

Microsoft Azure 제공업체 패키지는 Azure 파일 공유의 연결 유형을 제공합니다. 이 유형의 연결을 만듭니다. 이름이 google_cloud_default인 Cloud Storage 연결이 이미 환경에 설정되어 있습니다.

다음 방법으로 Azure FileShare에 대한 연결을 설정합니다.

  1. Airflow UI에서 관리자 > 연결로 이동합니다.
  2. 새 연결을 만듭니다.
  3. 연결 유형으로 Azure FileShare을 선택합니다.
  4. 다음 예시에서는 azure_fileshare라는 연결을 사용합니다. 연결에는 이 이름 또는 다른 이름을 사용할 수 있습니다.
  5. Microsoft Azure 파일 공유 연결에 대한 Airflow 문서에 설명된 대로 연결 매개변수를 지정합니다. 예를 들어 스토리지 계정 액세스 키의 연결 문자열을 지정할 수 있습니다.

Azure FileShare에서 데이터 전송

나중에 다른 DAG 또는 태스크에서 동기화된 데이터에 대해 작업하려면 환경 버킷의 /data 폴더로 가져옵니다. 이 폴더는 다른 Airflow 작업자와 동기화되므로 DAG의 태스크가 여기에서 작동할 수 있습니다.

다음 DAG는 다음을 수행합니다.

다음 DAG 예시는 다음을 수행합니다.

  • Azure 파일 공유의 /data-for-gcs 디렉터리 콘텐츠를 환경 버킷의 /data/from-azure 폴더에 동기화합니다.
  • 데이터가 사용자 환경의 모든 Airflow 작업자와 동기화될 때까지 2분 정도 기다립니다.
  • ls 명령어를 사용하여 이 디렉터리의 파일 목록을 출력합니다. 이 태스크를 데이터에 사용할 수 있는 다른 Airflow 연산자로 바꿉니다.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

다음 단계