Pola tugas Java

Aplikasi contoh e-commerce menunjukkan praktik terbaik untuk menggunakan Dataflow guna menerapkan analisis data streaming dan AI real-time. Contoh ini berisi pola tugas yang menunjukkan cara terbaik untuk menyelesaikan tugas pemrograman Java. Tugas ini biasanya diperlukan untuk membuat aplikasi e-commerce.

Aplikasi ini berisi pola tugas Java berikut:

Menggunakan skema Apache Beam untuk menangani data terstruktur

Anda dapat menggunakan skema Apache Beam untuk mempermudah pemrosesan data terstruktur.

Dengan mengonversi objek ke Baris, Anda dapat menghasilkan kode Java yang sangat rapi, sehingga memudahkan latihan pembuatan directed acyclic graph (DAG). Anda juga dapat mereferensikan properti objek sebagai kolom dalam pernyataan analisis yang Anda buat, tanpa harus memanggil metode.

Contoh

CountViewsPerProduct.java

Menggunakan JsonToRow untuk mengonversi data JSON

Memproses string JSON di Dataflow adalah kebutuhan umum. Misalnya, string JSON diproses saat streaming informasi clickstream yang diambil dari aplikasi web. Untuk memproses string JSON, Anda perlu mengonversinya menjadi Baris atau objek Java biasa (POJO) selama pemrosesan pipeline.

Anda dapat menggunakan transformasi bawaan Apache Beam JsonToRow untuk mengonversi string JSON menjadi Baris. Namun, jika Anda menginginkan antrean untuk memproses pesan yang gagal, Anda harus mem-buildnya secara terpisah, lihat Menambahkan antrean data yang tidak dapat diproses untuk analisis lebih lanjut.

Jika Anda perlu mengonversi string JSON menjadi POJO menggunakan AutoValue, daftarkan skema untuk jenis tersebut menggunakan anotasi @DefaultSchema(AutoValueSchema.class), lalu gunakan class utilitas Convert. Kode yang dihasilkan mirip dengan berikut ini:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Untuk informasi selengkapnya, termasuk jenis Java yang berbeda yang dapat Anda gunakan untuk menyimpulkan skema, lihat Membuat Skema.

Jika JsonToRow tidak berfungsi dengan data Anda, Gson adalah alternatifnya. Gson cukup santai dalam pemrosesan data defaultnya, yang mungkin memerlukan Anda untuk membuat lebih banyak validasi ke dalam proses konversi data.

Contoh

Menggunakan generator kode AutoValue untuk membuat POJO

Skema Apache Beam sering kali merupakan cara terbaik untuk merepresentasikan objek dalam pipeline, karena cara skema tersebut memungkinkan Anda menggunakan data terstruktur. Namun, terkadang objek Java biasa (POJO) diperlukan, seperti saat menangani objek nilai kunci atau menangani status objek. Membuat POJO secara manual mengharuskan Anda membuat kode penggantian untuk metode equals() dan hashcode(), yang dapat menghabiskan waktu dan rentan error. Penggantian yang salah dapat menyebabkan perilaku aplikasi yang tidak konsisten atau kehilangan data.

Untuk membuat POJO, gunakan pembuat class AutoValue. Opsi ini memastikan bahwa penggantian yang diperlukan digunakan dan memungkinkan Anda menghindari potensi error. AutoValue banyak digunakan dalam codebase Apache Beam, sehingga pemahaman tentang builder class ini berguna jika Anda ingin mengembangkan pipeline Apache Beam di Dataflow menggunakan Java.

Anda juga dapat AutoValue dengan skema Apache Beam jika menambahkan anotasi @DefaultSchema(AutoValueSchema.class). Untuk mengetahui informasi selengkapnya, lihat Membuat Skema.

Untuk mengetahui informasi selengkapnya tentang AutoValue, lihat Alasan AutoValue? dan dokumen AutoValue.

Contoh

Clickstream.java

Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut

Dalam sistem produksi, penting untuk menangani data yang bermasalah. Jika memungkinkan, Anda dapat memvalidasi dan memperbaiki data dalam streaming. Jika koreksi tidak memungkinkan, catat nilai ke antrean pesan yang belum diproses, yang terkadang disebut antrean pesan mati, untuk analisis nanti. Masalah biasanya terjadi saat mengonversi data dari satu format ke format lainnya, misalnya saat mengonversi string JSON menjadi Baris.

