Mentransfer data dengan Operator Transfer Google

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menunjukkan cara mentransfer data dari layanan lain dengan Operator Transfer Google di DAG Anda.

Tentang Operator Transfer Google

Operator Transfer Google adalah sekumpulan operator Airflow yang dapat Anda gunakan untuk mengambil data dari layanan lain ke Google Cloud.

Panduan ini menunjukkan operator untuk Azure FileShare Storage dan Amazon S3 yang kompatibel dengan Cloud Storage. Ada lebih banyak operator transfer yang berfungsi dengan layanan dalam Google Cloud dan dengan layanan selain Google Cloud.

Amazon S3 ke Cloud Storage

Bagian ini menunjukkan cara menyinkronkan data dari Amazon S3 ke bucket Cloud Storage.

Menginstal paket penyedia Amazon

Paket apache-airflow-providers-amazon berisi jenis dan fungsi koneksi yang berinteraksi dengan Amazon S3. Instal paket PyPI ini di lingkungan Anda.

Mengonfigurasi koneksi ke Amazon S3

Paket penyedia Amazon menyediakan jenis koneksi untuk Amazon S3. Anda akan membuat koneksi jenis ini. Koneksi untuk Cloud Storage, bernama google_cloud_default sudah disiapkan di lingkungan Anda.

Siapkan koneksi ke Amazon S3 dengan cara berikut:

  1. Di UI Airflow, buka Admin > Connections.
  2. Buat koneksi baru.
  3. Pilih Amazon S3 sebagai jenis koneksi.
  4. Contoh berikut menggunakan koneksi bernama aws_s3. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi.
  5. Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Koneksi Amazon Web Services. Misalnya, untuk menyiapkan koneksi dengan kunci akses AWS, Anda membuat kunci akses untuk akun Anda di AWS, lalu memberikan ID kunci akses AWS sebagai login untuk kunci akses rahasia AWS sebagai sandi untuk koneksi tersebut.

Mentransfer data dari Amazon S3

Jika Anda ingin mengoperasikan data yang disinkronkan nanti di DAG atau tugas lain, tarik data tersebut ke folder /data bucket lingkungan Anda. Folder ini disinkronkan ke pekerja Airflow lainnya, sehingga tugas di DAG dapat beroperasi di dalamnya.

Contoh DAG berikut melakukan hal berikut:

  • Menyinkronkan konten direktori /data-for-gcs dari bucket S3 ke folder /data/from-s3/data-for-gcs/ di bucket lingkungan Anda.
  • Menunggu selama dua menit, sampai data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
  • Menghasilkan daftar file dalam direktori ini menggunakan perintah ls. Ganti tugas ini dengan operator Airflow lain yang menggunakan data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_aws_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_s3 = S3ToGCSOperator(
        task_id='transfer_dir_from_s3',
        aws_conn_id='aws_s3',
        prefix='data-for-gcs',
        bucket='example-s3-bucket-transfer-operators',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')


    transfer_dir_from_s3 >> sleep_2min >> print_dir_files

Azure FileShare ke Cloud Storage

Bagian ini menunjukkan cara menyinkronkan data dari Azure FileShare ke bucket Cloud Storage.

Menginstal paket penyedia Microsoft Azure

Paket apache-airflow-providers-microsoft-azure berisi jenis dan fungsi koneksi yang berinteraksi dengan Microsoft Azure. Instal paket PyPI ini di lingkungan Anda.

Mengonfigurasi koneksi ke Azure FileShare

Paket penyedia Microsoft Azure menyediakan jenis koneksi untuk Azure File Share. Anda membuat koneksi jenis ini. Koneksi untuk Cloud Storage, bernama google_cloud_default sudah disiapkan di lingkungan Anda.

Siapkan koneksi ke Azure FileShare dengan cara berikut:

  1. Di UI Airflow, buka Admin > Connections.
  2. Buat koneksi baru.
  3. Pilih Azure FileShare sebagai jenis koneksi.
  4. Contoh berikut menggunakan koneksi bernama azure_fileshare. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi.
  5. Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Microsoft Azure File Share Connection. Misalnya, Anda dapat menentukan string koneksi untuk kunci akses akun penyimpanan Anda.

Mentransfer data dari Azure FileShare

Jika Anda ingin mengoperasikan data yang disinkronkan nanti di DAG atau tugas lain, tarik data tersebut ke folder /data bucket lingkungan Anda. Folder ini disinkronkan ke pekerja Airflow lainnya, sehingga tugas di DAG dapat beroperasi di dalamnya.

DAG berikut melakukan hal berikut:

Contoh DAG berikut melakukan hal berikut:

  • Menyinkronkan konten direktori /data-for-gcs dari Azure File Share ke folder /data/from-azure di bucket lingkungan Anda.
  • Menunggu selama dua menit, sampai data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
  • Menghasilkan daftar file dalam direktori ini menggunakan perintah ls. Ganti tugas ini dengan operator Airflow lain yang menggunakan data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator

with airflow.DAG(
    'composer_sample_azure_to_gcs',
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval=None,
) as dag:

    transfer_dir_from_azure = AzureFileShareToGCSOperator(
        task_id='transfer_dir_from_azure',
        azure_fileshare_conn_id='azure_fileshare',
        share_name='example-file-share',
        directory_name='data-for-gcs',
        dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')

    sleep_2min = BashOperator(
        task_id='sleep_2min',
        bash_command='sleep 2m')

    print_dir_files = BashOperator(
        task_id='print_dir_files',
        bash_command='ls /home/airflow/gcs/data/from-azure/')


    transfer_dir_from_azure >> sleep_2min >> print_dir_files

Langkah selanjutnya