Praktik terbaik untuk alur kerja yang sangat paralel

Halaman ini memberikan panduan tentang praktik terbaik yang harus diikuti saat mem-build dan menjalankan alur kerja Dataflow HPC yang sangat paralel, termasuk cara menggunakan kode eksternal di pipeline, cara menjalankan pipeline, dan cara mengelola penanganan error.

Menyertakan kode eksternal dalam pipeline

Pembeda utama untuk pipeline yang sangat paralel adalah pipeline tersebut menggunakan kode C++ dalam DoFn, bukan salah satu bahasa Apache Beam SDK standar. Untuk pipeline Java, agar lebih mudah menggunakan library C++ dalam pipeline, sebaiknya gunakan panggilan prosedur eksternal. Bagian ini menjelaskan pendekatan umum yang digunakan untuk menjalankan kode eksternal (C++) di pipeline Java.

Definisi pipeline Apache Beam memiliki beberapa komponen utama:

  • PCollections adalah kumpulan elemen homogen yang tidak dapat diubah.
  • PTransforms digunakan untuk menentukan transformasi ke PCollection yang menghasilkan PCollection lain.
  • Pipeline adalah konstruksi yang memungkinkan Anda, melalui kode, mendeklarasikan interaksi antara PTransforms dan PCollections. Pipeline direpresentasikan sebagai directed acyclic graph (DAG).

Saat Anda menggunakan kode dari bahasa yang bukan salah satu bahasa Apache Beam SDK standar, tempatkan kode di PTransform, yang berada dalam DoFn, dan gunakan salah satu bahasa SDK standar untuk menentukan pipeline itu sendiri. Sebaiknya gunakan Apache Beam Python SDK untuk menentukan pipeline, karena Python SDK memiliki class utilitas yang mempermudah penggunaan kode lain. Namun, Anda dapat menggunakan Apache Beam SDK lainnya.

Anda dapat menggunakan kode untuk melakukan eksperimen cepat tanpa memerlukan build lengkap. Untuk sistem produksi, Anda biasanya membuat biner sendiri, yang memberi Anda kebebasan untuk menyesuaikan proses sesuai kebutuhan.

Diagram berikut mengilustrasikan dua penggunaan data pipeline:

  • Data digunakan untuk mendorong proses.
  • Data diperoleh selama pemrosesan dan digabungkan dengan data pengemudi.

Dua tahap data pipeline

Di halaman ini, data primer (dari sumber) disebut sebagai data pendorong, dan data sekunder (dari fase pemrosesan) disebut sebagai data penggabungan.

Dalam kasus penggunaan keuangan, data pendorongnya mungkin adalah beberapa ratus ribu transaksi. Setiap transaksi perlu diproses bersama dengan data pasar. Dalam hal ini, data pasar adalah data penggabungan. Dalam kasus penggunaan media, data pendorong mungkin adalah file gambar yang memerlukan pemrosesan, tetapi tidak memerlukan sumber data lain, sehingga tidak menggunakan data penggabungan.

Pertimbangan ukuran untuk data mengemudi

Jika ukuran elemen data pendorong berada dalam rentang megabyte rendah, perlakuannya sama dengan paradigma Apache Beam normal, yaitu membuat objek PCollection dari sumber dan mengirim objek ke transformasi Apache Beam untuk diproses.

Jika ukuran elemen data berkendara dalam megabyte tinggi atau dalam gigabyte, seperti yang biasa dilakukan untuk media, Anda dapat memasukkan data berkendara ke dalam Cloud Storage. Kemudian, di objek PCollection awal, referensikan URI penyimpanan, dan hanya referensi URI ke data tersebut yang digunakan.

Pertimbangan ukuran untuk menggabungkan data

Jika data penggabungan berukuran beberapa ratus megabyte atau kurang, gunakan input samping untuk mendapatkan data ini ke transformasi Apache Beam. Input samping mengirimkan paket data ke setiap pekerja yang membutuhkannya.

Jika data penggabungan berada dalam rentang gigabyte atau terabyte, gunakan Bigtable atau Cloud Storage untuk menggabungkan data penggabungan ke data pendorong, bergantung pada sifat data. Bigtable ideal untuk skenario keuangan dengan data pasar yang sering diakses sebagai pencarian nilai kunci dari Bigtable. Untuk mengetahui informasi selengkapnya tentang cara mendesain skema Bigtable, termasuk rekomendasi untuk menggunakan data deret waktu, lihat dokumentasi Bigtable berikut:

Menjalankan kode eksternal

