Template Apache Kafka ke Cloud Storage

Template Apache Kafka ke Cloud Storage adalah pipeline streaming yang menyerap data teks dari Google Cloud Managed Service for Apache Kafka dan menghasilkan data ke Cloud Storage.

Anda juga dapat menggunakan template Apache Kafka ke BigQuery dengan Kafka eksternal atau yang dikelola sendiri.

Persyaratan pipeline

  • Bucket Cloud Storage output harus ada.
  • Server broker Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
  • Topik Apache Kafka harus ada.

Format pesan Kafka

Template Apache Kafka ke Cloud Storage mendukung pembacaan pesan dari Kafka dalam format berikut: CONFLUENT_AVRO_WIRE_FORMAT dan JSON.

Format file output

Format file output memiliki format yang sama dengan pesan Kafka input. Misalnya, jika Anda memilih JSON untuk format pesan Kafka, file JSON akan ditulis ke bucket Cloud Storage output.

Autentikasi

Template Apache Kafka ke Cloud Storage mendukung autentikasi SASL/PLAIN ke broker Kafka.

Parameter template

Parameter yang diperlukan

  • readBootstrapServerAndTopic: Topik Kafka untuk membaca input.
  • outputDirectory: Awalan jalur dan nama file untuk menulis file output. Harus diakhiri dengan garis miring. Contoh, gs://your-bucket/your-path/.
  • kafkaReadAuthenticationMode: Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan KafkaAuthenticationMethod.NONE untuk tidak ada autentikasi, KafkaAuthenticationMethod.SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, dan KafkaAuthenticationMethod.TLS untuk autentikasi berbasis sertifikat. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS hanya boleh digunakan untuk cluster Google Cloud Apache Kafka for BigQuery, yang memungkinkan autentikasi menggunakan kredensial default aplikasi.
  • messageFormat: Format pesan Kafka yang akan dibaca. Nilai yang didukung adalah AVRO_CONFLUENT_WIRE_FORMAT (Avro yang dienkode Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro biner biasa), dan JSON. Default-nya adalah: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: Jika true (benar), pesan yang gagal akan ditulis ke BigQuery dengan informasi error tambahan. Defaultnya adalah: false.

