Cloud Composer 1 | Cloud Composer 2
Halaman ini menjelaskan cara menggunakan Cloud Composer 2 untuk menjalankan workload Dataproc Serverless di Google Cloud.
Contoh di bagian berikut menunjukkan cara menggunakan operator untuk mengelola beban kerja batch Dataproc Serverless. Gunakan operator ini di DAG yang membuat, menghapus, mencantumkan, dan mendapatkan beban kerja batch Dataproc Serverless Spark:
Buat DAG untuk operator yang bekerja dengan beban kerja Dataproc Serverless Batch:
Buat DAG yang menggunakan container kustom, dan Metastore Dataproc.
Konfigurasi Server Histori Persistent untuk DAG ini.
Sebelum memulai
Aktifkan Dataproc API:
Konsol
Aktifkan API Dataproc.
gcloud
Aktifkan API Dataproc:
gcloud services enable dataproc.googleapis.com
Pilih lokasi untuk file beban kerja Batch Anda. Anda dapat menggunakan salah satu opsi berikut:
- Buat bucket Cloud Storage yang menyimpan file ini.
- Gunakan bucket lingkungan Anda. Karena tidak perlu menyinkronkan file ini
dengan Airflow, Anda dapat membuat subfolder terpisah di luar folder
/dags
atau/data
. Misalnya,/batches
. - Gunakan bucket yang ada.
Menyiapkan file dan variabel Airflow
Bagian ini menunjukkan cara menyiapkan file dan mengonfigurasi variabel Airflow untuk tutorial ini.
Upload file beban kerja Dataproc Serverless Spark ML ke bucket
Beban kerja dalam tutorial ini menjalankan skrip pyspark:
Simpan skrip pyspark apa pun ke file lokal yang bernama
spark-job.py
. Misalnya, Anda dapat menggunakan contoh skrip pyspark.Upload file ke lokasi yang Anda pilih di bagian Sebelum memulai.
Menetapkan variabel Airflow
Contoh di bagian berikut ini menggunakan variabel Airflow. Anda dapat menetapkan nilai untuk variabel ini di Airflow, lalu kode DAG dapat mengakses nilai ini.
Contoh dalam tutorial ini menggunakan variabel Airflow berikut. Anda dapat menyetelnya sesuai kebutuhan, bergantung pada contoh yang Anda gunakan.
Tetapkan variabel Airflow berikut untuk digunakan dalam kode DAG Anda:
project_id
: Project ID.bucket_name
: URI bucket tempat file python utama beban kerja (spark-job.py
) berada. Anda memilih lokasi ini di bagian Sebelum memulai.phs_cluster
: Nama cluster Server Histori Persisten. Anda menetapkan variabel ini saat Membuat Server Histori Persisten.image_name
: nama dan tag image penampung kustom (image:tag
). Anda menetapkan variabel ini saat menggunakan image penampung kustom dengan DataprocCreateBatchOperator.metastore_cluster
: Nama layanan Dataproc Metastore. Anda menetapkan variabel ini saat menggunakan layanan Metastore Dataproc dengan DataprocCreateBatchOperator.region_name
: region tempat layanan Dataproc Metastore berada. Anda menetapkan variabel ini saat menggunakan layanan Metastore Dataproc dengan DataprocCreateBatchOperator.
Gunakan konsol Google Cloud dan UI Airflow untuk menetapkan setiap variabel Airflow
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik link Airflow untuk lingkungan Anda. UI Airflow akan terbuka.
Di UI Airflow, pilih Admin > Variabel.
Klik Add a new record.
Tentukan nama variabel di kolom Key, dan tetapkan nilainya di kolom Val.
Klik Save.
Membuat Server Histori Persisten
Gunakan Persistent History Server (PHS) untuk melihat file histori Spark dari workload batch Anda:
- Membuat Server Histori Persisten.
- Pastikan Anda menentukan nama cluster PHS dalam
phs_cluster
variabel Airflow.
DataprocCreateBatchOperator
DAG berikut memulai beban kerja Dataproc Serverless Batch.
Untuk mengetahui informasi selengkapnya tentang argumen DataprocCreateBatchOperator
, lihat kode sumber operator.
Untuk mengetahui informasi selengkapnya tentang atribut yang dapat Anda teruskan dalam parameter batch
dari DataprocCreateBatchOperator
, lihat
deskripsi class Batch.
Menggunakan gambar container kustom dengan DataprocCreateBatchOperator
Contoh berikut menunjukkan cara menggunakan image container kustom untuk menjalankan workload Anda. Anda dapat menggunakan container kustom, misalnya, untuk menambahkan dependensi Python yang tidak disediakan oleh image container default.
Untuk menggunakan image container kustom:
Membuat image container kustom dan menguploadnya ke Container Registry.
Tentukan gambar dalam
image_name
variabel Airflow.Gunakan DataprocCreateBatchOperator dengan gambar kustom:
Menggunakan layanan Metastore Dataproc dengan DataprocCreateBatchOperator
Untuk menggunakan layanan Metastore Dataproc dari DAG:
Periksa apakah layanan metastore Anda sudah dimulai.
Untuk mempelajari cara memulai layanan metastore, lihat Mengaktifkan dan menonaktifkan Metastore Dataproc.
Untuk mengetahui informasi selengkapnya tentang operator batch untuk membuat konfigurasi, lihat PeripheralsConfig.
Setelah layanan metastore aktif dan berjalan, tentukan namanya di variabel
metastore_cluster
dan regionnya di variabel Airflowregion_name
.Gunakan layanan metastore di DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Anda dapat menggunakan DataprocDeleteBatchOperator untuk menghapus batch berdasarkan ID batch beban kerja.
DataprocListBatchesOperator
DataprocDeleteBatchOperator mencantumkan batch yang ada dalam project_id dan region tertentu.
DataprocGetBatchOperator
DataprocGetBatchOperator mengambil satu workload batch tertentu.