Mode Fleksibilitas yang Disempurnakan Dataproc

Mode Fleksibilitas yang Ditingkatkan (EFM) Dataproc mengelola data shuffle untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang sedang berjalan. EFM men-offload data shuffle dalam salah satu dari dua mode yang dapat dipilih pengguna:

  1. Pengacakan pekerja utama. Pemetaan menulis data ke pekerja utama. Pekerja mengambil dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk, dan direkomendasikan untuk, tugas Spark.

  2. Pengacakan HCFS (Hadoop Compatible File System). Pemetaan menulis data ke implementasi HCFS (HDFS secara default). Seperti mode pekerja utama, hanya pekerja utama yang berpartisipasi dalam implementasi HDFS dan HCFS (jika HCFS shuffle menggunakan Konektor Cloud Storage, data disimpan di luar cluster). Mode ini dapat menguntungkan tugas dengan jumlah data yang kecil, tetapi karena batasan penskalaan, mode ini tidak direkomendasikan untuk tugas yang lebih besar.

Karena kedua mode EFM tidak menyimpan data shuffle perantara di pekerja sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya menskalakan otomatis grup pekerja sekunder.

Batasan:

  • Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMaster selesai).
  • Mode Fleksibilitas yang Disempurnakan tidak direkomendasikan:
    • di cluster yang hanya memiliki pekerja primer
    • pada tugas streaming karena dapat memerlukan waktu hingga 30 menit setelah tugas selesai untuk membersihkan data shuffle perantara.
  • Mode Fleksibilitas yang Ditingkatkan tidak didukung:
    • saat penskalaan otomatis pekerja utama diaktifkan. Pada umumnya, pekerja utama akan terus menyimpan data shuffle yang tidak dimigrasikan secara otomatis. Menskalakan turun grup pekerja utama akan menghilangkan manfaat EFM.
    • saat tugas Spark berjalan di cluster dengan penghentian tuntas diaktifkan. Penghentian layanan yang lancar dan EFM dapat bekerja secara berlawanan karena mekanisme penghentian layanan yang lancar YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.

Menggunakan Mode Fleksibilitas yang Ditingkatkan

Mode Fleksibilitas yang Disempurnakan dikonfigurasi per mesin eksekusi, dan harus dikonfigurasi selama pembuatan cluster.

  • Implementasi EFM Spark dikonfigurasi dengan properti cluster dataproc:efm.spark.shuffle. Nilai properti yang valid:

    • primary-worker untuk pengurutan ulang pekerja utama (direkomendasikan)
    • hcfs untuk shuffle berbasis HCFS. Mode ini tidak digunakan lagi dan hanya tersedia di cluster yang menjalankan image versi 1.5. Tidak direkomendasikan untuk alur kerja baru.
  • Implementasi MapReduce Hadoop dikonfigurasi dengan dataproc:efm.mapreduce.shuffle properti cluster. Nilai properti yang valid:

    • hcfs

Contoh: Buat cluster dengan shuffle pekerja utama untuk Spark dan shuffle HCFS untuk MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Contoh Apache Spark

  1. Jalankan tugas WordCount terhadap teks Shakespeare publik menggunakan jar contoh Spark di cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Contoh MapReduce Apache Hadoop

  1. Jalankan tugas teragen kecil untuk membuat data input di Cloud Storage untuk tugas terasort berikutnya menggunakan jar contoh mapreduce di cluster EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Jalankan tugas terasort pada data.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Mengonfigurasi SSD lokal untuk shuffle pekerja utama

Implementasi shuffle pekerja utama dan HDFS menulis data shuffle perantara ke disk yang terpasang di VM, dan mendapatkan manfaat dari throughput dan IOPS tambahan yang ditawarkan oleh SSD lokal. Untuk memfasilitasi alokasi resource, targetkan sasaran sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin pekerja utama.

Untuk memasang SSD lokal, teruskan flag --num-worker-local-ssds ke perintah gcloud Dataproc clusters create.

Umumnya, Anda tidak memerlukan SSD lokal pada pekerja sekunder. Menambahkan SSD lokal ke pekerja sekunder cluster (menggunakan tanda --num-secondary-worker-local-ssds) sering kali kurang penting karena pekerja sekunder tidak menulis data shuffle secara lokal. Namun, karena SSD lokal meningkatkan performa disk lokal, Anda dapat memutuskan untuk menambahkan SSD lokal ke pekerja sekunder jika Anda mengharapkan tugas menjadi terikat I/O karena penggunaan disk lokal: tugas Anda menggunakan disk lokal yang signifikan untuk ruang sementara atau partisi Anda terlalu besar untuk muat di memori dan akan ditransfer ke disk.

