Apache Beam menyederhanakan alur kerja pengayaan data dengan menyediakan transformasi pengayaan siap pakai yang dapat Anda tambahkan ke pipeline. Halaman ini menjelaskan cara menggunakan transformasi pengayaan Apache Beam untuk memperkaya data streaming Anda.
Saat memperkaya data, Anda akan menambah data mentah dari satu sumber dengan menambahkan data terkait dari sumber kedua. Data tambahan dapat berasal dari berbagai sumber, seperti Bigtable atau BigQuery. Transformasi pengayaan Apache Beam menggunakan pencarian nilai kunci untuk menghubungkan data tambahan ke data mentah.
Contoh berikut memberikan beberapa kasus saat pengayaan data berguna:
- Anda ingin membuat pipeline e-commerce yang merekam aktivitas pengguna dari situs atau aplikasi dan memberikan rekomendasi yang disesuaikan. Transformasi menggabungkan aktivitas ke dalam data pipeline sehingga Anda dapat memberikan rekomendasi yang disesuaikan.
- Anda memiliki data pengguna yang ingin digabungkan dengan data geografis untuk melakukan analisis berbasis geografi.
- Anda ingin membuat pipeline yang mengumpulkan data dari perangkat internet of things (IOT) yang mengirim peristiwa telemetri.
Manfaat
Transformasi pengayaan memiliki manfaat berikut:
- Mentransformasi data tanpa mengharuskan Anda menulis kode yang kompleks atau mengelola library yang mendasarinya.
- Menyediakan pengendali sumber bawaan.
- Gunakan pengendali
BigTableEnrichmentHandler
untuk memperkaya data Anda menggunakan sumber Bigtable tanpa meneruskan detail konfigurasi. - Gunakan pengendali
BigQueryEnrichmentHandler
untuk memperkaya data Anda menggunakan sumber BigQuery tanpa meneruskan detail konfigurasi. - Gunakan pengendali
VertexAIFeatureStoreEnrichmentHandler
dengan Vertex AI Feature Store dan penayangan online Bigtable.
- Gunakan pengendali
- Menggunakan throttling sisi klien untuk mengelola pembatasan kapasitas permintaan. Permintaan ditunda secara eksponensial dengan strategi percobaan ulang default. Anda dapat mengonfigurasi pembatasan kapasitas agar sesuai dengan kasus penggunaan Anda.
Dukungan dan batasan
Transformasi pengayaan memiliki persyaratan berikut:
- Tersedia untuk pipeline batch dan streaming.
- Pengendali
BigTableEnrichmentHandler
tersedia di Apache Beam Python SDK versi 2.54.0 dan yang lebih baru. - Pengendali
BigQueryEnrichmentHandler
tersedia di Apache Beam Python SDK versi 2.57.0 dan yang lebih baru. - Pengendali
VertexAIFeatureStoreEnrichmentHandler
tersedia di Apache Beam Python SDK versi 2.55.0 dan yang lebih baru. - Saat menggunakan Apache Beam Python SDK versi 2.55.0 dan yang lebih baru, Anda juga harus menginstal klien Python untuk Redis.
- Tugas Dataflow harus menggunakan Runner v2.
Menggunakan transformasi pengayaan
Untuk menggunakan transformasi pengayaan, sertakan kode berikut dalam pipeline Anda:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Karena transformasi pengayaan melakukan cross join secara default, desain gabungan kustom untuk memperkaya data input. Desain ini memastikan bahwa penggabungan hanya menyertakan kolom yang ditentukan.
Dalam contoh berikut, left
adalah elemen input transformasi pengayaan, dan right
adalah data yang diambil dari layanan eksternal untuk elemen input tersebut.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parameter
Untuk menggunakan transformasi pengayaan, parameter EnrichmentHandler
diperlukan.
Anda juga dapat menggunakan parameter konfigurasi untuk menentukan fungsi lambda
untuk fungsi join, waktu tunggu, throttler, atau repeater (strategi percobaan ulang). Parameter konfigurasi
berikut tersedia:
join_fn
: Fungsilambda
yang menggunakan kamus sebagai input dan menampilkan baris yang diperkaya (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). Baris yang diperkaya menentukan cara menggabungkan data yang diambil dari API. Setelan defaultnya adalah cross join.timeout
: Jumlah detik yang harus ditunggu hingga permintaan selesai oleh API sebelum waktu tunggu habis. Defaultnya adalah 30 detik.throttler
: Menentukan mekanisme throttling. Satu-satunya opsi yang didukung adalah throttle adaptif sisi klien default.repeater
: Menentukan strategi percobaan ulang saat error sepertiTooManyRequests
danTimeoutException
terjadi. Setelan defaultnya adalahExponentialBackOffRepeater
.
Langkah selanjutnya
- Untuk contoh lainnya, lihat Transformasi pengayaan di katalog transformasi Apache Beam.
- Gunakan Apache Beam dan Bigtable untuk memperkaya data.
- Menggunakan Apache Beam dan BigQuery untuk memperkaya data.
- Gunakan Apache Beam dan Vertex AI Feature Store untuk memperkaya data.