Tips penyesuaian tugas Spark

Bagian berikut memberikan tips untuk membantu Anda menyesuaikan aplikasi Spark Dataproc.

Menggunakan cluster sementara

Saat menggunakan model cluster "ephemeral" Dataproc, Anda membuat cluster khusus untuk setiap tugas, dan saat tugas selesai, Anda menghapus cluster. Dengan model sementara, Anda dapat memperlakukan penyimpanan dan komputasi secara terpisah, menyimpan data input dan output tugas di Cloud Storage atau BigQuery, menggunakan cluster hanya untuk komputasi dan penyimpanan data sementara.

Permasalahan cluster persisten

Penggunaan cluster satu tugas efemeral menghindari masalah berikut dan potensi masalah yang terkait dengan penggunaan cluster "permanen" bersama dan yang berjalan lama:

  • Titik tunggal kegagalan: status error cluster bersama dapat menyebabkan semua tugas gagal, yang memblokir seluruh pipeline data. Investigasi dan pemulihan dari error dapat memerlukan waktu berjam-jam. Karena cluster sementara hanya menyimpan status dalam cluster sementara, saat error terjadi, cluster tersebut dapat dihapus dan dibuat ulang dengan cepat.
  • Kesulitan dalam mempertahankan dan memigrasikan status cluster di HDFS, MySQL, atau sistem file lokal
  • Pertentangan resource di antara tugas yang berdampak negatif pada SLO
  • Daemon layanan yang tidak responsif yang disebabkan oleh tekanan memori
  • Penumpukan log dan file sementara yang dapat melebihi kapasitas disk
  • Kegagalan penskalaan karena kehabisan stok zona cluster
  • Kurangnya dukungan untuk versi image cluster yang sudah tidak berlaku.

Manfaat cluster efemeral

Di sisi positifnya, cluster sementara memungkinkan Anda melakukan hal berikut:

  • Konfigurasikan izin IAM yang berbeda untuk tugas yang berbeda dengan akun layanan VM Dataproc yang berbeda.
  • Optimalkan konfigurasi hardware dan software cluster untuk setiap tugas, dengan mengubah konfigurasi cluster sesuai kebutuhan.
  • Upgrade versi image di cluster baru untuk mendapatkan patch keamanan, perbaikan bug, dan pengoptimalan terbaru.
  • Memecahkan masalah lebih cepat di cluster satu tugas yang terisolasi.
  • Hemat biaya dengan membayar hanya untuk waktu berjalan cluster sementara, bukan untuk waktu tidak ada aktivitas di antara tugas di cluster bersama.

Menggunakan Spark SQL

DataFrame API Spark SQL adalah pengoptimalan RDD API yang signifikan. Jika Anda berinteraksi dengan kode yang menggunakan RDD, pertimbangkan untuk membaca data sebagai DataFrame sebelum meneruskan RDD dalam kode. Dalam kode Java atau Scala, pertimbangkan untuk menggunakan Dataset API Spark SQL sebagai superset RDD dan DataFrame.

Menggunakan Apache Spark 3

Dataproc 2.0 menginstal Spark 3, yang mencakup fitur dan peningkatan performa berikut:

  • Dukungan GPU
  • Kemampuan untuk membaca file biner
  • Peningkatan performa
  • Pemangkasan Partisi Dinamis
  • Eksekusi kueri adaptif, yang mengoptimalkan tugas Spark secara real time

Menggunakan Alokasi Dinamis

Apache Spark menyertakan fitur Dynamic Allocation yang menskalakan jumlah eksekutor Spark pada pekerja dalam cluster. Fitur ini memungkinkan tugas menggunakan cluster Dataproc lengkap meskipun cluster diskalasikan. Fitur ini diaktifkan secara default di Dataproc (spark.dynamicAllocation.enabled disetel ke true). Lihat Alokasi Dinamis Spark untuk mengetahui informasi selengkapnya.

Menggunakan Penskalaan Otomatis Dataproc

Penskalaan Otomatis Dataproc secara dinamis menambahkan dan menghapus pekerja Dataproc dari cluster untuk membantu memastikan bahwa tugas Spark memiliki resource yang diperlukan untuk diselesaikan dengan cepat.

Praktik terbaik adalah mengonfigurasi kebijakan penskalaan otomatis agar hanya menskalakan pekerja sekunder.

Menggunakan Mode Fleksibilitas yang Disempurnakan Dataproc

