Spark mendapatkan tips penyesuaian tugas

Bagian berikut memberikan tips untuk membantu Anda menyempurnakan aplikasi Dataproc Spark.

Menggunakan cluster efemeral

Saat menggunakan model cluster "efemeral" Dataproc, Anda membuat cluster khusus untuk setiap tugas, dan setelah tugas selesai, Anda menghapus cluster. Dengan model sementara, Anda dapat memperlakukan penyimpanan dan komputasi secara terpisah, sehingga menyimpan data input dan output tugas di Cloud Storage atau BigQuery, menggunakan cluster untuk komputasi dan penyimpanan data sementara saja.

Kesalahan cluster persisten

Menggunakan cluster satu tugas sementara akan menghindari perangkap dan potensi masalah berikut yang terkait dengan penggunaan cluster "persisten" bersama dan yang berjalan lama:

  • Titik tunggal kegagalan: status error cluster yang dibagikan dapat menyebabkan semua tugas gagal, sehingga memblokir seluruh pipeline data. Menyelidiki dan memulihkan kesalahan bisa memakan waktu berjam-jam. Karena cluster efemeral hanya mempertahankan status dalam cluster sementara, saat terjadi error, cluster dapat dihapus dan dibuat ulang dengan cepat.
  • Kesulitan dalam mempertahankan dan memigrasikan status cluster di HDFS, MySQL, atau sistem file lokal
  • Pertentangan resource di antara tugas yang secara negatif memengaruhi SLO
  • Daemon layanan tidak responsif yang disebabkan oleh tekanan memori
  • Penumpukan log dan file sementara yang dapat melebihi kapasitas disk
  • Kegagalan peningkatan skala karena kehabisan kuota zona cluster
  • Kurangnya dukungan untuk versi gambar cluster yang sudah tidak berlaku.

Manfaat cluster efemeral

Sisi positifnya, cluster sementara memungkinkan Anda melakukan hal berikut:

  • Mengonfigurasi izin IAM yang berbeda untuk tugas yang berbeda dengan akun layanan VM Dataproc yang berbeda.
  • Mengoptimalkan konfigurasi hardware dan software cluster untuk setiap tugas, dengan mengubah konfigurasi cluster sesuai kebutuhan.
  • Upgrade versi gambar di cluster baru untuk mendapatkan patch keamanan, perbaikan bug, dan pengoptimalan terbaru.
  • Memecahkan masalah lebih cepat di cluster tugas tunggal yang terisolasi.
  • Hemat biaya dengan membayar waktu berjalan cluster sementara saja, bukan waktu tidak ada aktivitas antartugas di cluster bersama.

Menggunakan Spark SQL

DataFrame API Spark SQL adalah pengoptimalan yang signifikan dari RDD API. Jika Anda berinteraksi dengan kode yang menggunakan RDD, pertimbangkan untuk membaca data sebagai DataFrame sebelum meneruskan RDD dalam kode. Dalam kode Java atau Scala, sebaiknya gunakan Dataset API Spark SQL sebagai superset RDD dan DataFrames.

Menggunakan Apache Spark 3

Dataproc 2.0 menginstal Spark 3, yang menyertakan fitur dan peningkatan performa berikut:

  • Dukungan GPU
  • Kemampuan untuk membaca file biner
  • Peningkatan performa
  • Pruning Partisi Dinamis
  • Eksekusi kueri adaptif, yang mengoptimalkan tugas Spark secara real time

Gunakan Alokasi Dinamis

Apache Spark menyertakan fitur Alokasi Dinamis yang menskalakan jumlah eksekutor Spark pada pekerja dalam satu cluster. Fitur ini memungkinkan tugas menggunakan cluster Dataproc lengkap bahkan saat skala cluster ditingkatkan. Fitur ini diaktifkan secara default di Dataproc (spark.dynamicAllocation.enabled ditetapkan ke true). Lihat Alokasi Dinamis Spark untuk informasi selengkapnya.

Menggunakan Penskalaan Otomatis Dataproc

Penskalaan otomatis Dataproc secara dinamis menambahkan dan menghapus pekerja Dataproc dari cluster untuk membantu memastikan bahwa tugas Spark memiliki resource yang diperlukan untuk diselesaikan dengan cepat.

Ini adalah praktik terbaik untuk mengonfigurasi kebijakan penskalaan otomatis agar hanya menskalakan pekerja sekunder.

Gunakan Mode Fleksibilitas yang Ditingkatkan Dataproc