Rasio pekerja sekunder

Karena pekerja sekunder menulis data shuffle ke pekerja utama, kluster Anda harus berisi pekerja utama dalam jumlah yang memadai dengan resource CPU, memori, dan disk yang memadai untuk mengakomodasi beban shuffle tugas Anda. Untuk cluster penskalaan otomatis, agar grup utama tidak diskalakan dan menyebabkan perilaku yang tidak diinginkan, tetapkan minInstances ke nilai maxInstances di kebijakan penskalaan otomatis untuk grup pekerja utama.

Jika Anda memiliki rasio pekerja sekunder terhadap pekerja primer yang tinggi (misalnya, 10:1), monitor penggunaan CPU, jaringan, dan disk pekerja primer untuk menentukan apakah pekerja tersebut kelebihan beban. Untuk melakukannya:

  1. Buka halaman VM instances di konsol Google Cloud.

  2. Klik kotak centang di sebelah kiri pekerja utama.

  3. Klik tab MONITORING untuk melihat Penggunaan CPU pekerja utama, IOPS Disk, Byte Jaringan, dan metrik lainnya.

Jika pekerja utama kelebihan beban, pertimbangkan untuk melakukan penskalaan pekerja utama secara manual.

Mengubah ukuran grup pekerja utama

Grup pekerja utama dapat diskalakan dengan aman, tetapi menskalakan turun grup pekerja utama dapat berdampak negatif pada progres tugas. Operasi yang mendownscale grup pekerja utama harus menggunakan penghentian tuntas, yang diaktifkan dengan menetapkan tanda --graceful-decommission-timeout.

Cluster yang diskalakan otomatis: Penskalaan grup pekerja utama dinonaktifkan di cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup pekerja utama di cluster yang diskalakan otomatis:

  1. Nonaktifkan penskalaan otomatis.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Menskalakan grup utama.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Aktifkan kembali penskalaan otomatis:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Memantau penggunaan disk pekerja utama

Pekerja utama harus memiliki kapasitas disk yang memadai untuk data shuffle cluster. Anda dapat memantaunya secara tidak langsung melalui metrik remaining HDFS capacity. Saat disk lokal terisi penuh, ruang tidak akan tersedia untuk HDFS, dan kapasitas yang tersisa akan berkurang.

Secara default, saat penggunaan disk lokal pekerja utama melebihi 90% kapasitas, node akan ditandai sebagai UNHEALTHY di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau menskalakan kumpulan pekerja utama.

Konfigurasi lanjutan

Partisi dan paralelisme

Saat mengirimkan tugas MapReduce atau Spark, konfigurasikan tingkat pempartisi yang sesuai. Menentukan jumlah partisi input dan output untuk suatu tahap shuffle melibatkan kompromi di antara karakteristik performa yang berbeda. Sebaiknya bereksperimenlah dengan nilai yang sesuai untuk bentuk tugas Anda.

Partisi input

Partisi input MapReduce dan Spark ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses data senilai "ukuran blok".

  • Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh spark.sql.files.maxPartitionBytes. Pertimbangkan untuk meningkatkannya menjadi 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Untuk tugas MapReduce dan RDD Spark, ukuran partisi biasanya dikontrol dengan fs.gs.block.size, yang secara default ditetapkan ke 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Anda juga dapat menetapkan properti khusus InputFormat seperti mapreduce.input.fileinputformat.split.minsize dan mapreduce.input.fileinputformat.split.maxsize

    • Untuk tugas MapReduce: --properties fs.gs.block.size=1073741824
    • Untuk RDD Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partisi output

Jumlah tugas di tahap berikutnya dikontrol oleh beberapa properti. Untuk tugas yang lebih besar yang memproses lebih dari 1 TB, sebaiknya sediakan setidaknya 1 GB per partisi.

  • Untuk tugas MapReduce, jumlah partisi output dikontrol oleh mapreduce.job.reduces.

  • Untuk Spark SQL, jumlah partisi output dikontrol oleh spark.sql.shuffle.partitions.

  • Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan spark.default.parallelism.

Penyesuaian pengacakan untuk pengacakan pekerja utama

Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server shuffle Spark berjalan sebagai bagian dari Pengelola Node. Nilai defaultnya adalah dua kali (2x) jumlah core pada komputer (misalnya, 16 thread pada n1-highmem-8). Jika "Shuffle Read Blocked Time" lebih besar dari 1 detik, dan pekerja utama belum mencapai batas jaringan, CPU, atau disk, pertimbangkan untuk meningkatkan jumlah thread server shuffle.

