Mode Fleksibilitas yang Ditingkatkan Dataproc

Dataproc Enhanced Flexibility Mode (EFM) mengelola data acak untuk meminimalkan penundaan progres tugas yang disebabkan oleh penghapusan node dari cluster yang berjalan. EFM mengurangi beban data acak dalam salah satu dari dua mode yang dapat dipilih pengguna:

  1. Pekerja utama acak. Mapper menulis data ke pekerja primer. Pekerja menarik dari node jarak jauh tersebut selama fase pengurangan. Mode ini hanya tersedia untuk, dan direkomendasikan untuk, tugas Spark.

  2. Pengacakan HCFS (Hadoop Compatible File System). Mapper menulis data ke implementasi HCFS (HDFS secara default). Seperti mode pekerja utama, hanya pekerja primer yang berpartisipasi dalam implementasi HDFS dan HCFS (jika pengacakan HCFS menggunakan Cloud Storage Connector, data akan disimpan di luar cluster). Mode ini dapat memanfaatkan tugas dengan data dalam jumlah kecil, tetapi karena keterbatasan penskalaan, mode ini tidak direkomendasikan untuk tugas yang lebih besar.

Karena kedua mode EFM tidak menyimpan data acak perantara pada pekerja sekunder, EFM sangat cocok untuk cluster yang menggunakan preemptible VM atau hanya otomatis melakukan penskalaan otomatis grup pekerja sekunder.

Batasan:

  • Tugas Apache Hadoop YARN yang tidak mendukung relokasi AppMaster dapat gagal dalam Mode Fleksibilitas yang Ditingkatkan (lihat Kapan harus menunggu AppMasters selesai).
  • Mode Fleksibilitas yang Ditingkatkan tidak direkomendasikan:
    • pada cluster yang hanya memiliki pekerja utama.
  • Mode Fleksibilitas yang Ditingkatkan tidak didukung:
    • saat penskalaan otomatis pekerja utama diaktifkan. Pada umumnya, pekerja utama akan terus menyimpan data acak yang tidak dimigrasikan secara otomatis. Mengurangi skala grup pekerja utama meniadakan manfaat EFM.
    • saat tugas Spark berjalan di cluster dengan penghentian tuntas yang diaktifkan. Penghentian tuntas dan EFM dapat berfungsi pada tujuan silang karena mekanisme penghentian tuntas YARN mempertahankan node DECOMMISSIONING hingga semua aplikasi yang terlibat selesai.

Menggunakan Mode Fleksibilitas yang Ditingkatkan

Mode Fleksibilitas yang Ditingkatkan dikonfigurasi per mesin eksekusi, dan harus dikonfigurasi selama pembuatan cluster.

  • Implementasi Spark EFM dikonfigurasi dengan properti cluster dataproc:efm.spark.shuffle. Nilai properti yang valid:

    • primary-worker untuk acak pekerja utama (direkomendasikan)
    • hcfs untuk pengacakan berbasis HCFS. Mode ini tidak digunakan lagi dan hanya tersedia pada cluster yang menjalankan image versi 1.5. Tidak direkomendasikan untuk alur kerja baru.
  • Implementasi MapReduce Hadoop dikonfigurasi dengan properti cluster dataproc:efm.mapreduce.shuffle. Nilai properti yang valid:

    • hcfs

Contoh: Buat cluster dengan pengacakan pekerja utama untuk Spark dan acak HCFS untuk MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Contoh Apache Spark

  1. Menjalankan tugas WordCount terhadap teks Shakespeare publik menggunakan jar contoh Spark di cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Contoh MapReduce Apache Hadoop

  1. Jalankan tugas teragen kecil untuk menghasilkan data input di Cloud Storage untuk tugas terasort berikutnya menggunakan jar contoh mapReduce di cluster EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Jalankan tugas terasort pada data.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Mengonfigurasi SSD Lokal untuk mode acak pekerja utama

Implementasi pekerja utama dan HDFS shuffle akan menulis data acak menengah ke disk yang terpasang ke VM, dan mendapatkan manfaat dari throughput dan IOPS tambahan yang ditawarkan oleh SSD lokal. Untuk memfasilitasi alokasi resource, targetkan sasaran sekitar 1 partisi SSD lokal per 4 vCPU saat mengonfigurasi mesin pekerja utama.

Untuk memasang SSD lokal, teruskan flag --num-worker-local-ssds ke perintah gcloud dataproc cluster create.

Rasio pekerja sekunder

Karena pekerja sekunder menulis data acak mereka ke pekerja utama, cluster Anda harus berisi jumlah pekerja utama yang memadai dengan resource CPU, memori, dan disk yang memadai untuk mengakomodasi pemuatan acak tugas. Untuk melakukan penskalaan otomatis cluster, guna mencegah grup utama melakukan penskalaan dan menyebabkan perilaku yang tidak diinginkan, setel minInstances ke nilai maxInstances dalam kebijakan penskalaan otomatis untuk grup pekerja utama.