Untuk mengatasi masalah ini, gunakan transformasi multi-output untuk memindahkan elemen yang berisi data yang belum diproses ke PCollection lain untuk analisis lebih lanjut. Pemrosesan ini adalah operasi umum yang mungkin ingin Anda gunakan di banyak tempat dalam pipeline. Coba buat transformasi cukup umum untuk digunakan di beberapa tempat. Pertama, buat objek error untuk menggabungkan properti umum, termasuk data asli. Selanjutnya, buat transformasi sink yang memiliki beberapa opsi untuk tujuan.

Contoh

Menerapkan transformasi validasi data secara serial

Data yang dikumpulkan dari sistem eksternal sering kali perlu dibersihkan. Strukturkan pipeline Anda agar dapat memperbaiki data yang bermasalah dalam streaming jika memungkinkan. Kirim data ke antrean untuk analisis lebih lanjut jika diperlukan.

Karena satu pesan mungkin mengalami beberapa masalah yang perlu diperbaiki, rencanakan directed acyclic graph (DAG) yang diperlukan. Jika elemen berisi data dengan beberapa cacat, Anda harus memastikan bahwa elemen mengalami transformasi yang sesuai.

Misalnya, bayangkan elemen dengan nilai berikut, yang tidak boleh null:

{"itemA": null,"itemB": null}

Pastikan elemen mengalir melalui transformasi yang memperbaiki kedua potensi masalah:

badElements.apply(fixItemA).apply(fixItemB)

Pipeline Anda mungkin memiliki lebih banyak langkah serial, tetapi fusi membantu meminimalkan overhead pemrosesan yang diperkenalkan.

Contoh

ValidateAndCorrectCSEvt.java

Menggunakan DoFn.StartBundle untuk membuat batch mikro panggilan ke layanan eksternal

Anda mungkin perlu memanggil API eksternal sebagai bagian dari pipeline. Karena pipeline mendistribusikan pekerjaan di banyak resource komputasi, membuat satu panggilan untuk setiap elemen yang mengalir melalui sistem dapat membebani endpoint layanan eksternal. Masalah ini sangat umum terjadi jika Anda belum menerapkan fungsi pengurangan apa pun.

Untuk menghindari masalah ini, lakukan panggilan secara batch ke sistem eksternal.

Anda dapat mengelompokkan panggilan menggunakan transformasi GroupByKey atau menggunakan Apache Beam Timer API. Namun, kedua pendekatan ini memerlukan pengacakan, yang memperkenalkan beberapa overhead pemrosesan dan kebutuhan akan angka ajaib untuk menentukan ruang kunci.

Sebagai gantinya, gunakan elemen siklus proses StartBundle dan FinishBundle untuk mengelompokkan data Anda. Dengan opsi ini, pengacakan tidak diperlukan.

Satu kelemahan kecil pada opsi ini adalah ukuran paket ditentukan secara dinamis oleh implementasi runner berdasarkan apa yang saat ini terjadi di dalam pipeline dan pekerjanya. Dalam mode streaming, ukuran paket sering kali kecil. Penggabungan dataflow dipengaruhi oleh faktor backend seperti penggunaan sharding, jumlah data yang tersedia untuk kunci tertentu, dan throughput pipeline.

Contoh

EventItemCorrectionService.java

Menggunakan pola input samping yang sesuai untuk pengayaan data

Dalam aplikasi analisis streaming, data sering kali diperkaya dengan informasi tambahan yang mungkin berguna untuk analisis lebih lanjut. Misalnya, jika memiliki ID toko untuk transaksi, Anda dapat menambahkan informasi tentang lokasi toko. Informasi tambahan ini sering ditambahkan dengan mengambil elemen dan memasukkan informasi dari tabel pencarian.

Untuk tabel pemeta yang berubah secara perlahan dan berukuran lebih kecil, memasukkan tabel ke dalam pipeline sebagai class singleton yang menerapkan antarmuka Map<K,V> berfungsi dengan baik. Opsi ini memungkinkan Anda menghindari setiap elemen melakukan panggilan API untuk pencariannya. Setelah menyertakan salinan tabel dalam pipeline, Anda perlu memperbaruinya secara berkala agar tetap aktual.

Untuk menangani input samping yang diperbarui secara lambat, gunakan Pola input samping Apache Beam.

Menyimpan data ke dalam cache

Input samping dimuat dalam memori sehingga di-cache secara otomatis.

Anda dapat menetapkan ukuran cache menggunakan opsi --setWorkerCacheMb.

Anda dapat membagikan cache di seluruh instance DoFn dan menggunakan pemicu eksternal untuk memuat ulang cache.

Contoh

SlowMovingStoreLocationDimension.java