Mentransfer data dari layanan lain dengan Operator Transfer Google

Cloud Composer 1 | Cloud Composer 2

Halaman ini menunjukkan cara mentransfer data dari layanan lain dengan Operator Transfer Google di DAG Anda.

Tentang Operator Transfer Google

Google Transfer Operator adalah sekumpulan operator Airflow yang dapat Anda gunakan untuk menarik data dari layanan lain ke Google Cloud.

Panduan ini menunjukkan operator untuk Azure FileShare Storage dan Amazon S3 yang dapat digunakan dengan Cloud Storage. Ada banyak operator transfer lain yang bekerja dengan layanan dalam Google Cloud dan dengan layanan selain Google Cloud.

Sebelum memulai

  • Panduan ini ditujukan untuk Airflow 2. Jika lingkungan Anda menggunakan Airflow 1, gunakan paket penyedia backport untuk mengimpor operator dan menyediakan jenis koneksi yang diperlukan di lingkungan Anda.

Amazon S3 ke Cloud Storage

Bagian ini menunjukkan cara menyinkronkan data dari Amazon S3 ke bucket Cloud Storage.

Instal paket penyedia Amazon

Paket apache-airflow-providers-amazon berisi jenis dan fungsi koneksi yang berinteraksi dengan Amazon S3. Instal paket PyPI ini di lingkungan Anda.

Mengonfigurasi koneksi ke Amazon S3

Paket penyedia Amazon menyediakan jenis koneksi untuk Amazon S3. Anda membuat koneksi jenis ini. Koneksi untuk Cloud Storage, yang bernama google_cloud_default, sudah disiapkan di lingkungan Anda.

Siapkan koneksi ke Amazon S3 dengan cara berikut:

  1. Di Airflow UI, buka Admin > Connections.
  2. Buat koneksi baru.
  3. Pilih Amazon S3 sebagai jenis koneksi.
  4. Contoh berikut menggunakan koneksi bernama aws_s3. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi tersebut.
  5. Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Amazon Web Services Connection. Misalnya, untuk menyiapkan koneksi dengan kunci akses AWS, buat kunci akses untuk akun Anda di AWS, lalu berikan ID kunci akses AWS sebagai proses login kunci akses rahasia AWS sebagai sandi untuk koneksi tersebut.

Mentransfer data dari Amazon S3

Jika nanti Anda ingin mengoperasikan data yang disinkronkan di DAG atau tugas lain, tarik data tersebut ke folder /data di bucket lingkungan Anda. Folder ini disinkronkan ke pekerja Airflow lainnya, sehingga tugas di DAG dapat beroperasi di dalamnya.

Contoh DAG berikut melakukan hal berikut:

  • Menyinkronkan konten direktori /data-for-gcs dari bucket S3 ke folder /data/from-s3/data-for-gcs/ di bucket lingkungan Anda.
  • Tunggu selama dua menit hingga data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
  • Menampilkan daftar file dalam direktori ini menggunakan perintah ls. Ganti tugas ini dengan operator Airflow lain yang menangani data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')

    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare ke Cloud Storage

Bagian ini menunjukkan cara menyinkronkan data dari Azure FileShare ke bucket Cloud Storage.

Menginstal paket penyedia Microsoft Azure

Paket apache-airflow-providers-microsoft-azure berisi jenis dan fungsi koneksi yang berinteraksi dengan Microsoft Azure. Instal paket PyPI ini di lingkungan Anda.

Mengonfigurasi koneksi ke Azure FileShare

Paket penyedia Microsoft Azure menyediakan jenis koneksi untuk Azure File Share. Anda membuat koneksi dari jenis ini. Koneksi untuk Cloud Storage, bernama google_cloud_default, sudah disiapkan di lingkungan Anda.

Siapkan koneksi ke Azure FileShare dengan cara berikut:

  1. Di Airflow UI, buka Admin > Connections.
  2. Buat koneksi baru.
  3. Pilih Azure FileShare sebagai jenis koneksi.
  4. Contoh berikut menggunakan koneksi bernama azure_fileshare. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi tersebut.
  5. Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Microsoft Azure File Share Connection. Misalnya, Anda dapat menentukan string koneksi untuk kunci akses akun penyimpanan.

Mentransfer data dari Azure FileShare

Jika nanti Anda ingin mengoperasikan data yang disinkronkan di DAG atau tugas lain, tarik data tersebut ke folder /data di bucket lingkungan Anda. Folder ini disinkronkan ke pekerja Airflow lainnya, sehingga tugas di DAG dapat beroperasi di dalamnya.

DAG berikut akan melakukan hal berikut:

Contoh DAG berikut melakukan hal berikut:

  • Menyinkronkan konten direktori /data-for-gcs dari Azure File Share ke folder /data/from-azure di bucket lingkungan Anda.
  • Tunggu selama dua menit hingga data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
  • Menampilkan daftar file dalam direktori ini menggunakan perintah ls. Ganti tugas ini dengan operator Airflow lain yang menangani data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')

    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Langkah selanjutnya