Memecahkan masalah error Dataflow kehabisan memori

Halaman ini memberikan informasi tentang penggunaan memori di pipeline Dataflow dan langkah-langkah untuk menyelidiki dan menyelesaikan masalah dengan error kehabisan memori (OOM) Dataflow.

Tentang penggunaan memori Dataflow

Untuk memecahkan masalah error kekurangan memori, sebaiknya pahami cara pipeline Dataflow menggunakan memori.

Saat Dataflow menjalankan pipeline, pemrosesan didistribusikan di beberapa virtual machine (VM) Compute Engine, yang sering disebut pekerja. Pekerja memproses item pekerjaan dari layanan Dataflow dan mendelegasikan item pekerjaan ke proses Apache Beam SDK. Proses Apache Beam SDK membuat instance DoFn. DoFn adalah class Apache Beam SDK yang menentukan fungsi pemrosesan terdistribusi.

Dataflow meluncurkan beberapa thread di setiap pekerja, dan memori setiap pekerja dibagikan di semua thread. Thread adalah satu tugas yang dapat dieksekusi yang berjalan dalam proses yang lebih besar. Jumlah thread default bergantung pada beberapa faktor dan bervariasi antara tugas batch dan streaming.

Jika pipeline Anda memerlukan lebih banyak memori daripada jumlah memori default yang tersedia di pekerja, Anda mungkin mengalami error kehabisan memori.

Pipeline Dataflow terutama menggunakan memori pekerja dengan tiga cara:

Memori operasional pekerja

Pekerja Dataflow memerlukan memori untuk sistem operasi dan proses sistemnya. Penggunaan memori pekerja biasanya tidak lebih besar dari 1 GB. Penggunaan biasanya kurang dari 1 GB.

  • Berbagai proses di pekerja menggunakan memori untuk memastikan bahwa pipeline Anda berfungsi dengan baik. Setiap proses ini mungkin menyimpan sejumlah kecil memori untuk operasinya.
  • Jika pipeline Anda tidak menggunakan Streaming Engine, proses pekerja tambahan akan menggunakan memori.

Memori proses SDK

Proses Apache Beam SDK dapat membuat objek dan data yang dibagikan di antara thread dalam proses, yang disebut di halaman ini sebagai objek dan data bersama SDK. Penggunaan memori dari objek dan data bersama SDK ini disebut sebagai memori proses SDK. Daftar berikut menyertakan contoh objek dan data bersama SDK:

  • Input samping
  • Model machine learning
  • Objek singleton dalam memori
  • Objek Python yang dibuat dengan modul apache_beam.utils.shared
  • Data yang dimuat dari sumber eksternal, seperti Cloud Storage atau BigQuery

Tugas streaming yang tidak menggunakan Streaming Engine menyimpan input sisi penyimpanan di memori. Untuk pipeline Java dan Go, setiap pekerja memiliki satu salinan input samping. Untuk pipeline Python, setiap proses Apache Beam SDK memiliki satu salinan input samping.

Tugas streaming yang menggunakan Streaming Engine memiliki batas ukuran input samping sebesar 80 MB. Input samping disimpan di luar memori pekerja.

Penggunaan memori dari objek dan data bersama SDK tumbuh secara linear dengan jumlah proses Apache Beam SDK. Dalam pipeline Java dan Go, satu proses Apache Beam SDK dimulai per pekerja. Dalam pipeline Python, satu proses Apache Beam SDK dimulai per vCPU. Objek dan data bersama SDK digunakan kembali di seluruh thread dalam proses Apache Beam SDK yang sama.

Penggunaan memori DoFn

DoFn adalah class Apache Beam SDK yang menentukan fungsi pemrosesan terdistribusi. Setiap pekerja dapat menjalankan instance DoFn serentak. Setiap thread menjalankan satu instance DoFn. Saat mengevaluasi total penggunaan memori, menghitung ukuran set kerja, atau jumlah memori yang diperlukan agar aplikasi dapat terus berfungsi, mungkin akan membantu. Misalnya, jika setiap DoFn menggunakan memori maksimum 5 MB dan pekerja memiliki 300 thread, penggunaan memori DoFn dapat mencapai puncaknya sebesar 1,5 GB, atau jumlah byte memori dikalikan dengan jumlah thread. Bergantung pada cara pekerja menggunakan memori, lonjakan penggunaan memori dapat menyebabkan pekerja kehabisan memori.

