Aplikasi contoh e-commerce menunjukkan praktik terbaik penggunaan Dataflow untuk mengimplementasikan analisis data streaming dan AI real-time. Contoh ini berisi pola tugas yang menunjukkan cara terbaik untuk menyelesaikan tugas pemrograman Java. Tugas ini biasanya diperlukan untuk membuat aplikasi e-commerce.
Aplikasi ini berisi pola tugas Java berikut:
- Menggunakan skema Apache Beam untuk menangani data terstruktur
- Menggunakan JsonToRow untuk mengonversi data JSON
- Gunakan generator kode
AutoValue
untuk menghasilkan objek Java lama (POJO) biasa - Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut
- Menerapkan transformasi validasi data secara berurutan
- Menggunakan
DoFn.StartBundle
untuk melakukan panggilan batch mikro ke layanan eksternal - Gunakan pola side-input yang sesuai
Menggunakan skema Apache Beam untuk bekerja dengan data terstruktur
Anda dapat menggunakan skema Apache Beam untuk mempermudah pemrosesan data terstruktur.
Dengan mengonversi objek menjadi Rows, Anda dapat menghasilkan kode Java yang sangat bersih, yang mempermudah latihan pembuatan directed acyclic graph (DAG). Anda juga dapat mereferensikan properti objek sebagai kolom dalam pernyataan analisis yang Anda buat, daripada harus memanggil metode.
Contoh
Menggunakan JsonToRow untuk mengonversi data JSON
Pemrosesan string JSON di Dataflow adalah kebutuhan umum. Misalnya, string JSON diproses saat melakukan streaming informasi clickstream yang diambil dari aplikasi web. Untuk memproses string JSON, Anda harus mengonversinya menjadi Baris atau objek Java lama biasa (POJO) selama pemrosesan pipeline.
Anda dapat menggunakan transformasi bawaan Apache Beam JsonToRow untuk mengonversi string JSON menjadi Baris. Namun, jika Anda menginginkan antrean untuk memproses pesan yang gagal, Anda harus mem-build-nya secara terpisah. Lihat Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut.
Jika Anda perlu mengonversi string JSON menjadi POJO menggunakan AutoValue,
daftarkan skema untuk jenis tersebut menggunakan
anotasi @DefaultSchema(AutoValueSchema.class)
, lalu gunakan
class utilitas
Convert. Kode yang dihasilkan mirip dengan kode berikut:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Untuk informasi selengkapnya, termasuk jenis Java lain yang dapat Anda gunakan untuk menyimpulkan skema, lihat Membuat Skema.
Jika JsonToRow tidak dapat menangani data Anda, Gson adalah alternatifnya. Gson cukup santai dalam pemrosesan data default, yang mungkin mengharuskan Anda membuat lebih banyak validasi ke dalam proses konversi data.
Contoh
Gunakan generator kode AutoValue
untuk menghasilkan POJO
Skema Apache Beam
sering kali merupakan cara terbaik untuk merepresentasikan objek dalam pipeline, karena caranya
skema tersebut
memungkinkan Anda menangani data terstruktur. Meskipun demikian, terkadang
objek Java lama (POJO) biasa
diperlukan, seperti saat menangani objek nilai kunci atau menangani status objek.
Pembuatan POJO secara manual mengharuskan Anda melakukan penggantian kode untuk metode equals()
dan
hashcode()
, yang dapat memakan waktu dan rentan error. Penggantian yang salah
dapat mengakibatkan perilaku aplikasi atau kehilangan data yang tidak konsisten.
Untuk membuat POJO, gunakan
builder
class
AutoValue
. Opsi ini memastikan penggantian yang diperlukan digunakan dan
memungkinkan Anda menghindari potensi error.
AutoValue
sangat banyak digunakan dalam codebase Apache Beam,
sehingga pemahaman tentang builder class ini berguna jika Anda ingin mengembangkan
pipeline Apache Beam pada Dataflow menggunakan Java.
Anda juga dapat melakukan AutoValue
dengan skema Apache Beam jika menambahkan
anotasi @DefaultSchema(AutoValueSchema.class)
. Untuk mengetahui informasi selengkapnya, lihat
Membuat Skema.
Untuk mengetahui informasi selengkapnya tentang AutoValue
,
lihat Mengapa AutoValue?
dan dokumen AutoValue
.
Contoh
Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut
Dalam sistem produksi, penting untuk menangani data yang bermasalah. Jika memungkinkan, Anda perlu memvalidasi dan memperbaiki data in-stream. Jika koreksi tidak dapat dilakukan, catat nilai ke antrean pesan yang belum diproses, yang terkadang disebut antrean wajib dihentikan, untuk dianalisis di lain waktu. Masalah biasanya terjadi saat mengonversi data dari satu format ke format lainnya, misalnya saat mengonversi string JSON menjadi Baris.
Untuk mengatasi masalah ini, gunakan transformasi multi-output untuk memindahkan elemen yang berisi data yang belum diproses ke PCollection lain untuk dianalisis lebih lanjut. Pemrosesan ini adalah operasi umum yang mungkin ingin Anda gunakan di banyak tempat dalam pipeline. Coba buat transformasi yang cukup umum untuk digunakan di beberapa tempat. Pertama, buat objek error untuk menggabungkan properti umum, termasuk data asli. Selanjutnya, buat transformasi sink yang memiliki beberapa opsi untuk tujuan.
Contoh
Menerapkan transformasi validasi data secara berurutan
Data yang dikumpulkan dari sistem eksternal sering kali perlu dibersihkan. Susun pipeline Anda agar dapat memperbaiki data in-stream yang bermasalah jika memungkinkan. Jika diperlukan, kirim data ke antrean untuk analisis lebih lanjut.
Karena satu pesan mungkin mengalami beberapa masalah yang perlu diperbaiki, rencanakan directed acyclic graph (DAG) yang diperlukan. Jika elemen berisi data dengan beberapa kerusakan, Anda harus memastikan bahwa elemen mengalir melalui transformasi yang sesuai.
Misalnya, bayangkan elemen dengan nilai berikut, yang keduanya tidak boleh bernilai null:
{"itemA": null,"itemB": null}
Pastikan elemen mengalir melalui transformasi yang memperbaiki kedua potensi masalah:
badElements.apply(fixItemA).apply(fixItemB)
Pipeline Anda mungkin memiliki lebih banyak langkah serial, tetapi fusion membantu meminimalkan overhead pemrosesan yang diperkenalkan.
Contoh
Gunakan DoFn.StartBundle
untuk melakukan batch kecil panggilan ke layanan eksternal
Anda mungkin perlu memanggil API eksternal sebagai bagian dari pipeline. Karena pipeline mendistribusikan pekerjaan ke banyak resource komputasi, membuat satu panggilan untuk setiap elemen yang mengalir melalui sistem dapat membebani endpoint layanan eksternal. Masalah ini sangat umum terjadi jika Anda belum menerapkan fungsi pengurangan apa pun.
Untuk menghindari masalah ini, lakukan panggilan batch ke sistem eksternal.
Anda dapat mengelompokkan panggilan menggunakan transformasi GroupByKey
atau menggunakan Apache Beam Timer API. Namun, pendekatan ini memerlukan
shuffling, yang
memperkenalkan beberapa overhead pemrosesan dan kebutuhan
angka ajaib
untuk menentukan ruang kunci.
Sebagai gantinya, gunakan elemen siklus proses
StartBundle
dan
FinishBundle
untuk mengelompokkan data Anda. Dengan opsi ini, tidak perlu mengacak.
Satu kelemahan kecil dari opsi ini adalah ukuran paket ditentukan secara dinamis oleh penerapan runner berdasarkan pada apa yang saat ini terjadi di dalam pipeline dan pekerjanya. Dalam mode streaming, paket sering kali berukuran kecil. Pemaketan Dataflow dipengaruhi oleh faktor backend seperti penggunaan sharding, jumlah data yang tersedia untuk kunci tertentu, dan throughput pipeline.
Contoh
EventItemCorrectionService.java
Menggunakan pola side-input yang sesuai untuk pengayaan data
Dalam aplikasi analisis streaming, data sering kali diperkaya dengan informasi tambahan yang mungkin berguna untuk analisis lebih lanjut. Misalnya, jika memiliki ID toko untuk transaksi, sebaiknya tambahkan informasi tentang lokasi toko. Informasi tambahan ini sering kali ditambahkan dengan mengambil elemen dan membawa informasi dari tabel pemeta.
Untuk tabel pencarian yang perlahan berubah dan berukuran lebih kecil, memasukkan tabel ke dalam pipeline sebagai class singleton yang mengimplementasikan antarmuka Map<K,V>
akan berfungsi dengan baik. Opsi ini memungkinkan Anda agar
setiap elemen tidak melakukan panggilan API untuk pencariannya. Setelah menyertakan salinan tabel
di pipeline, Anda harus memperbaruinya secara berkala agar tetap terbaru.
Untuk menangani input samping yang lambat diperbarui, gunakan Pola input samping Apache Beam.
Menyimpan data ke dalam cache
Input samping dimuat dalam memori dan oleh karena itu di-cache secara otomatis.
Anda dapat menyetel ukuran cache menggunakan opsi --setWorkerCacheMb
.
Anda dapat membagikan cache ke seluruh instance DoFn
dan menggunakan pemicu eksternal untuk memperbarui cache.
Contoh
SlowMovingStoreLocationDimension.java