Template Apache Kafka ke Cloud Storage adalah pipeline streaming yang menyerap data teks dari Layanan yang Dikelola Google Cloud untuk Apache Kafka dan menghasilkan kumpulan data ke Cloud Storage.
Anda juga dapat menggunakan template Apache Kafka to BigQuery dengan Kafka yang dikelola sendiri atau eksternal.
Persyaratan pipeline
- Bucket Cloud Storage output harus ada.
- Server broker Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
- Topik Apache Kafka harus ada.
Format pesan Kafka
Template Apache Kafka ke Cloud Storage mendukung pembacaan pesan dari Kafka dalam format berikut: CONFLUENT_AVRO_WIRE_FORMAT
dan JSON
.
Format file output
Format file output memiliki format yang sama dengan pesan Kafka input. Misalnya, jika Anda memilih JSON untuk format pesan Kafka, file JSON akan ditulis ke bucket Cloud Storage output.
Autentikasi
Template Apache Kafka ke Cloud Storage mendukung autentikasi SASL/PLAIN untuk broker Kafka.
Parameter template
Parameter yang diperlukan
- readBootstrapServerAndTopic : Topik Kafka untuk membaca input.
- kafkaReadAuthenticationMode : Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan NONE jika tidak ada autentikasi dan SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN. Apache Kafka untuk BigQuery hanya mendukung mode autentikasi SASL_PLAIN. Default-nya adalah: SASL_PLAIN.
- outputDirectory : Jalur dan awalan nama file untuk menulis file output. Harus diakhiri dengan garis miring. (Contoh: gs://your-bucket/your-path/).
- messageFormat : Format pesan Kafka yang akan dibaca. Nilai yang didukung adalah AVRO_CONFLUENT_WIRE_FORMAT (Confluent Schema Registry yang dienkode Avro), AVRO_BINARY_ENCODING (Biner polos Avro), dan JSON. Default-nya adalah: AVRO_CONFLUENT_WIRE_FORMAT.
Parameter opsional
- windowDuration : Durasi/ukuran periode tempat data akan ditulis ke Cloud Storage. Format yang diizinkan adalah: Ns (untuk detik, misalnya: 5d), Nm (untuk menit, misalnya: 12m), Nh (untuk jam, misalnya: 2j). (Contoh: 5 m). Default-nya adalah: 5m.
- outputFilenamePrefix : Awalan yang akan ditempatkan pada setiap file yang memiliki jendela. (Contoh: output-). Default-nya adalah: output.
- numShards : Jumlah maksimum shard output yang dihasilkan saat menulis. Jumlah shard yang lebih tinggi berarti throughput yang lebih tinggi untuk menulis ke Cloud Storage, tetapi berpotensi menyebabkan biaya agregasi data yang lebih tinggi di seluruh shard saat memproses file Cloud Storage output. Nilai default ditentukan oleh Dataflow.
- enableCommitOffsets : Meng-commit offset pesan yang diproses ke Kafka. Jika diaktifkan, tindakan ini akan meminimalkan kesenjangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan ID Grup Konsumen. Default-nya adalah: false.
- consumerGroupId : ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offsets to Kafka diaktifkan. Nilai defaultnya adalah kosong.
- kafkaReadOffset : Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Pesan paling awal dimulai dari awal, terbaru dari pesan terbaru. Default-nya adalah: terbaru.
- kafkaReadUsernameSecretId : ID rahasia Google Cloud Secret Manager yang berisi nama pengguna Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>). Nilai defaultnya adalah kosong.
- kafkaReadPasswordSecretId : ID rahasia Google Cloud Secret Manager yang berisi sandi Kafka untuk digunakan dengan autentikasi SASL_PLAIN. (Contoh: projects/<PROJECT_ID>/secrets/<SECRET_ID>/version/<SECRET_VERSION>). Nilai defaultnya adalah kosong.
- schemaFormat : Format skema Kafka. Dapat disediakan sebagai SINGLE_SCHEMA_FILE atau SCHEMA_REGISTRY. Jika SINGLE_SCHEMA_FILE ditentukan, semua pesan harus memiliki skema yang disebutkan dalam file skema avro. Jika SCHEMA_REGISTRY ditentukan, pesan dapat memiliki satu skema atau beberapa skema. Default-nya adalah: SINGLE_SCHEMA_FILE.
- confluentAvroSchemaPath : Jalur Google Cloud Storage ke file skema Avro tunggal yang digunakan untuk mendekode semua pesan dalam topik. Nilai defaultnya adalah kosong.
- schemaRegistryConnectionUrl : URL untuk instance Confluent Schema Registry yang digunakan untuk mengelola skema Avro untuk decoding pesan. Nilai defaultnya adalah kosong.
- binaryAvroSchemaPath : Jalur Google Cloud Storage ke file skema Avro yang digunakan untuk mendekode pesan Avro yang dienkode dengan biner. Nilai defaultnya adalah kosong.
Menjalankan template
Konsol
- Buka halaman Create job from template Dataflow. Buka Buat tugas dari template
- Di kolom Job name, masukkan nama tugas yang unik.
- Opsional: Untuk Regional endpoint, pilih nilai dari menu drop-down. Default
region adalah
us-central1
.Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Dataflow template, pilih the Kafka to Cloud Storage template.
- Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
- Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal satu kali, pilih Setidaknya Sekali.
- Klik Run job.
gcloud
Di shell atau terminal Anda, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Cloud Storage \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama pekerjaan unik pilihan AndaREGION_NAME
: wilayah tempat Anda ingin men-deploy tugas Dataflow Anda—misalnya,us-central1
VERSION
: versi {i>template<i} yang ingin Anda gunakan.Anda dapat menggunakan nilai berikut:
latest
untuk menggunakan template versi terbaru, yang tersedia di Folder induk non-dated di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi tertentu dari , yang dapat ditemukan disarangkan di masing-masing folder induk bertanggal dalam bucket— gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nama tabel Cloud Storage AndaKAFKA_TOPICS
: daftar topik Apache Kakfa. Jika beberapa topik disediakan, Anda harus meng-escape koma. Lihatgcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage dari file.js
yang menentukan JavaScript yang ditentukan pengguna (UDF) yang ingin Anda gunakan—misalnya,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakanMisalnya, jika kode fungsi JavaScript Anda adalah
myTransform(inJson) { /*...do stuff...*/ }
, maka nama fungsinya adalahmyTransform
. Untuk contoh UDF JavaScript, lihat Contoh UDF.KAFKA_SERVER_ADDRESSES
: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port yang dapat digunakan untuk mengakses server. Contoh:35.70.252.199:9092
. Jika ada beberapa alamat, Anda harus meng-escape koma. Lihatgcloud topic escaping
.
API
Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk informasi selengkapnya tentang
API dan cakupan otorisasinya, lihat
projects.templates.launch
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Cloud Storage", } }
Ganti kode berikut:
PROJECT_ID
: ID project Google Cloud tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama pekerjaan unik pilihan AndaLOCATION
: wilayah tempat Anda ingin men-deploy tugas Dataflow Anda—misalnya,us-central1
VERSION
: versi {i>template<i} yang ingin Anda gunakan.Anda dapat menggunakan nilai berikut:
latest
untuk menggunakan template versi terbaru, yang tersedia di Folder induk non-dated di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi tertentu dari , yang dapat ditemukan disarangkan di masing-masing folder induk bertanggal dalam bucket— gs://dataflow-templates-REGION_NAME/
BIGQUERY_TABLE
: nama tabel Cloud Storage AndaKAFKA_TOPICS
: daftar topik Apache Kakfa. Jika beberapa topik disediakan, Anda harus meng-escape koma. Lihatgcloud topic escaping
.PATH_TO_JAVASCRIPT_UDF_FILE
: URI Cloud Storage dari file.js
yang menentukan JavaScript yang ditentukan pengguna (UDF) yang ingin Anda gunakan—misalnya,gs://my-bucket/my-udfs/my_file.js
JAVASCRIPT_FUNCTION
: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakanMisalnya, jika kode fungsi JavaScript Anda adalah
myTransform(inJson) { /*...do stuff...*/ }
, maka nama fungsinya adalahmyTransform
. Untuk contoh UDF JavaScript, lihat Contoh UDF.KAFKA_SERVER_ADDRESSES
: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port yang dapat digunakan untuk mengakses server. Contoh:35.70.252.199:9092
. Jika ada beberapa alamat, Anda harus meng-escape koma. Lihatgcloud topic escaping
.
Untuk informasi selengkapnya, lihat Menulis data dari Kafka ke Cloud Storage dengan Dataflow.
Langkah selanjutnya
- Pelajari template Dataflow.
- Lihat daftar template yang disediakan Google.