Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini menjelaskan cara menggunakan Cloud Composer 2 untuk menjalankan Workload Dataproc Serverless aktif Google Cloud.
Contoh di bagian berikut menunjukkan cara menggunakan operator untuk mengelola workload batch Dataproc Serverless. Anda menggunakan operator ini di DAG yang membuat, menghapus, mencantumkan, dan mendapatkan workload batch Dataproc Serverless Spark:
Membuat DAG untuk operator yang berfungsi dengan workload Batch Serverless Dataproc:
Membuat DAG yang menggunakan penampung kustom, dan Metastore Dataproc.
Konfigurasi Server Histori Persistent untuk DAG ini.
Sebelum memulai
Aktifkan Dataproc API:
Konsol
Aktifkan API Dataproc.
gcloud
Aktifkan API Dataproc:
gcloud services enable dataproc.googleapis.com
Pilih lokasi untuk file workload Batch Anda. Anda dapat menggunakan salah satu opsi berikut:
- Membuat bucket Cloud Storage yang menyimpan file tersebut.
- Menggunakan bucket lingkungan Anda. Karena Anda tidak perlu menyinkronkan file ini
dengan Airflow, Anda dapat membuat subfolder terpisah di luar
/dags
atau/data
. Misalnya,/batches
. - Gunakan bucket yang sudah ada.
Menyiapkan file dan variabel Airflow
Bagian ini menunjukkan cara menyiapkan file dan mengonfigurasi variabel Airflow untuk tutorial ini.
Mengupload file workload Dataproc Serverless Spark ML ke bucket
Beban kerja dalam tutorial ini menjalankan skrip pyspark:
Simpan skrip pyspark ke file lokal bernama
spark-job.py
. Misalnya, Anda dapat menggunakan contoh skrip pyspark.Upload file ke lokasi yang dipilih di bagian Sebelum memulai.
Menyetel variable Airflow
Contoh di bagian berikut menggunakan variabel Airflow. Anda menetapkan nilai untuk variabel ini di Airflow, kode DAG Anda dapat mengakses nilai tersebut.
Contoh dalam tutorial ini menggunakan variabel Airflow berikut. Anda dapat menyetelnya sesuai kebutuhan, tergantung pada contoh yang Anda gunakan.
Tetapkan variabel Airflow berikut untuk digunakan dalam kode DAG Anda:
project_id
: Project ID.bucket_name
: URI bucket tempat file python utama beban kerja (spark-job.py
) ditemukan. Anda memilih lokasi ini di Sebelum memulai.phs_cluster
: Nama cluster Server Histori Persistent. Anda menetapkan variabel ini saat Anda Membuat Server Histori Persisten.image_name
: nama dan tag dari image container kustom (image:tag
). Anda mengatur variabel ini ketika Anda gunakan image container kustom dengan DataprocCreateBatchOperator.metastore_cluster
: Nama layanan Dataproc Metastore. Anda menetapkan variabel ini ketika menggunakan layanan Dataproc Metastore dengan DataprocCreateBatchOperator.region_name
: region tempat layanan Dataproc Metastore ditemukan. Anda menetapkan variabel ini ketika menggunakan layanan Dataproc Metastore dengan DataprocCreateBatchOperator.
Menggunakan Konsol Google Cloud dan UI Airflow untuk menetapkan setiap variabel Airflow
Di Konsol Google Cloud, buka halaman Environments.
Dalam daftar lingkungan, klik link Airflow untuk lingkungan fleksibel App Engine. UI Airflow akan terbuka.
Di UI Airflow, pilih Admin > Variabel.
Klik Add a new record.
Tentukan nama variabel di kolom Kunci, dan tetapkan nilai untuk di kolom Val.
Klik Simpan.
Membuat Server Histori Persisten
Gunakan Persistent History Server (PHS) untuk melihat file histori Spark batch Anda workload:
- Membuat Server Histori Persisten.
- Pastikan Anda telah menentukan nama cluster PHS di
phs_cluster
Variabel Airflow.
DataprocCreateBatchOperator
DAG berikut memulai workload Dataproc Serverless Batch.
Untuk mengetahui informasi selengkapnya tentang argumen DataprocCreateBatchOperator
, lihat
kode sumber operator.
Untuk informasi selengkapnya tentang atribut yang dapat Anda teruskan dalam batch
dari DataprocCreateBatchOperator
, lihat
deskripsi class Batch.
Menggunakan gambar container kustom dengan DataprocCreateBatchOperator
Contoh berikut menunjukkan cara menggunakan image container kustom untuk menjalankan sebagian besar workload standar dan berbasis cloud. Anda dapat menggunakan container kustom, misalnya, untuk menambahkan Python dependensi yang tidak disediakan oleh image container default.
Untuk menggunakan image container kustom:
Buat image container kustom dan upload ke Container Registry.
Tentukan gambar dalam variabel Airflow
image_name
.Gunakan DataprocCreateBatchOperator dengan gambar kustom Anda:
Menggunakan layanan Metastore Dataproc dengan DataprocCreateBatchOperator
Untuk menggunakan layanan Dataproc Metastore dari DAG:
Pastikan layanan metastore sudah dimulai.
Untuk mempelajari cara memulai layanan metastore, lihat Aktifkan dan nonaktifkan Dataproc Metastore.
Untuk informasi terperinci tentang operator batch untuk membuat konfigurasi, lihat PeripheralsConfig.
Setelah layanan metastore aktif dan berjalan, tentukan namanya di variabel
metastore_cluster
dan regionnya dalam variabel Airflowregion_name
.Gunakan layanan metastore di DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Anda dapat menggunakan DataprocDeleteBatchOperator untuk menghapus batch berdasarkan ID batch dari beban kerja.
DataprocListBatchesOperator
DataprocDeleteBatchOperator mencantumkan batch yang ada dalam project_id tertentu dan region.
DataprocGetBatchOperator
DataprocGetBatchOperator mengambil satu workload batch tertentu.