Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini menunjukkan cara mentransfer data dari layanan lain dengan Operator Transfer Google di DAG Anda.
Tentang Operator Transfer Google
Operator Transfer Google adalah serangkaian operator Airflow yang dapat Anda gunakan untuk mengambil data dari layanan lain ke Google Cloud.
Panduan ini menunjukkan operator untuk Azure FileShare Storage dan Amazon S3 yang berfungsi dengan Cloud Storage. Ada banyak operator transfer lainnya yang berfungsi dengan layanan dalam Google Cloud dan dengan layanan selain Google Cloud.
Sebelum memulai
- Panduan ini ditujukan untuk Airflow 2. Jika lingkungan Anda menggunakan Airflow 1, gunakan paket penyedia backport untuk mengimpor operator dan menyediakan jenis koneksi yang diperlukan di lingkungan Anda.
Amazon S3 ke Cloud Storage
Bagian ini menunjukkan cara menyinkronkan data dari Amazon S3 ke bucket Cloud Storage.
Menginstal paket penyedia Amazon
Paket apache-airflow-providers-amazon
berisi jenis
koneksi dan fungsi yang berinteraksi dengan Amazon S3.
Instal paket PyPI ini di lingkungan
Anda.
Mengonfigurasi koneksi ke Amazon S3
Paket penyedia Amazon menyediakan jenis koneksi untuk Amazon S3. Anda
membuat koneksi jenis ini. Koneksi untuk Cloud Storage,
yang bernama google_cloud_default
, sudah disiapkan di lingkungan Anda.
Siapkan koneksi ke Amazon S3 dengan cara berikut:
- Di UI Airflow, buka Admin > Koneksi.
- Buat koneksi baru.
- Pilih
Amazon S3
sebagai jenis koneksi. - Contoh berikut menggunakan koneksi bernama
aws_s3
. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi. - Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Koneksi Amazon Web Services. Misalnya, untuk menyiapkan koneksi dengan kunci akses AWS, Anda membuat kunci akses untuk akun Anda di AWS, lalu memberikan ID kunci akses AWS sebagai login dan kunci akses rahasia AWS sebagai sandi untuk koneksi.
Mentransfer data dari Amazon S3
Jika Anda ingin mengoperasikan data yang disinkronkan nanti di DAG atau tugas lain,
tarik data tersebut ke folder /data
di bucket lingkungan Anda. Folder ini
disinkronkan ke pekerja Airflow lainnya, sehingga tugas dalam DAG Anda
dapat beroperasi di dalamnya.
Contoh DAG berikut melakukan hal berikut:
- Menyinkronkan konten direktori
/data-for-gcs
dari bucket S3 ke folder/data/from-s3/data-for-gcs/
di bucket lingkungan Anda. - Menunggu selama dua menit, agar data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
- Menampilkan daftar file dalam direktori ini menggunakan perintah
ls
. Ganti tugas ini dengan operator Airflow lain yang berfungsi dengan data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_aws_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_s3 = S3ToGCSOperator(
task_id='transfer_dir_from_s3',
aws_conn_id='aws_s3',
prefix='data-for-gcs',
bucket='example-s3-bucket-transfer-operators',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-s3/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-s3/data-for-gcs/')
transfer_dir_from_s3 >> sleep_2min >> print_dir_files
Azure FileShare ke Cloud Storage
Bagian ini menunjukkan cara menyinkronkan data dari Azure FileShare ke bucket Cloud Storage.
Menginstal paket penyedia Microsoft Azure
Paket apache-airflow-providers-microsoft-azure
berisi jenis koneksi dan fungsi yang berinteraksi dengan Microsoft Azure.
Instal paket PyPI ini di lingkungan
Anda.
Mengonfigurasi koneksi ke Azure FileShare
Paket penyedia Microsoft Azure menyediakan jenis koneksi untuk Azure File Share. Anda membuat koneksi jenis ini. Koneksi untuk Cloud Storage, bernama google_cloud_default
, sudah disiapkan di lingkungan Anda.
Siapkan koneksi ke Azure FileShare dengan cara berikut:
- Di UI Airflow, buka Admin > Koneksi.
- Buat koneksi baru.
- Pilih
Azure FileShare
sebagai jenis koneksi. - Contoh berikut menggunakan koneksi bernama
azure_fileshare
. Anda dapat menggunakan nama ini, atau nama lain untuk koneksi. - Tentukan parameter koneksi seperti yang dijelaskan dalam dokumentasi Airflow untuk Koneksi Berbagi File Microsoft Azure. Misalnya, Anda dapat menentukan string koneksi untuk kunci akses akun penyimpanan.
Mentransfer data dari Azure FileShare
Jika Anda ingin mengoperasikan data yang disinkronkan nanti di DAG atau tugas lain,
tarik data tersebut ke folder /data
di bucket lingkungan Anda. Folder ini
disinkronkan ke pekerja Airflow lainnya, sehingga tugas dalam DAG Anda
dapat beroperasi di dalamnya.
DAG berikut melakukan hal berikut:
Contoh DAG berikut melakukan hal berikut:
- Menyinkronkan konten direktori
/data-for-gcs
dari Azure File Share ke folder/data/from-azure
di bucket lingkungan Anda. - Menunggu selama dua menit, agar data disinkronkan ke semua pekerja Airflow di lingkungan Anda.
- Menampilkan daftar file dalam direktori ini menggunakan perintah
ls
. Ganti tugas ini dengan operator Airflow lain yang berfungsi dengan data Anda.
import datetime
import airflow
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.operators.bash_operator import BashOperator
with airflow.DAG(
'composer_sample_azure_to_gcs',
start_date=datetime.datetime(2022, 1, 1),
schedule_interval=None,
) as dag:
transfer_dir_from_azure = AzureFileShareToGCSOperator(
task_id='transfer_dir_from_azure',
azure_fileshare_conn_id='azure_fileshare',
share_name='example-file-share',
directory_name='data-for-gcs',
dest_gcs='gs://us-central1-example-environ-361f2312-bucket/data/from-azure/')
sleep_2min = BashOperator(
task_id='sleep_2min',
bash_command='sleep 2m')
print_dir_files = BashOperator(
task_id='print_dir_files',
bash_command='ls /home/airflow/gcs/data/from-azure/')
transfer_dir_from_azure >> sleep_2min >> print_dir_files