Sulit untuk memperkirakan jumlah instance DoFn yang dibuat Dataflow. Jumlahnya bergantung pada berbagai faktor, seperti SDK, jenis mesin, dan sebagainya. Selain itu, DoFn mungkin digunakan oleh beberapa thread secara berurutan. Layanan Dataflow tidak menjamin frekuensi pemanggilan DoFn, atau menjamin jumlah persis instance DoFn yang dibuat selama pipeline. Namun, tabel berikut memberikan beberapa insight tentang tingkat paralelisme yang dapat Anda harapkan dan memperkirakan batas atas jumlah instance DoFn.

Beam Python SDK

Batch Streaming tanpa Streaming Engine Streaming Engine
Keparalelan 1 proses per vCPU

1 thread per proses

1 thread per vCPU

1 proses per vCPU

12 thread per proses

12 thread per vCPU

1 proses per vCPU

12 thread per proses

12 thread per vCPU

Jumlah maksimum instance DoFn serentak (Semua angka ini dapat berubah kapan saja.) 1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

Beam Java/Go SDK

Batch Streaming tanpa Streaming Engine Streaming Engine
Keparalelan 1 proses per VM pekerja

1 thread per vCPU

1 proses per VM pekerja

300 thread per proses

300 thread per VM pekerja

1 proses per VM pekerja

500 thread per proses

500 thread per VM pekerja

Jumlah maksimum instance DoFn serentak (Semua angka ini dapat berubah kapan saja.) 1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

300 DoFn per VM pekerja

1 DoFn per thread

500 DoFn per VM pekerja

Misalnya, saat menggunakan Python SDK dengan pekerja Dataflow n1-standard-2, hal berikut berlaku:

  • Tugas batch: Dataflow meluncurkan satu proses per vCPU (dua dalam kasus ini). Setiap proses menggunakan satu thread, dan setiap thread membuat satu instance DoFn.
  • Tugas streaming dengan Streaming Engine: Dataflow memulai satu proses per vCPU (total dua). Namun, setiap proses dapat menghasilkan hingga 12 thread, masing-masing dengan instance DoFn-nya sendiri.

Saat Anda mendesain pipeline yang kompleks, penting untuk memahami siklus proses DoFn. Pastikan fungsi DoFn Anda dapat diserialisasi, dan hindari mengubah argumen elemen secara langsung di dalamnya.

Jika Anda memiliki pipeline multibahasa, dan lebih dari satu Apache Beam SDK berjalan di pekerja, pekerja akan menggunakan tingkat paralelisme thread per proses terendah yang memungkinkan.

Perbedaan Java, Go, dan Python

Java, Go, dan Python mengelola proses dan memori secara berbeda. Akibatnya, pendekatan yang harus Anda lakukan saat memecahkan masalah error kehabisan memori bervariasi berdasarkan apakah pipeline Anda menggunakan Java, Go, atau Python.

Pipeline Java dan Go

Di pipeline Java dan Go:

  • Setiap pekerja memulai satu proses Apache Beam SDK.
  • Objek dan data bersama SDK, seperti input samping dan cache, dibagikan di antara semua thread pada pekerja.
  • Memori yang digunakan oleh objek dan data bersama SDK biasanya tidak diskalakan berdasarkan jumlah vCPU pada pekerja.

Pipeline Python

Di pipeline Python:

  • Setiap pekerja memulai satu proses Apache Beam SDK per vCPU.
  • Objek dan data bersama SDK, seperti input samping dan cache, dibagikan di antara semua thread dalam setiap proses Apache Beam SDK.
  • Jumlah total thread pada pekerja diskalakan secara linear berdasarkan jumlah vCPU. Akibatnya, memori yang digunakan oleh objek dan data bersama SDK akan bertambah secara linier dengan jumlah vCPU.
  • Thread yang melakukan pekerjaan didistribusikan di seluruh proses. Unit tugas baru ditetapkan ke proses tanpa item tugas, atau ke proses dengan item tugas paling sedikit yang saat ini ditetapkan.

Menemukan error kekurangan memori

Untuk menentukan apakah pipeline Anda kehabisan memori, gunakan salah satu metode berikut.

  • Di halaman Jobs details, di panel Logs, lihat tab Diagnostics. Tab ini menampilkan error yang terkait dengan masalah memori dan seberapa sering error tersebut terjadi.
  • Di antarmuka pemantauan Dataflow, gunakan diagram Penggunaan memori untuk memantau kapasitas dan penggunaan memori pekerja.
  • Di halaman Detail tugas, di panel Log, pilih Log pekerja. Temukan error memori.

Java