Cluster dengan VM yang dapat di-preempt atau kebijakan penskalaan otomatis dapat menerima pengecualian FetchFailed saat pekerja di-preempt atau dihapus sebelum selesai menayangkan data shuffle ke reducer. Pengecualian ini dapat menyebabkan percobaan ulang tugas dan waktu penyelesaian tugas yang lebih lama.

Rekomendasi: Gunakan Mode Fleksibilitas yang Ditingkatkan Dataproc, yang tidak menyimpan data shuffle perantara di pekerja sekunder, sehingga pekerja sekunder dapat dihentikan atau diskalakan dengan aman.

Mengonfigurasi partisi dan pengacakan

Spark menyimpan data dalam partisi sementara di cluster. Jika aplikasi Anda mengelompokkan atau menggabungkan DataFrame, aplikasi akan mengacak data ke dalam partisi baru sesuai dengan pengelompokan dan konfigurasi tingkat rendah.

Partisi data secara signifikan memengaruhi performa aplikasi: terlalu sedikit partisi akan membatasi paralelisme tugas dan penggunaan resource cluster; terlalu banyak partisi akan memperlambat tugas karena pemrosesan dan pengurutan ulang partisi tambahan.

Mengonfigurasi partisi

Properti berikut mengatur jumlah dan ukuran partisi Anda:

  • spark.sql.files.maxPartitionBytes: ukuran maksimum partisi saat Anda membaca data dari Cloud Storage. Defaultnya adalah 128 MB, yang cukup besar untuk sebagian besar aplikasi yang memproses kurang dari 100 TB.

  • spark.sql.shuffle.partitions: jumlah partisi setelah melakukan pengacakan. Nilai defaultnya adalah 200, yang sesuai untuk cluster dengan total vCPU kurang dari 100. Rekomendasi: Tetapkan ini ke 3x jumlah vCPU di cluster Anda.

  • spark.default.parallelism: jumlah partisi yang ditampilkan setelah melakukan transformasi RDD yang memerlukan pengacakan, seperti join, reduceByKey, dan parallelize. Nilai default-nya adalah jumlah total vCPU di cluster Anda. Saat menggunakan RDD dalam tugas Spark, Anda dapat menetapkan jumlah ini ke 3x vCPU Anda

Membatasi jumlah file

Ada penurunan performa saat Spark membaca file kecil dalam jumlah besar. Menyimpan data dalam ukuran file yang lebih besar, misalnya, ukuran file dalam rentang 256 MB–512 MB. Demikian pula, batasi jumlah file output (untuk memaksa pengacakan, lihat Menghindari pengacakan yang tidak perlu).

Mengonfigurasi eksekusi kueri adaptif (Spark 3)

Eksekusi kueri adaptif (diaktifkan secara default di image Dataproc versi 2.0) memberikan peningkatan performa tugas Spark, termasuk:

Meskipun setelan konfigurasi default sudah sesuai untuk sebagian besar kasus penggunaan, menetapkan spark.sql.adaptive.advisoryPartitionSizeInBytes ke spark.sqlfiles.maxPartitionBytes (default 128 MB) dapat bermanfaat.

Menghindari pengacakan yang tidak perlu

Spark memungkinkan pengguna memicu pengacakan secara manual untuk menyeimbangkan kembali data mereka dengan fungsi repartition. Pengacakan mahal, jadi pengurutan ulang data harus digunakan dengan hati-hati. Menetapkan configurations partisi dengan tepat akan cukup untuk memungkinkan Spark mempartisi data Anda secara otomatis.

Pengecualian: Saat menulis data yang dipartisi kolom ke Cloud Storage, pemisahan ulang pada kolom tertentu akan menghindari penulisan banyak file kecil untuk mencapai waktu tulis yang lebih cepat.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Menyimpan data dalam Parquet atau Avro

Spark SQL secara default membaca dan menulis data dalam file Parquet yang dikompresi Snappy. Parquet menggunakan format file kolom yang efisien sehingga Spark hanya dapat membaca data yang diperlukan untuk menjalankan aplikasi. Ini adalah keuntungan penting saat bekerja dengan set data besar. Format kolom lainnya, seperti Apache ORC, juga berperforma baik.

Untuk data non-kolom, Apache Avro menyediakan format file baris biner yang efisien. Meskipun biasanya lebih lambat daripada Parquet, performa Avro lebih baik daripada format berbasis teks,seperti CSV atau JSON.

Mengoptimalkan ukuran disk

Throughput persistent disk diskalakan sesuai dengan ukuran disk, yang dapat memengaruhi performa tugas Spark karena tugas menulis metadata dan mengacak data ke disk. Saat menggunakan persistent disk standar, ukuran disk harus minimal 1 terabyte per pekerja (lihat Performa berdasarkan ukuran persistent disk).

