Apache Kafka to BigQuery 模板

Apache Kafka to BigQuery 模板是一种流处理流水线,可从 Google Cloud Managed Service for Apache Kafka 集群注入文本数据,然后将生成的记录输出到 BigQuery 表。将数据插入输出表时发生的任何错误都会插入 BigQuery 中的单独错误表中。

您还可以将 Apache Kafka to BigQuery 模板与自行管理的 Kafka 或外部 Kafka 搭配使用。

流水线要求

  • Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
  • Apache Kafka 主题必须已存在。
  • 您必须启用 Dataflow、BigQuery 和 Cloud Storage API。如果需要进行身份验证,您还必须启用 Secret Manager API。
  • 为您的 Kafka 输入主题创建具有适当架构的 BigQuery 数据集和表。如果您在同一主题中使用多个架构,并希望写入多个表,则无需在配置流水线之前创建表。
  • 启用模板的死信(未处理的消息)队列后,请创建一个不包含死信队列架构的空表。

Kafka 消息格式

Apache Kafka to BigQuery 模板支持以如下格式从 Kafka 读取消息:CONFLUENT_AVRO_WIRE_FORMATAVRO_BINARY_FORMATJSON

身份验证

Apache Kafka to BigQuery 模板支持对 Kafka 代理进行 SASL/PLAIN 身份验证。

模板参数

必需参数

  • readBootstrapServerAndTopic:要从中读取输入的 Kafka 主题。
  • writeMode:将记录写入一个表或多个表(基于架构)。只有 AVRO_CONFLUENT_WIRE_FORMAT 来源消息格式和 SCHEMA_REGISTRY 架构来源支持 DYNAMIC_TABLE_NAMES 模式。目标表名称会根据每条消息的 Avro 架构名称自动生成,可以是单个架构(创建单个表)或多个架构(创建多个表)。SINGLE_TABLE_NAME 模式会写入用户指定的单个表(单个架构)。默认值为 SINGLE_TABLE_NAME
  • kafkaReadAuthenticationMode:与 Kafka 集群搭配使用的身份验证模式。如果不进行身份验证,请使用 KafkaAuthenticationMethod.NONE;如果使用 SASL/PLAIN 用户名和密码,请使用 KafkaAuthenticationMethod.SASL_PLAIN;如果使用基于证书的身份验证,请使用 KafkaAuthenticationMethod.TLSKafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS 仅应用于 Google Cloud Apache Kafka for BigQuery 集群,它允许使用应用默认凭据进行身份验证。
  • messageFormat:要读取的 Kafka 消息的格式。支持的值包括 AVRO_CONFLUENT_WIRE_FORMAT(Confluent 架构注册表编码的 Avro)、AVRO_BINARY_ENCODING(普通二进制 Avro)和 JSON。默认值为:AVRO_CONFLUENT_WIRE_FORMAT。
  • useBigQueryDLQ:如果为 true,系统会将失败的消息写入 BigQuery,并附带额外的错误信息。默认值为:false。