Parameter opsional

  • windowDuration: Durasi/ukuran periode waktu saat data akan ditulis ke Cloud Storage. Format yang diizinkan adalah: Ns (untuk detik, contoh: 5d), Nm (untuk menit, contoh: 12m), Nh (untuk jam, contoh: 2h). Contoh, 5m. Setelan defaultnya adalah: 5 m.
  • outputFilenamePrefix: Awalan yang akan ditempatkan pada setiap file dengan jendela. Contoh, output-. Setelan defaultnya adalah: output.
  • numShards: Jumlah maksimum shard output yang dihasilkan saat menulis. Jumlah shard yang lebih tinggi berarti throughput yang lebih tinggi untuk menulis ke Cloud Storage, tetapi berpotensi meningkatkan biaya agregasi data di seluruh shard saat memproses file Cloud Storage output. Nilai default ditentukan oleh Dataflow.
  • enableCommitOffsets: Menyimpan offset pesan yang diproses ke Kafka. Jika diaktifkan, hal ini akan meminimalkan kesenjangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan penentuan ID Grup Konsumen. Defaultnya adalah: false.
  • consumerGroupId: ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offsets to Kafka diaktifkan. Default-nya adalah kosong.
  • kafkaReadOffset: Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Pesan terlama dimulai dari awal, pesan terbaru dimulai dari pesan terbaru. Default-nya adalah: latest.
  • kafkaReadUsernameSecretId: ID secret Google Cloud Secret Manager yang berisi nama pengguna Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. Misalnya, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Default-nya adalah kosong.
  • kafkaReadPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. Misalnya, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Default-nya adalah kosong.
  • kafkaReadKeystoreLocation: Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi yang akan digunakan saat mengautentikasi dengan cluster Kafka. Contoh, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
  • kafkaReadTruststorePasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka. Misalnya, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: Format skema Kafka. Dapat diberikan sebagai SINGLE_SCHEMA_FILE atau SCHEMA_REGISTRY. Jika SINGLE_SCHEMA_FILE ditentukan, gunakan skema yang disebutkan dalam file skema avro untuk semua pesan. Jika SCHEMA_REGISTRY ditentukan, pesan dapat memiliki satu skema atau beberapa skema. Setelan defaultnya adalah: SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: Jalur Google Cloud Storage ke satu file skema Avro yang digunakan untuk mendekode semua pesan dalam topik. Default-nya adalah kosong.
  • schemaRegistryConnectionUrl: URL untuk instance Confluent Schema Registry yang digunakan untuk mengelola skema Avro untuk decoding pesan. Default-nya adalah kosong.
  • binaryAvroSchemaPath: Jalur Google Cloud Storage ke file skema Avro yang digunakan untuk mendekode pesan Avro yang dienkode biner. Default-nya adalah kosong.
  • schemaRegistryAuthenticationMode: Mode autentikasi Schema Registry. Dapat berupa NONE, TLS, atau OAUTH. Defaultnya adalah: NONE.
  • schemaRegistryTruststoreLocation: Lokasi sertifikat SSL tempat trust store untuk autentikasi ke Schema Registry disimpan. Contoh, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId di secret manager tempat sandi untuk mengakses secret di truststore disimpan. Contoh, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: Lokasi keystore yang berisi sertifikat SSL dan kunci pribadi. Contoh, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId di secret manager tempat sandi untuk mengakses file keystore. Misalnya, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId sandi yang diperlukan untuk mengakses kunci pribadi klien yang disimpan dalam keystore. Misalnya, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: Client ID yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: ID rahasia Google Cloud Secret Manager yang berisi Client Secret yang akan digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT. Contoh, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: Cakupan token akses yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Kolom ini bersifat opsional, karena permintaan dapat dilakukan tanpa parameter cakupan yang diteruskan. Contoh, openid.
  • schemaRegistryOauthTokenEndpointUrl: URL berbasis HTTP(S) untuk penyedia identitas OAuth/OIDC yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: Nama tabel BigQuery yang sepenuhnya memenuhi syarat untuk pesan yang gagal. Pesan yang gagal mencapai tabel output karena berbagai alasan (misalnya, skema yang tidak cocok, JSON yang salah format) akan ditulis ke tabel ini. Tabel akan dibuat oleh template. Contoh, your-project-id:your-dataset.your-table-name.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Kafka to Cloud Storage template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming setidaknya sekali, pilih Setidaknya Sekali.
  8. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel Cloud Storage Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port tempat server dapat diakses. Contoh: 35.70.252.199:9092. Jika beberapa alamat diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel Cloud Storage Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port tempat server dapat diakses. Contoh: 35.70.252.199:9092. Jika beberapa alamat diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.

Untuk informasi selengkapnya, lihat Menulis data dari Kafka ke Cloud Storage dengan Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueue;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueueOptions;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.kafka.options.SchemaRegistryOptions;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.transforms.WriteTransform;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;

