Dokumen ini menjelaskan cara membaca data dari Apache Kafka ke Dataflow.
Konektor I/O Kafka Apache Beam (KafkaIO
) tersedia secara native untuk
Java,
dan juga tersedia untuk
Python
dan Go
menggunakan framework pipeline multibahasa Apache Beam.
Untuk pipeline Java, sebaiknya gunakan Konektor I/O terkelola untuk membaca dari Kafka.
Keparalelan
Paralelisme dibatasi oleh dua faktor: jumlah maksimum pekerja
(max_num_workers
) dan jumlah partisi Kafka. Dataflow
memiliki setelan default fanout paralelisme 4 x max_num_workers
. Namun, fanout
dibatasi oleh jumlah partisi. Misalnya, jika 100 vCPU tersedia, tetapi pipeline hanya membaca dari 10 partisi Kafka, paralelisme maksimumnya adalah 10.
Untuk memaksimalkan paralelisme, sebaiknya miliki setidaknya 4 x
partisi Kafka max_num_workers
. Jika tugas Anda menggunakan
Runner v2, pertimbangkan untuk menetapkan paralelisme yang lebih tinggi.
Titik awal yang baik adalah memiliki partisi yang sama dengan dua kali jumlah vCPU pekerja.
Jika Anda tidak dapat meningkatkan jumlah partisi, pertimbangkan untuk menyisipkan langkah Reshuffle
atau Redistribute
setelah langkah baca Kafka. Langkah ini memungkinkan
Dataflow mendistribusikan ulang dan melakukan paralelisasi data secara lebih
efisien, tetapi menambahkan beberapa overhead tambahan untuk melakukan langkah pengacakan. Untuk
mengetahui informasi selengkapnya, lihat
Faktor yang memengaruhi paralelisme.
Cobalah untuk memastikan beban antarpartisi relatif merata dan tidak terdistorsi. Jika beban tidak seimbang, hal ini dapat menyebabkan penggunaan pekerja yang buruk. Pekerja yang membaca dari partisi dengan beban yang lebih ringan mungkin relatif tidak ada aktivitas, sedangkan pekerja yang membaca dari partisi dengan beban yang berat mungkin tertinggal. Dataflow menyediakan metrik untuk backlog per partisi.
Jika beban tidak seimbang, penyesuaian beban kerja dinamis dapat membantu mendistribusikan pekerjaan. Misalnya, Dataflow dapat mengalokasikan satu pekerja untuk membaca dari beberapa partisi bervolume rendah, dan mengalokasikan pekerja lain untuk membaca dari satu partisi bervolume tinggi. Namun, dua pekerja tidak dapat membaca dari partisi yang sama, sehingga partisi yang dimuat berat masih dapat menyebabkan pipeline tertinggal.
Praktik terbaik
Bagian ini berisi rekomendasi untuk membaca dari Kafka ke Dataflow.
Topik dengan volume rendah
Skenario umum adalah membaca dari banyak topik bervolume rendah secara bersamaan — misalnya, satu topik per pelanggan. Membuat tugas Dataflow terpisah untuk setiap topik tidak efisien biaya, karena setiap tugas memerlukan minimal satu pekerja penuh. Sebagai gantinya, pertimbangkan opsi berikut:
Gabungkan topik. Gabungkan topik sebelum ditransfer ke Dataflow. Proses transfer beberapa topik bervolume tinggi jauh lebih efisien daripada proses transfer banyak topik bervolume rendah. Setiap topik bervolume tinggi dapat ditangani oleh satu tugas Dataflow yang memanfaatkan pekerjanya sepenuhnya.
Membaca beberapa topik. Jika Anda tidak dapat menggabungkan topik sebelum menyerapnya ke dalam Dataflow, pertimbangkan untuk membuat pipeline yang membaca dari beberapa topik. Pendekatan ini memungkinkan Dataflow menetapkan beberapa topik ke pekerja yang sama. Ada dua cara untuk menerapkan pendekatan ini:
Langkah baca tunggal. Buat satu instance konektor
KafkaIO
dan konfigurasikan untuk membaca beberapa topik. Kemudian, filter menurut nama topik untuk menerapkan logika yang berbeda per topik. Untuk kode contoh, lihat Membaca dari beberapa topik. Pertimbangkan opsi ini jika semua topik Anda ditempatkan bersama dalam cluster yang sama. Satu kelemahannya adalah masalah pada satu sink atau transformasi dapat menyebabkan semua topik mengakumulasi backlog.Untuk kasus penggunaan lanjutan lainnya, teruskan kumpulan objek
KafkaSourceDescriptor
yang menentukan topik yang akan dibaca. Dengan menggunakanKafkaSourceDescriptor
, Anda dapat memperbarui daftar topik nanti jika diperlukan. Fitur ini memerlukan Java dengan Runner v2.Beberapa langkah baca. Untuk membaca dari topik yang berada di cluster yang berbeda, pipeline Anda dapat menyertakan beberapa instance
KafkaIO
. Saat tugas berjalan, Anda dapat memperbarui setiap sumber menggunakan pemetaan transformasi. Menetapkan topik atau cluster baru hanya didukung saat menggunakan Runner v2. Observabilitas adalah potensi tantangan dengan pendekatan ini, karena Anda perlu memantau setiap transformasi baca satu per satu, bukan mengandalkan metrik tingkat pipeline.
Melakukan commit kembali ke Kafka
Secara default, konektor KafkaIO
tidak menggunakan offset Kafka untuk melacak progres dan tidak melakukan commit kembali ke Kafka. Jika Anda memanggil
commitOffsetsInFinalize
, konektor akan melakukan upaya terbaik
untuk melakukan commit kembali ke Kafka setelah data di-commit di
Dataflow. Data yang di-commit di Dataflow mungkin tidak
diproses sepenuhnya, jadi jika Anda
membatalkan pipeline, offset
mungkin di-commit tanpa data diproses sepenuhnya.
Karena setelan enable.auto.commit=True
melakukan commit offset segera setelah dibaca dari
Kafka tanpa pemrosesan oleh Dataflow, sebaiknya jangan gunakan opsi ini.
Rekomendasinya adalah menyetel enable.auto.commit=False
dan
commitOffsetsInFinalize=True
. Jika Anda menetapkan
enable.auto.commit
ke True
, data dapat hilang jika pipeline terganggu
saat pemrosesan. Data yang telah di-commit di Kafka mungkin dihapus.
Watermark
Secara default, konektor KafkaIO
menggunakan waktu pemrosesan saat ini untuk menetapkan
watermark output
dan waktu peristiwa. Untuk mengubah perilaku ini, panggil
withTimestampPolicyFactory
dan tetapkan
TimestampPolicy
. Beam menyediakan
implementasi TimestampPolicy
yang menghitung watermark berdasarkan
waktu penambahan log Kafka atau waktu pembuatan pesan.
Pertimbangan pelari
Konektor KafkaIO
memiliki dua implementasi dasar untuk pembacaan Kafka, yaitu
ReadFromKafkaViaUnbounded
yang lebih lama dan
ReadFromKafkaViaSDF
yang lebih baru. Dataflow
otomatis memilih implementasi terbaik untuk tugas Anda berdasarkan bahasa SDK
dan persyaratan tugas. Hindari meminta penerapan runner atau Kafka secara eksplisit kecuali jika Anda memerlukan fitur tertentu yang hanya tersedia dalam penerapan tersebut. Untuk informasi selengkapnya tentang cara memilih runner, lihat Menggunakan Runner Dataflow v2.
Jika pipeline Anda menggunakan withTopic
atau withTopics
,
implementasi lama akan mengkueri Kafka pada waktu konstruksi pipeline untuk
partisi yang tersedia. Mesin yang membuat pipeline harus memiliki izin untuk terhubung ke Kafka. Jika Anda menerima error izin, pastikan Anda memiliki
izin untuk terhubung ke Kafka secara lokal. Anda dapat menghindari masalah ini dengan menggunakan
withTopicPartitions
, yang tidak terhubung ke Kafka
pada waktu pembuatan pipeline.
Men-deploy ke produksi
Saat men-deploy solusi dalam produksi, sebaiknya gunakan template Flex. Dengan menggunakan template Flex, pipeline diluncurkan dari lingkungan yang konsisten, yang dapat membantu mengurangi masalah konfigurasi lokal.
Logging dari KafkaIO
dapat menjadi cukup panjang. Pertimbangkan untuk mengurangi level logging
dalam produksi sebagai berikut:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Untuk informasi selengkapnya, lihat Menetapkan level log pekerja pipeline.
Mengonfigurasi jaringan
Secara default, Dataflow meluncurkan instance dalam jaringan Virtual Private Cloud (VPC) default Anda. Bergantung pada konfigurasi Kafka, Anda mungkin perlu mengonfigurasi jaringan dan subnet yang berbeda untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Menentukan jaringan dan subnetwork. Saat mengonfigurasi jaringan, buat aturan firewall yang memungkinkan mesin pekerja Dataflow menjangkau broker Kafka.
Jika Anda menggunakan Kontrol Layanan VPC, tempatkan cluster Kafka dalam perimeter Kontrol Layanan VPC, atau perluas perimeter ke VPN atau Cloud Interconnect resmi.
Jika cluster Kafka di-deploy di luar Google Cloud, Anda harus membuat koneksi jaringan antara Dataflow dan cluster Kafka. Ada beberapa opsi jaringan dengan kompromi yang berbeda:
- Menghubungkan menggunakan ruang alamat RFC 1918 bersama, dengan menggunakan salah satu dari berikut:
- Jangkau cluster Kafka yang dihosting secara eksternal melalui alamat IP publik, dengan menggunakan salah satu dari opsi berikut:
- Internet publik
- Peering langsung
- Peering operator
Dedicated Interconnect adalah opsi terbaik untuk performa dan keandalan yang dapat diprediksi, tetapi penyiapannya dapat memerlukan waktu lebih lama karena pihak ketiga harus menyediakan sirkuit baru. Dengan topologi berbasis IP publik, Anda dapat memulai dengan cepat karena hanya sedikit pekerjaan jaringan yang perlu dilakukan.
Dua bagian berikutnya menjelaskan opsi ini secara lebih mendetail.
Ruang alamat RFC 1918 bersama
Dedicated Interconnect dan IPsec VPN memberi Anda akses langsung ke alamat IP RFC 1918 di Virtual Private Cloud (VPC), yang dapat menyederhanakan konfigurasi Kafka Anda. Jika Anda menggunakan topologi berbasis VPN, sebaiknya siapkan VPN throughput tinggi.
Secara default, Dataflow meluncurkan instance di jaringan VPC default Anda. Dalam topologi jaringan pribadi dengan rute yang ditentukan secara eksplisit di Cloud Router yang menghubungkan subjaringan di Google Cloud ke cluster Kafka tersebut, Anda memerlukan kontrol lebih besar atas lokasi instance Dataflow. Anda
dapat menggunakan Dataflow untuk mengonfigurasi parameter eksekusi network
dan subnetwork
.
Pastikan subnet yang sesuai memiliki alamat IP yang cukup untuk digunakan Dataflow dalam meluncurkan instance saat mencoba melakukan penskalaan keluar. Selain itu, saat membuat jaringan terpisah untuk meluncurkan instance Dataflow, pastikan Anda memiliki aturan firewall yang mengaktifkan traffic TCP di antara semua virtual machine dalam project. Jaringan default sudah mengonfigurasi aturan firewall ini.
Ruang alamat IP publik
Arsitektur ini menggunakan Transport Layer Security
(TLS) untuk mengamankan traffic
antara klien eksternal dan Kafka, serta menggunakan traffic yang tidak dienkripsi untuk komunikasi antar-broker. Saat pemroses Kafka terikat ke antarmuka jaringan yang digunakan
untuk komunikasi internal dan eksternal, mengonfigurasi pemroses
akan mudah. Namun, dalam banyak skenario, alamat broker Kafka yang diiklankan secara eksternal di cluster berbeda dengan antarmuka jaringan internal yang digunakan Kafka. Dalam skenario tersebut, Anda dapat menggunakan properti
advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Klien eksternal terhubung menggunakan port 9093 melalui saluran "SSL", dan klien internal
terhubung menggunakan port 9092 melalui saluran teks biasa. Saat Anda menentukan alamat di bagian advertised.listeners
, gunakan nama DNS (kafkabroker-n.mydomain.com
, dalam contoh ini) yang me-resolve ke instance yang sama untuk traffic eksternal dan internal. Menggunakan alamat IP publik mungkin tidak berfungsi
karena alamat tersebut mungkin gagal di-resolve untuk traffic internal.
Menyesuaikan Kafka
Setelan cluster Kafka dan klien Kafka dapat berdampak besar pada performa. Secara khusus, setelan berikut mungkin terlalu rendah. Bagian ini memberikan beberapa titik awal yang disarankan, tetapi Anda harus bereksperimen dengan nilai ini untuk beban kerja tertentu.
unboundedReaderMaxElements
. Setelan defaultnya adalah 10.000. Nilai yang lebih tinggi seperti 100.000 dapat meningkatkan ukuran paket, yang dapat meningkatkan performa secara signifikan jika pipeline Anda menyertakan agregasi. Namun, nilai yang lebih tinggi juga dapat meningkatkan latensi. Untuk menetapkan nilai, gunakansetUnboundedReaderMaxElements
. Setelan ini tidak berlaku untuk Runner v2.unboundedReaderMaxReadTimeMs
. Setelan defaultnya adalah 10.000 md. Nilai yang lebih tinggi seperti 20.000 md dapat meningkatkan ukuran paket, sedangkan nilai yang lebih rendah seperti 5.000 md dapat mengurangi latensi atau backlog. Untuk menetapkan nilai, gunakansetUnboundedReaderMaxReadTimeMs
. Setelan ini tidak berlaku untuk Runner v2.max.poll.records
. Setelan defaultnya adalah 500. Nilai yang lebih tinggi mungkin berperforma lebih baik dengan mengambil lebih banyak data masuk secara bersamaan, terutama saat menggunakan Runner v2. Untuk menetapkan nilai, panggilwithConsumerConfigUpdates
.fetch.max.bytes
. Default-nya adalah 1 MB. Nilai yang lebih tinggi dapat meningkatkan throughput dengan mengurangi jumlah permintaan, terutama saat menggunakan Runner v2. Namun, menyetelnya terlalu tinggi dapat meningkatkan latensi, meskipun pemrosesan downstream lebih cenderung menjadi bottleneck utama. Nilai awal yang direkomendasikan adalah 100 MB. Untuk menetapkan nilai, panggilwithConsumerConfigUpdates
.max.partition.fetch.bytes
. Default-nya adalah 1 MB. Parameter ini menetapkan jumlah data maksimum per partisi yang ditampilkan server. Meningkatkan nilai dapat meningkatkan throughput dengan mengurangi jumlah permintaan, terutama saat menggunakan Runner v2. Namun, menyetelnya terlalu tinggi dapat meningkatkan latensi, meskipun pemrosesan downstream lebih cenderung menjadi bottleneck utama. Nilai awal yang direkomendasikan adalah 100 MB. Untuk menetapkan nilai, panggilwithConsumerConfigUpdates
.consumerPollingTimeout
. Defaultnya adalah 2 detik. Jika waktu tunggu klien konsumen habis sebelum dapat membaca data apa pun, coba tetapkan nilai yang lebih tinggi. Setelan ini paling sering relevan saat melakukan pembacaan lintas region atau pembacaan dengan jaringan yang lambat. Untuk menetapkan nilai, panggilwithConsumerPollingTimeout
.
Pastikan receive.buffer.bytes
cukup besar untuk menangani ukuran
pesan. Jika nilainya terlalu kecil, log mungkin menunjukkan bahwa konsumen terus-menerus dibuat ulang dan mencari offset tertentu.
Contoh
Contoh kode berikut menunjukkan cara membuat pipeline Dataflow
yang membaca dari Kafka. Saat menggunakan Kredensial Default Aplikasi bersama dengan pengelola callback yang disediakan Google Cloud Managed Service for Apache Kafka, kafka-clients
versi 3.7.0 atau yang lebih tinggi diperlukan.
Membaca dari satu topik
Contoh ini membaca dari topik Kafka dan menulis payload pesan ke file teks.
Java
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Python
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Membaca dari beberapa topik
Contoh ini membaca dari beberapa topik Kafka dan menerapkan logika pipeline terpisah untuk setiap topik.
Untuk kasus penggunaan lanjutan lainnya, teruskan kumpulan objek
KafkaSourceDescriptor
secara dinamis, sehingga Anda dapat memperbarui
daftar topik yang akan dibaca. Pendekatan ini memerlukan Java dengan Runner v2.
Java
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Python
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.