Pemrosesan paralel untuk operasi JOIN

Halaman ini menjelaskan penyesuaian performa untuk operasi JOIN di Cloud Data Fusion.

Operasi JOIN dapat menjadi bagian paling mahal dari pipeline. Seperti semua hal lain dalam pipeline, operasi dijalankan secara paralel. Langkah pertama JOIN adalah mengacak data sehingga setiap data dengan kunci JOIN yang sama dikirim ke eksekutor yang sama. Setelah semua data diacak, data akan digabungkan, dan output akan dilanjutkan melalui pipeline.

Contoh pemrosesan paralel dalam operasi JOIN

Misalnya, Anda melakukan operasi JOIN pada set data yang disebut Purchases dan Items. Setiap kumpulan data pembelian berisi nama dan jumlah item yang dibeli. Setiap kumpulan data item berisi nama item dan harga item tersebut. JOIN dilakukan pada nama item untuk menghitung harga total setiap pembelian. Saat data digabungkan, data akan diacak di seluruh cluster sehingga data dengan ID yang sama akan berakhir di eksekutor yang sama.

Jika kunci JOIN didistribusikan secara merata, operasi JOIN akan berperforma baik karena dapat dieksekusi secara paralel.

Seperti pengacakan lainnya, kecondongan data berdampak negatif terhadap performa. Pada contoh sebelumnya, telur lebih sering dibeli daripada ayam atau susu, yang berarti eksekutor yang bergabung dengan pembelian telur melakukan lebih banyak pekerjaan daripada eksekutor lainnya. Jika Anda melihat bahwa JOIN miring, ada dua cara untuk meningkatkan performa.

Membagi partisi yang miring secara otomatis

Dengan eksekusi kueri adaptif, kemiringan yang sangat berat akan ditangani secara otomatis. Segera setelah JOIN menghasilkan beberapa partisi yang jauh lebih besar dari yang lain, partisi tersebut akan dibagi menjadi partisi yang lebih kecil. Untuk mengonfirmasi bahwa Anda telah mengaktifkan eksekusi kueri adaptif, lihat Penyesuaian otomatis.

Menggunakan JOIN dalam memori

JOIN dalam memori dapat dilakukan jika satu sisi JOIN cukup kecil untuk muat dalam memori. Dalam situasi ini, set data kecil dimuat ke dalam memori, lalu disiarkan ke setiap eksekutor. Set data besar tidak diacak sama sekali, sehingga menghapus partisi yang tidak merata yang dihasilkan saat mengacak kunci JOIN.

Pada contoh sebelumnya, set data item pertama kali dimuat ke dalam memori driver Spark. Kemudian, pesan tersebut disiarkan ke setiap eksekutor. Eksekutor kini dapat menggabungkan data tanpa mengacak set data pembelian.

Pendekatan ini mengharuskan Anda memberikan memori yang cukup ke driver dan eksekutor Spark agar dapat menyimpan set data siaran dalam memori. Secara default, Spark mencadangkan sedikit kurang dari 30% memorinya untuk menyimpan jenis data ini. Saat menggunakan JOIN dalam memori, kalikan ukuran set data dengan empat dan tetapkan sebagai memori eksekutor dan driver. Misalnya, jika set data item berukuran 1 GB, kita harus menetapkan memori eksekutor dan driver setidaknya 4 GB. Set data yang lebih besar dari 8 GB tidak dapat dimuat ke dalam memori.

Distribusi kunci

Jika kedua sisi JOIN terlalu besar untuk muat dalam memori, teknik yang berbeda dapat digunakan untuk membagi setiap kunci JOIN menjadi beberapa kunci untuk meningkatkan tingkat paralelisme. Teknik ini dapat diterapkan ke operasi INNER JOIN dan LEFT OUTER JOIN. Fungsi ini tidak dapat digunakan untuk operasi FULL OUTER JOIN.

Dalam pendekatan ini, sisi yang miring diberi garam dengan kolom bilangan bulat baru dengan angka acak dari 1 hingga N. Sisi yang tidak miring akan diperluas, dengan setiap baris yang ada membuat N baris baru. Kolom baru ditambahkan ke sisi yang diperluas, yang diisi dengan setiap angka dari 1 hingga N. JOIN normal kemudian dilakukan, kecuali kolom baru ditambahkan sebagai bagian dari kunci JOIN. Dengan cara ini, semua data yang sebelumnya dikirim ke satu partisi kini tersebar ke hingga N partisi yang berbeda.

Pada contoh sebelumnya, faktor distribusi N ditetapkan ke 3. Set data asli ditampilkan di sebelah kiri. Versi set data yang di-salt dan di-explode ditampilkan di tengah. Data yang diacak ditampilkan di sebelah kanan, dengan tiga eksekutor berbeda yang bergabung dengan pembelian telur, bukan satu.

Paralelisme yang lebih besar dicapai dengan meningkatkan distribusi. Namun, hal ini memerlukan biaya untuk meledakkan satu sisi JOIN, sehingga lebih banyak data yang diacak di seluruh cluster. Oleh karena itu, manfaatnya akan berkurang seiring meningkatnya distribusi. Dalam sebagian besar situasi, tetapkan ke 20 atau kurang.

Langkah selanjutnya