Dokumen ini menjelaskan cara menulis data dari Dataflow ke Apache Kafka.
Untuk sebagian besar kasus penggunaan, pertimbangkan untuk menggunakan konektor I/O Terkelola untuk menulis ke Kafka.
Jika Anda memerlukan penyesuaian performa yang lebih canggih, pertimbangkan untuk menggunakan
konektor KafkaIO
. Konektor KafkaIO
tersedia untuk
Java
atau dengan menggunakan
framework pipeline multibahasa
untuk Python
dan Go.
Pemrosesan tepat satu kali
Secara default, konektor KafkaIO
tidak menyediakan
semantik tepat satu kali untuk operasi tulis. Artinya, data mungkin ditulis ke topik Kafka Anda beberapa kali. Untuk mengaktifkan
penulisan tepat sekali, panggil metode withEOS
. Operasi tulis tepat satu kali
memastikan bahwa data ditulis ke topik Kafka tujuan tepat satu kali.
Namun, hal ini juga meningkatkan biaya pipeline dan menurunkan throughput.
Jika Anda tidak memiliki persyaratan ketat untuk semantik exactly-once, dan logika dalam pipeline Anda dapat menangani data duplikat, pertimbangkan untuk mengaktifkan mode setidaknya sekali untuk seluruh pipeline guna mengurangi biaya. Untuk mengetahui informasi selengkapnya, lihat Menetapkan mode streaming pipeline.
Drain pipeline
Jika Anda menghabiskan pipeline, semantik tepat satu kali tidak dijamin. Satu-satunya jaminan adalah tidak ada data yang diakui yang hilang. Akibatnya, beberapa data mungkin diproses saat pipeline habis, tanpa commit offset baca kembali ke Kafka. Untuk mencapai semantik setidaknya sekali untuk Kafka saat Anda mengubah pipeline, perbarui pipeline, bukan membatalkan tugas dan memulai tugas baru.
Menyesuaikan Kafka untuk semantik tepat satu kali
Menyesuaikan transaction.max.timeout.ms
dan transactional.id.expiration.ms
dapat
melengkapi strategi pengiriman tepat waktu dan toleransi error secara keseluruhan.
Namun, dampaknya bergantung pada sifat pemadaman layanan dan konfigurasi khusus Anda. Tetapkan transaction.max.timeout.ms
mendekati waktu retensi
topik Kafka Anda untuk mencegah duplikasi data yang disebabkan oleh pemadaman broker Kafka.
Jika broker Kafka tidak tersedia untuk sementara (misalnya, karena kegagalan node atau
partisi jaringan), dan produsen memiliki transaksi yang sedang berlangsung, transaksi
tersebut mungkin akan habis waktunya. Meningkatkan nilai
transaction.max.timeout.ms
memberi transaksi lebih banyak waktu untuk diselesaikan setelah
broker pulih, sehingga berpotensi menghindari kebutuhan untuk memulai ulang transaksi dan
mengirim ulang pesan. Mitigasi ini secara tidak langsung membantu mempertahankan semantik
tepat satu kali, dengan mengurangi kemungkinan pesan duplikat yang disebabkan oleh dimulainya ulang transaksi. Di sisi lain, waktu habis masa berlaku yang lebih singkat dapat membantu membersihkan
ID transaksional yang tidak aktif dengan lebih cepat, sehingga mengurangi potensi penggunaan resource.
Mengonfigurasi jaringan
Secara default, Dataflow meluncurkan instance dalam jaringan Virtual Private Cloud (VPC) default Anda. Bergantung pada konfigurasi Kafka, Anda mungkin perlu mengonfigurasi jaringan dan subnet yang berbeda untuk Dataflow. Untuk mengetahui informasi selengkapnya, lihat Menentukan jaringan dan subnetwork. Saat mengonfigurasi jaringan, buat aturan firewall yang memungkinkan mesin pekerja Dataflow menjangkau broker Kafka.
Jika Anda menggunakan Kontrol Layanan VPC, tempatkan cluster Kafka dalam perimeter Kontrol Layanan VPC, atau perluas perimeter ke VPN atau Cloud Interconnect resmi.
Jika cluster Kafka di-deploy di luar Google Cloud, Anda harus membuat koneksi jaringan antara Dataflow dan cluster Kafka. Ada beberapa opsi jaringan dengan kompromi yang berbeda:
- Menghubungkan menggunakan ruang alamat RFC 1918 bersama, dengan menggunakan salah satu dari berikut:
- Jangkau cluster Kafka yang dihosting secara eksternal melalui alamat IP publik, dengan menggunakan salah satu dari opsi berikut:
- Internet publik
- Peering langsung
- Peering operator
Dedicated Interconnect adalah opsi terbaik untuk performa dan keandalan yang dapat diprediksi, tetapi penyiapannya dapat memerlukan waktu lebih lama karena pihak ketiga harus menyediakan sirkuit baru. Dengan topologi berbasis IP publik, Anda dapat memulai dengan cepat karena hanya perlu melakukan sedikit pekerjaan jaringan.
Dua bagian berikutnya menjelaskan opsi ini secara lebih mendetail.
Ruang alamat RFC 1918 bersama
Dedicated Interconnect dan IPsec VPN memberi Anda akses langsung ke alamat IP RFC 1918 di Virtual Private Cloud (VPC), yang dapat menyederhanakan konfigurasi Kafka Anda. Jika Anda menggunakan topologi berbasis VPN, sebaiknya siapkan VPN throughput tinggi.
Secara default, Dataflow meluncurkan instance di jaringan VPC default Anda. Dalam topologi jaringan pribadi dengan
rute yang ditentukan secara eksplisit di Cloud Router
yang menghubungkan subnetwork di Google Cloud ke cluster Kafka tersebut, Anda memerlukan
kontrol lebih besar atas tempat untuk menempatkan instance Dataflow. Anda
dapat menggunakan Dataflow untuk mengonfigurasi parameter eksekusi network
dan subnetwork
.
Pastikan subnet yang sesuai memiliki alamat IP yang cukup untuk digunakan Dataflow dalam meluncurkan instance saat mencoba melakukan penskalaan keluar. Selain itu, saat membuat jaringan terpisah untuk meluncurkan instance Dataflow, pastikan Anda memiliki aturan firewall yang mengaktifkan traffic TCP di antara semua virtual machine dalam project. Jaringan default sudah mengonfigurasi aturan firewall ini.
Ruang alamat IP publik
Arsitektur ini menggunakan Transport Layer Security
(TLS) untuk mengamankan traffic
antara klien eksternal dan Kafka, serta menggunakan traffic yang tidak dienkripsi untuk komunikasi antar-broker. Saat pemroses Kafka terikat ke antarmuka jaringan yang digunakan
untuk komunikasi internal dan eksternal, mengonfigurasi pemroses
akan mudah. Namun, dalam banyak skenario, alamat broker Kafka yang diiklankan secara eksternal di cluster berbeda dengan antarmuka jaringan internal yang digunakan Kafka. Dalam skenario tersebut, Anda dapat menggunakan properti
advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Klien eksternal terhubung menggunakan port 9093 melalui saluran "SSL", dan klien internal
terhubung menggunakan port 9092 melalui saluran teks biasa. Saat Anda menentukan alamat di bagian advertised.listeners
, gunakan nama DNS (kafkabroker-n.mydomain.com
, dalam contoh ini) yang me-resolve ke instance yang sama untuk traffic eksternal dan internal. Menggunakan alamat IP publik mungkin tidak berfungsi
karena alamat tersebut mungkin gagal di-resolve untuk traffic internal.
Logging
Logging dari KafkaIO
bisa jadi cukup panjang. Pertimbangkan untuk mengurangi level logging
dalam produksi sebagai berikut:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Untuk informasi selengkapnya, lihat Menetapkan level log pekerja pipeline.
Langkah berikutnya
- Membaca dari Apache Kafka.
- Pelajari I/O Terkelola lebih lanjut.