Pada jenis mesin yang lebih besar, pertimbangkan untuk meningkatkan spark.shuffle.io.numConnectionsPerPeer, yang secara default ditetapkan ke 1. (Misalnya, tetapkan ke 5 koneksi per pasangan host).

Meningkatkan percobaan ulang

Jumlah maksimum percobaan yang diizinkan untuk master aplikasi, tugas, dan tahap dapat dikonfigurasi dengan menetapkan properti berikut:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Karena master aplikasi dan tugas lebih sering dihentikan di cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti di atas di cluster tersebut dapat membantu (perhatikan bahwa menggunakan EFM dengan Spark dan penghentian tuntas tidak didukung).

Mengonfigurasi HDFS untuk pengurutan ulang HCFS

Untuk meningkatkan performa shuffle besar, Anda dapat mengurangi pertentangan kunci di NameNode dengan menetapkan dfs.namenode.fslock.fair=false. Perhatikan bahwa tindakan ini berisiko mengorbankan permintaan individual, tetapi dapat meningkatkan throughput di seluruh cluster. Untuk meningkatkan performa NameNode lebih lanjut, Anda dapat memasang SSD lokal ke node master dengan menetapkan --num-master-local-ssds. Anda juga dapat menambahkan SSD lokal ke pekerja utama untuk meningkatkan performa DataNode dengan menetapkan --num-worker-local-ssds.

Sistem File yang Kompatibel dengan Hadoop lainnya untuk shuffle HCFS

Secara default, data shuffle HCFS EFM ditulis ke HDFS, tetapi Anda dapat menggunakan Hadoop Compatible File System (HCFS) apa pun. Misalnya, Anda dapat memutuskan untuk menulis shuffle ke Cloud Storage atau ke HDFS cluster lain. Untuk menentukan sistem file, Anda dapat mengarahkan fs.defaultFS ke sistem file target saat mengirimkan tugas ke cluster.

Penghentian tuntas YARN di cluster EFM

Penghentian Tuntas YARN dapat digunakan untuk menghapus node dengan cepat dengan dampak minimal pada aplikasi yang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian layanan yang wajar dapat ditetapkan di AutoscalingPolicy yang dilampirkan ke cluster EFM.

Peningkatan EFM MapReduce untuk penghentian tuntas

  1. Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua penampung yang berjalan di node tersebut selesai. Sebagai perbandingan, node tidak dihapus di cluster Dataproc standar hingga aplikasi selesai.

  2. Penghapusan node tidak menunggu master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung akan dijadwalkan ulang di node lain yang tidak dinonaktifkan. Progres tugas tidak hilang: master aplikasi baru cepat memulihkan status dari master aplikasi sebelumnya dengan membaca histori tugas.

Menggunakan penghentian tuntas di cluster EFM dengan MapReduce

  1. Buat cluster EFM dengan jumlah pekerja primer dan sekunder yang sama.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Jalankan tugas mapreduce yang menghitung nilai pi menggunakan jar contoh mapreduce di cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Saat tugas berjalan, turunkan skala cluster menggunakan penghentian tuntas.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    Node akan dihapus dari cluster dengan cepat sebelum tugas selesai, sekaligus meminimalkan hilangnya progres tugas. Jeda sementara dalam progres tugas dapat terjadi karena:

    • Failover master aplikasi. Jika progres tugas turun ke 0%, lalu langsung melonjak ke nilai pra-penurunan, master aplikasi mungkin telah dihentikan dan master aplikasi baru memulihkan statusnya. Hal ini tidak akan memengaruhi progres tugas secara signifikan karena failover terjadi dengan cepat.
    • Preemption VM. Karena HDFS hanya mempertahankan output tugas peta yang lengkap, bukan sebagian, jeda sementara dalam progres tugas dapat terjadi saat VM di-preempt saat mengerjakan tugas peta.

Untuk mempercepat penghapusan node, Anda dapat menskalakan cluster tanpa penghentian layanan yang halus dengan menghapus tanda --graceful-decommission-timeout dalam contoh perintah gcloud sebelumnya. Progres tugas dari tugas peta yang telah selesai akan dipertahankan, tetapi output tugas peta yang telah selesai sebagian akan hilang (tugas peta akan dijalankan ulang).