Java Memory Monitor, yang dikonfigurasi oleh antarmuka MemoryMonitorOptions, secara berkala melaporkan metrik pembersihan sampah. Jika fraksi waktu CPU yang digunakan untuk pengumpulan sampah melebihi nilai minimum 50% selama jangka waktu yang lama, harness SDK saat ini akan gagal.

Anda mungkin melihat error yang mirip dengan contoh berikut:

Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...

Error memori ini dapat terjadi saat memori fisik masih tersedia. Error ini biasanya menunjukkan bahwa penggunaan memori pipeline tidak efisien. Untuk mengatasi masalah ini, optimalkan pipeline Anda.

Jika tugas Anda memiliki penggunaan memori yang tinggi atau error kehabisan memori, ikuti rekomendasi di halaman ini untuk mengoptimalkan penggunaan memori atau meningkatkan jumlah memori yang tersedia.

Mengatasi error kekurangan memori

Perubahan pada pipeline Dataflow Anda dapat menyelesaikan error kehabisan memori atau mengurangi penggunaan memori. Kemungkinan perubahan mencakup tindakan berikut:

Diagram berikut menunjukkan alur kerja pemecahan masalah Dataflow yang dijelaskan di halaman ini.

Diagram yang menunjukkan alur kerja pemecahan masalah.

Mengoptimalkan pipeline

Beberapa operasi pipeline dapat menyebabkan error memori habis. Bagian ini menyediakan opsi untuk mengurangi penggunaan memori pipeline Anda. Untuk mengidentifikasi tahap pipeline yang paling banyak menggunakan memori, gunakan Cloud Profiler untuk memantau performa pipeline.

Anda dapat menggunakan praktik terbaik berikut untuk mengoptimalkan pipeline:

Menggunakan konektor I/O bawaan Apache Beam untuk membaca file

Jangan membuka file berukuran besar di dalam DoFn. Untuk membaca file, gunakan konektor I/O bawaan Apache Beam. File yang dibuka di DoFn harus sesuai dengan memori. Karena beberapa instance DoFn berjalan secara serentak, file besar yang dibuka di DoFn dapat menyebabkan error memori habis.

Mendesain ulang operasi saat menggunakan PTransform GroupByKey

Saat Anda menggunakan PTransform GroupByKey di Dataflow, nilai per kunci dan per jendela yang dihasilkan akan diproses pada satu thread. Karena data ini diteruskan sebagai streaming dari layanan backend Dataflow ke pekerja, data ini tidak perlu muat di memori pekerja. Namun, jika nilai dikumpulkan dalam memori, logika pemrosesan dapat menyebabkan error kehabisan memori.

Misalnya, jika Anda memiliki kunci yang berisi data untuk jendela, dan Anda menambahkan nilai kunci ke objek dalam memori, seperti daftar, error kehabisan memori mungkin terjadi. Dalam skenario ini, pekerja mungkin tidak memiliki kapasitas memori yang memadai untuk menyimpan semua objek.

Untuk informasi selengkapnya tentang PTransform GroupByKey, lihat dokumentasi Python GroupByKey dan Java GroupByKey Apache Beam.

Daftar berikut berisi saran untuk mendesain pipeline Anda guna meminimalkan konsumsi memori saat menggunakan PTransform GroupByKey.

  • Untuk mengurangi jumlah data per kunci dan per jendela, hindari kunci dengan banyak nilai, yang juga dikenal sebagai hotkey.
  • Untuk mengurangi jumlah data yang dikumpulkan per periode, gunakan ukuran periode yang lebih kecil.
  • Jika Anda menggunakan nilai kunci dalam jendela untuk menghitung angka, gunakan transformasi Combine. Jangan lakukan penghitungan dalam satu instance DoFn setelah mengumpulkan nilai.
  • Memfilter nilai atau duplikat sebelum memproses. Untuk mengetahui informasi selengkapnya, lihat dokumentasi transformasi Filter Python dan Filter Java.

Mengurangi data masuk dari sumber eksternal

Jika Anda melakukan panggilan ke API eksternal atau database untuk pengayaan data, data yang ditampilkan harus sesuai dengan memori pekerja. Jika Anda mengelompokkan panggilan, sebaiknya gunakan transformasi GroupIntoBatches. Jika Anda mengalami error kehabisan memori, kurangi ukuran batch. Untuk mengetahui informasi selengkapnya tentang pengelompokan ke dalam batch, lihat dokumentasi transformasi GroupIntoBatches Python dan GroupIntoBatches Java.

Membagikan objek di seluruh thread

