Mentransfer data dari layanan lain dengan Operator Transfer Google

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menunjukkan cara mentransfer data dari layanan lain dengan Operator Transfer Google di DAG Anda.

Tentang Operator Transfer Google

Operator Transfer Google adalah serangkaian operator Airflow yang dapat Anda gunakan untuk mengambil data dari layanan lain ke Google Cloud.

Panduan ini menunjukkan operator untuk Azure FileShare Storage dan Amazon S3 yang berfungsi dengan Cloud Storage. Ada banyak operator transfer lainnya yang berfungsi dengan layanan dalam Google Cloud dan dengan layanan selain Google Cloud.

Sebelum memulai

  • Panduan ini ditujukan untuk Airflow 2. Jika lingkungan Anda menggunakan Airflow 1, gunakan paket penyedia backport untuk mengimpor operator dan menyediakan jenis koneksi yang diperlukan di lingkungan Anda.

Amazon S3 ke Cloud Storage

Bagian ini menunjukkan cara menyinkronkan data dari Amazon S3 ke bucket Cloud Storage.

Menginstal paket penyedia Amazon

Paket apache-airflow-providers-amazon berisi jenis koneksi dan fungsi yang berinteraksi dengan Amazon S3. Instal paket PyPI ini di lingkungan Anda.

Mengonfigurasi koneksi ke Amazon S3

Paket penyedia Amazon menyediakan jenis koneksi untuk Amazon S3. Anda membuat koneksi jenis ini. Koneksi untuk Cloud Storage, yang bernama google_cloud_default, sudah disiapkan di lingkungan Anda.

Siapkan koneksi ke Amazon S3 dengan cara berikut:

  1. Di UI Airflow, buka Admin > Koneksi.
  2. Buat koneksi baru.
  3. Pilih Amazon S3 sebagai jenis koneksi.
  4. Contoh berikut menggunakan koneksi bernama aws_s3. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi.
  5. Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Koneksi Amazon Web Services. Misalnya, untuk menyiapkan koneksi dengan kunci akses AWS, Anda membuat kunci akses untuk akun Anda di AWS, lalu memberikan ID kunci akses AWS sebagai login dan kunci akses rahasia AWS sebagai sandi untuk koneksi.

Mentransfer data dari Amazon S3

Jika Anda ingin mengoperasikan data yang disinkronkan nanti di DAG atau tugas lain, tarik data tersebut ke folder /data di bucket lingkungan Anda. Folder ini disinkronkan ke pekerja Airflow lainnya, sehingga tugas dalam DAG Anda dapat beroperasi di dalamnya.

Contoh DAG berikut melakukan hal berikut:

  • Menyinkronkan konten direktori /data-for-gcs dari bucket S3 ke folder /data/from-s3/data-for-gcs/ di bucket lingkungan Anda.
  • Menunggu selama dua menit, agar data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
  • Menampilkan daftar file dalam direktori ini menggunakan perintah ls. Ganti tugas ini dengan operator Airflow lain yang berfungsi dengan data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare ke Cloud Storage

Bagian ini menunjukkan cara menyinkronkan data dari Azure FileShare ke bucket Cloud Storage.

Menginstal paket penyedia Microsoft Azure

Paket apache-airflow-providers-microsoft-azure berisi jenis koneksi dan fungsi yang berinteraksi dengan Microsoft Azure. Instal paket PyPI ini di lingkungan Anda.

Mengonfigurasi koneksi ke Azure FileShare

Paket penyedia Microsoft Azure menyediakan jenis koneksi untuk Azure File Share. Anda membuat koneksi jenis ini. Koneksi untuk Cloud Storage, bernama google_cloud_default, sudah disiapkan di lingkungan Anda.

Siapkan koneksi ke Azure FileShare dengan cara berikut:

  1. Di UI Airflow, buka Admin > Koneksi.
  2. Buat koneksi baru.
  3. Pilih Azure FileShare sebagai jenis koneksi.
  4. Contoh berikut menggunakan koneksi bernama azure_fileshare. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi.
  5. Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Koneksi Berbagi File Microsoft Azure. Misalnya, Anda dapat menentukan string koneksi untuk kunci akses akun penyimpanan.

Mentransfer data dari Azure FileShare

Jika Anda ingin mengoperasikan data yang disinkronkan nanti di DAG atau tugas lain, tarik data tersebut ke folder /data di bucket lingkungan Anda. Folder ini disinkronkan ke pekerja Airflow lainnya, sehingga tugas dalam DAG Anda dapat beroperasi di dalamnya.

DAG berikut melakukan hal berikut:

Contoh DAG berikut melakukan hal berikut:

  • Menyinkronkan konten direktori /data-for-gcs dari Azure File Share ke folder /data/from-azure di bucket lingkungan Anda.
  • Menunggu selama dua menit, agar data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
  • Menampilkan daftar file dalam direktori ini menggunakan perintah ls. Ganti tugas ini dengan operator Airflow lain yang berfungsi dengan data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Langkah selanjutnya