Anda dapat menjalankan kode eksternal di Apache Beam dengan banyak cara.

  • Buat proses yang dipanggil dari objek DoFn di dalam transformasi Dataflow.

  • Gunakan JNI dengan Java SDK.

  • Buat subproses langsung dari objek DoFn. Meskipun pendekatan ini bukan yang paling efisien, pendekatan ini andal dan mudah diterapkan. Karena potensi masalah saat menggunakan JNI, halaman ini menunjukkan penggunaan panggilan subproses.

Saat mendesain alur kerja, pertimbangkan pipeline menyeluruh dari awal hingga akhir. Setiap inefisiensi dalam cara proses dijalankan diimbangi dengan fakta bahwa pergerakan data dari sumber hingga ke sink dilakukan dengan satu pipeline. Jika Anda membandingkan pendekatan ini dengan pendekatan lainnya, lihat waktu menyeluruh pipeline serta biaya menyeluruh.

Menarik biner ke host

Saat Anda menggunakan bahasa Apache Beam native, Apache Beam SDK otomatis memindahkan semua kode yang diperlukan ke pekerja. Namun, saat melakukan panggilan ke kode eksternal, Anda harus memindahkan kode secara manual.

File biner yang disimpan di bucket

Untuk memindahkan kode, lakukan tindakan berikut. Contoh ini menunjukkan langkah-langkah untuk Apache Beam Java SDK.

  1. Simpan kode eksternal yang dikompilasi, beserta informasi pembuatan versi, di Cloud Storage.
  2. Dalam metode @Setup, buat blok yang disinkronkan untuk memeriksa apakah file kode tersedia di resource lokal. Daripada menerapkan pemeriksaan fisik, Anda dapat mengonfirmasi ketersediaan menggunakan variabel statis saat thread pertama selesai.
  3. Jika file tidak tersedia, gunakan library klien Cloud Storage untuk mengambil file dari bucket Cloud Storage ke pekerja lokal. Pendekatan yang direkomendasikan adalah menggunakan class FileSystems Apache Beam untuk tugas ini.
  4. Setelah file dipindahkan, pastikan bit eksekusi ditetapkan pada file kode.
  5. Dalam sistem produksi, periksa hash biner untuk memastikan bahwa file telah disalin dengan benar.

Menggunakan fungsi filesToStage Apache Beam juga merupakan opsi, tetapi akan menghilangkan beberapa keuntungan dari kemampuan runner untuk mengemas dan memindahkan kode Java Anda secara otomatis. Selain itu, karena panggilan ke subproses memerlukan lokasi file absolut, Anda perlu menggunakan kode untuk menentukan jalur class, sehingga lokasi file dipindahkan oleh filesToStage. Kami tidak merekomendasikan pendekatan ini.

Menjalankan biner eksternal

Sebelum dapat menjalankan kode eksternal, Anda harus mem-build wrapper untuk kode tersebut. Tulis wrapper ini dalam bahasa yang sama dengan kode eksternal (misalnya, C++) atau sebagai skrip shell. Wrapper memungkinkan Anda meneruskan handle file dan menerapkan pengoptimalan seperti yang dijelaskan di bagian Pemrosesan desain untuk siklus CPU kecil di halaman ini. Wrapper Anda tidak perlu rumit. Cuplikan berikut menunjukkan garis besar wrapper di C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Kode ini membaca dua parameter dari daftar argumen. Parameter pertama adalah lokasi file return tempat data didorong. Parameter kedua adalah data yang ditampilkan kode kepada pengguna. Dalam penerapan di dunia nyata, kode ini akan melakukan lebih dari sekadar menampilkan "Hello, world".

Setelah Anda menulis kode wrapper, jalankan kode eksternal dengan melakukan hal berikut:

  1. Transmisikan data ke biner kode eksternal.
  2. Jalankan biner, tangkap error apa pun, dan catat error dan hasil ke dalam log.
  3. Menangani informasi logging.
  4. Ambil data dari pemrosesan yang telah selesai.

Mentransmisikan data ke biner

Untuk memulai proses menjalankan library, transmisikan data ke kode C++. Pada langkah ini, Anda dapat memanfaatkan integrasi Dataflow dengan alat Google Cloud lain. Alat seperti Bigtable dapat menangani set data yang sangat besar dan menangani akses latensi rendah dan konkurensi tinggi, yang memungkinkan ribuan core mengakses set data secara bersamaan. Selain itu, Bigtable dapat memproses data terlebih dahulu, sehingga memungkinkan pembentukan, pengayaan, dan pemfilteran data. Semua pekerjaan ini dapat dilakukan di transformasi Apache Beam sebelum Anda menjalankan kode eksternal.