Membagikan objek data dalam memori di seluruh instance DoFn dapat meningkatkan ruang dan efisiensi akses. Objek data yang dibuat dalam metode DoFn apa pun, termasuk Setup, StartBundle, Process, FinishBundle, dan Teardown, dipanggil untuk setiap DoFn. Di Dataflow, setiap pekerja mungkin memiliki beberapa instance DoFn. Untuk penggunaan memori yang lebih efisien, teruskan objek data sebagai singleton untuk membagikannya di beberapa DoFn. Untuk informasi selengkapnya, lihat postingan blog Penggunaan kembali cache di seluruh DoFn.

Menggunakan representasi elemen yang hemat memori

Evaluasi apakah Anda dapat menggunakan representasi untuk elemen PCollection yang menggunakan lebih sedikit memori. Saat menggunakan coder di pipeline, pertimbangkan tidak hanya representasi elemen PCollection yang dienkode, tetapi juga didekode. Matriks yang jarang sering kali dapat memanfaatkan jenis pengoptimalan ini.

Mengurangi ukuran input samping

Jika DoFn Anda menggunakan input samping, kurangi ukuran input samping. Untuk input samping yang merupakan kumpulan elemen, pertimbangkan untuk menggunakan tampilan yang dapat di-iterasi, seperti AsIterable atau AsMultimap, bukan tampilan yang mewujudkan seluruh input samping secara bersamaan, seperti AsList.

Membuat lebih banyak memori tersedia

Untuk meningkatkan memori yang tersedia, Anda dapat meningkatkan jumlah total memori yang tersedia pada pekerja tanpa mengubah jumlah memori yang tersedia per thread. Atau, Anda dapat meningkatkan jumlah memori yang tersedia per thread. Saat meningkatkan memori per thread, Anda juga akan meningkatkan total memori pada pekerja.

Anda dapat meningkatkan jumlah memori yang tersedia per thread dengan empat cara:

Menggunakan jenis mesin dengan lebih banyak memori per vCPU

Untuk memilih pekerja dengan lebih banyak memori per vCPU, gunakan salah satu metode berikut.

  • Gunakan jenis mesin bermemori tinggi di kelompok mesin tujuan umum. Jenis mesin bermemori tinggi memiliki memori per vCPU yang lebih tinggi daripada jenis mesin standar. Menggunakan jenis mesin memori tinggi akan meningkatkan memori yang tersedia untuk setiap pekerja dan memori yang tersedia per thread, karena jumlah vCPU tetap sama. Akibatnya, menggunakan jenis mesin dengan memori tinggi dapat menjadi cara yang hemat biaya untuk memilih pekerja dengan lebih banyak memori per vCPU.
  • Untuk fleksibilitas yang lebih besar saat menentukan jumlah vCPU dan jumlah memori, Anda dapat menggunakan jenis mesin kustom. Dengan jenis mesin kustom, Anda dapat meningkatkan memori dalam kelipatan 256 MB. Harga jenis mesin ini berbeda dengan jenis mesin standar.
  • Beberapa kelompok mesin memungkinkan Anda menggunakan jenis mesin kustom memori tambahan. Memori yang diperluas memungkinkan rasio memori per vCPU yang lebih tinggi. Biayanya lebih tinggi.

Untuk menetapkan jenis pekerja, gunakan opsi pipeline berikut. Untuk informasi selengkapnya, lihat Menetapkan opsi pipeline dan Opsi pipeline.

Java

Gunakan opsi pipeline --workerMachineType.

Python

Gunakan opsi pipeline --machine_type.

Go

Gunakan opsi pipeline --worker_machine_type.

Menggunakan jenis mesin dengan lebih banyak vCPU

Opsi ini hanya direkomendasikan untuk pipeline streaming Java dan Go. Jenis mesin dengan lebih banyak vCPU memiliki total memori yang lebih besar, karena jumlah memori diskalakan secara linear dengan jumlah vCPU. Misalnya, jenis mesin n1-standard-4 dengan empat vCPU memiliki memori sebesar 15 GB. Jenis mesin n1-standard-8 dengan delapan vCPU memiliki memori sebesar 30 GB. Untuk mengetahui informasi selengkapnya tentang jenis mesin yang telah ditetapkan, lihat Kelompok mesin untuk tujuan umum.