@Template(
    name = "Kafka_to_Gcs_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to Cloud Storage",
    description =
        "A streaming pipeline which ingests data from Kafka and writes to a pre-existing Cloud"
            + " Storage bucket with a variety of file types.",
    optionsClass = KafkaToGcsFlex.KafkaToGcsOptions.class,
    flexContainerName = "kafka-to-gcs-flex",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The output Google Cloud Storage directory must exist."})
public class KafkaToGcsFlex {

  public interface KafkaToGcsOptions
      extends PipelineOptions,
          DataflowPipelineOptions,
          KafkaReadOptions,
          SchemaRegistryOptions,
          BigQueryDeadLetterQueueOptions {

    // This is a duplicate option that already exist in KafkaReadOptions but keeping it here
    // so the KafkaTopic appears above the authentication enum on the Templates UI.
    @TemplateParameter.KafkaReadTopic(
        order = 1,
        name = "readBootstrapServerAndTopic",
        groupName = "Source",
        description = "Source Kafka Topic",
        helpText = "Kafka Topic to read the input from.")
    String getReadBootstrapServerAndTopic();

    void setReadBootstrapServerAndTopic(String value);

    @TemplateParameter.Duration(
        order = 20,
        optional = true,
        groupName = "Destination",
        description = "Window duration",
        helpText =
            "The window duration/size in which data will be written to Cloud Storage. Allowed formats are: Ns (for "
                + "seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h).",
        example = "5m")
    @Default.String("5m")
    String getWindowDuration();

    void setWindowDuration(String windowDuration);

    @TemplateParameter.GcsWriteFolder(
        order = 21,
        groupName = "Destination",
        description = "Output file directory in Cloud Storage",
        helpText = "The path and filename prefix for writing output files. Must end with a slash.",
        example = "gs://your-bucket/your-path/")
    String getOutputDirectory();

    void setOutputDirectory(String outputDirectory);

    @TemplateParameter.Text(
        order = 22,
        optional = true,
        groupName = "Destination",
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file.",
        example = "output-")
    @Default.String("output")
    String getOutputFilenamePrefix();

    void setOutputFilenamePrefix(String outputFilenamePrefix);

    @TemplateParameter.Integer(
        order = 23,
        optional = true,
        description = "Maximum output shards",
        groupName = "Destination",
        helpText =
            "The maximum number of output shards produced when writing. A higher number of "
                + "shards means higher throughput for writing to Cloud Storage, but potentially higher "
                + "data aggregation cost across shards when processing output Cloud Storage files. "
                + "Default value is decided by Dataflow.")
    @Default.Integer(0)
    Integer getNumShards();

    void setNumShards(Integer numShards);
  }

  public static PipelineResult run(KafkaToGcsOptions options) throws Exception {
    // Create the Pipeline
    Pipeline pipeline = Pipeline.create(options);
    String bootstrapServes;
    List<String> topicsList;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> bootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      topicsList = List.of(bootstrapServerAndTopicList.get(1));
      bootstrapServes = bootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    options.setStreaming(true);

    Map<String, Object> kafkaConfig = new HashMap<>(KafkaConfig.fromReadOptions(options));

    // Configure dead letter queue params
    ErrorHandler<BadRecord, ?> errorHandler = new ErrorHandler.DefaultErrorHandler<>();
    // Throwing Router throws the error instead of sending it to the DLQ. This will be the case
    // when no DLQ is configured and the pipeline will retry the failed error.
    BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;

    if (options.getUseBigQueryDLQ()) {
      if (options.getOutputDeadletterTable() == null
          || options.getOutputDeadletterTable().isBlank()) {
        throw new IllegalArgumentException(
            "Please provide a Fully Qualified BigQuery table name when BigQuery Dead Letter"
                + "Queue is enabled");
      }
      badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
      errorHandler =
          pipeline.registerBadRecordErrorHandler(
              BigQueryDeadLetterQueue.newBuilder()
                  .setTableName(options.getOutputDeadletterTable())
                  .build());
    }

    PCollection<KafkaRecord<byte[], byte[]>> kafkaRecord;
    // Step 1: Read from Kafka as bytes.
    KafkaIO.Read<byte[], byte[]> kafkaTransform =
        KafkaTransform.readBytesFromKafka(
            bootstrapServes, topicsList, kafkaConfig, options.getEnableCommitOffsets());
    kafkaRecord = pipeline.apply(kafkaTransform);

    kafkaRecord.apply(
        WriteTransform.newBuilder()
            .setOptions(options)
            .setBadRecordErrorHandler(errorHandler)
            .setBadRecordRouter(badRecordRouter)
            .build());
    if (options.getUseBigQueryDLQ()) {
      errorHandler.close();
    }
    return pipeline.run();
  }

  public static void main(String[] args) throws Exception {
    KafkaToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToGcsOptions.class);

    run(options);
  }
}

Langkah berikutnya