可选参数

  • outputTableSpec:要将输出写入的 BigQuery 表位置。该名称应采用 <project>:<dataset>.<table_name> 格式。表的架构必须与输入对象匹配。
  • persistKafkaKey:如果为 true,流水线将在 BigQuery 表中的类型为 BYTES_key 字段中保留 Kafka 消息键。默认值为 false(系统会忽略该键)。
  • outputProject:数据集所在的 BigQuery 输出项目。系统会在数据集中动态创建表。默认值为空。
  • outputDataset:要将输出写入到的 BigQuery 输出数据集。系统会在数据集中动态创建表。如果表是预先创建的,则表名称应遵循指定的命名惯例。名称应为 bqTableNamePrefix + Avro Schema FullName,每个字词之间用连字符 - 分隔。默认值为空。
  • bqTableNamePrefix:创建 BigQuery 输出表时使用的命名前缀。仅在使用架构注册表时适用。默认值为空。
  • createDisposition:BigQuery CreateDisposition。例如:CREATE_IF_NEEDEDCREATE_NEVER。默认值为:CREATE_IF_NEEDED。
  • writeDisposition:BigQuery WriteDisposition。例如:WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认值为:WRITE_APPEND。
  • useAutoSharding:如果为 true,则流水线在写入 BigQuery 时会使用自动分片。默认值为 true
  • numStorageWriteApiStreams:指定写入流的数量,必须设置此参数。默认值为 0
  • storageWriteApiTriggeringFrequencySec:指定触发频率(以秒为单位),必须设置此参数。默认值为 5 秒。
  • useStorageWriteApiAtLeastOnce:此参数仅在启用了“使用 BigQuery Storage Write API”时有效。如果启用,则系统会将“至少一次”语义用于 Storage Write API,否则会使用“正好一次”语义。默认值为:false。
  • enableCommitOffsets:将已处理消息的偏移量提交到 Kafka。如果启用此参数,则在重启流水线时,消息处理的间隔或重复处理会降到最低。需要指定使用方群组 ID。默认值为:false。
  • consumerGroupId:此流水线所属的使用方群组的唯一标识符。如果已启用“将偏移量提交到 Kafka”,则必须使用此参数。默认值为空。
  • kafkaReadOffset:在没有提交偏移量的情况下读取消息的起点。最早的从最开始算起,最新的从最新消息算起。默认值为:latest。
  • kafkaReadUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含要与 SASL_PLAIN 身份验证搭配使用的 Kafka 用户名。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。默认值为空。
  • kafkaReadPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含要与 SASL_PLAIN 身份验证搭配使用的 Kafka 密码。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。默认值为空。
  • kafkaReadKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含在向 Kafka 集群进行身份验证时使用的 TLS 证书和私钥。例如 gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证 Kafka 代理身份的受信任证书。
  • kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以进行 Kafka TLS 身份验证的密码,例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以进行 Kafka TLS 身份验证的密码。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以进行 Kafka TLS 身份验证的密码。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaFormat:Kafka 架构格式。可以作为 SINGLE_SCHEMA_FILESCHEMA_REGISTRY 提供。如果指定了 SINGLE_SCHEMA_FILE,则对所有消息使用 avro 架构文件中提及的架构。如果指定了 SCHEMA_REGISTRY,消息可以具有单个架构或多个架构。默认值为 SINGLE_SCHEMA_FILE。
  • confluentAvroSchemaPath:用于解码主题中所有消息的单个 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
  • schemaRegistryConnectionUrl:用于管理 Avro 架构以进行消息解码的 Confluent 架构注册表实例的网址。默认值为空。
  • binaryAvroSchemaPath:用于解码二进制编码 Avro 消息的 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
  • schemaRegistryAuthenticationMode:架构注册表身份验证模式。可以是 NONE、TLS 或 OAUTH。默认为:NONE。
  • schemaRegistryTruststoreLocation:SSL 证书的位置,用于存储用于对 Schema Registry 进行身份验证的信任库。例如 /your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId:Secret Manager 中的 SecretId,用于存储访问信任库中 Secret 的密码。例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation:包含 SSL 证书和私钥的密钥库位置。例如 /your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId:Secret Manager 中的 SecretId,其中包含用于访问密钥库文件的密码,例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeyPasswordSecretId:访问密钥库中存储的客户端私钥所需的密码的 SecretId,例如 projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryOauthClientId:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的客户端 ID。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此字段为必需字段。
  • schemaRegistryOauthClientSecretId:Google Cloud Secret Manager Secret ID,其中包含用于在 OAUTH 模式下对 Schema Registry 客户端进行身份验证的客户端 Secret。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此字段为必需字段。例如 projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的访问令牌范围。此字段是可选字段,因为您无需传递范围参数即可发出请求。例如 openid
  • schemaRegistryOauthTokenEndpointUrl:OAuth/OIDC 身份提供程序的基于 HTTP(S) 的网址,用于在 OAUTH 模式下对架构注册表客户端进行身份验证。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此字段为必需字段。
  • outputDeadletterTable:失败消息的完全限定 BigQuery 表名称。出于各种原因(例如,架构不匹配、JSON 格式错误)未能到达输出表的消息会写入该表。该表将由模板创建。例如 your-project-id:your-dataset.your-table-name
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Kafka to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
  8. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅 gcloud topic escaping
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092。如果提供了多个地址,您需要转义英文逗号。请参阅 gcloud topic escaping

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,您需要转义英文逗号。请参阅 gcloud topic escaping
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都需要可访问服务器的端口号。例如:35.70.252.199:9092。如果提供了多个地址,您需要转义英文逗号。请参阅 gcloud topic escaping