Cluster dengan preemptible VM atau kebijakan penskalaan otomatis dapat menerima pengecualian FetchFailed saat pekerja di-preempt atau dihapus sebelum selesai menayangkan data acak ke pengurangan. Pengecualian ini dapat menyebabkan percobaan ulang tugas dan waktu penyelesaian tugas yang lebih lama.

Rekomendasi: Gunakan Mode Fleksibilitas yang Ditingkatkan Dataproc, yang tidak menyimpan data acak perantara pada pekerja sekunder, sehingga pekerja sekunder dapat di-preempt atau diperkecil dengan aman.

Konfigurasi partisi dan pengacakan

Spark menyimpan data dalam partisi sementara di cluster. Jika aplikasi Anda mengelompokkan atau menggabungkan DataFrames, aplikasi tersebut akan mengacak data ke dalam partisi baru sesuai dengan pengelompokan dan konfigurasi tingkat rendah.

Partisi data memengaruhi performa aplikasi secara signifikan: terlalu sedikit partisi membatasi paralelisme tugas dan penggunaan resource cluster; terlalu banyak partisi memperlambat tugas karena pemrosesan dan pengacakan partisi tambahan.

Mengonfigurasi partisi

Properti berikut mengatur jumlah dan ukuran partisi Anda:

  • spark.sql.files.maxPartitionBytes: ukuran maksimum partisi saat Anda membaca data dari Cloud Storage. Defaultnya adalah 128 MB, yang cukup besar untuk sebagian besar aplikasi yang memproses kurang dari 100 TB.

  • spark.sql.shuffle.partitions: jumlah partisi setelah melakukan pengacakan. Nilai defaultnya adalah 200, yang sesuai untuk cluster dengan total kurang dari 100 vCPU. Rekomendasi: Tetapkan jumlahnya ke 3x jumlah vCPU di cluster Anda.

  • spark.default.parallelism: jumlah partisi yang ditampilkan setelah melakukan transformasi RDD yang memerlukan acak, seperti join, reduceByKey, dan parallelize. Defaultnya adalah jumlah total vCPU dalam cluster Anda. Saat menggunakan RDD di tugas Spark, Anda dapat menetapkan jumlah ini menjadi 3x vCPU Anda

Membatasi jumlah file

Terjadi penurunan performa saat Spark membaca file kecil dalam jumlah besar. Simpan data dalam ukuran file yang lebih besar, misalnya, ukuran file dalam rentang 256 MB–512 MB. Demikian pula, batasi jumlah file output (untuk memaksa pengacakan, lihat Menghindari pengacakan yang tidak perlu).

Mengonfigurasi eksekusi kueri adaptif (Spark 3)

Eksekusi kueri adaptif (diaktifkan secara default di gambar Dataproc versi 2.0) memberikan peningkatan performa tugas Spark, termasuk:

Meskipun setelan konfigurasi default adalah setelan yang berlaku untuk sebagian besar kasus penggunaan, menetapkan spark.sql.adaptive.advisoryPartitionSizeInBytes ke spark.sqlfiles.maxPartitionBytes (default 128 MB) dapat bermanfaat.

Hindari acak yang tidak perlu

Dengan Spark, pengguna dapat memicu shuffle secara manual untuk menyeimbangkan kembali data mereka dengan fungsi repartition. Pengacakan data itu mahal, jadi reshuffling data harus digunakan dengan hati-hati. Menyetel konfigurasi partisi secara tepat harus memadai agar Spark dapat mempartisi data Anda secara otomatis.

Pengecualian:Saat menulis data yang dipartisi kolom ke Cloud Storage, partisi ulang pada kolom tertentu akan menghindari penulisan banyak file kecil untuk mencapai waktu penulisan yang lebih cepat.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Menyimpan data di Parquet atau Avro

Spark SQL ditetapkan secara default ke pembacaan dan penulisan data dalam file Parquet terkompresi Snappy. Parquet menggunakan format file berdasarkan kolom yang efisien yang memungkinkan Spark hanya membaca data yang diperlukan untuk menjalankan suatu aplikasi. Ini adalah keuntungan penting saat bekerja dengan {i>dataset<i} besar. Format kolom lainnya, seperti Apache ORC, juga berperforma baik.

Untuk data non-kolom, Apache Avro menyediakan format file baris biner yang efisien. Meski biasanya lebih lambat daripada Parquet, performa Avro lebih baik daripada format berbasis teks,seperti CSV atau JSON.

Mengoptimalkan ukuran disk

Throughput persistent disk diskalakan dengan ukuran disk, yang dapat memengaruhi performa tugas Spark karena tugas menulis metadata dan mengacak data ke disk. Saat menggunakan persistent disk standar, ukuran disk minimal harus 1 terabyte per pekerja (lihat Performa berdasarkan ukuran persistent disk).

