Menghubungkan Pub/Sub ke Apache Kafka

Dokumen ini menjelaskan cara mengintegrasikan Apache Kafka dan Pub/Sub menggunakan Konektor Kafka Pub/Sub Group.

Tentang Konektor Kafka Pub/Sub Group

Apache Kafka adalah platform open source untuk streaming acara. Ini biasanya digunakan dalam arsitektur terdistribusi untuk memungkinkan komunikasi antara komponen yang dikaitkan secara longgar. Pub/Sub adalah layanan terkelola untuk mengirim dan menerima pesan secara asinkron. Seperti Kafka, Anda dapat menggunakan Pub/Sub untuk berkomunikasi antar-komponen dalam arsitektur cloud.

Konektor Kafka Pub/Sub Group memungkinkan Anda mengintegrasikan kedua sistem ini. Konektor berikut dikemas dalam JAR Konektor:

  • Konektor sink membaca kumpulan data dari satu atau beberapa topik Kafka dan memublikasikannya ke Pub/Sub.
  • Konektor sumber membaca pesan dari topik Pub/Sub dan memublikasikannya ke Kafka.

Berikut adalah beberapa skenario yang mungkin membuat Anda menggunakan Pub/Sub Group Kafka Connector:

  • Anda akan memigrasikan arsitektur berbasis Kafka ke Google Cloud.
  • Anda memiliki sistem frontend yang menyimpan peristiwa di Kafka di luar Google Cloud, tetapi Anda juga menggunakan Google Cloud untuk menjalankan beberapa layanan backend yang perlu menerima peristiwa Kafka.
  • Anda mengumpulkan log dari solusi Kafka lokal dan mengirimkannya ke Google Cloud untuk analisis data.
  • Anda memiliki sistem frontend yang menggunakan Google Cloud, tetapi juga menyimpan data secara lokal menggunakan Kafka.

Konektor memerlukan Kafka Connect, yang merupakan framework untuk streaming data antara Kafka dan sistem lain. Untuk menggunakan konektor, Anda harus menjalankan Kafka Connect bersama cluster Kafka.

Dokumen ini mengasumsikan bahwa Anda telah memahami Kafka dan Pub/Sub. Sebelum membaca dokumen ini, sebaiknya Anda menyelesaikan salah satu panduan memulai Pub/Sub.

Konektor Pub/Sub tidak mendukung integrasi apa pun antara Google Cloud IAM dan ACL Kafka Connect.

Mulai menggunakan konektor

Bagian ini akan memandu Anda melakukan tugas-tugas berikut:

  1. Konfigurasikan Konektor Kafka Pub/Sub Group.
  2. Mengirim peristiwa dari Kafka ke Pub/Sub.
  3. Mengirim pesan dari Pub/Sub ke Kafka.

Prasyarat

Instal Kafka

Ikuti panduan memulai Apache Kafka untuk menginstal Kafka node tunggal di komputer lokal Anda. Selesaikan langkah-langkah ini di panduan memulai:

  1. Download rilis Kafka terbaru, lalu ekstrak.
  2. Mulai lingkungan Kafka.
  3. Membuat topik Kafka.

Autentikasikan

Konektor Kafka Pub/Sub Group harus melakukan autentikasi dengan Pub/Sub agar dapat mengirim dan menerima pesan Pub/Sub. Untuk menyiapkan autentikasi, lakukan langkah-langkah berikut:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  10. Buat kredensial autentikasi lokal untuk Akun Google Anda:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Download JAR konektor

Download file JAR konektor ke komputer lokal Anda. Untuk mengetahui informasi selengkapnya, baca artikel Mendapatkan konektor di readme GitHub.

