Apache Beam menyederhanakan alur kerja pengayaan data dengan menyediakan transformasi pengayaan siap pakai yang dapat Anda tambahkan ke pipeline Anda. Halaman ini menjelaskan cara menggunakan transformasi pengayaan Apache Beam untuk memperkaya data streaming Anda.
Saat memperkaya data, Anda menambah data mentah dari satu sumber dengan menambahkan data terkait dari sumber kedua. Data tambahan dapat berasal dari berbagai sumber, seperti Bigtable atau BigQuery. Transformasi pengayaan Apache Beam menggunakan pencarian nilai kunci untuk menghubungkan data tambahan ke data mentah.
Contoh berikut memberikan beberapa kasus di mana pengayaan data akan berguna:
- Anda ingin membuat pipeline e-commerce yang mencatat aktivitas pengguna dari situs atau aplikasi dan memberikan rekomendasi yang disesuaikan. Transformasi menggabungkan aktivitas ke dalam data pipeline Anda sehingga Anda dapat memberikan rekomendasi yang disesuaikan.
- Anda memiliki data pengguna yang ingin digabungkan dengan data geografis untuk melakukan analisis berbasis geografis.
- Anda ingin membuat pipeline yang mengumpulkan data dari perangkat internet-of-things (IOT) yang mengirimkan peristiwa telemetri.
Manfaat
Transformasi pengayaan memiliki manfaat berikut:
- Mentransformasi data tanpa mengharuskan Anda menulis kode kompleks atau mengelola library yang mendasarinya.
- Menyediakan pengendali sumber bawaan.
- Gunakan
pengendali
BigTableEnrichmentHandler
untuk memperkaya data dengan menggunakan sumber Bigtable tanpa meneruskan detail konfigurasi. - Gunakan
pengendali
VertexAIFeatureStoreEnrichmentHandler
dengan Vertex AI Feature Store dan penyaluran online Bigtable.
- Gunakan
pengendali
- Menggunakan throttling sisi klien untuk mengelola pembatasan kapasitas permintaan. Permintaan dicadangkan secara eksponensial dengan strategi percobaan ulang default. Anda dapat mengonfigurasi pembatasan kapasitas agar sesuai dengan kasus penggunaan Anda.
Dukungan dan batasan
Transformasi pengayaan memiliki persyaratan berikut:
- Tersedia untuk pipeline batch dan streaming.
- Pengendali
BigTableEnrichmentHandler
tersedia di Apache Beam Python SDK versi 2.54.0 dan yang lebih baru. - Pengendali
VertexAIFeatureStoreEnrichmentHandler
tersedia di Apache Beam Python SDK versi 2.55.0 dan yang lebih baru. - Saat menggunakan Apache Beam Python SDK versi 2.55.0 dan yang lebih baru, Anda juga perlu menginstal klien Python untuk Redis.
- Tugas Dataflow harus menggunakan Runner v2.
Menggunakan transformasi pengayaan
Untuk menggunakan transformasi pengayaan, sertakan kode berikut di pipeline Anda:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Karena transformasi pengayaan melakukan lintas gabungan secara default, desain gabungan kustom untuk memperkaya data input. Desain ini memastikan bahwa gabungan hanya menyertakan kolom yang ditentukan.
Dalam contoh berikut, left
adalah elemen input dari transformasi
pengayaan, dan right
adalah data yang diambil dari layanan eksternal untuk elemen
input tersebut.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parameter
Untuk menggunakan transformasi pengayaan, parameter EnrichmentHandler
diperlukan.
Anda juga dapat menggunakan parameter konfigurasi untuk menentukan fungsi lambda
bagi fungsi
join, waktu tunggu, throttler, atau repeater (strategi coba ulang). Tersedia parameter konfigurasi berikut:
join_fn
: Fungsilambda
yang menggunakan kamus sebagai input dan menampilkan baris yang diperkaya (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). Baris yang diperkaya menentukan cara menggabungkan data yang diambil dari API. Default-nya adalah gabungan silang.timeout
: Jumlah detik untuk menunggu permintaan diselesaikan oleh API sebelum waktu habis. Default-nya adalah 30 detik.throttler
: Menentukan mekanisme throttling. Satu-satunya opsi yang didukung adalah throttling adaptif sisi klien default.repeater
: Menentukan strategi percobaan ulang saat error sepertiTooManyRequests
danTimeoutException
terjadi. Setelan defaultnya adalahExponentialBackOffRepeater
.
Langkah selanjutnya
- Untuk contoh lainnya, lihat Transformasi pengayaan di katalog transformasi Apache Beam.
- Gunakan Apache Beam dan Bigtable untuk memperkaya data.
- Gunakan Apache Beam dan Vertex AI Feature Store untuk memperkaya data.