Untuk sistem produksi, jalur yang direkomendasikan adalah menggunakan buffer protokol untuk mengenkapsulasi data input. Anda dapat mengonversi data input menjadi byte dan mengenkodenya dengan base64 sebelum meneruskannya ke library eksternal. Dua cara untuk meneruskan data ini ke library eksternal adalah sebagai berikut:

  • Data input kecil. Untuk data kecil yang tidak melebihi panjang maksimum sistem untuk argumen perintah, teruskan argumen di posisi 2 proses yang sedang dibuat dengan java.lang.ProcessBuilder.
  • Data input yang besar. Untuk ukuran data yang lebih besar, buat file yang namanya menyertakan UUID untuk memuat data yang diperlukan oleh proses.

Menjalankan kode C++, menangkap error, dan melakukan logging

Menangkap dan menangani informasi error adalah bagian penting dari pipeline Anda. Resource yang digunakan oleh runner Dataflow bersifat sementara, dan sering kali sulit untuk memeriksa file log pekerja. Anda harus memastikan bahwa Anda mengambil dan mengirim semua informasi yang berguna ke logging runner Dataflow, dan menyimpan data logging di satu atau beberapa bucket Cloud Storage.

Pendekatan yang direkomendasikan adalah mengalihkan stdout dan stderr ke file, yang memungkinkan Anda menghindari pertimbangan kehabisan memori. Misalnya, dalam runner Dataflow yang memanggil kode C++, Anda dapat menyertakan baris seperti berikut:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Menangani informasi logging

Banyak kasus penggunaan yang melibatkan pemrosesan jutaan elemen. Pemrosesan yang berhasil akan menghasilkan log dengan sedikit atau tanpa nilai, sehingga Anda harus membuat keputusan bisnis tentang mempertahankan data log. Misalnya, pertimbangkan alternatif berikut untuk mempertahankan semua data log:

  • Jika informasi yang terdapat dalam log dari pemrosesan elemen yang berhasil tidak berharga, jangan simpan.
  • Buat logika yang mengambil sampel data log, seperti mengambil sampel hanya setiap 10.000 entri log. Jika pemrosesan bersifat homogen, seperti saat banyak iterasi kode menghasilkan data log yang pada dasarnya identik, pendekatan ini memberikan keseimbangan yang efektif antara mempertahankan data log dan mengoptimalkan pemrosesan.

Untuk kondisi kegagalan, jumlah data yang di-dump ke log mungkin besar. Strategi yang efektif untuk menangani data log error dalam jumlah besar adalah dengan membaca beberapa baris pertama entri log dan hanya mengirim baris tersebut ke Cloud Logging. Anda dapat memuat sisa file log ke bucket Cloud Storage. Pendekatan ini memungkinkan Anda melihat baris pertama log error nanti, lalu, jika diperlukan, lihat Cloud Storage untuk melihat seluruh file.

Memeriksa ukuran file log juga berguna. Jika ukuran file nol, Anda dapat mengabaikannya dengan aman atau mencatat pesan log sederhana bahwa file tidak memiliki data.

Mengambil data dari pemrosesan yang telah selesai

Sebaiknya jangan gunakan stdout untuk meneruskan hasil komputasi kembali ke fungsi DoFn. Kode lain yang dipanggil kode C++ Anda, dan bahkan kode Anda sendiri, juga dapat mengirim pesan ke stdout, sehingga mencemari aliran stdoutput yang berisi data logging. Sebagai gantinya, praktik yang lebih baik adalah membuat perubahan pada kode wrapper C++ untuk memungkinkan kode menerima parameter yang menunjukkan tempat membuat file yang menyimpan nilai. Idealnya, file ini harus disimpan dengan cara yang netral bahasa menggunakan buffering protokol, yang memungkinkan kode C++ meneruskan objek kembali ke kode Java atau Python. Objek DoFn dapat membaca hasilnya langsung dari file dan meneruskan informasi hasil ke panggilan output-nya sendiri.

Pengalaman telah menunjukkan pentingnya menjalankan pengujian unit yang menangani proses itu sendiri. Penting untuk menerapkan pengujian unit yang menjalankan proses secara independen dari pipeline Dataflow. Proses debug library dapat dilakukan dengan jauh lebih efisien jika bersifat mandiri dan tidak harus menjalankan seluruh pipeline.

Mendesain pemrosesan untuk siklus CPU kecil

Memanggil subproses memiliki overhead. Bergantung pada beban kerja Anda, Anda mungkin perlu melakukan pekerjaan tambahan untuk mengurangi rasio antara pekerjaan yang dilakukan dan overhead administratif untuk memulai dan menghentikan proses.