如需了解详情,请参阅使用 Dataflow 将数据从 Kafka 写入 BigQuery

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueue;
import com.google.cloud.teleport.v2.kafka.transforms.AvroDynamicTransform;
import com.google.cloud.teleport.v2.kafka.transforms.AvroTransform;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaRecordErrorConverters.WriteKafkaRecordMessageErrors;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.MessageFormatConstants;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.SchemaFormat;
import com.google.cloud.teleport.v2.options.KafkaToBigQueryFlexOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryWriteUtils;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.StringMessageToTableRow;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link KafkaToBigQueryFlex} pipeline is a streaming pipeline which ingests text data from
 * Kafka, and outputs the resulting records to BigQuery. Any errors which occur in the
 * transformation of the data, or inserting into the output table will be inserted into a separate
 * errors table in BigQuery. Both output and error tables are specified by the user as parameters.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The Kafka topic exists and the message is encoded in a valid JSON format.
 *   <li>The BigQuery output table exists.
 *   <li>The Kafka brokers are reachable from the Dataflow worker machines.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/kafka-to-bigquery/README_Kafka_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Kafka_to_BigQuery_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to BigQuery",
    description =
        "The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, and outputs the resulting records to BigQuery. "
            + "Any errors which occur in the transformation of the data, or inserting into the output table are inserted into a separate errors table in BigQuery. "
            + "For any errors which occur in the transformation of the data, the original records can be inserted into a separate Kafka topic. The template supports "
            + "reading a Kafka topic which contains single/multiple schema(s). It can write to a single or multiple BigQuery tables, depending on the schema of records. ",
    optionsClass = KafkaToBigQueryFlexOptions.class,
    flexContainerName = "kafka-to-bigquery-flex",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The output BigQuery table must exist.",
      "The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.",
      "The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format."
    },
    skipOptions = {"useStorageWriteApi"})
public class KafkaToBigQueryFlex {

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(KafkaToBigQueryFlex.class);

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  public static final TupleTag<FailsafeElement<KafkaRecord<String, String>, String>>
      TRANSFORM_DEADLETTER_OUT = new TupleTag<>() {};

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  private static ErrorHandler<BadRecord, ?> errorHandler = new ErrorHandler.DefaultErrorHandler<>();
  private static BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for its execution to finish. If blocking execution is required, use the {@link
   * KafkaToBigQueryFlex#run(KafkaToBigQueryFlexOptions)} method to start the pipeline and invoke
   * {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) throws Exception {
    UncaughtExceptionLogger.register();

    KafkaToBigQueryFlexOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToBigQueryFlexOptions.class);

    run(options);
  }

  public static Boolean useErrorHandler(KafkaToBigQueryFlexOptions options) {
    return options.getUseBigQueryDLQ();
  }

