Template Apache Kafka ke BigQuery

Template Apache Kafka ke BigQuery adalah pipeline streaming yang menyerap data teks dari cluster Google Cloud Managed Service for Apache Kafka, lalu menghasilkan data yang dihasilkan ke tabel BigQuery. Setiap error yang terjadi saat menyisipkan data ke dalam tabel output akan disisipkan ke dalam tabel error terpisah di BigQuery.

Anda juga dapat menggunakan template Apache Kafka ke BigQuery dengan Kafka eksternal atau yang dikelola sendiri.

Persyaratan pipeline

  • Server broker Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
  • Topik Apache Kafka harus ada.
  • Anda harus mengaktifkan Dataflow, BigQuery, dan Cloud Storage API. Jika autentikasi diperlukan, Anda juga harus mengaktifkan Secret Manager API.
  • Buat set data dan tabel BigQuery dengan skema yang sesuai untuk topik input Kafka Anda. Jika menggunakan beberapa skema dalam topik yang sama dan ingin menulis ke beberapa tabel, Anda tidak perlu membuat tabel sebelum mengonfigurasi pipeline.
  • Saat antrean pesan yang tidak terproses (dead letter) untuk template diaktifkan, buat tabel kosong yang tidak memiliki skema untuk antrean pesan yang tidak terproses.

Format pesan Kafka

Template Apache Kafka ke BigQuery mendukung pembacaan pesan dari Kafka dalam format berikut: CONFLUENT_AVRO_WIRE_FORMAT, AVRO_BINARY_FORMAT, dan JSON.

Autentikasi

Template Apache Kafka ke BigQuery mendukung autentikasi SASL/PLAIN ke broker Kafka.

Parameter template

Parameter yang diperlukan

  • readBootstrapServerAndTopic: Topik Kafka untuk membaca input.
  • writeMode: Menulis kumpulan data ke satu tabel atau beberapa tabel (berdasarkan skema). Mode DYNAMIC_TABLE_NAMES hanya didukung untuk AVRO_CONFLUENT_WIRE_FORMAT Source Message Format dan SCHEMA_REGISTRY Schema Source. Nama tabel target dibuat secara otomatis berdasarkan nama skema Avro dari setiap pesan, yang dapat berupa satu skema (membuat satu tabel) atau beberapa skema (membuat beberapa tabel). Mode SINGLE_TABLE_NAME menulis ke satu tabel (satu skema) yang ditentukan oleh pengguna. Nilai default-nya adalah SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode: Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan KafkaAuthenticationMethod.NONE untuk tidak ada autentikasi, KafkaAuthenticationMethod.SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, dan KafkaAuthenticationMethod.TLS untuk autentikasi berbasis sertifikat. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS hanya boleh digunakan untuk cluster Google Cloud Apache Kafka for BigQuery, yang memungkinkan autentikasi menggunakan kredensial default aplikasi.
  • messageFormat: Format pesan Kafka yang akan dibaca. Nilai yang didukung adalah AVRO_CONFLUENT_WIRE_FORMAT (Avro yang dienkode Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro biner biasa), dan JSON. Default-nya adalah: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: Jika true (benar), pesan yang gagal akan ditulis ke BigQuery dengan informasi error tambahan. Defaultnya adalah: false.