Dalam kasus penggunaan media, ukuran elemen data pendorong mungkin dalam megabyte tinggi atau dalam gigabyte. Akibatnya, pemrosesan untuk setiap elemen data dapat memerlukan waktu beberapa menit. Dalam hal ini, biaya memanggil subproses tidak signifikan dibandingkan dengan waktu pemrosesan secara keseluruhan. Pendekatan terbaik dalam situasi ini adalah dengan membuat satu elemen memulai prosesnya sendiri.

Namun, dalam kasus penggunaan lainnya, seperti keuangan, pemrosesan memerlukan unit waktu CPU yang sangat kecil (puluhan milidetik). Dalam hal ini, overhead memanggil subproses tidak proporsional. Solusi untuk masalah ini adalah dengan menggunakan transformasi GroupByKey Apache Beam untuk membuat batch antara 50 dan 100 elemen yang akan dimasukkan ke dalam proses. Misalnya, Anda dapat mengikuti langkah-langkah berikut:

  • Dalam fungsi DoFn, buat key-value pair. Jika memproses transaksi keuangan, Anda dapat menggunakan nomor transaksi sebagai kunci. Jika tidak memiliki nomor unik untuk digunakan sebagai kunci, Anda dapat membuat checksum dari data dan menggunakan fungsi modulo untuk membuat partisi dari 50 elemen.
  • Kirim kunci ke fungsi GroupByKey.create, yang menampilkan koleksi KV<key,Iterable<data>> yang berisi 50 elemen yang kemudian dapat Anda kirim ke proses.

Membatasi paralelisme pekerja

Saat menggunakan bahasa yang didukung secara native di runner Dataflow, Anda tidak perlu memikirkan apa yang terjadi pada pekerja. Dataflow memiliki banyak proses yang mengawasi kontrol alur dan thread dalam mode batch atau streaming.

Namun, jika Anda menggunakan bahasa eksternal seperti C++, perhatikan bahwa Anda melakukan sesuatu yang sedikit tidak biasa dengan memulai subproses. Dalam mode batch, runner Dataflow menggunakan rasio kecil thread yang berfungsi ke CPU dibandingkan dengan mode streaming. Sebaiknya, terutama dalam mode streaming, Anda membuat semaphore dalam class untuk mengontrol paralelisme pekerja individual secara lebih langsung.

Misalnya, dengan pemrosesan media, Anda mungkin tidak ingin ratusan elemen transcoding diproses secara paralel oleh satu pekerja. Dalam kasus seperti itu, Anda dapat membuat class utilitas yang memberikan izin ke fungsi DoFn untuk pekerjaan yang sedang dilakukan. Dengan menggunakan class ini, Anda dapat mengambil kontrol langsung pada thread pekerja dalam pipeline.

Menggunakan sink data berkapasitas tinggi di Google Cloud

Setelah diproses, data akan dikirim ke sink data. Sink harus dapat menangani volume hasil yang dibuat oleh solusi pemrosesan grid Anda.

Diagram berikut menunjukkan beberapa sink yang tersedia di Google Cloud saat Dataflow menjalankan beban kerja petak.

Sink tersedia di Google Cloud

Bigtable, BigQuery, dan Pub/Sub dapat menangani aliran data yang sangat besar. Misalnya, setiap node Bigtable dapat menangani 10.000 penyisipan per detik dengan ukuran hingga 1 K dengan skalabilitas horizontal yang mudah. Akibatnya, cluster Bigtable dengan 100 node dapat menyerap 1.000.000 pesan per detik yang dihasilkan oleh petak Dataflow.

Mengelola segfault

Saat menggunakan kode C++ dalam pipeline, Anda perlu memutuskan cara mengelola segfault, karena memiliki konsekuensi non-lokal jika tidak ditangani dengan benar. Runner Dataflow membuat proses sesuai kebutuhan di Java, Python, atau Go, lalu menetapkan tugas ke proses dalam bentuk paket.

Jika panggilan ke kode C++ dilakukan menggunakan alat yang terikat erat, seperti JNI atau Cython, dan proses C++ mengalami segfault, proses panggilan dan Java Virtual Machine (JVM) juga akan error. Dalam skenario ini, titik data yang buruk tidak dapat ditangkap. Agar titik data buruk dapat dideteksi, gunakan pengaitan yang lebih longgar, yang akan memisahkan data buruk dan memungkinkan pipeline berlanjut. Namun, dengan kode C++ yang matang dan diuji sepenuhnya terhadap semua variasi data, Anda dapat menggunakan mekanisme seperti Cython.

Langkah berikutnya