Pola tugas Java

Aplikasi contoh e-commerce menunjukkan praktik terbaik penggunaan Dataflow untuk mengimplementasikan analisis data streaming dan AI real-time. Contoh ini berisi pola tugas yang menunjukkan cara terbaik untuk menyelesaikan tugas pemrograman Java. Tugas ini biasanya diperlukan untuk membuat aplikasi e-commerce.

Aplikasi ini berisi pola tugas Java berikut:

Menggunakan skema Apache Beam untuk bekerja dengan data terstruktur

Anda dapat menggunakan skema Apache Beam untuk mempermudah pemrosesan data terstruktur.

Dengan mengonversi objek menjadi Rows, Anda dapat menghasilkan kode Java yang sangat bersih, yang mempermudah latihan pembuatan directed acyclic graph (DAG). Anda juga dapat mereferensikan properti objek sebagai kolom dalam pernyataan analisis yang Anda buat, daripada harus memanggil metode.

Contoh

CountViewsPerProduct.java

Menggunakan JsonToRow untuk mengonversi data JSON

Pemrosesan string JSON di Dataflow adalah kebutuhan umum. Misalnya, string JSON diproses saat melakukan streaming informasi clickstream yang diambil dari aplikasi web. Untuk memproses string JSON, Anda harus mengonversinya menjadi Baris atau objek Java lama biasa (POJO) selama pemrosesan pipeline.

Anda dapat menggunakan transformasi bawaan Apache Beam JsonToRow untuk mengonversi string JSON menjadi Baris. Namun, jika Anda menginginkan antrean untuk memproses pesan yang gagal, Anda harus mem-build-nya secara terpisah. Lihat Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut.

Jika Anda perlu mengonversi string JSON menjadi POJO menggunakan AutoValue, daftarkan skema untuk jenis tersebut menggunakan anotasi @DefaultSchema(AutoValueSchema.class), lalu gunakan class utilitas Convert. Kode yang dihasilkan mirip dengan kode berikut:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Untuk informasi selengkapnya, termasuk jenis Java lain yang dapat Anda gunakan untuk menyimpulkan skema, lihat Membuat Skema.

Jika JsonToRow tidak dapat menangani data Anda, Gson adalah alternatifnya. Gson cukup santai dalam pemrosesan data default, yang mungkin mengharuskan Anda membuat lebih banyak validasi ke dalam proses konversi data.

Contoh

Gunakan generator kode AutoValue untuk menghasilkan POJO

Skema Apache Beam sering kali merupakan cara terbaik untuk merepresentasikan objek dalam pipeline, karena caranya skema tersebut memungkinkan Anda menangani data terstruktur. Meskipun demikian, terkadang objek Java lama (POJO) biasa diperlukan, seperti saat menangani objek nilai kunci atau menangani status objek. Pembuatan POJO secara manual mengharuskan Anda melakukan penggantian kode untuk metode equals() dan hashcode(), yang dapat memakan waktu dan rentan error. Penggantian yang salah dapat mengakibatkan perilaku aplikasi atau kehilangan data yang tidak konsisten.

Untuk membuat POJO, gunakan builder class AutoValue. Opsi ini memastikan penggantian yang diperlukan digunakan dan memungkinkan Anda menghindari potensi error. AutoValue sangat banyak digunakan dalam codebase Apache Beam, sehingga pemahaman tentang builder class ini berguna jika Anda ingin mengembangkan pipeline Apache Beam pada Dataflow menggunakan Java.

Anda juga dapat melakukan AutoValue dengan skema Apache Beam jika menambahkan anotasi @DefaultSchema(AutoValueSchema.class). Untuk mengetahui informasi selengkapnya, lihat Membuat Skema.

Untuk mengetahui informasi selengkapnya tentang AutoValue, lihat Mengapa AutoValue? dan dokumen AutoValue.

Contoh

Clickstream.java

Mengantrekan data yang tidak dapat diproses untuk analisis lebih lanjut

Dalam sistem produksi, penting untuk menangani data yang bermasalah. Jika memungkinkan, Anda perlu memvalidasi dan memperbaiki data in-stream. Jika koreksi tidak dapat dilakukan, catat nilai ke antrean pesan yang belum diproses, yang terkadang disebut antrean wajib dihentikan, untuk dianalisis di lain waktu. Masalah biasanya terjadi saat mengonversi data dari satu format ke format lainnya, misalnya saat mengonversi string JSON menjadi Baris.