Jika Anda memiliki rasio pekerja sekunder ke primer yang tinggi (misalnya, 10:1), pantau penggunaan CPU, jaringan, dan penggunaan disk pekerja utama untuk menentukan apakah mereka kelebihan beban. Untuk melakukan ini:

  1. Buka halaman VM instances di Google Cloud Console.

  2. Klik kotak centang di sebelah kiri pekerja utama.

  3. Klik tab PEMANTAUAN untuk melihat Pemakaian CPU pekerja utama, IOPS Disk, Byte Jaringan, dan metrik lainnya.

Jika pekerja utama kelebihan beban, pertimbangkan untuk meningkatkan skala pekerja utama secara manual.

Mengubah ukuran grup pekerja utama

Grup pekerja utama dapat ditingkatkan skalanya dengan aman, tetapi menurunkan skala grup pekerja utama dapat berdampak negatif pada progres tugas. Operasi yang menurunkan skala kelompok pekerja utama harus menggunakan penonaktifan secara halus, yang diaktifkan dengan menyetel tanda --graceful-decommission-timeout.

Cluster dengan penskalaan otomatis: Penskalaan grup pekerja utama dinonaktifkan di cluster EFM dengan kebijakan penskalaan otomatis. Untuk mengubah ukuran grup pekerja utama pada cluster yang diskalakan otomatis:

  1. Nonaktifkan penskalaan otomatis.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Menskalakan grup utama.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Mengaktifkan kembali penskalaan otomatis:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Memantau penggunaan disk pekerja utama

Pekerja utama harus memiliki kapasitas disk yang cukup untuk data acak cluster. Anda dapat memantau hal ini secara tidak langsung melalui metrik remaining HDFS capacity. Saat disk lokal terisi, ruang untuk HDFS menjadi tidak tersedia, dan kapasitas yang tersisa berkurang.

Secara default, saat disk lokal pekerja utama menggunakan melebihi kapasitas 90%, node akan ditandai sebagai TIDAK SEHAT di UI node YARN. Jika mengalami masalah kapasitas disk, Anda dapat menghapus data yang tidak digunakan dari HDFS atau meningkatkan skala kumpulan pekerja utama.

Perlu diketahui bahwa data acak menengah biasanya tidak dibersihkan hingga akhir tugas. Saat menggunakan pengacakan pekerja utama dengan Spark, proses ini dapat memerlukan waktu hingga 30 menit setelah tugas selesai.

Konfigurasi lanjutan

Partisi dan paralelisme

Saat mengirimkan tugas MapReduce atau Spark, konfigurasikan tingkat partisi yang sesuai. Menentukan jumlah partisi input dan output untuk tahap acak melibatkan kompromi antara berbagai karakteristik performa. Cara terbaik adalah bereksperimen dengan nilai-nilai yang sesuai untuk bentuk tugas Anda.

Partisi input

Partisi input MapReduce dan Spark ditentukan oleh set data input. Saat membaca file dari Cloud Storage, setiap tugas memproses sekitar satu data "ukuran blok".

  • Untuk tugas Spark SQL, ukuran partisi maksimum dikontrol oleh spark.sql.files.maxPartitionBytes. Sebaiknya naikkan batas memori ke 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Untuk tugas MapReduce dan RDD Spark, ukuran partisi biasanya dikontrol dengan fs.gs.block.size, yang secara default berukuran 128 MB. Pertimbangkan untuk meningkatkannya menjadi 1 GB. Anda juga dapat menetapkan properti khusus InputFormat seperti mapreduce.input.fileinputformat.split.minsize dan mapreduce.input.fileinputformat.split.maxsize

    • Untuk tugas MapReduce: --properties fs.gs.block.size=1073741824
    • Untuk RDD Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partisi output

Jumlah tugas pada tahap berikutnya dikontrol oleh beberapa properti. Pada tugas yang lebih besar yang memproses lebih dari 1 TB, pertimbangkan untuk memiliki setidaknya 1 GB per partisi.

  • Untuk tugas MapReduce, jumlah partisi output dikontrol oleh mapreduce.job.reduces.

  • Untuk Spark SQL, jumlah partisi output dikontrol oleh spark.sql.shuffle.partitions.

  • Untuk tugas Spark yang menggunakan RDD API, Anda dapat menentukan jumlah partisi output atau menetapkan spark.default.parallelism.

Mengacak penyesuaian untuk pengacakan pekerja utama

