Pipeline dijalankan di cluster mesin. Executor ini mencapai throughput tinggi dengan membagi pekerjaan yang perlu dilakukan, lalu menjalankan pekerjaan secara paralel di beberapa eksekutor yang tersebar di seluruh cluster. Secara umum, semakin banyak jumlah pemisahan (juga disebut partisi), semakin cepat pipeline dapat dijalankan. Tingkat paralelisme dalam pipeline Anda ditentukan oleh sumber dan tahap shuffle dalam pipeline.
Sumber
Di awal setiap operasi pipeline, setiap sumber dalam pipeline Anda menghitung data yang perlu dibaca, dan cara data tersebut dapat dibagi menjadi beberapa bagian. Misalnya, pertimbangkan pipeline dasar yang membaca dari Cloud Storage, melakukan beberapa transformasi Wrangler, lalu menulis kembali ke Cloud Storage.
Saat pipeline dimulai, sumber Cloud Storage akan memeriksa file input dan membaginya menjadi beberapa bagian berdasarkan ukuran file. Misalnya, file satu gigabyte dapat dibagi menjadi 100 bagian, dengan ukuran masing-masing 10 MB. Setiap eksekutor membaca data untuk bagian tersebut, menjalankan transformasi Wrangler, lalu menulis output ke file part.
Jika pipeline berjalan lambat, salah satu hal pertama yang harus diperiksa adalah apakah sumber Anda membuat pemisahan yang cukup untuk memanfaatkan paralelisme sepenuhnya. Misalnya, beberapa jenis kompresi membuat file teks biasa tidak dapat dibagi. Jika membaca file yang telah di-gzip, Anda mungkin melihat bahwa pipeline berjalan jauh lebih lambat daripada jika Anda membaca file yang tidak dikompresi, atau file yang dikompresi dengan BZIP (yang dapat dibagi). Demikian pula, jika Anda menggunakan sumber database dan telah mengonfigurasinya untuk hanya menggunakan satu pemisahan, pemisahan tersebut akan berjalan jauh lebih lambat daripada jika Anda mengonfigurasinya untuk menggunakan lebih banyak pemisahan.
Mengacak
Jenis plugin tertentu menyebabkan data diacak di seluruh cluster. Hal ini terjadi saat data yang diproses oleh satu eksekutor perlu dikirim ke eksekutor lain untuk melakukan komputasi. Pengacakan adalah operasi yang mahal karena melibatkan banyak I/O. Semua plugin yang menyebabkan data diacak akan muncul di bagian Analytics di Pipeline Studio. Hal ini mencakup plugin, seperti Group By, Deduplicate, Distinct, dan Joiner. Misalnya, anggap saja tahap Group By ditambahkan ke pipeline dalam contoh sebelumnya.
Selain itu, anggaplah data yang dibaca mewakili pembelian yang dilakukan di toko bahan makanan.
Setiap kumpulan data berisi kolom item
dan kolom num_purchased
. Pada tahap Kelompokkan Menurut, kita mengonfigurasi pipeline untuk mengelompokkan data di kolom item
dan
menghitung jumlah kolom num_purchased
.
Saat pipeline berjalan, file input akan dibagi seperti yang dijelaskan sebelumnya. Setelah itu, setiap data diacak di seluruh cluster sehingga setiap data dengan item yang sama adalah milik eksekutor yang sama.
Seperti yang diilustrasikan dalam contoh sebelumnya, data untuk pembelian apel awalnya tersebar di beberapa eksekutor. Untuk melakukan agregasi, semua data tersebut harus dikirim ke seluruh cluster ke eksekutor yang sama.
Sebagian besar plugin yang memerlukan pengacakan memungkinkan Anda menentukan jumlah partisi yang akan digunakan saat mengacak data. Ini mengontrol jumlah eksekutor yang digunakan untuk memproses data yang diacak.
Dalam contoh sebelumnya, jika jumlah partisi ditetapkan ke 2
, setiap eksekutor akan menghitung agregat untuk dua item, bukan satu.
Perhatikan bahwa Anda dapat mengurangi paralelisme pipeline setelah tahap tersebut. Misalnya, pertimbangkan tampilan logis pipeline:
Jika sumber membagi data di 500 partisi, tetapi Pengelompokan Urutan Mengacak menggunakan 200 partisi, tingkat paralelisme maksimum setelah Pengelompokan Urutan Mengacak akan turun dari 500 menjadi 200. Daripada 500 file bagian yang berbeda yang ditulis ke Cloud Storage, Anda hanya memiliki 200 file.
Memilih partisi
Jika jumlah partisi terlalu rendah, Anda tidak akan menggunakan kapasitas penuh cluster untuk melakukan paralelisasi sebanyak mungkin. Menetapkan partisi terlalu tinggi akan meningkatkan jumlah overhead yang tidak perlu. Secara umum, lebih baik menggunakan terlalu banyak partisi daripada terlalu sedikit. Overhead tambahan adalah hal yang perlu dikhawatirkan jika pipeline Anda memerlukan waktu beberapa menit untuk dijalankan dan Anda mencoba menghemat beberapa menit. Jika pipeline Anda memerlukan waktu berjam-jam untuk dijalankan, overhead umumnya tidak perlu Anda khawatirkan.
Cara yang berguna, tetapi terlalu sederhana, untuk menentukan jumlah partisi yang akan digunakan adalah menetapkannya ke max(cluster CPUs, input records / 500,000)
. Dengan kata lain,
ambil jumlah data input dan bagi dengan 500.000. Jika jumlahnya
lebih besar dari jumlah CPU cluster, gunakan jumlah tersebut untuk jumlah partisi.
Jika tidak, gunakan jumlah CPU cluster. Misalnya, jika cluster Anda memiliki
100 CPU dan tahap shuffle diperkirakan memiliki 100 juta data
input, gunakan 200 partisi.
Jawaban yang lebih lengkap adalah shuffle berperforma terbaik saat data shuffle perantara untuk setiap partisi dapat muat sepenuhnya di memori eksekutor sehingga tidak ada yang perlu ditransfer ke disk. Spark mencadangkan hampir 30% memori eksekutor untuk menyimpan data shuffle. Jumlah pastinya adalah (total memori - 300 MB) * 30%. Jika kita mengasumsikan setiap eksekutor disetel untuk menggunakan memori 2 GB, artinya setiap partisi tidak boleh menyimpan lebih dari (2 GB - 300 MB) * 30% = sekitar 500 MB data. Jika kita mengasumsikan setiap kumpulan data dikompresi hingga ukuran 1 KB, artinya (500 MB / partisi) / (1 KB / kumpulan data) = 500.000 kumpulan data per partisi. Jika eksekutor menggunakan lebih banyak memori, atau data Anda lebih kecil, Anda dapat menyesuaikan jumlah ini.
Kecondongan data
Perhatikan bahwa dalam contoh sebelumnya, pembelian untuk berbagai item didistribusikan secara merata. Artinya, ada tiga pembelian masing-masing untuk apel, pisang, wortel, dan telur. Mengacak kunci yang didistribusikan secara merata adalah jenis shuffle dengan performa terbaik, tetapi banyak set data tidak memiliki properti ini. Melanjutkan pembelian di toko bahan makanan pada contoh sebelumnya, Anda akan mengharapkan lebih banyak pembelian telur daripada kartu pernikahan. Jika ada beberapa kunci shuffle yang jauh lebih umum daripada kunci lainnya, Anda akan menangani data yang miring. Data yang terdistorsi dapat berperforma jauh lebih buruk daripada data yang tidak terdistorsi karena jumlah pekerjaan yang tidak proporsional dilakukan oleh segelintir eksekutor. Hal ini menyebabkan sebagian kecil partisi menjadi jauh lebih besar daripada semua partisi lainnya.
Dalam contoh ini, ada lima kali lebih banyak pembelian telur daripada pembelian kartu, yang berarti agregat telur memerlukan waktu komputasi sekitar lima kali lebih lama. Hal ini tidak terlalu penting jika hanya menangani 10 data, bukan dua, tetapi akan sangat berpengaruh jika menangani lima miliar data, bukan satu miliar. Jika Anda memiliki skew data, jumlah partisi yang digunakan dalam pengacakan tidak akan berdampak besar pada performa pipeline.
Anda dapat mengenali penyimpangan data dengan memeriksa grafik untuk catatan output dari waktu ke waktu. Jika tahap menghasilkan data dengan kecepatan yang jauh lebih tinggi di awal pipeline berjalan, lalu tiba-tiba melambat, ini mungkin berarti Anda memiliki data yang tidak seimbang.
Anda juga dapat mengenali kemiringan data dengan memeriksa penggunaan memori cluster dari waktu ke waktu. Jika kluster Anda memiliki kapasitas selama beberapa waktu, tetapi tiba-tiba memiliki penggunaan memori yang rendah selama suatu periode waktu, ini juga merupakan tanda bahwa Anda mengalami skew data.
Data yang terdistorsi paling signifikan memengaruhi performa saat join dilakukan. Ada beberapa teknik yang dapat digunakan untuk meningkatkan performa join yang tidak seimbang. Untuk mengetahui informasi selengkapnya, lihat
Pemrosesan paralel untuk operasi JOIN
.
Penyesuaian adaptif untuk eksekusi
Untuk menyesuaikan eksekusi secara adaptif, tentukan rentang partisi yang akan digunakan, bukan jumlah partisi yang tepat. Nomor partisi yang tepat, meskipun ditetapkan dalam konfigurasi pipeline, akan diabaikan saat eksekusi adaptif diaktifkan.
Jika Anda menggunakan cluster Dataproc sementara, Cloud Data Fusion akan otomatis menetapkan konfigurasi yang sesuai, tetapi untuk cluster Dataproc atau Hadoop statis, dua parameter konfigurasi berikutnya dapat ditetapkan:
spark.default.parallelism
: tetapkan ke jumlah total vCore yang tersedia di cluster. Tindakan ini memastikan cluster Anda tidak kelebihan beban dan menentukan batas bawah untuk jumlah partisi.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: tetapkan ke 32x jumlah vCore yang tersedia di cluster. Ini menentukan batas atas jumlah partisi.Spark.sql.adaptive.enabled
: untuk mengaktifkan pengoptimalan, tetapkan nilai ini ketrue
. Dataproc menetapkannya secara otomatis, tetapi jika Anda menggunakan cluster Hadoop generik, Anda harus memastikannya diaktifkan .
Parameter ini dapat ditetapkan di konfigurasi mesin pipeline tertentu atau di properti cluster cluster Dataproc statis.
Langkah selanjutnya
- Pelajari pemrosesan paralel untuk operasi
JOIN
.