Parameter opsional

  • outputTableSpec: Lokasi tabel BigQuery tempat output akan ditulis. Nama harus dalam format <project>:<dataset>.<table_name>. Skema tabel harus cocok dengan objek input.
  • persistKafkaKey: Jika true, pipeline akan mempertahankan kunci pesan Kafka di tabel BigQuery, di kolom _key dari jenis BYTES. Default-nya adalah false (Kunci diabaikan).
  • outputProject: Project output BigQuery tempat set data berada. Tabel akan dibuat secara dinamis dalam set data. Default-nya adalah kosong.
  • outputDataset: Set data output BigQuery tempat output akan ditulis. Tabel akan dibuat secara dinamis dalam set data. Jika tabel dibuat sebelumnya, nama tabel harus mengikuti konvensi penamaan yang ditentukan. Namanya harus bqTableNamePrefix + Avro Schema FullName , setiap kata akan dipisahkan dengan tanda hubung -. Default-nya adalah kosong.
  • bqTableNamePrefix: Awalan penamaan yang akan digunakan saat membuat tabel output BigQuery. Hanya berlaku saat menggunakan registry skema. Default-nya adalah kosong.
  • createDisposition: CreateDisposition BigQuery. Misalnya: CREATE_IF_NEEDED, CREATE_NEVER. Defaultnya adalah: CREATE_IF_NEEDED.
  • writeDisposition: BigQuery WriteDisposition. Misalnya: WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Default-nya adalah: WRITE_APPEND.
  • useAutoSharding: Jika benar, pipeline akan menggunakan sharding otomatis saat menulis ke BigQuery. Nilai defaultnya adalah true.
  • numStorageWriteApiStreams: Menentukan jumlah aliran tulis, parameter ini harus ditetapkan. Default-nya adalah 0.
  • storageWriteApiTriggeringFrequencySec: Menentukan frekuensi pemicuan dalam detik, parameter ini harus ditetapkan. Defaultnya adalah 5 detik.
  • useStorageWriteApiAtLeastOnce: Parameter ini hanya berlaku jika "Gunakan BigQuery Storage Write API" diaktifkan. Jika diaktifkan, semantik minimal satu kali akan digunakan untuk Storage Write API, jika tidak, semantik tepat satu kali akan digunakan. Defaultnya adalah: false.
  • enableCommitOffsets: Menyimpan offset pesan yang diproses ke Kafka. Jika diaktifkan, hal ini akan meminimalkan kesenjangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan penentuan ID Grup Konsumen. Defaultnya adalah: false.
  • consumerGroupId: ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offsets to Kafka diaktifkan. Default-nya adalah kosong.
  • kafkaReadOffset: Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Pesan terlama dimulai dari awal, pesan terbaru dimulai dari pesan terbaru. Default-nya adalah: latest.
  • kafkaReadUsernameSecretId: ID secret Google Cloud Secret Manager yang berisi nama pengguna Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. Misalnya, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Default-nya adalah kosong.
  • kafkaReadPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi Kafka yang akan digunakan dengan autentikasi SASL_PLAIN. Misalnya, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Default-nya adalah kosong.
  • kafkaReadKeystoreLocation: Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi yang akan digunakan saat mengautentikasi dengan cluster Kafka. Contoh, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
  • kafkaReadTruststorePasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka. Misalnya, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: Format skema Kafka. Dapat diberikan sebagai SINGLE_SCHEMA_FILE atau SCHEMA_REGISTRY. Jika SINGLE_SCHEMA_FILE ditentukan, gunakan skema yang disebutkan dalam file skema avro untuk semua pesan. Jika SCHEMA_REGISTRY ditentukan, pesan dapat memiliki satu skema atau beberapa skema. Setelan defaultnya adalah: SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: Jalur Google Cloud Storage ke satu file skema Avro yang digunakan untuk mendekode semua pesan dalam topik. Default-nya adalah kosong.
  • schemaRegistryConnectionUrl: URL untuk instance Confluent Schema Registry yang digunakan untuk mengelola skema Avro untuk decoding pesan. Default-nya adalah kosong.
  • binaryAvroSchemaPath: Jalur Google Cloud Storage ke file skema Avro yang digunakan untuk mendekode pesan Avro yang dienkode biner. Default-nya adalah kosong.
  • schemaRegistryAuthenticationMode: Mode autentikasi Schema Registry. Dapat berupa NONE, TLS, atau OAUTH. Defaultnya adalah: NONE.
  • schemaRegistryTruststoreLocation: Lokasi sertifikat SSL tempat trust store untuk autentikasi ke Schema Registry disimpan. Contoh, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId di secret manager tempat sandi untuk mengakses secret di truststore disimpan. Contoh, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: Lokasi keystore yang berisi sertifikat SSL dan kunci pribadi. Contoh, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId di secret manager tempat sandi untuk mengakses file keystore. Misalnya, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId sandi yang diperlukan untuk mengakses kunci pribadi klien yang disimpan dalam keystore. Misalnya, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: Client ID yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: ID rahasia Google Cloud Secret Manager yang berisi Client Secret yang akan digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT. Contoh, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: Cakupan token akses yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Kolom ini bersifat opsional, karena permintaan dapat dilakukan tanpa parameter cakupan yang diteruskan. Contoh, openid.
  • schemaRegistryOauthTokenEndpointUrl: URL berbasis HTTP(S) untuk penyedia identitas OAuth/OIDC yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: Nama tabel BigQuery yang sepenuhnya memenuhi syarat untuk pesan yang gagal. Pesan yang gagal mencapai tabel output karena berbagai alasan (misalnya, skema yang tidak cocok, JSON yang salah format) akan ditulis ke tabel ini. Tabel akan dibuat oleh template. Contoh, your-project-id:your-dataset.your-table-name.
  • javascriptTextTransformGcsPath: URI Cloud Storage file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Kafka to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming setidaknya sekali, pilih Setidaknya Sekali.
  8. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel BigQuery Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP memerlukan nomor port tempat server dapat diakses. Contoh: 35.70.252.199:9092. Jika beberapa alamat diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel BigQuery Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP memerlukan nomor port tempat server dapat diakses. Contoh: 35.70.252.199:9092. Jika beberapa alamat diberikan, Anda harus meng-escape koma. Lihat gcloud topic escaping.