Properti yang paling signifikan adalah --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Perhatikan bahwa ini adalah properti YARN tingkat cluster karena server acak Spark berjalan sebagai bagian dari Pengelola Node. Secara default, jumlah core di mesin adalah dua kali (2x) (misalnya, 16 thread pada n1-highmem-8). Jika "Shuffle Read Blocked Time" berukuran lebih dari 1 detik, dan pekerja utama belum mencapai batas jaringan, CPU, atau disk, sebaiknya tingkatkan jumlah thread server acak.

Pada jenis mesin yang lebih besar, sebaiknya tingkatkan spark.shuffle.io.numConnectionsPerPeer, yang nilai defaultnya adalah 1. (Misalnya, setel ke 5 koneksi per pasangan host).

Meningkatkan percobaan ulang

Jumlah maksimum upaya yang diizinkan untuk master, tugas, dan tahapan aplikasi dapat dikonfigurasi dengan menetapkan properti berikut:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Karena master aplikasi dan tugas lebih sering dihentikan dalam cluster yang menggunakan banyak preemptible VM atau penskalaan otomatis tanpa penghentian tuntas, meningkatkan nilai properti di atas dalam cluster tersebut dapat membantu (perlu diperhatikan bahwa menggunakan EFM dengan Spark dan penghentian tuntas tidak didukung).

Mengonfigurasi HDFS untuk pengacakan HCFS

Untuk meningkatkan performa acak besar, Anda dapat mengurangi pertentangan kunci di NameNode dengan menetapkan dfs.namenode.fslock.fair=false. Perlu diperhatikan bahwa hal ini berisiko membuat permintaan tunggal berlebih, tetapi dapat meningkatkan throughput seluruh cluster. Untuk lebih meningkatkan performa NameNode, Anda dapat memasang SSD lokal ke node master dengan menetapkan --num-master-local-ssds. Anda juga dapat menambahkan SSD lokal ke pekerja primer untuk meningkatkan performa DataNode dengan menetapkan --num-worker-local-ssds.

Sistem File yang Kompatibel dengan Hadoop lainnya untuk pengacakan HCFS

Secara default, data acak HCFS EFM ditulis ke HDFS, tetapi Anda dapat menggunakan Hadoop Compatible File System (HCFS) apa pun. Misalnya, Anda dapat memutuskan untuk melakukan shuffle ke Cloud Storage atau ke HDFS cluster yang berbeda. Untuk menentukan sistem file, Anda dapat mengarahkan fs.defaultFS ke sistem file target saat mengirimkan tugas ke cluster Anda.

Penghentian tuntas YARN pada cluster EFM

YARN Graceful Decommissioning dapat digunakan untuk menghapus node secara cepat dengan dampak minimal pada aplikasi yang berjalan. Untuk cluster penskalaan otomatis, waktu tunggu penghentian yang halus dapat ditetapkan dalam AutoscalingPolicy yang disertakan ke cluster EFM.

Peningkatan EFM MapReduce ke penghentian tuntas

  1. Karena data perantara disimpan dalam sistem file terdistribusi, node dapat dihapus dari cluster EFM segera setelah semua container yang berjalan pada node tersebut telah selesai. Sebagai perbandingan, node tidak dihapus pada cluster Dataproc standar hingga aplikasi selesai.

  2. Penghapusan node tidak menunggu master aplikasi yang berjalan di node selesai. Saat penampung master aplikasi dihentikan, penampung tersebut akan dijadwalkan ulang pada node lain yang masih belum dinonaktifkan. Progres tugas tidak hilang: master aplikasi baru memulihkan status dari master aplikasi sebelumnya secara cepat dengan membaca histori tugas.

Menggunakan penghentian tuntas pada cluster EFM dengan MapReduce

  1. Buat cluster EFM dengan jumlah pekerja primer dan sekunder yang sama.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Menjalankan tugas mapReduce yang menghitung nilai pi menggunakan jar contoh mapReduce di cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Selagi tugas berjalan, perkecil skala cluster menggunakan penghentian tuntas.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    Node akan dihapus dari cluster dengan cepat sebelum tugas selesai, sekaligus meminimalkan hilangnya progres tugas. Jeda sementara selama proses tugas dapat terjadi karena:

    • Failover master aplikasi. Jika progres tugas turun ke 0% lalu langsung melompat ke nilai pra-rilis, master aplikasi mungkin telah dihentikan dan master aplikasi baru memulihkan statusnya. Hal ini seharusnya tidak memengaruhi progres tugas secara signifikan karena failover terjadi dengan cepat.
    • preemption VM. Karena HDFS hanya mempertahankan output tugas peta lengkap, bukan parsial, jeda sementara dalam progres tugas dapat terjadi jika VM di-preempt saat mengerjakan tugas peta.