Pemrosesan paralel

Pipeline dijalankan pada cluster mesin. Solusi ini mencapai throughput yang tinggi dengan membagi pekerjaan yang perlu diselesaikan, lalu menjalankan tugas secara paralel pada beberapa eksekutor yang tersebar di seluruh cluster. Secara umum, semakin besar jumlah bagian (juga disebut partisi), makin cepat pipeline dapat dijalankan. Tingkat paralelisme dalam pipeline ditentukan oleh sumber dan stage shuffle dalam pipeline.

Sumber

Pada awal setiap pengoperasian pipeline, setiap sumber dalam pipeline Anda menghitung data yang perlu dibaca, dan bagaimana data tersebut dapat dibagi menjadi beberapa bagian. Misalnya, pertimbangkan pipeline dasar yang membaca dari Cloud Storage, melakukan beberapa transformasi Wrangler, lalu menulis kembali ke Cloud Storage.

Pipeline dasar yang menunjukkan sumber Cloud Storage, transformasi Wrangler, dan sink Cloud Storage

Saat pipeline dimulai, sumber Cloud Storage memeriksa file input dan membaginya menjadi beberapa bagian berdasarkan ukuran file. Misalnya, satu file gigabyte dapat dipecah menjadi 100 bagian, yang ukurannya masing-masing berukuran 10 MB. Setiap eksekutor membaca data untuk bagian tersebut, menjalankan transformasi Wrangler, lalu menulis output ke file part.

Data yang dipartisi di Cloud Storage menjadi transformasi Wrangler paralel menjadi file bagian

Jika pipeline Anda berjalan lambat, salah satu hal pertama yang harus diperiksa adalah apakah sumber Anda membuat bagian yang cukup untuk memanfaatkan paralelisme sepenuhnya. Misalnya, beberapa jenis kompresi membuat file teks biasa tidak dapat dipisah. Jika membaca file yang telah dikompresi ke file gzip, Anda mungkin melihat bahwa pipeline berjalan jauh lebih lambat dibandingkan jika Anda membaca file yang tidak dikompresi, atau file yang dikompresi dengan BZIP (yang dapat dipisahkan). Demikian pula, jika Anda menggunakan sumber database dan telah mengonfigurasinya untuk menggunakan hanya satu bagian, sumber database akan berjalan jauh lebih lambat dibandingkan jika Anda mengonfigurasinya untuk menggunakan lebih banyak bagian.

Mengacak lagu

Jenis plugin tertentu menyebabkan data diacak di seluruh cluster. Hal ini terjadi saat data yang diproses oleh satu eksekutor perlu dikirim ke eksekutor lain untuk melakukan komputasi. Acak adalah operasi yang mahal karena melibatkan banyak I/O. Plugin yang menyebabkan data diacak semuanya akan muncul di bagian Analytics pada Pipeline Studio. Plugin ini mencakup plugin, seperti Group By, Deduplicate, Distinct, dan Joiner. Misalnya, tahap Group By ditambahkan ke pipeline dalam contoh sebelumnya.

Juga anggaplah data yang dibaca mewakili pembelian yang dilakukan di toko kelontong. Setiap kumpulan data berisi kolom item dan kolom num_purchased. Pada tahap Group By, kita mengonfigurasi pipeline untuk mengelompokkan kumpulan data di kolom item dan menghitung jumlah kolom num_purchased.

Saat pipeline berjalan, file input akan dibagi seperti yang dijelaskan sebelumnya. Setelah itu, setiap kumpulan data diacak di seluruh cluster sehingga setiap kumpulan data dengan item yang sama akan dimiliki oleh eksekutor yang sama.

Seperti dalam contoh sebelumnya, kumpulan data untuk pembelian apel awalnya tersebar di beberapa eksekutor. Untuk melakukan agregasi, semua data tersebut harus dikirim ke seluruh cluster ke eksekutor yang sama.

Sebagian besar plugin yang perlu diacak memungkinkan Anda menentukan jumlah partisi yang akan digunakan saat mengacak data. Cara ini mengontrol jumlah eksekutor yang digunakan untuk memproses data yang diacak.

Dalam contoh sebelumnya, jika jumlah partisi ditetapkan ke 2, setiap eksekutor akan menghitung agregat untuk dua item, bukan satu.

Perlu diperhatikan bahwa Anda dapat mengurangi paralelisme pipeline setelah tahap tersebut. Misalnya, pertimbangkan tampilan logis dari pipeline:

Jika sumber membagi data menjadi 500 partisi, tetapi Group By melakukan shuffle menggunakan 200 partisi, tingkat paralelisme maksimum setelah Group By turun dari 500 menjadi 200. Anda hanya memiliki 200, bukan 500 file bagian berbeda yang ditulis ke Cloud Storage.

Memilih partisi

Jika jumlah partisi terlalu rendah, Anda tidak akan menggunakan kapasitas penuh cluster Anda untuk memparalelkan sebanyak mungkin pekerjaan. Menetapkan partisi terlalu tinggi akan meningkatkan jumlah overhead yang tidak perlu. Secara umum, lebih baik menggunakan terlalu banyak partisi daripada terlalu sedikit. Overhead tambahan adalah sesuatu yang perlu dikhawatirkan jika pipeline Anda membutuhkan waktu beberapa menit untuk berjalan dan Anda mencoba mengurangi beberapa menit. Jika pipeline Anda membutuhkan waktu berjam-jam untuk berjalan, overhead umumnya bukan hal yang perlu Anda khawatirkan.

