Aplikasi contoh e-commerce menunjukkan praktik terbaik untuk menggunakan Dataflow guna menerapkan analisis data streaming dan AI real-time. Contoh ini berisi pola tugas yang menunjukkan cara terbaik untuk menyelesaikan tugas pemrograman Java. Tugas ini biasanya diperlukan untuk membuat aplikasi e-commerce.
Aplikasi ini berisi pola tugas Java berikut:
- Menggunakan skema Apache Beam untuk menangani data terstruktur
- Menggunakan JsonToRow untuk mengonversi data JSON
- Menggunakan generator kode
AutoValue
untuk membuat objek Java lama (POJO) - Menambahkan data yang tidak dapat diproses ke antrean untuk analisis lebih lanjut
- Menerapkan transformasi validasi data secara serial
- Menggunakan
DoFn.StartBundle
untuk membuat batch mikro panggilan ke layanan eksternal - Menggunakan pola input samping yang sesuai
Menggunakan skema Apache Beam untuk menangani data terstruktur
Anda dapat menggunakan skema Apache Beam untuk mempermudah pemrosesan data terstruktur.
Dengan mengonversi objek ke Baris, Anda dapat menghasilkan kode Java yang sangat rapi, sehingga memudahkan latihan pembuatan directed acyclic graph (DAG). Anda juga dapat mereferensikan properti objek sebagai kolom dalam pernyataan analisis yang Anda buat, tanpa harus memanggil metode.
Contoh
Menggunakan JsonToRow untuk mengonversi data JSON
Memproses string JSON di Dataflow adalah kebutuhan umum. Misalnya, string JSON diproses saat streaming informasi clickstream yang diambil dari aplikasi web. Untuk memproses string JSON, Anda perlu mengonversinya menjadi Baris atau objek Java biasa (POJO) selama pemrosesan pipeline.
Anda dapat menggunakan transformasi bawaan Apache Beam JsonToRow untuk mengonversi string JSON menjadi Baris. Namun, jika Anda menginginkan antrean untuk memproses pesan yang gagal, Anda harus mem-buildnya secara terpisah, lihat Menambahkan antrean data yang tidak dapat diproses untuk analisis lebih lanjut.
Jika Anda perlu mengonversi string JSON menjadi POJO menggunakan AutoValue,
daftarkan skema untuk jenis tersebut menggunakan
anotasi @DefaultSchema(AutoValueSchema.class)
, lalu gunakan
class utilitas
Convert. Kode yang dihasilkan mirip dengan berikut ini:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Untuk informasi selengkapnya, termasuk jenis Java yang berbeda yang dapat Anda gunakan untuk menyimpulkan skema, lihat Membuat Skema.
Jika JsonToRow tidak berfungsi dengan data Anda, Gson adalah alternatifnya. Gson cukup santai dalam pemrosesan data defaultnya, yang mungkin memerlukan Anda untuk membuat lebih banyak validasi ke dalam proses konversi data.
Contoh
Menggunakan generator kode AutoValue
untuk membuat POJO
Skema Apache Beam
sering kali merupakan cara terbaik untuk merepresentasikan objek dalam pipeline, karena cara skema tersebut
memungkinkan Anda menggunakan data terstruktur. Namun, terkadang
objek Java biasa (POJO)
diperlukan, seperti saat menangani objek nilai kunci atau menangani status objek.
Membuat POJO secara manual mengharuskan Anda membuat kode penggantian untuk metode equals()
dan
hashcode()
, yang dapat menghabiskan waktu dan rentan error. Penggantian yang salah
dapat menyebabkan perilaku aplikasi yang tidak konsisten atau kehilangan data.
Untuk membuat POJO, gunakan
pembuat class
AutoValue
. Opsi ini memastikan bahwa penggantian yang diperlukan digunakan dan
memungkinkan Anda menghindari potensi error.
AutoValue
banyak digunakan dalam codebase Apache Beam,
sehingga pemahaman tentang builder class ini berguna jika Anda ingin mengembangkan
pipeline Apache Beam di Dataflow menggunakan Java.
Anda juga dapat AutoValue
dengan skema Apache Beam jika menambahkan
anotasi @DefaultSchema(AutoValueSchema.class)
. Untuk mengetahui informasi selengkapnya, lihat
Membuat Skema.
Untuk mengetahui informasi selengkapnya tentang AutoValue
,
lihat Alasan AutoValue?
dan dokumen AutoValue
.
Contoh
Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut
Dalam sistem produksi, penting untuk menangani data yang bermasalah. Jika memungkinkan, Anda dapat memvalidasi dan memperbaiki data dalam streaming. Jika koreksi tidak memungkinkan, catat nilai ke antrean pesan yang belum diproses, yang terkadang disebut antrean pesan mati, untuk analisis nanti. Masalah biasanya terjadi saat mengonversi data dari satu format ke format lainnya, misalnya saat mengonversi string JSON menjadi Baris.
Untuk mengatasi masalah ini, gunakan transformasi multi-output untuk memindahkan elemen yang berisi data yang belum diproses ke PCollection lain untuk analisis lebih lanjut. Pemrosesan ini adalah operasi umum yang mungkin ingin Anda gunakan di banyak tempat dalam pipeline. Coba buat transformasi cukup umum untuk digunakan di beberapa tempat. Pertama, buat objek error untuk menggabungkan properti umum, termasuk data asli. Selanjutnya, buat transformasi sink yang memiliki beberapa opsi untuk tujuan.
Contoh
Menerapkan transformasi validasi data secara serial
Data yang dikumpulkan dari sistem eksternal sering kali perlu dibersihkan. Strukturkan pipeline Anda agar dapat memperbaiki data yang bermasalah dalam streaming jika memungkinkan. Kirim data ke antrean untuk analisis lebih lanjut jika diperlukan.
Karena satu pesan mungkin mengalami beberapa masalah yang perlu diperbaiki, rencanakan directed acyclic graph (DAG) yang diperlukan. Jika elemen berisi data dengan beberapa cacat, Anda harus memastikan bahwa elemen mengalami transformasi yang sesuai.
Misalnya, bayangkan elemen dengan nilai berikut, yang tidak boleh null:
{"itemA": null,"itemB": null}
Pastikan elemen mengalir melalui transformasi yang memperbaiki kedua potensi masalah:
badElements.apply(fixItemA).apply(fixItemB)
Pipeline Anda mungkin memiliki lebih banyak langkah serial, tetapi fusi membantu meminimalkan overhead pemrosesan yang diperkenalkan.
Contoh
Menggunakan DoFn.StartBundle
untuk membuat batch mikro panggilan ke layanan eksternal
Anda mungkin perlu memanggil API eksternal sebagai bagian dari pipeline. Karena pipeline mendistribusikan pekerjaan di banyak resource komputasi, membuat satu panggilan untuk setiap elemen yang mengalir melalui sistem dapat membebani endpoint layanan eksternal. Masalah ini sangat umum terjadi jika Anda belum menerapkan fungsi pengurangan apa pun.
Untuk menghindari masalah ini, lakukan panggilan secara batch ke sistem eksternal.
Anda dapat mengelompokkan panggilan menggunakan transformasi GroupByKey
atau menggunakan
Apache Beam Timer API. Namun, kedua pendekatan ini memerlukan
pengacakan, yang
memperkenalkan beberapa overhead pemrosesan dan kebutuhan akan
angka ajaib
untuk menentukan ruang kunci.
Sebagai gantinya, gunakan elemen siklus proses
StartBundle
dan
FinishBundle
untuk mengelompokkan data Anda. Dengan opsi ini, pengacakan tidak diperlukan.
Satu kelemahan kecil pada opsi ini adalah ukuran paket ditentukan secara dinamis oleh implementasi runner berdasarkan apa yang saat ini terjadi di dalam pipeline dan pekerjanya. Dalam mode streaming, ukuran paket sering kali kecil. Penggabungan dataflow dipengaruhi oleh faktor backend seperti penggunaan sharding, jumlah data yang tersedia untuk kunci tertentu, dan throughput pipeline.
Contoh
EventItemCorrectionService.java
Menggunakan pola input samping yang sesuai untuk pengayaan data
Dalam aplikasi analisis streaming, data sering kali diperkaya dengan informasi tambahan yang mungkin berguna untuk analisis lebih lanjut. Misalnya, jika memiliki ID toko untuk transaksi, Anda dapat menambahkan informasi tentang lokasi toko. Informasi tambahan ini sering ditambahkan dengan mengambil elemen dan memasukkan informasi dari tabel pencarian.
Untuk tabel pemeta yang berubah secara perlahan dan berukuran lebih kecil,
memasukkan tabel ke dalam pipeline sebagai class singleton yang
menerapkan antarmuka Map<K,V>
berfungsi dengan baik. Opsi ini memungkinkan Anda menghindari
setiap elemen melakukan panggilan API untuk pencariannya. Setelah menyertakan salinan tabel
dalam pipeline, Anda perlu memperbaruinya secara berkala agar tetap aktual.
Untuk menangani input samping yang diperbarui secara lambat, gunakan Pola input samping Apache Beam.
Menyimpan data ke dalam cache
Input samping dimuat dalam memori sehingga di-cache secara otomatis.
Anda dapat menetapkan ukuran cache menggunakan opsi --setWorkerCacheMb
.
Anda dapat membagikan cache di seluruh instance DoFn
dan menggunakan pemicu eksternal untuk memuat ulang cache.
Contoh
SlowMovingStoreLocationDimension.java