Untuk mengatasi masalah ini, gunakan transformasi multi-output untuk memindahkan elemen yang berisi data yang belum diproses ke PCollection lain untuk dianalisis lebih lanjut. Pemrosesan ini adalah operasi umum yang mungkin ingin Anda gunakan di banyak tempat dalam pipeline. Coba buat transformasi yang cukup umum untuk digunakan di beberapa tempat. Pertama, buat objek error untuk menggabungkan properti umum, termasuk data asli. Selanjutnya, buat transformasi sink yang memiliki beberapa opsi untuk tujuan.

Contoh

Menerapkan transformasi validasi data secara berurutan

Data yang dikumpulkan dari sistem eksternal sering kali perlu dibersihkan. Susun pipeline Anda agar dapat memperbaiki data in-stream yang bermasalah jika memungkinkan. Jika diperlukan, kirim data ke antrean untuk analisis lebih lanjut.

Karena satu pesan mungkin mengalami beberapa masalah yang perlu diperbaiki, rencanakan directed acyclic graph (DAG) yang diperlukan. Jika elemen berisi data dengan beberapa kerusakan, Anda harus memastikan bahwa elemen mengalir melalui transformasi yang sesuai.

Misalnya, bayangkan elemen dengan nilai berikut, yang keduanya tidak boleh bernilai null:

{"itemA": null,"itemB": null}

Pastikan elemen mengalir melalui transformasi yang memperbaiki kedua potensi masalah:

badElements.apply(fixItemA).apply(fixItemB)

Pipeline Anda mungkin memiliki lebih banyak langkah serial, tetapi fusion membantu meminimalkan overhead pemrosesan yang diperkenalkan.

Contoh

ValidateAndCorrectCSEvt.java

Gunakan DoFn.StartBundle untuk melakukan batch kecil panggilan ke layanan eksternal

Anda mungkin perlu memanggil API eksternal sebagai bagian dari pipeline. Karena pipeline mendistribusikan pekerjaan ke banyak resource komputasi, membuat satu panggilan untuk setiap elemen yang mengalir melalui sistem dapat membebani endpoint layanan eksternal. Masalah ini sangat umum terjadi jika Anda belum menerapkan fungsi pengurangan apa pun.

Untuk menghindari masalah ini, lakukan panggilan batch ke sistem eksternal.

Anda dapat mengelompokkan panggilan menggunakan transformasi GroupByKey atau menggunakan Apache Beam Timer API. Namun, pendekatan ini memerlukan shuffling, yang memperkenalkan beberapa overhead pemrosesan dan kebutuhan angka ajaib untuk menentukan ruang kunci.

Sebagai gantinya, gunakan elemen siklus proses StartBundle dan FinishBundle untuk mengelompokkan data Anda. Dengan opsi ini, tidak perlu mengacak.

Satu kelemahan kecil dari opsi ini adalah ukuran paket ditentukan secara dinamis oleh penerapan runner berdasarkan pada apa yang saat ini terjadi di dalam pipeline dan pekerjanya. Dalam mode streaming, paket sering kali berukuran kecil. Pemaketan Dataflow dipengaruhi oleh faktor backend seperti penggunaan sharding, jumlah data yang tersedia untuk kunci tertentu, dan throughput pipeline.

Contoh

EventItemCorrectionService.java

Menggunakan pola side-input yang sesuai untuk pengayaan data

Dalam aplikasi analisis streaming, data sering kali diperkaya dengan informasi tambahan yang mungkin berguna untuk analisis lebih lanjut. Misalnya, jika memiliki ID toko untuk transaksi, sebaiknya tambahkan informasi tentang lokasi toko. Informasi tambahan ini sering kali ditambahkan dengan mengambil elemen dan membawa informasi dari tabel pemeta.

Untuk tabel pencarian yang perlahan berubah dan berukuran lebih kecil, memasukkan tabel ke dalam pipeline sebagai class singleton yang mengimplementasikan antarmuka Map<K,V> akan berfungsi dengan baik. Opsi ini memungkinkan Anda agar setiap elemen tidak melakukan panggilan API untuk pencariannya. Setelah menyertakan salinan tabel di pipeline, Anda harus memperbaruinya secara berkala agar tetap terbaru.

Untuk menangani input samping yang lambat diperbarui, gunakan Pola input samping Apache Beam.

Menyimpan data ke dalam cache

Input samping dimuat dalam memori dan oleh karena itu di-cache secara otomatis.

Anda dapat menyetel ukuran cache menggunakan opsi --setWorkerCacheMb.

Anda dapat membagikan cache ke seluruh instance DoFn dan menggunakan pemicu eksternal untuk memperbarui cache.

Contoh

SlowMovingStoreLocationDimension.java