Cara yang berguna, tetapi terlalu sederhana, untuk menentukan jumlah partisi yang akan digunakan adalah dengan menetapkannya ke max(cluster CPUs, input records / 500,000). Dengan kata lain, ambil jumlah kumpulan data input dan bagi dengan 500.000. Jika jumlah tersebut lebih besar dari jumlah CPU cluster, gunakan jumlah tersebut untuk jumlah partisi. Jika tidak, gunakan jumlah CPU cluster. Misalnya, jika cluster Anda memiliki 100 CPU dan tahap acak diperkirakan memiliki 100 juta kumpulan data input, gunakan 200 partisi.

Jawaban yang lebih lengkap adalah pengacakan berfungsi paling baik jika data pengacakan perantara untuk setiap partisi dapat sepenuhnya masuk ke dalam memori eksekutor, sehingga tidak ada yang perlu ditambahkan ke disk. Spark mencadangkan kurang dari 30% memori eksekutor untuk menyimpan data acak. Jumlah tepatnya adalah (total memori - 300 MB) * 30%. Jika kita menganggap setiap eksekutor ditetapkan untuk menggunakan memori 2 GB, berarti setiap partisi tidak boleh menyimpan lebih dari (2 GB - 300 MB) * 30% = sekitar 500 MB data. Jika kita mengasumsikan setiap kumpulan data dikompresi hingga berukuran 1 KB, berarti (500 MB / partisi) / (1 KB / record) = 500.000 kumpulan data per partisi. Jika eksekutor menggunakan lebih banyak memori, atau kumpulan data Anda lebih kecil, Anda dapat menyesuaikan angka ini.

Kemiringan data

Perhatikan bahwa dalam contoh sebelumnya, pembelian untuk berbagai item didistribusikan secara merata. Artinya, ada tiga pembelian masing-masing untuk apel, pisang, wortel, dan telur. Mengacak kunci yang didistribusikan secara merata adalah jenis pengacakan yang paling berperforma tinggi, tetapi banyak set data tidak memiliki properti ini. Melanjutkan pembelian bahan makanan di contoh sebelumnya, Anda akan membeli telur lebih banyak daripada pembelian kartu pernikahan. Saat ada beberapa kunci acak yang jauh lebih umum daripada kunci lain, Anda akan menangani data yang condong. Data yang dimiringkan dapat berperforma jauh lebih buruk daripada data yang tidak dimiringkan karena jumlah pekerjaan yang tidak proporsional dilakukan oleh beberapa eksekutor. Hal ini menyebabkan subset partisi kecil menjadi jauh lebih besar daripada yang lain.

Dalam contoh ini, jumlah pembelian telur lima kali lebih banyak daripada pembelian melalui kartu, yang berarti penghitungan telur memerlukan waktu sekitar lima kali lebih lama untuk dihitung. Hal ini tidak terlalu penting ketika menangani 10 kumpulan data saja, alih-alih dua, tetapi membuat perbedaan besar ketika menangani lima miliar kumpulan data, bukan satu miliar. Saat Anda mengalami kecondongan data, jumlah partisi yang digunakan dalam acak tidak akan berdampak besar pada performa pipeline.

Anda dapat mengenali kecondongan data dengan memeriksa grafik untuk catatan {i>output<i} dari waktu ke waktu. Jika stage menghasilkan kumpulan data dengan kecepatan yang jauh lebih tinggi di awal proses pipeline, lalu tiba-tiba melambat, ini mungkin berarti data Anda condong ke arah tertentu.

Anda juga dapat mengenali kecondongan data dengan memeriksa penggunaan memori cluster dari waktu ke waktu. Jika cluster Anda mencapai kapasitasnya selama beberapa waktu, tetapi tiba-tiba memiliki penggunaan memori yang rendah untuk jangka waktu tertentu, hal ini juga merupakan tanda bahwa Anda mengalami kecondongan data.

Data yang miring paling signifikan memengaruhi performa saat penggabungan dilakukan. Ada beberapa teknik yang dapat digunakan untuk meningkatkan performa penggabungan miring. Untuk mengetahui informasi selengkapnya, lihat Pemrosesan paralel untuk operasi JOIN.

Penyesuaian adaptif untuk eksekusi

Untuk menyesuaikan eksekusi secara adaptif, tentukan rentang partisi yang akan digunakan, bukan nomor partisi yang tepat. Nomor partisi yang tepat, meskipun ditetapkan dalam konfigurasi pipeline, akan diabaikan saat eksekusi adaptif diaktifkan.

Jika Anda menggunakan cluster Dataproc efemeral, Cloud Data Fusion akan otomatis menetapkan konfigurasi yang tepat, tetapi untuk cluster Dataproc atau Hadoop statis, dua parameter konfigurasi berikutnya dapat ditetapkan:

  • spark.default.parallelism: tetapkan ke jumlah total vCore yang tersedia di cluster. Hal ini memastikan cluster Anda tidak kekurangan beban dan menentukan batas bawah untuk jumlah partisi.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: tetapkan menjadi 32x jumlah vCore yang tersedia di cluster. Ini menentukan batas atas untuk jumlah partisi.
  • Spark.sql.adaptive.enabled: untuk mengaktifkan pengoptimalan, tetapkan nilai ini ke true. Dataproc menyetelnya secara otomatis, tetapi jika menggunakan cluster Hadoop umum, Anda harus memastikan cluster tersebut sudah aktif .

Parameter ini dapat ditetapkan dalam konfigurasi mesin dari pipeline tertentu atau dalam properti cluster dari cluster Dataproc statis.

Langkah selanjutnya