Memperkaya data streaming

Apache Beam menyederhanakan alur kerja pengayaan data dengan menyediakan transformasi pengayaan siap pakai yang dapat Anda tambahkan ke pipeline Anda. Halaman ini menjelaskan cara menggunakan transformasi pengayaan Apache Beam untuk memperkaya data streaming Anda.

Saat memperkaya data, Anda menambah data mentah dari satu sumber dengan menambahkan data terkait dari sumber kedua. Data tambahan dapat berasal dari berbagai sumber, seperti Bigtable atau BigQuery. Transformasi pengayaan Apache Beam menggunakan pencarian nilai kunci untuk menghubungkan data tambahan ke data mentah.

Contoh berikut memberikan beberapa kasus di mana pengayaan data akan berguna:

  • Anda ingin membuat pipeline e-commerce yang mencatat aktivitas pengguna dari situs atau aplikasi dan memberikan rekomendasi yang disesuaikan. Transformasi menggabungkan aktivitas ke dalam data pipeline Anda sehingga Anda dapat memberikan rekomendasi yang disesuaikan.
  • Anda memiliki data pengguna yang ingin digabungkan dengan data geografis untuk melakukan analisis berbasis geografis.
  • Anda ingin membuat pipeline yang mengumpulkan data dari perangkat internet-of-things (IOT) yang mengirimkan peristiwa telemetri.

Manfaat

Transformasi pengayaan memiliki manfaat berikut:

  • Mentransformasi data tanpa mengharuskan Anda menulis kode kompleks atau mengelola library yang mendasarinya.
  • Menyediakan pengendali sumber bawaan.
  • Menggunakan throttling sisi klien untuk mengelola pembatasan kapasitas permintaan. Permintaan dicadangkan secara eksponensial dengan strategi percobaan ulang default. Anda dapat mengonfigurasi pembatasan kapasitas agar sesuai dengan kasus penggunaan Anda.

Dukungan dan batasan

Transformasi pengayaan memiliki persyaratan berikut:

  • Tersedia untuk pipeline batch dan streaming.
  • Pengendali BigTableEnrichmentHandler tersedia di Apache Beam Python SDK versi 2.54.0 dan yang lebih baru.
  • Pengendali VertexAIFeatureStoreEnrichmentHandler tersedia di Apache Beam Python SDK versi 2.55.0 dan yang lebih baru.
  • Saat menggunakan Apache Beam Python SDK versi 2.55.0 dan yang lebih baru, Anda juga perlu menginstal klien Python untuk Redis.
  • Tugas Dataflow harus menggunakan Runner v2.

Menggunakan transformasi pengayaan

Untuk menggunakan transformasi pengayaan, sertakan kode berikut di pipeline Anda:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Karena transformasi pengayaan melakukan lintas gabungan secara default, desain gabungan kustom untuk memperkaya data input. Desain ini memastikan bahwa gabungan hanya menyertakan kolom yang ditentukan.

Dalam contoh berikut, left adalah elemen input dari transformasi pengayaan, dan right adalah data yang diambil dari layanan eksternal untuk elemen input tersebut.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parameter

Untuk menggunakan transformasi pengayaan, parameter EnrichmentHandler diperlukan.

Anda juga dapat menggunakan parameter konfigurasi untuk menentukan fungsi lambda bagi fungsi join, waktu tunggu, throttler, atau repeater (strategi coba ulang). Tersedia parameter konfigurasi berikut:

  • join_fn: Fungsi lambda yang menggunakan kamus sebagai input dan menampilkan baris yang diperkaya (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). Baris yang diperkaya menentukan cara menggabungkan data yang diambil dari API. Default-nya adalah gabungan silang.
  • timeout: Jumlah detik untuk menunggu permintaan diselesaikan oleh API sebelum waktu habis. Default-nya adalah 30 detik.
  • throttler: Menentukan mekanisme throttling. Satu-satunya opsi yang didukung adalah throttling adaptif sisi klien default.
  • repeater: Menentukan strategi percobaan ulang saat error seperti TooManyRequests dan TimeoutException terjadi. Setelan defaultnya adalah ExponentialBackOffRepeater.

Langkah selanjutnya