Bagian berikut memberikan tips untuk membantu Anda menyesuaikan aplikasi Spark Dataproc.
Menggunakan cluster sementara
Saat menggunakan model cluster "ephemeral" Dataproc, Anda membuat cluster khusus untuk setiap tugas, dan saat tugas selesai, Anda menghapus cluster. Dengan model sementara, Anda dapat memperlakukan penyimpanan dan komputasi secara terpisah, menyimpan data input dan output tugas di Cloud Storage atau BigQuery, menggunakan cluster hanya untuk komputasi dan penyimpanan data sementara.
Permasalahan cluster persisten
Penggunaan cluster satu tugas efemeral menghindari masalah berikut dan potensi masalah yang terkait dengan penggunaan cluster "permanen" bersama dan yang berjalan lama:
- Titik tunggal kegagalan: status error cluster bersama dapat menyebabkan semua tugas gagal, yang memblokir seluruh pipeline data. Investigasi dan pemulihan dari error dapat memerlukan waktu berjam-jam. Karena cluster sementara hanya menyimpan status dalam cluster sementara, saat error terjadi, cluster tersebut dapat dihapus dan dibuat ulang dengan cepat.
- Kesulitan dalam mempertahankan dan memigrasikan status cluster di HDFS, MySQL, atau sistem file lokal
- Pertentangan resource di antara tugas yang berdampak negatif pada SLO
- Daemon layanan yang tidak responsif yang disebabkan oleh tekanan memori
- Penumpukan log dan file sementara yang dapat melebihi kapasitas disk
- Kegagalan penskalaan karena kehabisan stok zona cluster
- Kurangnya dukungan untuk versi image cluster yang sudah tidak berlaku.
Manfaat cluster efemeral
Di sisi positifnya, cluster sementara memungkinkan Anda melakukan hal berikut:
- Konfigurasikan izin IAM yang berbeda untuk tugas yang berbeda dengan akun layanan VM Dataproc yang berbeda.
- Optimalkan konfigurasi hardware dan software cluster untuk setiap tugas, dengan mengubah konfigurasi cluster sesuai kebutuhan.
- Upgrade versi image di cluster baru untuk mendapatkan patch keamanan, perbaikan bug, dan pengoptimalan terbaru.
- Memecahkan masalah lebih cepat di cluster satu tugas yang terisolasi.
- Hemat biaya dengan membayar hanya untuk waktu berjalan cluster sementara, bukan untuk waktu tidak ada aktivitas di antara tugas di cluster bersama.
Menggunakan Spark SQL
DataFrame API Spark SQL adalah pengoptimalan RDD API yang signifikan. Jika Anda berinteraksi dengan kode yang menggunakan RDD, pertimbangkan untuk membaca data sebagai DataFrame sebelum meneruskan RDD dalam kode. Dalam kode Java atau Scala, pertimbangkan untuk menggunakan Dataset API Spark SQL sebagai superset RDD dan DataFrame.
Menggunakan Apache Spark 3
Dataproc 2.0 menginstal Spark 3, yang mencakup fitur dan peningkatan performa berikut:
- Dukungan GPU
- Kemampuan untuk membaca file biner
- Peningkatan performa
- Pemangkasan Partisi Dinamis
- Eksekusi kueri adaptif, yang mengoptimalkan tugas Spark secara real time
Menggunakan Alokasi Dinamis
Apache Spark menyertakan fitur Dynamic Allocation yang menskalakan
jumlah eksekutor Spark pada pekerja dalam cluster. Fitur ini memungkinkan
tugas menggunakan cluster Dataproc lengkap meskipun cluster
diskalasikan. Fitur ini diaktifkan secara default di Dataproc
(spark.dynamicAllocation.enabled
disetel ke true
). Lihat
Alokasi Dinamis Spark
untuk mengetahui informasi selengkapnya.
Menggunakan Penskalaan Otomatis Dataproc
Penskalaan Otomatis Dataproc secara dinamis menambahkan dan menghapus pekerja Dataproc dari cluster untuk membantu memastikan bahwa tugas Spark memiliki resource yang diperlukan untuk diselesaikan dengan cepat.
Praktik terbaik adalah mengonfigurasi kebijakan penskalaan otomatis agar hanya menskalakan pekerja sekunder.
Menggunakan Mode Fleksibilitas yang Disempurnakan Dataproc
Cluster dengan VM yang dapat di-preempt atau kebijakan penskalaan otomatis dapat menerima pengecualian FetchFailed saat pekerja di-preempt atau dihapus sebelum selesai menayangkan data shuffle ke reducer. Pengecualian ini dapat menyebabkan percobaan ulang tugas dan waktu penyelesaian tugas yang lebih lama.
Rekomendasi: Gunakan Mode Fleksibilitas yang Ditingkatkan Dataproc, yang tidak menyimpan data shuffle perantara di pekerja sekunder, sehingga pekerja sekunder dapat dihentikan atau diskalakan dengan aman.
Mengonfigurasi partisi dan pengacakan
Spark menyimpan data dalam partisi sementara di cluster. Jika aplikasi Anda mengelompokkan atau menggabungkan DataFrame, aplikasi akan mengacak data ke dalam partisi baru sesuai dengan pengelompokan dan konfigurasi tingkat rendah.
Partisi data secara signifikan memengaruhi performa aplikasi: terlalu sedikit partisi akan membatasi paralelisme tugas dan penggunaan resource cluster; terlalu banyak partisi akan memperlambat tugas karena pemrosesan dan pengurutan ulang partisi tambahan.
Mengonfigurasi partisi
Properti berikut mengatur jumlah dan ukuran partisi Anda:
spark.sql.files.maxPartitionBytes
: ukuran maksimum partisi saat Anda membaca data dari Cloud Storage. Defaultnya adalah 128 MB, yang cukup besar untuk sebagian besar aplikasi yang memproses kurang dari 100 TB.spark.sql.shuffle.partitions
: jumlah partisi setelah melakukan pengacakan. Nilai defaultnya adalah 200, yang sesuai untuk cluster dengan jumlah vCPU kurang dari 100. Rekomendasi: Tetapkan ini ke 3x jumlah vCPU di cluster Anda.spark.default.parallelism
: jumlah partisi yang ditampilkan setelah melakukan transformasi RDD yang memerlukan pengacakan, sepertijoin
,reduceByKey
, danparallelize
. Nilai default-nya adalah jumlah total vCPU di cluster Anda. Saat menggunakan RDD dalam tugas Spark, Anda dapat menetapkan jumlah ini ke 3x vCPU Anda
Membatasi jumlah file
Ada penurunan performa saat Spark membaca file kecil dalam jumlah besar. Menyimpan data dalam ukuran file yang lebih besar, misalnya, ukuran file dalam rentang 256 MB–512 MB. Demikian pula, batasi jumlah file output (untuk memaksa pengacakan, lihat Menghindari pengacakan yang tidak perlu).
Mengonfigurasi eksekusi kueri adaptif (Spark 3)
Eksekusi kueri adaptif (diaktifkan secara default di image Dataproc versi 2.0) memberikan peningkatan performa tugas Spark, termasuk:
- Menggabungkan partisi setelah mengacak
- Mengonversi join sort-merge menjadi join broadcast
- Pengoptimalan untuk join skew.
Meskipun setelan konfigurasi default sudah sesuai untuk sebagian besar kasus penggunaan,
menetapkan spark.sql.adaptive.advisoryPartitionSizeInBytes
ke
spark.sqlfiles.maxPartitionBytes
(default 128 MB) dapat bermanfaat.
Menghindari pengacakan yang tidak perlu
Spark memungkinkan pengguna memicu pengacakan secara manual untuk menyeimbangkan kembali data mereka dengan
fungsi repartition
. Pengacakan mahal, jadi pengurutan ulang data harus
digunakan dengan hati-hati. Menetapkan konfigurasi
partisi dengan tepat akan cukup untuk memungkinkan Spark mempartisi
data Anda secara otomatis.
Pengecualian: Saat menulis data yang dipartisi kolom ke Cloud Storage, pemisahan ulang pada kolom tertentu akan menghindari penulisan banyak file kecil untuk mencapai waktu tulis yang lebih cepat.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Menyimpan data dalam Parquet atau Avro
Spark SQL secara default membaca dan menulis data dalam file Parquet yang dikompresi Snappy. Parquet memiliki format file kolom yang efisien sehingga Spark hanya dapat membaca data yang diperlukan untuk menjalankan aplikasi. Ini adalah keuntungan penting saat bekerja dengan set data besar. Format kolom lainnya, seperti Apache ORC, juga berperforma baik.
Untuk data non-kolom, Apache Avro menyediakan format file baris biner yang efisien. Meskipun biasanya lebih lambat daripada Parquet, performa Avro lebih baik daripada format berbasis teks,seperti CSV atau JSON.
Mengoptimalkan ukuran disk
Throughput persistent disk diskalakan sesuai dengan ukuran disk, yang dapat memengaruhi performa tugas Spark karena tugas menulis metadata dan mengacak data ke disk. Saat menggunakan persistent disk standar, ukuran disk harus minimal 1 terabyte per pekerja (lihat Performa berdasarkan ukuran persistent disk).
Untuk memantau throughput disk pekerja di konsol Google Cloud:
- Klik nama cluster di halaman Cluster.
- Klik tab VM INSTANCES.
- Klik nama pekerja mana pun.
- Klik tab MONITORING, lalu scroll ke bawah ke Disk Throughput untuk melihat throughput pekerja.
Pertimbangan disk
Cluster Dataproc efemeral, yang tidak mendapatkan manfaat dari penyimpanan persisten, dapat menggunakan SSD lokal. SSD lokal secara fisik terhubung ke cluster dan memberikan throughput yang lebih tinggi daripada persistent disk (lihat Tabel performa). SSD Lokal tersedia dalam ukuran tetap sebesar 375 gigabyte, tetapi Anda dapat menambahkan beberapa SSD untuk meningkatkan performa.
SSD lokal tidak mempertahankan data setelah cluster dinonaktifkan. Jika memerlukan penyimpanan persisten, Anda dapat menggunakan persistent disk SSD, yang memberikan throughput yang lebih tinggi untuk ukurannya daripada persistent disk standar. Persistent disk SSD juga merupakan pilihan yang baik jika ukuran partisi akan lebih kecil dari 8 KB (tetapi, hindari partisi kecil).
Memasang GPU ke cluster
Spark 3 mendukung GPU. Gunakan GPU dengan tindakan inisialisasi RAPIDS untuk mempercepat tugas Spark menggunakan RAPIDS SQL Accelerator. Tindakan inisialisasi driver GPU untuk mengonfigurasi cluster dengan GPU.
Kegagalan dan perbaikan tugas umum
Kehabisan Memori
Contoh:
- "Peneksekusi hilang"
- "java.lang.OutOfMemoryError: GC overhead limit exceeded"
- "Container dihentikan oleh YARN karena melebihi batas memori"
Kemungkinan perbaikan:
- Jika menggunakan PySpark, naikkan
spark.executor.memoryOverhead
dan turunkanspark.executor.memory
. - Gunakan jenis mesin bermemori tinggi.
- Gunakan partisi yang lebih kecil.
Kegagalan Pengambilan Acak
Contoh:
- "FetchFailedException" (error Spark)
- "Gagal terhubung ke..." (Error Spark)
- "Gagal mengambil" (error MapReduce)
Biasanya disebabkan oleh penghapusan pekerja yang terlalu dini yang masih memiliki data shuffle untuk ditayangkan.
Kemungkinan penyebab dan perbaikan:
- VM pekerja preemptible diklaim kembali atau VM pekerja non-preemptible dihapus oleh autoscaler. Solusi: Gunakan Mode Fleksibilitas yang Ditingkatkan untuk membuat pekerja sekunder dapat dihentikan atau diskalakan dengan aman.
- Eksekutor atau pemetaan error karena error OutOfMemory. Solusi: tingkatkan memori eksekutor atau pemetaan.
- Layanan shuffle Spark mungkin kelebihan beban. Solusi: kurangi jumlah partisi tugas.
Node YARN TIDAK VALID
Contoh (dari log YARN):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Sering kali terkait dengan ruang disk yang tidak cukup untuk data shuffle. Diagnosis dengan melihat file log:
- Buka halaman Clusters project Anda di konsol Google Cloud, lalu klik nama cluster.
- Klik LIHAT LOG.
- Filter log menurut
hadoop-yarn-nodemanager
. - Telusuri "UNHEALTHY".
Kemungkinan Perbaikan:
- Cache pengguna disimpan di direktori yang ditentukan oleh
properti
yarn.nodemanager.local-dirs
diyarn-site.xml file
. File ini terletak di/etc/hadoop/conf/yarn-site.xml
. Anda dapat memeriksa ruang kosong di jalur/hadoop/yarn/nm-local-dir
, dan mengosongkan ruang dengan menghapus folder cache pengguna/hadoop/yarn/nm-local-dir/usercache
. - Jika log melaporkan status "UNHEALTHY", buat ulang cluster Anda dengan ruang disk yang lebih besar, yang akan meningkatkan batas throughput.
Tugas gagal karena memori driver tidak mencukupi
Saat menjalankan tugas dalam mode cluster, tugas akan gagal jika ukuran memori node master secara signifikan lebih besar daripada ukuran memori node pekerja.
Contoh dari log driver:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Kemungkinan Perbaikan:
- Tetapkan
spark:spark.driver.memory
kurang dariyarn:yarn.scheduler.maximum-allocation-mb
. - Gunakan jenis mesin yang sama untuk node master dan pekerja.