Untuk memantau throughput disk pekerja di Konsol Google Cloud:

  1. Klik nama cluster di halaman Clusters.
  2. Klik tab VM INSTANCES.
  3. Klik nama pekerja mana pun.
  4. Klik tab PEMANTAUAN, lalu scroll ke bawah ke Disk Throughput untuk melihat throughput pekerja.

Pertimbangan disk

Cluster Dataproc efemeral, yang tidak memanfaatkan penyimpanan persisten. dapat menggunakan SSD lokal. SSD lokal terpasang secara fisik ke cluster dan memberikan throughput yang lebih tinggi daripada persistent disk (lihat tabel Performa). SSD lokal tersedia dengan ukuran tetap, yaitu 375 gigabyte, tetapi Anda dapat menambahkan beberapa SSD untuk meningkatkan performa.

SSD lokal tidak mempertahankan data setelah cluster dinonaktifkan. Jika membutuhkan penyimpanan persisten, Anda dapat menggunakan persistent disk SSD, yang memberikan throughput yang lebih tinggi untuk ukurannya daripada persistent disk standar. Persistent disk SSD juga merupakan pilihan yang baik jika ukuran partisi lebih kecil dari 8 KB (tetapi, hindari pariti kecil).

Memasang GPU ke cluster Anda

Spark 3 mendukung GPU. Gunakan GPU dengan tindakan inisialisasi RAPIDS untuk mempercepat tugas Spark menggunakan Akselerator SQL RAPIDS. Tindakan inisialisasi driver GPU untuk mengonfigurasi cluster dengan GPU.

Kegagalan dan perbaikan tugas umum

Kehabisan Memori

Contoh:

  • "Ekonsekutor hilang"
  • "java.lang.OutOfMemoryError: Batas overhead GC terlampaui"
  • "Container dimatikan oleh YARN karena melebihi batas memori"

Kemungkinan perbaikan:

Kegagalan Pengambilan Acak

Contoh:

  • "FetchFailedException" (error Spark)
  • "Gagal tersambung ke..." (Error Spark)
  • "Gagal mengambil" (error MapReduce)

Biasanya disebabkan oleh penghapusan pekerja dini yang masih memiliki data acak untuk ditayangkan.

Kemungkinan penyebab dan cara mengatasinya:

  • VM pekerja yang dapat dihentikan telah diklaim ulang atau VM pekerja non-preemptible dihapus oleh penskalaan otomatis. Solusi: Gunakan Mode Fleksibilitas yang Ditingkatkan untuk membuat pekerja sekunder dapat dihentikan atau skalabel dengan aman.
  • Executor atau mapper mengalami error karena error OutOfMemory. Solusi: tingkatkan memori eksekutor atau mapper.
  • Layanan shuffle Spark mungkin kelebihan beban. Solusi: kurangi jumlah partisi tugas.

Node YARN TIDAK SEHAT

Contoh (dari log YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Sering kali terkait dengan kapasitas disk yang tidak cukup untuk mengacak data. Diagnosis dengan melihat file log:

  • Buka halaman Clusters project Anda di Konsol Google Cloud, lalu klik nama cluster.
  • Klik LIHAT LOG.
  • Filter log menurut hadoop-yarn-nodemanager.
  • Telusuri "UNHEALTHY".

Kemungkinan Perbaikan:

  • Cache pengguna disimpan di direktori yang ditentukan oleh properti yarn.nodemanager.local-dirs di yarn-site.xml file. File ini ada di /etc/hadoop/conf/yarn-site.xml. Anda dapat memeriksa ruang kosong di jalur /hadoop/yarn/nm-local-dir, dan mengosongkan ruang dengan menghapus folder cache pengguna /hadoop/yarn/nm-local-dir/usercache.
  • Jika log melaporkan status "TIDAK SEHAT", buat ulang cluster Anda dengan kapasitas disk yang lebih besar, yang akan meningkatkan batas throughput.

Tugas gagal karena memori driver tidak cukup

Saat menjalankan tugas dalam mode cluster, tugas akan gagal jika ukuran memori node master secara signifikan lebih besar daripada ukuran memori node pekerja.

Contoh dari log driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Kemungkinan Perbaikan:

  • Tetapkan spark:spark.driver.memory lebih rendah dari yarn:yarn.scheduler.maximum-allocation-mb.
  • Gunakan jenis mesin yang sama untuk node master dan pekerja.

Untuk informasi selengkapnya