Menggunakan pekerja dengan jumlah vCPU yang lebih tinggi dapat meningkatkan biaya pipeline Anda secara signifikan. Namun, Anda dapat menggunakan penskalaan otomatis horisontal untuk mengurangi jumlah total pekerja sehingga paralelisme tetap sama. Misalnya, jika Anda memiliki 50 pekerja yang menggunakan jenis mesin n1-standard-4, dan beralih ke jenis mesin n1-standard-8, Anda dapat menggunakan penskalaan otomatis horizontal dan menetapkan jumlah maksimum pekerja untuk mengurangi jumlah total pekerja di pipeline menjadi sekitar 25. Konfigurasi ini menghasilkan pipeline dengan biaya yang serupa.

Untuk menetapkan jumlah maksimum pekerja, gunakan opsi pipeline berikut.

Java

Gunakan opsi pipeline --maxNumWorkers.

Untuk informasi lebih lanjut, lihat Opsi pipeline.

Go

Gunakan opsi pipeline --max_num_workers.

Untuk informasi lebih lanjut, lihat Opsi pipeline.

Metode ini tidak direkomendasikan untuk pipeline Python. Saat menggunakan Python SDK, jika beralih ke pekerja dengan jumlah vCPU yang lebih tinggi, Anda tidak hanya meningkatkan memori, tetapi juga meningkatkan jumlah proses Apache Beam SDK. Misalnya, jenis mesin n1-standard-4 memiliki memori per thread yang sama dengan jenis mesin n1-standard-8 untuk pipeline Python. Oleh karena itu, dengan pipeline Python, rekomendasinya adalah menggunakan jenis mesin memori tinggi, mengurangi jumlah thread, atau hanya menggunakan satu proses Apache Beam SDK.

Mengurangi jumlah thread

Jika menggunakan jenis mesin bermemori tinggi tidak menyelesaikan masalah Anda, tingkatkan memori yang tersedia per thread dengan mengurangi jumlah maksimum thread yang menjalankan instance DoFn. Perubahan ini mengurangi paralelisme. Untuk mengurangi jumlah thread Apache Beam SDK yang menjalankan instance DoFn, gunakan opsi pipeline berikut.

Java

Gunakan opsi pipeline --numberOfWorkerHarnessThreads.

Untuk informasi lebih lanjut, lihat Opsi pipeline.

Python

Gunakan opsi pipeline --number_of_worker_harness_threads.

Untuk informasi lebih lanjut, lihat Opsi pipeline.

Go

Gunakan opsi pipeline --number_of_worker_harness_threads.

Untuk informasi lebih lanjut, lihat Opsi pipeline.

Untuk mengurangi jumlah thread untuk pipeline batch Java dan Go, tetapkan nilai tanda ke angka yang kurang dari jumlah vCPU pada pekerja. Untuk pipeline streaming, tetapkan nilai flag ke angka yang kurang dari jumlah thread per proses Apache Beam SDK. Untuk memperkirakan thread per proses, lihat tabel di bagian penggunaan memori DoFn di halaman ini.

Penyesuaian ini tidak tersedia untuk pipeline Python yang berjalan di Apache Beam SDK 2.20.0 atau yang lebih lama atau untuk pipeline Python yang tidak menggunakan Runner v2.

Hanya menggunakan satu proses Apache Beam SDK

Untuk pipeline streaming Python dan pipeline Python yang menggunakan Runner v2, Anda dapat memaksa Dataflow untuk hanya memulai satu proses Apache Beam SDK per pekerja. Sebelum mencoba opsi ini, coba selesaikan masalah terlebih dahulu menggunakan metode lain. Untuk mengonfigurasi VM pekerja Dataflow agar hanya memulai satu proses Python dalam penampung, gunakan opsi pipeline berikut:

--experiments=no_use_multiple_sdk_containers

Dengan konfigurasi ini, pipeline Python akan membuat satu proses Apache Beam SDK per pekerja. Konfigurasi ini mencegah objek dan data bersama direplikasi beberapa kali untuk setiap proses Apache Beam SDK. Namun, hal ini membatasi penggunaan resource komputasi yang tersedia di pekerja secara efisien.

Mengurangi jumlah proses Apache Beam SDK menjadi satu tidak selalu mengurangi jumlah total thread yang dimulai pada pekerja. Selain itu, memiliki semua thread pada satu proses Apache Beam SDK dapat menyebabkan pemrosesan lambat atau menyebabkan pipeline macet. Oleh karena itu, Anda mungkin juga harus mengurangi jumlah thread, seperti yang dijelaskan di bagian Mengurangi jumlah thread di halaman ini.

Anda juga dapat memaksa pekerja untuk hanya menggunakan satu proses Apache Beam SDK menggunakan jenis mesin yang hanya memiliki satu vCPU.