Salin file konfigurasi konektor

  1. Clone atau download repositori GitHub untuk konektor.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Salin konten direktori config ke subdirektori config dari penginstalan Kafka Anda.

    cp config/* [path to Kafka installation]/config/
    

File ini berisi setelan konfigurasi untuk konektor.

Memperbarui konfigurasi Kafka Connect

  1. Buka direktori yang berisi biner Kafka Connect yang telah Anda download.
  2. Di direktori biner Kafka Connect, buka file bernama config/connect-standalone.properties dalam editor teks.
  3. Jika plugin.path property diberi komentar, hapus tanda komentar.
  4. Update plugin.path property untuk menyertakan jalur ke JAR konektor.

    Contoh:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Tetapkan properti offset.storage.file.filename ke nama file lokal. Dalam mode mandiri, Kafka menggunakan file ini untuk menyimpan data offset.

    Contoh:

    offset.storage.file.filename=/tmp/connect.offsets
    

Meneruskan peristiwa dari Kafka ke Pub/Sub

Bagian ini menjelaskan cara memulai konektor sink, memublikasikan peristiwa ke Kafka, lalu membaca pesan yang diteruskan dari Pub/Sub.

  1. Gunakan Google Cloud CLI untuk membuat topik Pub/Sub dengan langganan.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    Ganti kode berikut:

    • PUBSUB_TOPIC: Nama topik Pub/Sub untuk menerima pesan dari Kafka.
    • PUBSUB_SUBSCRIPTION: Nama langganan Pub/Sub untuk topik.
  2. Buka file /config/cps-sink-connector.properties di editor teks. Tambahkan nilai untuk properti berikut, yang ditandai "TODO" dalam komentar:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    Ganti kode berikut:

    • KAFKA_TOPICS: Daftar yang dipisahkan koma untuk topik Kafka yang akan dibaca.
    • PROJECT_ID: Project Google Cloud yang berisi topik Pub/Sub Anda.
    • PUBSUB_TOPIC: Topik Pub/Sub untuk menerima pesan dari Kafka.
  3. Dari direktori Kafka, jalankan perintah berikut:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Ikuti langkah-langkah di panduan memulai Apache Kafka untuk menulis beberapa peristiwa ke topik Kafka.

  5. Gunakan gcloud CLI untuk membaca peristiwa dari Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

Meneruskan pesan dari Pub/Sub ke Kafka

Bagian ini menjelaskan cara memulai konektor sumber, memublikasikan pesan ke Pub/Sub, dan membaca pesan yang diteruskan dari Kafka.

  1. Gunakan gcloud CLI untuk membuat topik Pub/Sub dengan langganan.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    Ganti kode berikut:

    • PUBSUB_TOPIC: Nama topik Pub/Sub.
    • PUBSUB_SUBSCRIPTION: Nama langganan Pub/Sub.
  2. Buka file bernama /config/cps-source-connector.properties di editor teks. Tambahkan nilai untuk properti berikut, yang ditandai sebagai "TODO" dalam komentar:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    Ganti kode berikut:

    • KAFKA_TOPIC: Topik Kafka untuk menerima pesan Pub/Sub.
    • PROJECT_ID: Project Google Cloud yang berisi topik Pub/Sub Anda.
    • PUBSUB_TOPIC: Topik Pub/Sub.
  3. Dari direktori Kafka, jalankan perintah berikut:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Gunakan gcloud CLI untuk memublikasikan pesan ke Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. Baca pesan dari Kafka. Ikuti langkah-langkah pada panduan memulai Apache Kafka untuk membaca pesan dari topik Kafka.

Konversi pesan

Data Kafka berisi kunci dan nilai, yang merupakan array byte panjang variabel. Secara opsional, data Kafka juga dapat memiliki header, yang merupakan pasangan nilai kunci. Pesan Pub/Sub memiliki dua bagian utama: isi pesan dan tanpa atribut nilai kunci atau lebih.

Kafka Connect menggunakan pengonversi untuk membuat serialisasi kunci dan nilai ke dan dari Kafka. Untuk mengontrol serialisasi, tetapkan properti berikut dalam file konfigurasi konektor:

  • key.converter: Konverter yang digunakan untuk membuat serialisasi kunci kumpulan data.
  • value.converter: Konverter yang digunakan untuk membuat serialisasi nilai data.

Isi pesan Pub/Sub adalah objek ByteString, sehingga konversi yang paling efisien adalah menyalin payload secara langsung. Oleh karena itu, sebaiknya gunakan pengonversi yang menghasilkan jenis data primitif (skema bilangan bulat, float, string, atau byte) jika memungkinkan, untuk mencegah deserialisasi dan serial ulang isi pesan yang sama.

Konversi dari Kafka ke Pub/Sub

Konektor sink mengonversi data Kafka menjadi pesan Pub/Sub sebagai berikut:

  • Kunci data Kafka disimpan sebagai atribut dengan nama "key" dalam pesan Pub/Sub.
  • Secara default, konektor menghapus header apa pun dalam data Kafka. Namun, jika Anda menetapkan opsi konfigurasi headers.publish ke true, konektor akan menulis header sebagai atribut Pub/Sub. Konektor melewati header apa pun yang melebihi batas pada atribut pesan Pub/Sub.
  • Untuk skema bilangan bulat, float, string, dan byte, konektor meneruskan byte nilai data Kafka langsung ke isi pesan Pub/Sub.
  • Untuk skema struct, konektor akan menulis setiap kolom sebagai atribut pesan Pub/Sub. Misalnya, jika kolomnya adalah { "id"=123 }, pesan Pub/Sub yang dihasilkan memiliki atribut "id"="123". Nilai kolom selalu dikonversi menjadi string. Jenis peta dan struct tidak didukung sebagai jenis kolom dalam struct.
  • Untuk skema peta, konektor menulis setiap pasangan nilai kunci sebagai atribut pesan Pub/Sub. Misalnya, jika petanya adalah {"alice"=1,"bob"=2}, pesan Pub/Sub yang dihasilkan memiliki dua atribut, "alice"="1" dan "bob"="2". Kunci dan nilai dikonversi menjadi string.

Skema struktur dan peta memiliki beberapa perilaku tambahan:

  • Jika ingin, Anda dapat menentukan kolom struct atau kunci peta tertentu untuk menjadi isi pesan, dengan menetapkan properti konfigurasi messageBodyName. Nilai kolom atau kunci disimpan sebagai ByteString dalam isi pesan. Jika messageBodyName tidak ditetapkan, isi pesan akan kosong untuk struktur dan skema peta.

  • Untuk nilai array, konektor hanya mendukung jenis array primitif. Urutan nilai dalam array digabungkan menjadi satu objek ByteString.

Konversi dari Pub/Sub ke Kafka

Konektor sumber mengonversi pesan Pub/Sub ke data Kafka sebagai berikut:

  • Kunci data Kafka: Secara default, kunci disetel ke null. Jika ingin, Anda dapat menentukan atribut pesan Pub/Sub untuk digunakan sebagai kunci, dengan menetapkan opsi konfigurasi kafka.key.attribute. Dalam hal ini, konektor akan mencari atribut dengan nama tersebut dan menetapkan kunci kumpulan data ke nilai atribut. Jika atribut yang ditentukan tidak ada, kunci rekaman akan ditetapkan ke null.

  • Nilai data Kafka. Konektor menulis nilai kumpulan data sebagai berikut:

    • Jika pesan Pub/Sub tidak memiliki atribut khusus, konektor akan menulis isi pesan Pub/Sub secara langsung ke nilai data Kafka sebagai jenis byte[], menggunakan pengonversi yang ditentukan oleh value.converter.

    • Jika pesan Pub/Sub memiliki atribut khusus dan kafka.record.headers adalah false, konektor akan menulis struct ke nilai kumpulan data. Struct berisi satu kolom untuk setiap atribut, dan kolom bernama "message" yang nilainya adalah isi pesan Pub/Sub (disimpan sebagai byte):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Dalam hal ini, Anda harus menggunakan value.converter yang kompatibel dengan skema struct, seperti org.apache.kafka.connect.json.JsonConverter.

    • Jika pesan Pub/Sub memiliki atribut khusus dan kafka.record.headers adalah true, konektor akan menulis atribut sebagai header data Kafka. Fungsi ini menulis isi pesan Pub/Sub secara langsung ke nilai data Kafka sebagai jenis byte[], menggunakan pengonversi yang ditentukan oleh value.converter.

  • Header data Kafka. Secara default, header akan kosong, kecuali jika Anda menetapkan kafka.record.headers ke true.

Opsi konfigurasi

Selain konfigurasi yang disediakan oleh Kafka Connect API, Pub/Sub Group Kafka Connector mendukung konfigurasi berikut.

Opsi konfigurasi konektor sink

Konektor sink mendukung opsi konfigurasi berikut.

Setelan Jenis data Deskripsi
connector.class String Wajib. Class Java untuk konektor. Untuk konektor sink Pub/Sub, nilainya harus com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

Endpoint Pub/Sub yang akan digunakan.

Default: "pubsub.googleapis.com:443".

cps.project String Wajib diisi. Google Cloud yang berisi topik Pub/Sub.
cps.topic String Wajib diisi. Topik Pub/Sub sebagai tujuan publikasi data Kafka.
gcp.credentials.file.path String Opsional. Jalur ke file yang menyimpan kredensial Google Cloud untuk mengautentikasi Pub/Sub Lite.
gcp.credentials.json String Opsional. Blob JSON yang berisi Google Cloud untuk mengautentikasi Pub/Sub Lite.
headers.publish Boolean

Saat true, sertakan header data Kafka apa pun sebagai atribut pesan Pub/Sub.

Default: false.

maxBufferBytes Long

Jumlah byte maksimum yang akan diterima pada partisi Kafka topik sebelum memublikasikannya ke Pub/Sub.

Default: 10000000.

maxBufferSize Integer

Jumlah data maksimum yang akan diterima pada partisi topik Kafka sebelum memublikasikannya ke Pub/Sub.

Default: 100.

maxDelayThresholdMs Integer

Jumlah waktu tunggu maksimum untuk mencapai maxBufferSize atau maxBufferBytes sebelum memublikasikan kumpulan data yang belum diproses ke Pub/Sub, dalam milidetik.

Default: 100.

maxOutstandingMessages Long

Jumlah maksimum data yang dapat diselesaikan, termasuk batch yang tidak lengkap dan tertunda, sebelum penerbit memblokir publikasi lebih lanjut.

Default: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

Jumlah maksimum total byte yang dapat ditangani, termasuk batch yang tidak lengkap dan tertunda, sebelum penayang memblokir publikasi lebih lanjut.

Default: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

Waktu tunggu untuk setiap permintaan publikasi ke Pub/Sub, dalam milidetik.

Default: 10.000.

maxTotalTimeoutMs Integer

Total waktu tunggu, dalam milidetik, untuk panggilan yang dipublikasikan ke Pub/Sub, termasuk percobaan ulang.

Default: 60000.

metadata.publish Boolean

Saat true, sertakan topik, partisi, offset, dan stempel waktu Kafka sebagai atribut pesan Pub/Sub.

Default: false.

messageBodyName String

Saat menggunakan struct atau skema nilai peta, tentukan nama kolom atau kunci untuk digunakan sebagai isi pesan Pub/Sub. Lihat Konversi dari Kafka ke Pub/Sub.

Default: "cps_message_body".

orderingKeySource String

Menentukan cara menetapkan kunci pengurutan dalam pesan Pub/Sub. Dapat berupa salah satu dari nilai berikut:

  • none: Tidak menetapkan kunci pengurutan.
  • key: Menggunakan kunci data Kafka sebagai kunci pengurutan.
  • partition: Menggunakan nomor partisi yang dikonversi menjadi string sebagai kunci pengurutan. Hanya gunakan setelan ini untuk topik dengan throughput rendah atau topik dengan ribuan partisi.

Default: none.

topics String Wajib diisi. Daftar topik Kafka yang dipisahkan koma untuk dibaca.

Opsi konfigurasi konektor sumber

Konektor sumber mendukung opsi konfigurasi berikut.

Setelan Jenis data Deskripsi
connector.class String Wajib. Class Java untuk konektor. Untuk konektor sumber Pub/Sub, nilainya harus com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

Endpoint Pub/Sub yang akan digunakan.

Default: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Saat true, tulis kunci pengurutan ke data Kafka, menggunakan format yang sama dengan atribut pesan Pub/Sub. Lihat Konversi dari Pub/Sub ke data Kafka.

Default: false.

cps.maxBatchSize Integer

Jumlah maksimum pesan yang akan dikelompokkan per permintaan pull ke Pub/Sub.

Default: 100

cps.project String Wajib. Project Google Cloud yang berisi topik Pub/Sub.
cps.subscription String Wajib diisi. Nama langganan Pub/Sub yang menjadi tempat pengambilan pesan.
gcp.credentials.file.path String Opsional. Jalur ke file yang menyimpan kredensial Google Cloud untuk mengautentikasi Pub/Sub Lite.
gcp.credentials.json String Opsional. Blob JSON yang berisi Google Cloud untuk mengautentikasi Pub/Sub Lite.
kafka.key.attribute String

Atribut pesan Pub/Sub yang akan digunakan sebagai kunci untuk pesan yang dipublikasikan ke Kafka. Jika ditetapkan ke "orderingKey", gunakan kunci pengurutan pesan. Jika null, data Kafka tidak memiliki kunci.

Default: null.

kafka.partition.count Integer

Jumlah partisi Kafka untuk topik Kafka tempat pesan dipublikasikan. Parameter ini diabaikan jika skema partisi adalah "kafka_partitioner".

Default: 1.

kafka.partition.scheme String

Skema untuk menetapkan pesan ke partisi di Kafka. Dapat berupa salah satu dari nilai berikut:

  • round_robin: Menetapkan partisi dengan cara round robin.
  • hash_key: Temukan partisi dengan melakukan hashing kunci data.
  • hash_value: Temukan partisi dengan melakukan hashing nilai data.
  • kafka_partitioner: Mendelegasikan logika partisi ke produser Kafka. Secara default, produser Kafka otomatis mendeteksi jumlah partisi dan melakukan pemetaan partisi berbasis hash murmur atau round robin, bergantung pada apakah kunci rekaman disediakan.
  • ordering_key: Menggunakan kode hash kunci pengurutan pesan. Jika kunci pengurutan tidak ada, gunakan round_robin.

Default: round_robin.

kafka.record.headers Boolean

Jika true, tulis atribut pesan Pub/Sub sebagai header Kafka.

kafka.topic String Wajib diisi. Topik Kafka yang menerima pesan dari Pub/Sub.

Mendapatkan dukungan

Jika Anda memerlukan bantuan, buat tiket dukungan. Untuk pertanyaan dan diskusi umum, buat masalah di repositori GitHub.

Langkah selanjutnya