Memperkaya data streaming

Apache Beam menyederhanakan alur kerja pengayaan data dengan menyediakan transformasi pengayaan siap pakai yang dapat Anda tambahkan ke pipeline. Halaman ini menjelaskan cara menggunakan transformasi pengayaan Apache Beam untuk memperkaya data streaming Anda.

Saat memperkaya data, Anda akan menambah data mentah dari satu sumber dengan menambahkan data terkait dari sumber kedua. Data tambahan dapat berasal dari berbagai sumber, seperti Bigtable atau BigQuery. Transformasi pengayaan Apache Beam menggunakan pencarian nilai kunci untuk menghubungkan data tambahan ke data mentah.

Contoh berikut memberikan beberapa kasus saat pengayaan data berguna:

  • Anda ingin membuat pipeline e-commerce yang merekam aktivitas pengguna dari situs atau aplikasi dan memberikan rekomendasi yang disesuaikan. Transformasi menggabungkan aktivitas ke dalam data pipeline sehingga Anda dapat memberikan rekomendasi yang disesuaikan.
  • Anda memiliki data pengguna yang ingin digabungkan dengan data geografis untuk melakukan analisis berbasis geografi.
  • Anda ingin membuat pipeline yang mengumpulkan data dari perangkat internet of things (IOT) yang mengirim peristiwa telemetri.

Manfaat

Transformasi pengayaan memiliki manfaat berikut:

  • Mentransformasi data tanpa mengharuskan Anda menulis kode yang kompleks atau mengelola library yang mendasarinya.
  • Menyediakan pengendali sumber bawaan.
  • Menggunakan throttling sisi klien untuk mengelola pembatasan kapasitas permintaan. Permintaan ditunda secara eksponensial dengan strategi percobaan ulang default. Anda dapat mengonfigurasi pembatasan kapasitas agar sesuai dengan kasus penggunaan Anda.

Dukungan dan batasan

Transformasi pengayaan memiliki persyaratan berikut:

  • Tersedia untuk pipeline batch dan streaming.
  • Pengendali BigTableEnrichmentHandler tersedia di Apache Beam Python SDK versi 2.54.0 dan yang lebih baru.
  • Pengendali BigQueryEnrichmentHandler tersedia di Apache Beam Python SDK versi 2.57.0 dan yang lebih baru.
  • Pengendali VertexAIFeatureStoreEnrichmentHandler tersedia di Apache Beam Python SDK versi 2.55.0 dan yang lebih baru.
  • Saat menggunakan Apache Beam Python SDK versi 2.55.0 dan yang lebih baru, Anda juga harus menginstal klien Python untuk Redis.
  • Tugas Dataflow harus menggunakan Runner v2.

Menggunakan transformasi pengayaan

Untuk menggunakan transformasi pengayaan, sertakan kode berikut dalam pipeline Anda:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Karena transformasi pengayaan melakukan cross join secara default, desain gabungan kustom untuk memperkaya data input. Desain ini memastikan bahwa penggabungan hanya menyertakan kolom yang ditentukan.

Dalam contoh berikut, left adalah elemen input transformasi pengayaan, dan right adalah data yang diambil dari layanan eksternal untuk elemen input tersebut.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parameter

Untuk menggunakan transformasi pengayaan, parameter EnrichmentHandler diperlukan.

Anda juga dapat menggunakan parameter konfigurasi untuk menentukan fungsi lambda untuk fungsi join, waktu tunggu, throttler, atau repeater (strategi percobaan ulang). Parameter konfigurasi berikut tersedia:

  • join_fn: Fungsi lambda yang menggunakan kamus sebagai input dan menampilkan baris yang diperkaya (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). Baris yang diperkaya menentukan cara menggabungkan data yang diambil dari API. Setelan defaultnya adalah cross join.
  • timeout: Jumlah detik yang harus ditunggu hingga permintaan selesai oleh API sebelum waktu tunggu habis. Defaultnya adalah 30 detik.
  • throttler: Menentukan mekanisme throttling. Satu-satunya opsi yang didukung adalah throttle adaptif sisi klien default.
  • repeater: Menentukan strategi percobaan ulang saat error seperti TooManyRequests dan TimeoutException terjadi. Setelan defaultnya adalah ExponentialBackOffRepeater.

Langkah selanjutnya