  public static WriteResult processKafkaRecords(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {

    // Validate the pipeline options for MessageFormat and SchemaFormat.
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (options.getBinaryAvroSchemaPath() != null
            && options.getBinaryAvroSchemaPath().isBlank())) {
      throw new IllegalArgumentException(
          "Binary Avro Schema Path cannot be empty for AVRO_BINARY_ENCODING.");
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {

      if ((options.getSchemaRegistryConnectionUrl() != null
              && options.getSchemaRegistryConnectionUrl().isBlank())
          && (options.getConfluentAvroSchemaPath() != null
              && options.getConfluentAvroSchemaPath().isBlank())) {
        throw new IllegalArgumentException(
            "Either Schema Registry Connection URL or Confluent Avro Schema Path must be provided for AVRO_CONFLUENT_WIRE_FORMAT.");
      }

      if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
        if (!options.getConfluentAvroSchemaPath().isBlank()
            && (options.getOutputTableSpec() != null && options.getOutputTableSpec().isBlank())) {
          throw new IllegalArgumentException(
              "The outputTableSpec parameter is required when using the SINGLE_SCHEMA_FILE schema format.");
        }
      } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
        if (options.getSchemaRegistryConnectionUrl() != null
            && (options.getOutputDataset() != null && options.getOutputDataset().isBlank())) {
          throw new IllegalArgumentException(
              "An output BigQuery dataset is required. It will be used to create tables per schema.");
        }
      } else {
        throw new IllegalArgumentException(
            "Unsupported schemaFormat parameter value: " + options.getSchemaFormat());
      }
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (!options.getBinaryAvroSchemaPath().isBlank())) {
      return handleAvroBinaryEncoding(kafkaRecords, options);
    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && (!options.getSchemaRegistryConnectionUrl().isBlank()
            || !options.getConfluentAvroSchemaPath().isBlank())) {
      return handleAvroConfluentWireFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getMessageFormat() + " is unsupported.");
    }
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(KafkaToBigQueryFlexOptions options) throws Exception {

    // Enable Streaming Engine
    options.setEnableStreamingEngine(true);

    List<String> dataflowServiceOptions = options.getDataflowServiceOptions();
    if (dataflowServiceOptions == null) {
      dataflowServiceOptions = new ArrayList<>();
    }
    dataflowServiceOptions.add("enable_streaming_engine_resource_based_billing");
    options.setDataflowServiceOptions(dataflowServiceOptions);

    // Validate BQ STORAGE_WRITE_API options
    options.setUseStorageWriteApi(true);
    if (options.getStorageWriteApiTriggeringFrequencySec() == null) {
      options.setStorageWriteApiTriggeringFrequencySec(5);
    }
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    MetadataValidator.validate(options);

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    List<String> topicsList;
    String bootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> bootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      topicsList = List.of(bootstrapServerAndTopicList.get(1));
      bootstrapServers = bootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    // Configure error handler for Dead letter queue
    if (options.getUseBigQueryDLQ()) {
      if (options.getOutputDeadletterTable() == null
          || options.getOutputDeadletterTable().isBlank()) {
        throw new IllegalArgumentException(
            "Please provide a valid BigQuery full qualified table name when using BigQuery"
                + "Dead letter queue");
      }
      badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
      errorHandler =
          pipeline.registerBadRecordErrorHandler(
              BigQueryDeadLetterQueue.newBuilder()
                  .setTableName(options.getOutputDeadletterTable())
                  .build());
    }

    // Get the Kafka config
    Map<String, Object> kafkaConfig = KafkaConfig.fromReadOptions(options);

    /*
     * Steps:
     *  1) Read messages in from Kafka
     *  2) Transform the messages into TableRows
     *     - Transform message payload via UDF
     *     - Convert UDF result to TableRow objects
     *  3) Write successful records out to BigQuery
     *  4) Write failed records out to BigQuery
     */

    if (options.getMessageFormat() == null
        || options.getMessageFormat().equals(MessageFormatConstants.JSON)) {

      pipeline = runJsonPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        || options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)) {
      pipeline = runAvroPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else {
      throw new IllegalArgumentException("Invalid format specified: " + options.getMessageFormat());
    }
    if (useErrorHandler(options)) {
      errorHandler.close();
    }
    return pipeline.run();
  }

  private static WriteResult handleAvroBinaryEncoding(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      // BigQueryIO sets the BadRecordRouter to RecordingRouter even when the errorHandler is
      // DefaultErrorHandler(which is a no op). In this case, when the BadRecordRouter is
      // ThrowingRouter,
      // don't pass errorHandler to BigQueryIO.
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getBinaryAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleAvroConfluentWireFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
      return handleSingleSchemaFileFormat(kafkaRecords, options);
    } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
      return handleSchemaRegistryFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getSchemaFormat() + " is unsupported.");
    }
  }

  private static WriteResult handleSingleSchemaFileFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getConfluentAvroSchemaPath() != null && options.getOutputTableSpec() != null)) {
      // TODO: Add error.
      throw new RuntimeException("");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getConfluentAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleSchemaRegistryFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getSchemaRegistryConnectionUrl() != null && options.getOutputDataset() != null)) {
      throw new RuntimeException(
          "Missing required parameters: Schema Registry URL and/or Output Dataset");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryDynamicWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroDynamicTransform.of(
                    options.getSchemaRegistryConnectionUrl(),
                    KafkaConfig.fromSchemaRegistryOptions(options),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  public static Pipeline runAvroPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig)
      throws Exception {

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && options.getBinaryAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Avro schema is needed in order to read non confluent wire format messages.");
    }
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && options.getSchemaRegistryConnectionUrl() == null
        && options.getConfluentAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Schema Registry Connection URL or Avro schema is needed in order to read confluent wire format messages.");
    }
    if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())
        && !Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
      LOG.warn(
          "JavaScript UDF parameters are set while using Avro message format. "
              + "UDFs are supported for JSON format only. No UDF transformation will be applied.");
    }

    PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords;

    kafkaRecords =
        pipeline
            /*
             * Step #1: Read messages in from Kafka and convert to GenericRecords wrap in FailsafeElement
             */
            .apply(
                "ReadBytesFromKafka",
                KafkaTransform.readBytesFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))
            .setCoder(
                KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of()), ByteArrayCoder.of()));

    WriteResult writeResult = processKafkaRecords(kafkaRecords, options);
    return pipeline;
  }

  public static Pipeline runJsonPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig) {

    // Register the coder for pipeline
    FailsafeElementCoder<KafkaRecord<String, String>, String> coder =
        FailsafeElementCoder.of(
            KafkaRecordCoder.of(
                NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
            NullableCoder.of(StringUtf8Coder.of()));

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

    PCollectionTuple convertedTableRows;
    convertedTableRows =
        pipeline
            /*
             * Step #1: Read messages in from Kafka
             */
            .apply(
                "ReadFromKafka",
                KafkaTransform.readStringFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))

            /*
             * Step #2: Transform the Kafka Messages into TableRows
             */
            .apply(
                "ConvertMessageToTableRow",
                StringMessageToTableRow.newBuilder()
                    .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                    .setFunctionName(options.getJavascriptTextTransformFunctionName())
                    .setReloadIntervalMinutes(
                        options.getJavascriptTextTransformReloadIntervalMinutes())
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());
    /*
     * Step #3: Write the successful records out to BigQuery
     */
    WriteResult writeResult =
        convertedTableRows
            .get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(options.getOutputTableSpec()));

    /*
     * Step 3 Contd.
     * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
     */
    PCollection<FailsafeElement<String, String>> failedInserts =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(KafkaToBigQueryFlex::wrapBigQueryInsertError))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #4: Write failed records out to BigQuery
       */
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply(
              "WriteTransformationFailedRecords",
              WriteKafkaRecordMessageErrors.newBuilder()
                  .setErrorRecordsTable(
                      ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
                  .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                  .build());
    } else {
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply("PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<>()));
    }

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #5: Insert records that failed BigQuery inserts into a dead-letter table.
       */
      failedInserts.apply(
          "WriteInsertionFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
              .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
              .build());
    } else {
      failedInserts.apply(
          "PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<String, String>()));
    }

    return pipeline;
  }

  /**
   * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
   *
   * @param insertError BigQueryInsert error.
   * @return FailsafeElement object.
   */
  protected static FailsafeElement<String, String> wrapBigQueryInsertError(
      BigQueryInsertError insertError) {

    FailsafeElement<String, String> failsafeElement;
    try {

      failsafeElement =
          FailsafeElement.of(
              insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
      failsafeElement.setErrorMessage(insertError.getError().toPrettyString());

    } catch (IOException e) {
      LOG.error("Failed to wrap BigQuery insert error.");
      throw new RuntimeException(e);
    }
    return failsafeElement;
  }

  static class ThrowErrorFn<T, W> extends DoFn<FailsafeElement<T, W>, FailsafeElement<T, W>> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      FailsafeElement<T, W> element = context.element();
      // TODO: Logging every exception might overload Google Cloud Logging API. Find a better way to
      // log these errors.
      LOG.error(element.toString() + element.getErrorMessage() + element.getStacktrace());
      context.output(element);
    }
  }
}

后续步骤