Template Apache Kafka ke Kafka

Template Apache Kafka ke Apache Kafka membuat pipeline streaming yang menyerap data sebagai byte dari sumber Apache Kafka, lalu menulis byte ke sink Apache Kafka.

Persyaratan pipeline

  • Topik sumber Apache Kafka harus ada.
  • Server broker sumber dan sink Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
  • Jika Anda menggunakan Google Cloud Managed Service untuk Apache Kafka sebagai sumber atau sink, topik tersebut harus ada sebelum meluncurkan template.

Format pesan Kafka

Pesan sumber Apache Kafka dibaca sebagai byte, dan byte ditulis ke sink Apache Kafka.

Autentikasi

Template Apache Kafka ke Apache Kafka mendukung SASL/PLAIN dan autentikasi TLS terhadap broker Kafka.

Parameter template

Parameter yang diperlukan

  • readBootstrapServerAndTopic : Topik Kafka untuk membaca input.
  • kafkaReadAuthenticationMode : Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan NONE untuk tanpa autentikasi, SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, dan TLS untuk autentikasi berbasis sertifikat. Apache Kafka untuk BigQuery hanya mendukung mode autentikasi SASL_PLAIN. Default-nya adalah: SASL_PLAIN.
  • writeBootstrapServerAndTopic : Topik Kafka untuk menulis output.
  • kafkaWriteAuthenticationMethod : Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan NONE untuk tanpa autentikasi, SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, dan TLS untuk autentikasi berbasis sertifikat. Default-nya adalah: NONE.

Parameter opsional

  • enableCommitOffsets : Meng-commit offset pesan yang diproses ke Kafka. Jika diaktifkan, tindakan ini akan meminimalkan kesenjangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan ID Grup Konsumen. Default-nya adalah: false.
  • consumerGroupId : ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offsets to Kafka diaktifkan. Nilai defaultnya adalah kosong.
  • kafkaReadOffset : Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Pesan paling awal dimulai dari awal, terbaru dari pesan terbaru. Default-nya adalah: terbaru.
  • kafkaReadUsernameSecretId : ID rahasia Google Cloud Secret Manager yang berisi nama pengguna Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>). Nilai defaultnya adalah kosong.
  • kafkaReadPasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi Kafka untuk digunakan dengan autentikasi SASL_PLAIN. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>). Nilai defaultnya adalah kosong.
  • kafkaReadKeystoreLocation : Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi untuk digunakan saat mengautentikasi dengan cluster Kafka. (Contoh: gs://your-bucket/keystore.jks).
  • kafkaReadTruststoreLocation : Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
  • kafkaReadTruststorePasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>).
  • kafkaReadKeyPasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>).
  • kafkaWriteUsernameSecretId : ID rahasia Google Cloud Secret Manager yang berisi nama pengguna Kafka untuk autentikasi SASL_PLAIN dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>). Nilai defaultnya adalah kosong.
  • kafkaWritePasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi Kafka yang akan digunakan untuk autentikasi SASL_PLAIN dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>). Nilai defaultnya adalah kosong.
  • kafkaWriteKeystoreLocation : Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi untuk mengautentikasi dengan cluster Kafka tujuan. (Contoh: gs://
  • kafkaWriteTruststoreLocation : Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka tujuan.
  • kafkaWriteTruststorePasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>).
  • kafkaWriteKeystorePasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi untuk mengakses file Java KeyStore (JKS) yang akan digunakan untuk autentikasi TLS dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>).
  • kafkaWriteKeyPasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>).

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama tugas yang unik.
  4. Opsional: Untuk Regional endpoint, pilih nilai dari menu drop-down. Default region adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Kafka to Cloud Storage template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal satu kali, pilih Setidaknya Sekali.
  8. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • REGION_NAME: wilayah tempat Anda ingin men-deploy tugas Dataflow Anda—misalnya, us-central1
  • VERSION: versi {i>template<i} yang ingin Anda gunakan.

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel Cloud Storage Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika beberapa topik disediakan, Anda harus meng-escape koma. Lihat gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan JavaScript yang ditentukan pengguna (UDF) yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, maka nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port yang dapat digunakan untuk mengakses server. Contoh: 35.70.252.199:9092. Jika ada beberapa alamat, Anda harus meng-escape koma. Lihat gcloud topic escaping.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • LOCATION: wilayah tempat Anda ingin men-deploy tugas Dataflow Anda—misalnya, us-central1
  • VERSION: versi {i>template<i} yang ingin Anda gunakan.

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel Cloud Storage Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika beberapa topik disediakan, Anda harus meng-escape koma. Lihat gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan JavaScript yang ditentukan pengguna (UDF) yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, maka nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port yang dapat digunakan untuk mengakses server. Contoh: 35.70.252.199:9092. Jika ada beberapa alamat, Anda harus meng-escape koma. Lihat gcloud topic escaping.

Untuk informasi selengkapnya, lihat Menulis data dari Kafka ke Cloud Storage dengan Dataflow.

Langkah selanjutnya