Template Apache Kafka ke Apache Kafka membuat pipeline streaming yang menyerap data sebagai byte dari sumber Apache Kafka, lalu menulis byte ke sink Apache Kafka.
Persyaratan pipeline
- Topik sumber Apache Kafka harus ada.
- Server broker sumber dan sink Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
- Jika Anda menggunakan Google Cloud Managed Service for Apache Kafka sebagai sumber atau sink, topik harus ada sebelum meluncurkan template.
Format pesan Kafka
Pesan sumber Apache Kafka dibaca sebagai byte, dan byte ditulis ke sink Apache Kafka.
Autentikasi
Template Apache Kafka ke Apache Kafka mendukung autentikasi SASL/PLAIN dan TLS ke broker Kafka.
Parameter template
Parameter yang diperlukan
- readBootstrapServerAndTopic : Server dan topik Bootstrap Kafka untuk membaca input. (Contoh: localhost:9092;topic1,topic2).
- kafkaReadAuthenticationMode : Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan NONE untuk tidak ada autentikasi, SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, TLS untuk autentikasi berbasis sertifikat. APPLICATION_DEFAULT_CREDENTIALS hanya boleh digunakan untuk cluster Google Cloud Apache Kafka untuk BigQuery karena hal ini memungkinkan Anda melakukan autentikasi dengan Google Cloud Apache Kafka untuk BigQuery menggunakan kredensial default aplikasi.
- writeBootstrapServerAndTopic : Topik Kafka tempat output akan ditulis.
- kafkaWriteAuthenticationMethod : Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan NONE untuk tidak ada autentikasi, SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, dan TLS untuk autentikasi berbasis sertifikat. Default-nya adalah: APPLICATION_DEFAULT_CREDENTIALS.
Parameter opsional
- enableCommitOffsets : Mencommit offset pesan yang diproses ke Kafka. Jika diaktifkan, hal ini akan meminimalkan kesenjangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan penentuan ID Grup Konsumen. Defaultnya adalah: false.
- consumerGroupId : ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offsets to Kafka diaktifkan. Default-nya adalah kosong.
- kafkaReadOffset : Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Pesan terlama dimulai dari awal, pesan terbaru dimulai dari pesan terbaru. Default-nya adalah: latest.
- kafkaReadUsernameSecretId : ID secret Google Cloud Secret Manager yang berisi nama pengguna Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Secara default kosong.
- kafkaReadPasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Secara default kosong.
- kafkaReadKeystoreLocation : Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi yang akan digunakan saat mengautentikasi dengan cluster Kafka. (Contoh: gs://your-bucket/keystore.jks).
- kafkaReadTruststoreLocation : Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
- kafkaReadTruststorePasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeystorePasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaReadKeyPasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaWriteUsernameSecretId : ID secret Google Cloud Secret Manager yang berisi nama pengguna Kafka untuk autentikasi SASL_PLAIN dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Secara default kosong.
- kafkaWritePasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi Kafka yang akan digunakan untuk autentikasi SASL_PLAIN dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>). Secara default kosong.
- kafkaWriteKeystoreLocation : Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi untuk mengautentikasi dengan cluster Kafka tujuan. (Contoh: gs://
- kafkaWriteTruststoreLocation : Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka tujuan.
- kafkaWriteTruststorePasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaWriteKeystorePasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi untuk mengakses file Java KeyStore (JKS) yang akan digunakan untuk autentikasi TLS dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
- kafkaWriteKeyPasswordSecretId : ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS dengan cluster Kafka tujuan. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
Menjalankan template
Konsol
- Buka halaman Create job from template Dataflow. Buka Buat tugas dari template
- Di kolom Nama tugas, masukkan nama tugas yang unik.
- Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah
us-central1
.Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Dataflow template, pilih the Kafka to Cloud Storage template.
- Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
- Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal sekali, pilih Minimal Sekali.
- Klik Run job.
gcloud
Di shell atau terminal, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama tugas unik pilihan AndaREGION_NAME
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1
VERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan template versi terbaru, yang tersedia di folder induk tanpa tanggal di bucket—gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat dalam folder induk bertanggal masing-masing di bucket—gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nama tabel Cloud Storage AndaKAFKA_TOPICS
: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, Anda harus meng-escape koma. Lihatgcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage dari file.js
yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakanMisalnya, jika kode fungsi JavaScript Anda adalah
myTransform(inJson) { /*...do stuff...*/ }
, nama fungsinya adalahmyTransform
. Untuk contoh UDF JavaScript, lihat Contoh UDF.KAFKA_SERVER_ADDRESSES
: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port tempat server dapat diakses. Contoh:35.70.252.199:9092
. Jika ada beberapa alamat yang diberikan, Anda harus meng-escape koma. Lihatgcloud topic escaping
.
API
Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang
API dan cakupan otorisasinya, lihat
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama tugas unik pilihan AndaLOCATION
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1
VERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan template versi terbaru, yang tersedia di folder induk tanpa tanggal di bucket—gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat dalam folder induk bertanggal masing-masing di bucket—gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nama tabel Cloud Storage AndaKAFKA_TOPICS
: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, Anda harus meng-escape koma. Lihatgcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage dari file.js
yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakanMisalnya, jika kode fungsi JavaScript Anda adalah
myTransform(inJson) { /*...do stuff...*/ }
, nama fungsinya adalahmyTransform
. Untuk contoh UDF JavaScript, lihat Contoh UDF.KAFKA_SERVER_ADDRESSES
: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port tempat server dapat diakses. Contoh:35.70.252.199:9092
. Jika ada beberapa alamat yang diberikan, Anda harus meng-escape koma. Lihatgcloud topic escaping
.
Untuk informasi selengkapnya, lihat Menulis data dari Kafka ke Cloud Storage dengan Dataflow.
Langkah selanjutnya
- Pelajari template Dataflow.
- Lihat daftar template yang disediakan Google.