Untuk memantau throughput disk pekerja di konsol Google Cloud:

  1. Klik nama cluster di halaman Cluster.
  2. Klik tab VM INSTANCES.
  3. Klik nama pekerja mana pun.
  4. Klik tab MONITORING, lalu scroll ke bawah ke Disk Throughput untuk melihat throughput pekerja.

Pertimbangan disk

Cluster Dataproc efemeral, yang tidak mendapatkan manfaat dari penyimpanan persisten, dapat menggunakan SSD lokal. SSD lokal secara fisik terhubung ke cluster dan memberikan throughput yang lebih tinggi daripada persistent disk (lihat Tabel performa). SSD Lokal tersedia dalam ukuran tetap sebesar 375 gigabyte, tetapi Anda dapat menambahkan beberapa SSD untuk meningkatkan performa.

SSD lokal tidak mempertahankan data setelah cluster dinonaktifkan. Jika memerlukan penyimpanan persisten, Anda dapat menggunakan persistent disk SSD, yang memberikan throughput yang lebih tinggi untuk ukurannya daripada persistent disk standar. Persistent disk SSD juga merupakan pilihan yang baik jika ukuran partisi akan lebih kecil dari 8 KB (tetapi, hindari partisi kecil).

Memasang GPU ke cluster

Spark 3 mendukung GPU. Gunakan GPU dengan tindakan inisialisasi RAPIDS untuk mempercepat tugas Spark menggunakan RAPIDS SQL Accelerator. Tindakan inisialisasi driver GPU untuk mengonfigurasi cluster dengan GPU.

Kegagalan dan perbaikan tugas umum

Kehabisan Memori

Contoh:

  • "Peneksekusi hilang"
  • "java.lang.OutOfMemoryError: GC overhead limit exceeded"
  • "Container dihentikan oleh YARN karena melebihi batas memori"

Kemungkinan perbaikan:

Kegagalan Pengambilan Acak

Contoh:

  • "FetchFailedException" (Error Spark)
  • "Gagal terhubung ke..." (Error Spark)
  • "Gagal mengambil" (error MapReduce)

Biasanya disebabkan oleh penghapusan pekerja yang terlalu dini yang masih memiliki data shuffle untuk ditayangkan.

Kemungkinan penyebab dan perbaikan:

  • VM pekerja preemptible diklaim kembali atau VM pekerja non-preemptible dihapus oleh autoscaler. Solusi: Gunakan Mode Fleksibilitas yang Ditingkatkan untuk membuat pekerja sekunder dapat dihentikan atau diskalakan dengan aman.
  • Eksekutor atau pemetaan error karena error OutOfMemory. Solusi: tingkatkan memori eksekutor atau pemetaan.
  • Layanan shuffle Spark mungkin kelebihan beban. Solusi: kurangi jumlah partisi tugas.

Node YARN TIDAK VALID

Contoh (dari log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Sering kali terkait dengan ruang disk yang tidak cukup untuk data shuffle. Diagnosis dengan melihat file log:

  • Buka halaman Clusters project Anda di konsol Google Cloud, lalu klik nama cluster.
  • Klik LIHAT LOG.
  • Filter log menurut hadoop-yarn-nodemanager.
  • Telusuri "UNHEALTHY".

Kemungkinan Perbaikan:

  • Cache pengguna disimpan di direktori yang ditentukan oleh properti yarn.nodemanager.local-dirs di yarn-site.xml file. File ini terletak di /etc/hadoop/conf/yarn-site.xml. Anda dapat memeriksa ruang kosong di jalur /hadoop/yarn/nm-local-dir, dan mengosongkan ruang dengan menghapus folder cache pengguna /hadoop/yarn/nm-local-dir/usercache.
  • Jika log melaporkan status "UNHEALTHY", buat ulang cluster Anda dengan ruang disk yang lebih besar, yang akan meningkatkan batas throughput.

Tugas gagal karena memori driver tidak memadai

Saat menjalankan tugas dalam mode cluster, tugas akan gagal jika ukuran memori node master secara signifikan lebih besar daripada ukuran memori node pekerja.

Contoh dari log driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Kemungkinan Perbaikan:

  • Tetapkan spark:spark.driver.memory kurang dari yarn:yarn.scheduler.maximum-allocation-mb.
  • Gunakan jenis mesin yang sama untuk node master dan pekerja.

Untuk informasi selengkapnya