Mode Fleksibilitas yang Ditingkatkan (EFM) Dataproc mengelola data shuffle untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang sedang berjalan. EFM men-offload data shuffle dalam salah satu dari dua mode yang dapat dipilih pengguna:
Pengacakan pekerja utama. Pemetaan menulis data ke pekerja utama. Pekerja mengambil dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk, dan direkomendasikan untuk, tugas Spark.
Pengacakan HCFS (Hadoop Compatible File System). Pemetaan menulis data ke implementasi HCFS (HDFS secara default). Seperti mode pekerja utama, hanya pekerja utama yang berpartisipasi dalam implementasi HDFS dan HCFS (jika HCFS shuffle menggunakan Konektor Cloud Storage, data disimpan di luar cluster). Mode ini dapat menguntungkan tugas dengan jumlah data yang kecil, tetapi karena batasan penskalaan, mode ini tidak direkomendasikan untuk tugas yang lebih besar.
Karena kedua mode EFM tidak menyimpan data shuffle perantara di pekerja sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya menskalakan otomatis grup pekerja sekunder.
- Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMaster selesai).
- Mode Fleksibilitas yang Disempurnakan tidak direkomendasikan:
- di cluster yang hanya memiliki pekerja primer
- pada tugas streaming karena dapat memerlukan waktu hingga 30 menit setelah tugas selesai untuk membersihkan data shuffle perantara.
- Mode Fleksibilitas yang Ditingkatkan tidak didukung:
- saat penskalaan otomatis pekerja utama diaktifkan. Pada umumnya, pekerja utama akan terus menyimpan data shuffle yang tidak dimigrasikan secara otomatis. Menskalakan turun grup pekerja utama akan menghilangkan manfaat EFM.
- saat tugas Spark berjalan di cluster dengan penghentian tuntas diaktifkan. Penghentian layanan yang lancar dan EFM dapat bekerja secara berlawanan karena mekanisme penghentian layanan yang lancar YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.
Menggunakan Mode Fleksibilitas yang Ditingkatkan
Mode Fleksibilitas yang Disempurnakan dikonfigurasi per mesin eksekusi, dan harus dikonfigurasi selama pembuatan cluster.
Implementasi EFM Spark dikonfigurasi dengan properti cluster
dataproc:efm.spark.shuffle
. Nilai properti yang valid:primary-worker
untuk pengurutan ulang pekerja utama (direkomendasikan)hcfs
untuk shuffle berbasis HCFS. Mode ini tidak digunakan lagi dan hanya tersedia di cluster yang menjalankan image versi 1.5. Tidak direkomendasikan untuk alur kerja baru.
Implementasi MapReduce Hadoop dikonfigurasi dengan
dataproc:efm.mapreduce.shuffle
properti cluster. Nilai properti yang valid:hcfs
Contoh: Buat cluster dengan shuffle pekerja utama untuk Spark dan shuffle HCFS untuk MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Contoh Apache Spark
- Jalankan tugas WordCount terhadap teks Shakespeare publik menggunakan jar contoh Spark di cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Contoh MapReduce Apache Hadoop
Jalankan tugas teragen kecil untuk menghasilkan data input di Cloud Storage untuk tugas terasort berikutnya menggunakan jar contoh mapreduce di cluster EFM.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Jalankan tugas terasort pada data.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Mengonfigurasi SSD lokal untuk shuffle pekerja utama
Implementasi shuffle pekerja utama dan HDFS menulis data shuffle perantara ke disk yang terpasang di VM, dan mendapatkan manfaat dari throughput dan IOPS tambahan yang ditawarkan oleh SSD lokal. Untuk memfasilitasi alokasi resource, targetkan sasaran sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin pekerja utama.
Untuk memasang SSD lokal, teruskan flag --num-worker-local-ssds
ke perintah gcloud Dataproc clusters create.
Umumnya, Anda tidak memerlukan SSD lokal pada pekerja sekunder.
Menambahkan SSD lokal ke pekerja sekunder cluster (menggunakan
tanda --num-secondary-worker-local-ssds
)
sering kali kurang penting karena pekerja sekunder tidak menulis data shuffle secara lokal.
Namun, karena SSD lokal meningkatkan performa disk lokal, Anda dapat memutuskan untuk menambahkan
SSD lokal ke pekerja sekunder jika Anda mengharapkan
tugas menjadi terikat I/O
karena penggunaan disk lokal: tugas Anda menggunakan disk lokal yang signifikan untuk
ruang sementara atau partisi Anda
terlalu besar untuk muat di memori dan akan ditransfer ke disk.
Rasio pekerja sekunder
Karena pekerja sekunder menulis data shuffle ke pekerja utama, kluster Anda harus berisi pekerja utama dalam jumlah yang memadai dengan resource CPU, memori, dan disk yang memadai untuk mengakomodasi beban shuffle tugas Anda. Untuk
cluster penskalaan otomatis, agar grup utama tidak diskalakan dan menyebabkan
perilaku yang tidak diinginkan, tetapkan minInstances
ke nilai maxInstances
di
kebijakan penskalaan otomatis
untuk grup pekerja utama.
Jika Anda memiliki rasio pekerja sekunder terhadap pekerja primer yang tinggi (misalnya, 10:1), monitor penggunaan CPU, jaringan, dan disk pekerja primer untuk menentukan apakah pekerja tersebut kelebihan beban. Untuk melakukannya:
Buka halaman VM instances di konsol Google Cloud.
Klik kotak centang di sebelah kiri pekerja utama.
Klik tab MONITORING untuk melihat Penggunaan CPU pekerja utama, IOPS Disk, Byte Jaringan, dan metrik lainnya.
Jika pekerja utama kelebihan beban, pertimbangkan untuk melakukan penskalaan pekerja utama secara manual.
Mengubah ukuran grup pekerja utama
Grup pekerja utama dapat diskalakan dengan aman, tetapi menskalakan turun grup pekerja utama dapat berdampak negatif pada progres tugas. Operasi yang mendownscale
grup pekerja utama harus menggunakan
penghentian tuntas, yang diaktifkan dengan menetapkan tanda --graceful-decommission-timeout
.
Cluster yang diskalakan otomatis: Penskalaan grup pekerja utama dinonaktifkan di cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup pekerja utama di cluster yang diskalakan otomatis:
Nonaktifkan penskalaan otomatis.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Menskalakan grup utama.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Aktifkan kembali penskalaan otomatis:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Memantau penggunaan disk pekerja utama
Pekerja utama harus memiliki kapasitas disk yang memadai untuk data shuffle cluster.
Anda dapat memantaunya secara tidak langsung melalui metrik remaining HDFS capacity
.
Saat disk lokal terisi penuh, ruang tidak akan tersedia untuk HDFS, dan
kapasitas yang tersisa akan berkurang.
Secara default, saat penggunaan disk lokal pekerja utama melebihi 90% kapasitas, node akan ditandai sebagai UNHEALTHY di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau menskalakan kumpulan pekerja utama.
Konfigurasi lanjutan
Partisi dan paralelisme
Saat mengirimkan tugas MapReduce atau Spark, konfigurasikan tingkat pempartisi yang sesuai. Menentukan jumlah partisi input dan output untuk suatu tahap shuffle melibatkan kompromi di antara karakteristik performa yang berbeda. Sebaiknya bereksperimenlah dengan nilai yang sesuai untuk bentuk tugas Anda.
Partisi input
Partisi input MapReduce dan Spark ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses data senilai "ukuran blok".
Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh
spark.sql.files.maxPartitionBytes
. Pertimbangkan untuk meningkatkannya menjadi 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Untuk tugas MapReduce dan RDD Spark, ukuran partisi biasanya dikontrol dengan
fs.gs.block.size
, yang secara default ditetapkan ke 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Anda juga dapat menetapkan properti khususInputFormat
sepertimapreduce.input.fileinputformat.split.minsize
danmapreduce.input.fileinputformat.split.maxsize
- Untuk tugas MapReduce:
--properties fs.gs.block.size=1073741824
- Untuk RDD Spark:
--properties spark.hadoop.fs.gs.block.size=1073741824
- Untuk tugas MapReduce:
Partisi output
Jumlah tugas di tahap berikutnya dikontrol oleh beberapa properti. Untuk tugas yang lebih besar yang memproses lebih dari 1 TB, sebaiknya sediakan setidaknya 1 GB per partisi.
Untuk tugas MapReduce, jumlah partisi output dikontrol oleh
mapreduce.job.reduces
.Untuk Spark SQL, jumlah partisi output dikontrol oleh
spark.sql.shuffle.partitions
.Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan
spark.default.parallelism
.
Penyesuaian pengacakan untuk pengacakan pekerja utama
Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server shuffle Spark
berjalan sebagai bagian dari Pengelola Node. Nilai defaultnya adalah dua kali (2x) jumlah core pada
komputer (misalnya, 16 thread pada n1-highmem-8). Jika "Shuffle Read Blocked Time"
lebih besar dari 1 detik, dan pekerja utama belum mencapai batas jaringan, CPU, atau
disk, pertimbangkan untuk meningkatkan jumlah thread server shuffle.
Pada jenis mesin yang lebih besar, pertimbangkan untuk meningkatkan spark.shuffle.io.numConnectionsPerPeer
,
yang secara default ditetapkan ke 1. (Misalnya, tetapkan ke 5 koneksi per pasangan host).
Meningkatkan percobaan ulang
Jumlah maksimum percobaan yang diizinkan untuk master aplikasi, tugas, dan tahap dapat dikonfigurasi dengan menetapkan properti berikut:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Karena master aplikasi dan tugas lebih sering dihentikan di cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti di atas di cluster tersebut dapat membantu (perhatikan bahwa menggunakan EFM dengan Spark dan penghentian tuntas tidak didukung).
Mengonfigurasi HDFS untuk shuffle HCFS
Untuk meningkatkan performa shuffle besar, Anda dapat mengurangi pertentangan kunci di
NameNode dengan menetapkan dfs.namenode.fslock.fair=false
. Perhatikan bahwa tindakan ini berisiko
mengorbankan permintaan individual, tetapi dapat meningkatkan throughput di seluruh cluster.
Untuk meningkatkan performa NameNode lebih lanjut, Anda dapat memasang SSD lokal ke node master dengan menetapkan --num-master-local-ssds
. Anda juga dapat menambahkan SSD lokal ke pekerja utama untuk meningkatkan performa DataNode dengan menetapkan --num-worker-local-ssds
.
Sistem File yang Kompatibel dengan Hadoop lainnya untuk shuffle HCFS
Secara default, data shuffle HCFS EFM ditulis ke HDFS, tetapi Anda dapat menggunakan
Hadoop Compatible File System (HCFS) apa pun.
Misalnya, Anda dapat memutuskan untuk menulis shuffle ke Cloud Storage atau ke HDFS cluster lain. Untuk menentukan sistem file, Anda dapat mengarahkan
fs.defaultFS
ke sistem file target saat mengirimkan tugas ke cluster.
Penghentian tuntas YARN di cluster EFM
Penghentian Tuntas YARN dapat digunakan untuk menghapus node dengan cepat dengan dampak minimal pada aplikasi yang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian layanan yang wajar dapat ditetapkan di AutoscalingPolicy yang dilampirkan ke cluster EFM.
Peningkatan EFM MapReduce untuk penghentian tuntas
Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua penampung yang berjalan di node tersebut selesai. Sebagai perbandingan, node tidak dihapus di cluster Dataproc standar hingga aplikasi selesai.
Penghapusan node tidak menunggu master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung akan dijadwalkan ulang di node lain yang tidak dinonaktifkan. Progres tugas tidak hilang: master aplikasi baru cepat memulihkan status dari master aplikasi sebelumnya dengan membaca histori tugas.
Menggunakan penghentian tuntas di cluster EFM dengan MapReduce
Buat cluster EFM dengan jumlah pekerja primer dan sekunder yang sama.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Jalankan tugas mapreduce yang menghitung nilai pi menggunakan jar contoh mapreduce di cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
Saat tugas berjalan, turunkan skala cluster menggunakan penghentian tuntas.
Node akan dihapus dari cluster dengan cepat sebelum tugas selesai, sekaligus meminimalkan hilangnya progres tugas. Jeda sementara dalam progres tugas dapat terjadi karena:gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- Failover master aplikasi. Jika progres tugas turun ke 0%, lalu langsung melonjak ke nilai pra-penurunan, master aplikasi mungkin telah dihentikan dan master aplikasi baru memulihkan statusnya. Hal ini tidak akan memengaruhi progres tugas secara signifikan karena failover terjadi dengan cepat.
- Preemption VM. Karena HDFS hanya mempertahankan output tugas peta yang lengkap, bukan sebagian, jeda sementara dalam progres tugas dapat terjadi saat VM di-preempt saat mengerjakan tugas peta.
Untuk mempercepat penghapusan node, Anda dapat menskalakan cluster
tanpa penghentian layanan yang halus dengan menghapus
tanda --graceful-decommission-timeout
dalam contoh perintah
gcloud
sebelumnya. Progres tugas dari tugas peta yang telah selesai akan
dipertahankan, tetapi output tugas peta yang telah selesai sebagian akan hilang
(tugas peta akan dijalankan ulang).