Untuk mengetahui informasi selengkapnya, lihat Menulis data dari Kafka ke BigQuery dengan Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueue;
import com.google.cloud.teleport.v2.kafka.transforms.AvroDynamicTransform;
import com.google.cloud.teleport.v2.kafka.transforms.AvroTransform;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaRecordErrorConverters.WriteKafkaRecordMessageErrors;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.MessageFormatConstants;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.SchemaFormat;
import com.google.cloud.teleport.v2.options.KafkaToBigQueryFlexOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryWriteUtils;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.StringMessageToTableRow;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link KafkaToBigQueryFlex} pipeline is a streaming pipeline which ingests text data from
 * Kafka, and outputs the resulting records to BigQuery. Any errors which occur in the
 * transformation of the data, or inserting into the output table will be inserted into a separate
 * errors table in BigQuery. Both output and error tables are specified by the user as parameters.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The Kafka topic exists and the message is encoded in a valid JSON format.
 *   <li>The BigQuery output table exists.
 *   <li>The Kafka brokers are reachable from the Dataflow worker machines.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/kafka-to-bigquery/README_Kafka_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Kafka_to_BigQuery_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to BigQuery",
    description =
        "The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, and outputs the resulting records to BigQuery. "
            + "Any errors which occur in the transformation of the data, or inserting into the output table are inserted into a separate errors table in BigQuery. "
            + "For any errors which occur in the transformation of the data, the original records can be inserted into a separate Kafka topic. The template supports "
            + "reading a Kafka topic which contains single/multiple schema(s). It can write to a single or multiple BigQuery tables, depending on the schema of records. ",
    optionsClass = KafkaToBigQueryFlexOptions.class,
    flexContainerName = "kafka-to-bigquery-flex",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The output BigQuery table must exist.",
      "The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.",
      "The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format."
    },
    skipOptions = {"useStorageWriteApi"})
public class KafkaToBigQueryFlex {

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(KafkaToBigQueryFlex.class);

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  public static final TupleTag<FailsafeElement<KafkaRecord<String, String>, String>>
      TRANSFORM_DEADLETTER_OUT = new TupleTag<>() {};

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  private static ErrorHandler<BadRecord, ?> errorHandler = new ErrorHandler.DefaultErrorHandler<>();
  private static BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for its execution to finish. If blocking execution is required, use the {@link
   * KafkaToBigQueryFlex#run(KafkaToBigQueryFlexOptions)} method to start the pipeline and invoke
   * {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) throws Exception {
    UncaughtExceptionLogger.register();

    KafkaToBigQueryFlexOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToBigQueryFlexOptions.class);

    run(options);
  }

  public static Boolean useErrorHandler(KafkaToBigQueryFlexOptions options) {
    return options.getUseBigQueryDLQ();
  }

