Halaman ini memberikan panduan tentang praktik terbaik yang harus diikuti saat membangun dan menjalankan alur kerja paralel tinggi HPC Dataflow, termasuk cara menggunakan kode eksternal di pipeline, cara menjalankan pipeline, dan cara mengelola penanganan error.
Menyertakan kode eksternal dalam pipeline Anda
Pembeda utama untuk pipeline yang sangat paralel adalah bahwa pipeline tersebut menggunakan kode C++ dalam DoFn
, bukan salah satu bahasa SDK Apache Beam standar. Untuk pipeline Java, agar lebih mudah menggunakan library C++ dalam pipeline, sebaiknya gunakan panggilan prosedur eksternal. Bagian ini menjelaskan pendekatan umum yang digunakan untuk menjalankan kode eksternal (C++) di pipeline Java.
Definisi pipeline Apache Beam memiliki beberapa komponen utama:
PCollections
adalah kumpulan elemen homogen yang tidak dapat diubah.PTransforms
digunakan untuk menentukan transformasi kePCollection
yang menghasilkanPCollection
lain.- Pipeline adalah konstruksi yang memungkinkan Anda, melalui kode, mendeklarasikan
interaksi antara
PTransforms
danPCollections
. Pipeline direpresentasikan sebagai directed acyclic graph (DAG).
Saat menggunakan kode dari bahasa yang bukan salah satu bahasa SDK Apache Beam standar, tempatkan kode di PTransform
, yang berada dalam DoFn
, dan gunakan salah satu bahasa SDK standar untuk menentukan pipeline itu sendiri.
Sebaiknya gunakan Apache Beam Python SDK untuk menentukan pipeline, karena Python SDK memiliki class utilitas yang menyederhanakan penggunaan kode lain. Namun, Anda dapat menggunakan Apache Beam SDK lainnya.
Anda dapat menggunakan kode untuk melakukan eksperimen cepat tanpa memerlukan build lengkap. Untuk sistem produksi, Anda biasanya membuat biner sendiri, yang memberi Anda kebebasan untuk menyesuaikan proses sesuai kebutuhan Anda.
Diagram berikut mengilustrasikan dua penggunaan data pipeline:
- Data digunakan untuk mendorong proses.
- Data diperoleh selama pemrosesan dan digabungkan dengan data pengemudi.
Di halaman ini, data utama (dari sumber) disebut sebagai data penggerak, dan data sekunder (dari fase pemrosesan) disebut sebagai data gabungan.
Dalam kasus penggunaan keuangan, data pendorongnya mungkin berupa beberapa ratus ribu transaksi. Setiap transaksi perlu diproses bersama dengan data pasar. Dalam hal ini, data pasar adalah data penggabungan. Dalam kasus penggunaan media, data penggerak mungkin berupa file gambar yang memerlukan pemrosesan, tetapi tidak memerlukan sumber data lain, dan oleh karena itu tidak menggunakan data gabungan.
Pertimbangan ukuran untuk data mengemudi
Jika ukuran elemen data penggerak berada dalam rentang megabyte rendah,
perlakukan dengan paradigma Apache Beam normal untuk membuat objek PCollection
dari
sumber dan mengirim objek ke transformasi Apache Beam untuk diproses.
Jika ukuran elemen data penggerak berada dalam megabyte tinggi atau dalam
gigabyte, seperti yang biasa terjadi pada media, Anda dapat memasukkan data penggerak ke
Cloud Storage. Kemudian, dalam objek PCollection
awal, referensikan
URI penyimpanan, dan hanya referensi URI ke data tersebut yang digunakan.
Pertimbangan ukuran untuk menggabungkan data
Jika data gabungan berukuran beberapa ratus megabyte atau kurang, gunakan input samping untuk mendapatkan data ini ke transformasi Apache Beam. Input samping mengirimkan paket data ke setiap pekerja yang membutuhkannya.
Jika data gabungan berukuran gigabyte atau terabyte, gunakan Bigtable atau Cloud Storage untuk menggabungkan data gabungan ke data penggerak, bergantung pada sifat data. Bigtable sangat ideal untuk skenario keuangan di mana data pasar sering diakses sebagai pencarian key-value dari Bigtable. Untuk mengetahui informasi selengkapnya tentang mendesain skema Bigtable, termasuk rekomendasi untuk menangani data deret waktu, lihat dokumentasi Bigtable berikut:
Menjalankan kode eksternal
Anda dapat menjalankan kode eksternal di Apache Beam dengan berbagai cara.
Buat proses yang dipanggil dari objek
DoFn
di dalam transformasi Dataflow.Gunakan JNI dengan Java SDK.
Buat subproses langsung dari objek
DoFn
. Meskipun pendekatan ini bukan yang paling efisien, pendekatan ini kuat dan mudah diterapkan. Karena potensi masalah saat menggunakan JNI, halaman ini menunjukkan penggunaan panggilan subproses.
Saat mendesain alur kerja, pertimbangkan pipeline end-to-end yang lengkap. Ketidakefisienan dalam cara proses dijalankan diimbangi oleh fakta bahwa pergerakan data dari sumber hingga ke tujuan dilakukan dengan satu pipeline. Jika Anda membandingkan pendekatan ini dengan pendekatan lainnya, lihat waktu end-to-end pipeline serta biaya end-to-end.
Tarik biner ke host
Saat Anda menggunakan bahasa Apache Beam native, Apache Beam SDK akan memindahkan semua kode yang diperlukan ke pekerja secara otomatis. Namun, saat melakukan panggilan ke kode eksternal, Anda harus memindahkan kode secara manual.
Untuk memindahkan kode, lakukan hal berikut. Contoh ini menunjukkan langkah-langkah untuk Apache Beam Java SDK.
- Simpan kode eksternal yang dikompilasi, beserta informasi pembuatan versi, di Cloud Storage.
- Dalam metode
@Setup
, buat blok yang disinkronkan untuk memeriksa apakah file kode tersedia di resource lokal. Daripada menerapkan pemeriksaan fisik, Anda dapat mengonfirmasi ketersediaan menggunakan variabel statis saat thread pertama selesai. - Jika file tidak tersedia, gunakan library klien Cloud Storage untuk menarik file dari bucket Cloud Storage ke pekerja lokal. Pendekatan
yang direkomendasikan adalah menggunakan class
FileSystems
Apache Beam untuk tugas ini. - Setelah file dipindahkan, konfirmasi bahwa bit eksekusi disetel pada file kode.
- Dalam sistem produksi, periksa hash biner untuk memastikan file telah disalin dengan benar.
Penggunaan fungsi
filesToStage
Apache Beam
juga merupakan opsi, tetapi hal ini menghilangkan beberapa keuntungan dari
kemampuan runner untuk mengemas dan memindahkan kode Java Anda secara otomatis. Selain itu,
karena panggilan ke subproses memerlukan lokasi file absolut, Anda harus
menggunakan kode untuk menentukan jalur class dan oleh karena itu, lokasi file
yang dipindahkan oleh filesToStage
. Kami tidak merekomendasikan pendekatan ini.
Menjalankan biner eksternal
Sebelum dapat menjalankan kode eksternal, Anda harus membuat wrapper untuk kode tersebut. Tulis wrapper ini dalam bahasa yang sama dengan kode eksternal (misalnya, C++) atau sebagai skrip shell. Wrapper memungkinkan Anda meneruskan handle file dan menerapkan pengoptimalan seperti yang dijelaskan di bagian Merancang pemrosesan untuk siklus CPU kecil di halaman ini. Pembungkus Anda tidak harus canggih. Cuplikan berikut menunjukkan garis batas wrapper di C++.
int main(int argc, char* argv[])
{
if(argc < 3){
std::cerr << "Required return file and data to process" << '\n';
return 1;
}
std::string returnFile = argv[1];
std::string word = argv[2];
std::ofstream myfile;
myfile.open (returnFile);
myfile << word;
myfile.close();
return 0;
}
Kode ini membaca dua parameter dari daftar argumen. Parameter pertama adalah lokasi file yang ditampilkan tempat data dikirim. Parameter kedua adalah data yang di-echo kode kepada pengguna. Dalam penerapan di dunia nyata, kode ini akan melakukan lebih dari sekadar mengulang "Hello, world".
Setelah menulis kode wrapper, jalankan kode eksternal dengan melakukan hal berikut:
- Mengirimkan data ke biner kode eksternal.
- Jalankan biner, tangkap error apa pun, dan catat error dan hasilnya.
- Tangani informasi logging.
- Ambil data dari pemrosesan yang telah selesai.
Mentransmisikan data ke biner
Untuk memulai proses menjalankan library, kirimkan data ke kode C++. Pada langkah ini, Anda dapat memanfaatkan integrasi Dataflow dengan alat Google Cloud Platform lainnya. Alat seperti Bigtable dapat menangani set data yang sangat besar serta menangani akses dengan latensi rendah dan konkurensi tinggi, sehingga memungkinkan ribuan core mengakses set data secara bersamaan. Selain itu, Bigtable dapat memproses data terlebih dahulu, sehingga memungkinkan pembentukan, pengayaan, dan pemfilteran data. Semua pekerjaan ini dapat dilakukan di transformasi Apache Beam sebelum Anda menjalankan kode eksternal.
Untuk sistem produksi, jalur yang direkomendasikan adalah menggunakan buffer protokol untuk mengenkapsulasi data input. Anda dapat mengonversi data input menjadi byte dan mengenkode base64 sebelum meneruskannya ke library eksternal. Dua cara untuk meneruskan data ini ke library eksternal adalah sebagai berikut:
- Data input kecil. Untuk data kecil yang tidak melebihi panjang maksimum sistem untuk argumen perintah, teruskan argumen di posisi 2 dari proses yang dibuat dengan
java.lang.ProcessBuilder
. - Data input besar. Untuk ukuran data yang lebih besar, buat file yang namanya menyertakan UUID untuk memuat data yang diperlukan oleh proses.
Menjalankan kode C++, menangkap error, dan mencatat
Merekam dan menangani informasi error adalah bagian penting dari pipeline Anda. Resource yang digunakan oleh runner Dataflow bersifat sementara, dan sering kali sulit untuk memeriksa file log pekerja. Anda harus memastikan bahwa Anda merekam dan mengirim semua informasi yang berguna ke logging runner Dataflow, dan bahwa Anda menyimpan data logging di satu atau beberapa bucket Cloud Storage.
Pendekatan yang direkomendasikan adalah mengalihkan stdout
dan stderr
ke file, yang memungkinkan Anda menghindari pertimbangan kehabisan memori. Misalnya, di
runner Dataflow yang memanggil kode C++, Anda dapat menyertakan baris
seperti berikut:
Java
import java.lang.ProcessBuilder.Redirect;
...
processbuilder.redirectError(Redirect.appendTo(errfile));
processbuilder.redirectOutput(Redirect.appendTo(outFile));
Python
# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
integers
| beam.Map(collatz.total_stopping_time).with_exception_handling(
use_subprocess=True))
# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
os.path.splitext(output_path)[0] + '-bad.txt')
Menangani informasi logging
Banyak kasus penggunaan melibatkan pemrosesan jutaan elemen. Pemrosesan yang berhasil menghasilkan log dengan sedikit atau tanpa nilai, jadi Anda harus membuat keputusan bisnis tentang penyimpanan data log. Misalnya, pertimbangkan alternatif berikut untuk mempertahankan semua data log:
- Jika informasi yang terdapat dalam log dari pemrosesan elemen yang berhasil tidak berharga, jangan simpan.
- Buat logika yang mengambil sampel data log, seperti hanya mengambil sampel setiap 10.000 entri log. Jika pemrosesan bersifat homogen, seperti saat banyak iterasi kode menghasilkan data log yang pada dasarnya identik, pendekatan ini memberikan keseimbangan yang efektif antara mempertahankan data log dan mengoptimalkan pemrosesan.
Untuk kondisi kegagalan, jumlah data yang di-dump ke log mungkin besar. Strategi yang efektif untuk menangani data log error dalam jumlah besar adalah dengan membaca beberapa baris pertama entri log dan mengirimkan hanya baris tersebut ke Cloud Logging. Anda dapat memuat sisa file log ke dalam bucket Cloud Storage. Dengan pendekatan ini, Anda dapat melihat baris pertama log error nanti dan, jika perlu, merujuk ke Cloud Storage untuk seluruh file.
Memeriksa ukuran file log juga berguna. Jika ukuran file nol, Anda dapat mengabaikannya dengan aman atau mencatat pesan log sederhana bahwa file tidak memiliki data.
Merekam data dari pemrosesan yang telah selesai
Sebaiknya jangan gunakan stdout
untuk meneruskan hasil komputasi
kembali ke fungsi DoFn
. Kode lain yang dipanggil oleh kode C++ Anda, dan bahkan kode Anda sendiri, juga dapat mengirim pesan ke stdout
, sehingga mencemari aliran stdout
yang seharusnya berisi data logging.stdoutput
Sebagai gantinya, praktik terbaiknya adalah
melakukan perubahan pada kode wrapper C++ untuk memungkinkan kode menerima parameter
yang menunjukkan tempat membuat file yang menyimpan nilai. Idealnya, file ini harus disimpan dengan cara yang netral bahasa menggunakan buffer protokol, yang memungkinkan kode C++ meneruskan objek kembali ke kode Java atau Python. Objek DoFn
dapat membaca hasil langsung dari file dan meneruskan informasi hasil
ke panggilan output
-nya sendiri.
Pengalaman telah menunjukkan pentingnya menjalankan pengujian unit yang menangani proses itu sendiri. Penting untuk menerapkan pengujian unit yang menjalankan proses secara independen dari pipeline Dataflow. Proses men-debug library dapat dilakukan dengan lebih efisien jika library tersebut bersifat mandiri dan tidak harus menjalankan seluruh pipeline.
Mendesain pemrosesan untuk siklus CPU kecil
Memanggil subproses memiliki overhead. Bergantung pada workload, Anda mungkin perlu melakukan pekerjaan tambahan untuk mengurangi rasio antara pekerjaan yang sedang dilakukan dan overhead administratif untuk memulai dan menghentikan proses.
Dalam kasus penggunaan media, ukuran elemen data penggerak mungkin dalam megabyte tinggi atau dalam gigabyte. Akibatnya, pemrosesan setiap elemen data dapat memakan waktu beberapa menit. Dalam hal ini, biaya pemanggilan subproses tidak signifikan dibandingkan dengan waktu pemrosesan keseluruhan. Pendekatan terbaik dalam situasi ini adalah membuat satu elemen memulai prosesnya sendiri.
Namun, dalam kasus penggunaan lain, seperti keuangan, pemrosesan memerlukan unit waktu CPU yang sangat kecil (puluhan milidetik). Dalam hal ini, overhead pemanggilan subproses terlalu besar. Solusi untuk masalah ini adalah menggunakan transformasi GroupByKey
Apache Beam untuk membuat batch antara 50 dan 100 elemen yang akan dimasukkan ke dalam proses. Misalnya, Anda dapat mengikuti langkah-langkah berikut:
- Dalam fungsi
DoFn
, buat pasangan nilai kunci. Jika Anda memproses transaksi keuangan, Anda dapat menggunakan nomor transaksi sebagai kunci. Jika tidak memiliki nomor unik untuk digunakan sebagai kunci, Anda dapat membuat checksum dari data dan menggunakan fungsi modulo untuk membuat partisi 50 elemen. - Kirim kunci ke fungsi
GroupByKey.create
, yang menampilkan koleksiKV<key,Iterable<data>>
yang berisi 50 elemen yang kemudian dapat Anda kirim ke proses.
Membatasi paralelisme pekerja
Saat Anda bekerja dengan bahasa yang didukung secara native di runner Dataflow, Anda tidak perlu memikirkan apa yang terjadi pada pekerja. Dataflow memiliki banyak proses yang mengawasi kontrol alur dan thread dalam mode batch atau streaming.
Namun, jika Anda menggunakan bahasa eksternal seperti C++, perlu diketahui bahwa Anda melakukan sesuatu yang sedikit tidak biasa dengan memulai subproses. Dalam mode batch, runner Dataflow menggunakan rasio kecil thread kerja terhadap CPU dibandingkan dengan mode streaming. Sebaiknya, terutama dalam mode streaming, Anda membuat semaphore dalam class untuk mengontrol paralelisme setiap pekerja secara lebih langsung.
Misalnya, dengan pemrosesan media, Anda mungkin tidak ingin ratusan elemen transkode diproses secara paralel oleh satu pekerja. Dalam kasus seperti itu,
Anda dapat membuat class utilitas yang memberikan izin ke fungsi DoFn
untuk
pekerjaan yang sedang dilakukan. Dengan menggunakan class ini, Anda dapat mengontrol langsung
thread pekerja dalam pipeline.
Menggunakan sink data berkapasitas tinggi di Google Cloud Platform
Setelah diproses, data dikirim ke tujuan data. Sink harus dapat menangani volume hasil yang dibuat oleh solusi pemrosesan petak Anda.
Diagram berikut menunjukkan beberapa sink yang tersedia di Google Cloud Platform saat Dataflow menjalankan workload petak.
Bigtable, BigQuery, dan Pub/Sub dapat menangani aliran data yang sangat besar. Misalnya, setiap node Bigtable dapat menangani 10.000 penyisipan per detik dengan ukuran hingga 1 KB dengan skalabilitas horizontal yang mudah. Hasilnya, cluster Bigtable 100 node dapat menyerap 1.000.000 pesan per detik yang dihasilkan oleh petak Dataflow.
Mengelola segfault
Saat menggunakan kode C++ dalam pipeline, Anda perlu memutuskan cara mengelola kesalahan segmentasi, karena kesalahan tersebut memiliki konsekuensi non-lokal jika tidak ditangani dengan benar. Runner Dataflow membuat proses sesuai kebutuhan di Java, Python, atau Go, lalu menetapkan tugas ke proses dalam bentuk paket.
Jika panggilan ke kode C++ dilakukan menggunakan alat yang terhubung erat, seperti JNI atau Cython, dan proses C++ mengalami segfault, proses panggilan dan Java Virtual Machine (JVM) juga akan mengalami error. Dalam skenario ini, titik data yang buruk tidak dapat ditangkap. Untuk membuat titik data yang buruk dapat terdeteksi, gunakan coupling yang lebih longgar, yang memisahkan data yang buruk dan memungkinkan pipeline berlanjut. Namun, dengan kode C++ yang matang dan diuji sepenuhnya terhadap semua variasi data, Anda dapat menggunakan mekanisme seperti Cython.
Langkah berikutnya
Ikuti tutorial untuk membuat pipeline yang menggunakan container kustom dengan library C++.
Lihat kode contoh untuk halaman ini di repositori GitHub Apache Beam.
Pelajari lebih lanjut cara membuat pipeline dengan Apache Beam.