  public static WriteResult processKafkaRecords(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {

    // Validate the pipeline options for MessageFormat and SchemaFormat.
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (options.getBinaryAvroSchemaPath() != null
            && options.getBinaryAvroSchemaPath().isBlank())) {
      throw new IllegalArgumentException(
          "Binary Avro Schema Path cannot be empty for AVRO_BINARY_ENCODING.");
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {

      if ((options.getSchemaRegistryConnectionUrl() != null
              && options.getSchemaRegistryConnectionUrl().isBlank())
          && (options.getConfluentAvroSchemaPath() != null
              && options.getConfluentAvroSchemaPath().isBlank())) {
        throw new IllegalArgumentException(
            "Either Schema Registry Connection URL or Confluent Avro Schema Path must be provided for AVRO_CONFLUENT_WIRE_FORMAT.");
      }

      if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
        if (!options.getConfluentAvroSchemaPath().isBlank()
            && (options.getOutputTableSpec() != null && options.getOutputTableSpec().isBlank())) {
          throw new IllegalArgumentException(
              "The outputTableSpec parameter is required when using the SINGLE_SCHEMA_FILE schema format.");
        }
      } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
        if (options.getSchemaRegistryConnectionUrl() != null
            && (options.getOutputDataset() != null && options.getOutputDataset().isBlank())) {
          throw new IllegalArgumentException(
              "An output BigQuery dataset is required. It will be used to create tables per schema.");
        }
      } else {
        throw new IllegalArgumentException(
            "Unsupported schemaFormat parameter value: " + options.getSchemaFormat());
      }
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (!options.getBinaryAvroSchemaPath().isBlank())) {
      return handleAvroBinaryEncoding(kafkaRecords, options);
    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && (!options.getSchemaRegistryConnectionUrl().isBlank()
            || !options.getConfluentAvroSchemaPath().isBlank())) {
      return handleAvroConfluentWireFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getMessageFormat() + " is unsupported.");
    }
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(KafkaToBigQueryFlexOptions options) throws Exception {

    // Enable Streaming Engine
    options.setEnableStreamingEngine(true);

    List<String> dataflowServiceOptions = options.getDataflowServiceOptions();
    if (dataflowServiceOptions == null) {
      dataflowServiceOptions = new ArrayList<>();
    }
    dataflowServiceOptions.add("enable_streaming_engine_resource_based_billing");
    options.setDataflowServiceOptions(dataflowServiceOptions);

    // Validate BQ STORAGE_WRITE_API options
    options.setUseStorageWriteApi(true);
    if (options.getStorageWriteApiTriggeringFrequencySec() == null) {
      options.setStorageWriteApiTriggeringFrequencySec(5);
    }
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    MetadataValidator.validate(options);

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    List<String> topicsList;
    String bootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> bootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      topicsList = List.of(bootstrapServerAndTopicList.get(1));
      bootstrapServers = bootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    // Configure error handler for Dead letter queue
    if (options.getUseBigQueryDLQ()) {
      if (options.getOutputDeadletterTable() == null
          || options.getOutputDeadletterTable().isBlank()) {
        throw new IllegalArgumentException(
            "Please provide a valid BigQuery full qualified table name when using BigQuery"
                + "Dead letter queue");
      }
      badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
      errorHandler =
          pipeline.registerBadRecordErrorHandler(
              BigQueryDeadLetterQueue.newBuilder()
                  .setTableName(options.getOutputDeadletterTable())
                  .build());
    }

    // Get the Kafka config
    Map<String, Object> kafkaConfig = KafkaConfig.fromReadOptions(options);

    /*
     * Steps:
     *  1) Read messages in from Kafka
     *  2) Transform the messages into TableRows
     *     - Transform message payload via UDF
     *     - Convert UDF result to TableRow objects
     *  3) Write successful records out to BigQuery
     *  4) Write failed records out to BigQuery
     */

    if (options.getMessageFormat() == null
        || options.getMessageFormat().equals(MessageFormatConstants.JSON)) {

      pipeline = runJsonPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        || options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)) {
      pipeline = runAvroPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else {
      throw new IllegalArgumentException("Invalid format specified: " + options.getMessageFormat());
    }
    if (useErrorHandler(options)) {
      errorHandler.close();
    }
    return pipeline.run();
  }

  private static WriteResult handleAvroBinaryEncoding(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      // BigQueryIO sets the BadRecordRouter to RecordingRouter even when the errorHandler is
      // DefaultErrorHandler(which is a no op). In this case, when the BadRecordRouter is
      // ThrowingRouter,
      // don't pass errorHandler to BigQueryIO.
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getBinaryAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleAvroConfluentWireFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
      return handleSingleSchemaFileFormat(kafkaRecords, options);
    } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
      return handleSchemaRegistryFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getSchemaFormat() + " is unsupported.");
    }
  }

  private static WriteResult handleSingleSchemaFileFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getConfluentAvroSchemaPath() != null && options.getOutputTableSpec() != null)) {
      // TODO: Add error.
      throw new RuntimeException("");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getConfluentAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleSchemaRegistryFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getSchemaRegistryConnectionUrl() != null && options.getOutputDataset() != null)) {
      throw new RuntimeException(
          "Missing required parameters: Schema Registry URL and/or Output Dataset");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryDynamicWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroDynamicTransform.of(
                    options.getSchemaRegistryConnectionUrl(),
                    KafkaConfig.fromSchemaRegistryOptions(options),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  public static Pipeline runAvroPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig)
      throws Exception {

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && options.getBinaryAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Avro schema is needed in order to read non confluent wire format messages.");
    }
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && options.getSchemaRegistryConnectionUrl() == null
        && options.getConfluentAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Schema Registry Connection URL or Avro schema is needed in order to read confluent wire format messages.");
    }
    if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())
        && !Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
      LOG.warn(
          "JavaScript UDF parameters are set while using Avro message format. "
              + "UDFs are supported for JSON format only. No UDF transformation will be applied.");
    }

    PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords;

    kafkaRecords =
        pipeline
            /*
             * Step #1: Read messages in from Kafka and convert to GenericRecords wrap in FailsafeElement
             */
            .apply(
                "ReadBytesFromKafka",
                KafkaTransform.readBytesFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))
            .setCoder(
                KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of()), ByteArrayCoder.of()));

    WriteResult writeResult = processKafkaRecords(kafkaRecords, options);
    return pipeline;
  }

  public static Pipeline runJsonPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig) {

    // Register the coder for pipeline
    FailsafeElementCoder<KafkaRecord<String, String>, String> coder =
        FailsafeElementCoder.of(
            KafkaRecordCoder.of(
                NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
            NullableCoder.of(StringUtf8Coder.of()));

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

    PCollectionTuple convertedTableRows;
    convertedTableRows =
        pipeline
            /*
             * Step #1: Read messages in from Kafka
             */
            .apply(
                "ReadFromKafka",
                KafkaTransform.readStringFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))

            /*
             * Step #2: Transform the Kafka Messages into TableRows
             */
            .apply(
                "ConvertMessageToTableRow",
                StringMessageToTableRow.newBuilder()
                    .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                    .setFunctionName(options.getJavascriptTextTransformFunctionName())
                    .setReloadIntervalMinutes(
                        options.getJavascriptTextTransformReloadIntervalMinutes())
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());
    /*
     * Step #3: Write the successful records out to BigQuery
     */
    WriteResult writeResult =
        convertedTableRows
            .get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(options.getOutputTableSpec()));

    /*
     * Step 3 Contd.
     * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
     */
    PCollection<FailsafeElement<String, String>> failedInserts =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(KafkaToBigQueryFlex::wrapBigQueryInsertError))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #4: Write failed records out to BigQuery
       */
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply(
              "WriteTransformationFailedRecords",
              WriteKafkaRecordMessageErrors.newBuilder()
                  .setErrorRecordsTable(
                      ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
                  .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                  .build());
    } else {
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply("PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<>()));
    }

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #5: Insert records that failed BigQuery inserts into a dead-letter table.
       */
      failedInserts.apply(
          "WriteInsertionFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
              .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
              .build());
    } else {
      failedInserts.apply(
          "PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<String, String>()));
    }

    return pipeline;
  }

  /**
   * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
   *
   * @param insertError BigQueryInsert error.
   * @return FailsafeElement object.
   */
  protected static FailsafeElement<String, String> wrapBigQueryInsertError(
      BigQueryInsertError insertError) {

    FailsafeElement<String, String> failsafeElement;
    try {

      failsafeElement =
          FailsafeElement.of(
              insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
      failsafeElement.setErrorMessage(insertError.getError().toPrettyString());

    } catch (IOException e) {
      LOG.error("Failed to wrap BigQuery insert error.");
      throw new RuntimeException(e);
    }
    return failsafeElement;
  }

  static class ThrowErrorFn<T, W> extends DoFn<FailsafeElement<T, W>, FailsafeElement<T, W>> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      FailsafeElement<T, W> element = context.element();
      // TODO: Logging every exception might overload Google Cloud Logging API. Find a better way to
      // log these errors.
      LOG.error(element.toString() + element.getErrorMessage() + element.getStacktrace());
      context.output(element);
    }
  }
}

Langkah berikutnya