Modèle Apache Kafka vers BigQuery

Le modèle Apache Kafka vers BigQuery est un pipeline de streaming qui ingère les données textuelles issues des clusters Google Cloud Managed Service pour Apache Kafka, puis génère les enregistrements obtenus dans les tables BigQuery. Toutes les erreurs qui se produisent lors de l'insertion de données dans la table de sortie sont insérées dans une table d'erreurs distincte dans BigQuery.

Vous pouvez également utiliser le modèle Apache Kafka vers BigQuery avec Kafka autogéré ou externe.

Conditions requises pour ce pipeline

  • Le serveur de courtiers Apache Kafka doit être en cours d'exécution et joignable depuis les machines de nœud de calcul Dataflow.
  • Les sujets Apache Kafka doivent exister.
  • Vous devez activer les API Dataflow, BigQuery et Cloud Storage. Si une authentification est requise, vous devez également activer l'API Secret Manager.
  • Créez un ensemble de données et une table BigQuery avec le schéma approprié pour votre sujet d'entrée Kafka. Si vous utilisez plusieurs schémas dans le même thème et que vous souhaitez écrire dans plusieurs tables, vous n'avez pas besoin de créer la table avant de configurer le pipeline.
  • Lorsque la file d'attente de lettres mortes (messages non traités) du modèle est activée, créez une table vide sans schéma pour la file d'attente de lettres mortes.

Format de message Kafka

Le modèle Apache Kafka vers BigQuery permet de lire les messages de Kafka dans les formats suivants : CONFLUENT_AVRO_WIRE_FORMAT, AVRO_BINARY_FORMAT et JSON.

Authentification

Le modèle Apache Kafka vers BigQuery accepte l'authentification SASL/PLAIN auprès des courtiers Kafka.

Paramètres de modèle

Paramètres obligatoires

  • readBootstrapServerAndTopic: sujet Kafka à partir duquel lire l'entrée.
  • writeMode: permet d'écrire des enregistrements dans une ou plusieurs tables (selon le schéma). Le mode DYNAMIC_TABLE_NAMES n'est compatible qu'avec le format de message source AVRO_CONFLUENT_WIRE_FORMAT et la source de schéma SCHEMA_REGISTRY. Le nom de la table cible est généré automatiquement en fonction du nom du schéma Avro de chaque message. Il peut s'agir d'un seul schéma (création d'une seule table) ou de plusieurs schémas (création de plusieurs tables). Le mode SINGLE_TABLE_NAME écrit dans une table unique (schéma unique) spécifiée par l'utilisateur. La valeur par défaut est SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode: mode d'authentification à utiliser avec le cluster Kafka. Utilisez KafkaAuthenticationMethod.NONE pour désactiver l'authentification, KafkaAuthenticationMethod.SASL_PLAIN pour le nom d'utilisateur et le mot de passe SASL/PLAIN, et KafkaAuthenticationMethod.TLS pour l'authentification basée sur un certificat. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS ne doit être utilisé que pour le cluster Google Cloud Apache Kafka pour BigQuery. Il permet de s'authentifier à l'aide des identifiants par défaut de l'application.
  • messageFormat: format des messages Kafka à lire. Les valeurs acceptées sont AVRO_CONFLUENT_WIRE_FORMAT (Avro encodé par Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binaire simple) et JSON. Valeur par défaut : AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: si la valeur est "true", les messages ayant échoué sont écrits dans BigQuery avec des informations d'erreur supplémentaires. La valeur par défaut est "false".

Paramètres facultatifs

  • outputTableSpec: emplacement de la table BigQuery dans lequel écrire la sortie. Le nom doit être au format <project>:<dataset>.<table_name>. Le schéma de la table doit correspondre aux objets d'entrée.
  • persistKafkaKey: si la valeur est "true", le pipeline conservera la clé du message Kafka dans la table BigQuery, dans un champ _key de type BYTES. La valeur par défaut est false (la clé est ignorée).
  • outputProject: projet de sortie BigQuery dans lequel se trouve l'ensemble de données. Les tables seront créées de manière dynamique dans l'ensemble de données. La valeur par défaut est vide.
  • outputDataset: ensemble de données de sortie BigQuery dans lequel écrire la sortie. Les tables seront créées de manière dynamique dans l'ensemble de données. Si les tables sont créées au préalable, leurs noms doivent respecter la convention d'attribution de noms spécifiée. Le nom doit être bqTableNamePrefix + Avro Schema FullName (chaque mot doit être séparé par un trait d'union -). La valeur par défaut est vide.
  • bqTableNamePrefix: préfixe du nom à utiliser lors de la création des tables de sortie BigQuery. Ne s'applique que lorsque vous utilisez Schema Registry. La valeur par défaut est vide.
  • createDisposition: CreateDisposition BigQuery. Par exemple: CREATE_IF_NEEDED, CREATE_NEVER. La valeur par défaut est CREATE_IF_NEEDED.
  • writeDisposition: BigQuery WriteDisposition. Par exemple: WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
  • useAutoSharding: si cette valeur est définie sur "true", le pipeline utilise le fractionnement automatique lors de l'écriture dans BigQuery. La valeur par défaut est true.
  • numStorageWriteApiStreams: spécifie le nombre de flux d'écriture. Ce paramètre doit être défini. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec: spécifie la fréquence de déclenchement en secondes. Ce paramètre doit être défini. La valeur par défaut est de 5 secondes.
  • useStorageWriteApiAtLeastOnce: ce paramètre ne prend effet que si l'option "Utiliser l'API BigQuery Storage Write" est activée. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. La valeur par défaut est "false".
  • enableCommitOffsets: commit des décalages des messages traités vers Kafka. Si cette option est activée, les écarts ou le traitement en double des messages seront minimisés lors du redémarrage du pipeline. L'ID du groupe de consommateurs doit être spécifié. La valeur par défaut est "false".
  • consumerGroupId: identifiant unique du groupe de consommateurs auquel ce pipeline appartient. Obligatoire si l'option "Commit Offsets to Kafka" (Enregistrer les décalages dans Kafka) est activée. La valeur par défaut est vide.
  • kafkaReadOffset: point de départ de la lecture des messages lorsqu'il n'existe aucun décalage validé. Le premier commence au début, le dernier à partir du message le plus récent. Valeur par défaut : le plus récent.
  • kafkaReadUsernameSecretId: ID du secret Google Cloud Secret Manager contenant le nom d'utilisateur Kafka à utiliser avec l'authentification SASL_PLAIN. Par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La valeur par défaut est vide.
  • kafkaReadPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe Kafka à utiliser avec l'authentification SASL_PLAIN. Par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La valeur par défaut est vide.
  • kafkaReadKeystoreLocation: chemin d'accès Google Cloud Storage au fichier Java KeyStore (JKS) contenant le certificat TLS et la clé privée à utiliser lors de l'authentification avec le cluster Kafka. Exemple :gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: chemin d'accès Google Cloud Storage du fichier Java TrustStore (JKS) contenant les certificats approuvés à utiliser pour vérifier l'identité du courtier Kafka.
  • kafkaReadTruststorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java TrustStore (JKS) pour l'authentification TLS de Kafka (par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder à la clé privée dans le fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaFormat: format de schéma de Kafka. Peut être fourni sous la forme SINGLE_SCHEMA_FILE ou SCHEMA_REGISTRY. Si SINGLE_SCHEMA_FILE est spécifié, utilisez le schéma mentionné dans le fichier de schéma Avro pour tous les messages. Si SCHEMA_REGISTRY est spécifié, les messages peuvent avoir un seul schéma ou plusieurs. La valeur par défaut est SINGLE_schema_FILE.
  • confluentAvroSchemaPath: chemin d'accès Google Cloud Storage au fichier de schéma Avro unique utilisé pour décoder tous les messages d'un sujet. La valeur par défaut est vide.
  • schemaRegistryConnectionUrl: URL de l'instance Confluent Schema Registry utilisée pour gérer les schémas Avro pour le décodage des messages. La valeur par défaut est vide.
  • binaryAvroSchemaPath: chemin d'accès Google Cloud Storage au fichier de schéma Avro utilisé pour décoder les messages Avro encodés en binaire. La valeur par défaut est vide.
  • schemaRegistryAuthenticationMode: mode d'authentification du schéma Registry. Peut être NONE, TLS ou OAUTH. La valeur par défaut est "NONE".
  • schemaRegistryTruststoreLocation: emplacement du certificat SSL où le truststore pour l'authentification au Schema Registry est stocké. Exemple :/your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId: SecretId dans Secret Manager où le mot de passe permettant d'accéder au secret dans le truststore est stocké. Exemple :projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation: emplacement du keystore contenant le certificat SSL et la clé privée. Exemple :/your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId: SecretId dans Secret Manager où le mot de passe permet d'accéder au fichier de clés (par exemple, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryKeyPasswordSecretId: SecretId du mot de passe requis pour accéder à la clé privée du client stockée dans le keystore (par exemple, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryOauthClientId: ID client utilisé pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: ID du secret Google Cloud Secret Manager contenant le secret client à utiliser pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope: champ qui spécifie la portée du jeton d'accès utilisée pour authentifier le client Schema Registry en mode OAuth. Ce champ est facultatif, car la requête peut être effectuée sans paramètre de portée. Exemple :openid
  • schemaRegistryOauthTokenEndpointUrl: URL basée sur HTTP(S) du fournisseur d'identité OAuth/OIDC utilisé pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: nom complet de la table BigQuery pour les messages ayant échoué. Les messages n'ayant pas pu atteindre la table de sortie pour différentes raisons (par exemple, schéma non concordant ou format JSON non valide) sont écrits dans cette table. La table sera créée par le modèle. Exemple :your-project-id:your-dataset.your-table-name
  • javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes: spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Kafka to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont fournis, vous devez échapper les virgules. Consultez gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, vous devez échapper les virgules. Consultez gcloud topic escaping.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont fournis, vous devez échapper les virgules. Consultez gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, vous devez échapper les virgules. Consultez gcloud topic escaping.

Pour en savoir plus, consultez la page Écrire des données de Kafka vers BigQuery avec Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueue;
import com.google.cloud.teleport.v2.kafka.transforms.AvroDynamicTransform;
import com.google.cloud.teleport.v2.kafka.transforms.AvroTransform;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaRecordErrorConverters.WriteKafkaRecordMessageErrors;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.MessageFormatConstants;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.SchemaFormat;
import com.google.cloud.teleport.v2.options.KafkaToBigQueryFlexOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryWriteUtils;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.StringMessageToTableRow;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link KafkaToBigQueryFlex} pipeline is a streaming pipeline which ingests text data from
 * Kafka, and outputs the resulting records to BigQuery. Any errors which occur in the
 * transformation of the data, or inserting into the output table will be inserted into a separate
 * errors table in BigQuery. Both output and error tables are specified by the user as parameters.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The Kafka topic exists and the message is encoded in a valid JSON format.
 *   <li>The BigQuery output table exists.
 *   <li>The Kafka brokers are reachable from the Dataflow worker machines.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/kafka-to-bigquery/README_Kafka_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Kafka_to_BigQuery_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to BigQuery",
    description =
        "The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, and outputs the resulting records to BigQuery. "
            + "Any errors which occur in the transformation of the data, or inserting into the output table are inserted into a separate errors table in BigQuery. "
            + "For any errors which occur in the transformation of the data, the original records can be inserted into a separate Kafka topic. The template supports "
            + "reading a Kafka topic which contains single/multiple schema(s). It can write to a single or multiple BigQuery tables, depending on the schema of records. ",
    optionsClass = KafkaToBigQueryFlexOptions.class,
    flexContainerName = "kafka-to-bigquery-flex",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The output BigQuery table must exist.",
      "The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.",
      "The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format."
    },
    skipOptions = {"useStorageWriteApi"})
public class KafkaToBigQueryFlex {

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(KafkaToBigQueryFlex.class);

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  public static final TupleTag<FailsafeElement<KafkaRecord<String, String>, String>>
      TRANSFORM_DEADLETTER_OUT = new TupleTag<>() {};

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  private static ErrorHandler<BadRecord, ?> errorHandler = new ErrorHandler.DefaultErrorHandler<>();
  private static BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for its execution to finish. If blocking execution is required, use the {@link
   * KafkaToBigQueryFlex#run(KafkaToBigQueryFlexOptions)} method to start the pipeline and invoke
   * {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) throws Exception {
    UncaughtExceptionLogger.register();

    KafkaToBigQueryFlexOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToBigQueryFlexOptions.class);

    run(options);
  }

  public static Boolean useErrorHandler(KafkaToBigQueryFlexOptions options) {
    return options.getUseBigQueryDLQ();
  }

  public static WriteResult processKafkaRecords(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {

    // Validate the pipeline options for MessageFormat and SchemaFormat.
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (options.getBinaryAvroSchemaPath() != null
            && options.getBinaryAvroSchemaPath().isBlank())) {
      throw new IllegalArgumentException(
          "Binary Avro Schema Path cannot be empty for AVRO_BINARY_ENCODING.");
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {

      if ((options.getSchemaRegistryConnectionUrl() != null
              && options.getSchemaRegistryConnectionUrl().isBlank())
          && (options.getConfluentAvroSchemaPath() != null
              && options.getConfluentAvroSchemaPath().isBlank())) {
        throw new IllegalArgumentException(
            "Either Schema Registry Connection URL or Confluent Avro Schema Path must be provided for AVRO_CONFLUENT_WIRE_FORMAT.");
      }

      if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
        if (!options.getConfluentAvroSchemaPath().isBlank()
            && (options.getOutputTableSpec() != null && options.getOutputTableSpec().isBlank())) {
          throw new IllegalArgumentException(
              "The outputTableSpec parameter is required when using the SINGLE_SCHEMA_FILE schema format.");
        }
      } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
        if (options.getSchemaRegistryConnectionUrl() != null
            && (options.getOutputDataset() != null && options.getOutputDataset().isBlank())) {
          throw new IllegalArgumentException(
              "An output BigQuery dataset is required. It will be used to create tables per schema.");
        }
      } else {
        throw new IllegalArgumentException(
            "Unsupported schemaFormat parameter value: " + options.getSchemaFormat());
      }
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (!options.getBinaryAvroSchemaPath().isBlank())) {
      return handleAvroBinaryEncoding(kafkaRecords, options);
    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && (!options.getSchemaRegistryConnectionUrl().isBlank()
            || !options.getConfluentAvroSchemaPath().isBlank())) {
      return handleAvroConfluentWireFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getMessageFormat() + " is unsupported.");
    }
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(KafkaToBigQueryFlexOptions options) throws Exception {

    // Enable Streaming Engine
    options.setEnableStreamingEngine(true);

    List<String> dataflowServiceOptions = options.getDataflowServiceOptions();
    if (dataflowServiceOptions == null) {
      dataflowServiceOptions = new ArrayList<>();
    }
    dataflowServiceOptions.add("enable_streaming_engine_resource_based_billing");
    options.setDataflowServiceOptions(dataflowServiceOptions);

    // Validate BQ STORAGE_WRITE_API options
    options.setUseStorageWriteApi(true);
    if (options.getStorageWriteApiTriggeringFrequencySec() == null) {
      options.setStorageWriteApiTriggeringFrequencySec(5);
    }
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    MetadataValidator.validate(options);

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    List<String> topicsList;
    String bootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> bootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      topicsList = List.of(bootstrapServerAndTopicList.get(1));
      bootstrapServers = bootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    // Configure error handler for Dead letter queue
    if (options.getUseBigQueryDLQ()) {
      if (options.getOutputDeadletterTable() == null
          || options.getOutputDeadletterTable().isBlank()) {
        throw new IllegalArgumentException(
            "Please provide a valid BigQuery full qualified table name when using BigQuery"
                + "Dead letter queue");
      }
      badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
      errorHandler =
          pipeline.registerBadRecordErrorHandler(
              BigQueryDeadLetterQueue.newBuilder()
                  .setTableName(options.getOutputDeadletterTable())
                  .build());
    }

    // Get the Kafka config
    Map<String, Object> kafkaConfig = KafkaConfig.fromReadOptions(options);

    /*
     * Steps:
     *  1) Read messages in from Kafka
     *  2) Transform the messages into TableRows
     *     - Transform message payload via UDF
     *     - Convert UDF result to TableRow objects
     *  3) Write successful records out to BigQuery
     *  4) Write failed records out to BigQuery
     */

    if (options.getMessageFormat() == null
        || options.getMessageFormat().equals(MessageFormatConstants.JSON)) {

      pipeline = runJsonPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        || options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)) {
      pipeline = runAvroPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else {
      throw new IllegalArgumentException("Invalid format specified: " + options.getMessageFormat());
    }
    if (useErrorHandler(options)) {
      errorHandler.close();
    }
    return pipeline.run();
  }

  private static WriteResult handleAvroBinaryEncoding(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      // BigQueryIO sets the BadRecordRouter to RecordingRouter even when the errorHandler is
      // DefaultErrorHandler(which is a no op). In this case, when the BadRecordRouter is
      // ThrowingRouter,
      // don't pass errorHandler to BigQueryIO.
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getBinaryAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleAvroConfluentWireFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
      return handleSingleSchemaFileFormat(kafkaRecords, options);
    } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
      return handleSchemaRegistryFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getSchemaFormat() + " is unsupported.");
    }
  }

  private static WriteResult handleSingleSchemaFileFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getConfluentAvroSchemaPath() != null && options.getOutputTableSpec() != null)) {
      // TODO: Add error.
      throw new RuntimeException("");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getConfluentAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleSchemaRegistryFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getSchemaRegistryConnectionUrl() != null && options.getOutputDataset() != null)) {
      throw new RuntimeException(
          "Missing required parameters: Schema Registry URL and/or Output Dataset");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryDynamicWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroDynamicTransform.of(
                    options.getSchemaRegistryConnectionUrl(),
                    KafkaConfig.fromSchemaRegistryOptions(options),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  public static Pipeline runAvroPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig)
      throws Exception {

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && options.getBinaryAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Avro schema is needed in order to read non confluent wire format messages.");
    }
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && options.getSchemaRegistryConnectionUrl() == null
        && options.getConfluentAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Schema Registry Connection URL or Avro schema is needed in order to read confluent wire format messages.");
    }
    if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())
        && !Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
      LOG.warn(
          "JavaScript UDF parameters are set while using Avro message format. "
              + "UDFs are supported for JSON format only. No UDF transformation will be applied.");
    }

    PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords;

    kafkaRecords =
        pipeline
            /*
             * Step #1: Read messages in from Kafka and convert to GenericRecords wrap in FailsafeElement
             */
            .apply(
                "ReadBytesFromKafka",
                KafkaTransform.readBytesFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))
            .setCoder(
                KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of()), ByteArrayCoder.of()));

    WriteResult writeResult = processKafkaRecords(kafkaRecords, options);
    return pipeline;
  }

  public static Pipeline runJsonPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig) {

    // Register the coder for pipeline
    FailsafeElementCoder<KafkaRecord<String, String>, String> coder =
        FailsafeElementCoder.of(
            KafkaRecordCoder.of(
                NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
            NullableCoder.of(StringUtf8Coder.of()));

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

    PCollectionTuple convertedTableRows;
    convertedTableRows =
        pipeline
            /*
             * Step #1: Read messages in from Kafka
             */
            .apply(
                "ReadFromKafka",
                KafkaTransform.readStringFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))

            /*
             * Step #2: Transform the Kafka Messages into TableRows
             */
            .apply(
                "ConvertMessageToTableRow",
                StringMessageToTableRow.newBuilder()
                    .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                    .setFunctionName(options.getJavascriptTextTransformFunctionName())
                    .setReloadIntervalMinutes(
                        options.getJavascriptTextTransformReloadIntervalMinutes())
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());
    /*
     * Step #3: Write the successful records out to BigQuery
     */
    WriteResult writeResult =
        convertedTableRows
            .get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(options.getOutputTableSpec()));

    /*
     * Step 3 Contd.
     * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
     */
    PCollection<FailsafeElement<String, String>> failedInserts =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(KafkaToBigQueryFlex::wrapBigQueryInsertError))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #4: Write failed records out to BigQuery
       */
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply(
              "WriteTransformationFailedRecords",
              WriteKafkaRecordMessageErrors.newBuilder()
                  .setErrorRecordsTable(
                      ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
                  .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                  .build());
    } else {
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply("PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<>()));
    }

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #5: Insert records that failed BigQuery inserts into a dead-letter table.
       */
      failedInserts.apply(
          "WriteInsertionFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
              .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
              .build());
    } else {
      failedInserts.apply(
          "PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<String, String>()));
    }

    return pipeline;
  }

  /**
   * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
   *
   * @param insertError BigQueryInsert error.
   * @return FailsafeElement object.
   */
  protected static FailsafeElement<String, String> wrapBigQueryInsertError(
      BigQueryInsertError insertError) {

    FailsafeElement<String, String> failsafeElement;
    try {

      failsafeElement =
          FailsafeElement.of(
              insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
      failsafeElement.setErrorMessage(insertError.getError().toPrettyString());

    } catch (IOException e) {
      LOG.error("Failed to wrap BigQuery insert error.");
      throw new RuntimeException(e);
    }
    return failsafeElement;
  }

  static class ThrowErrorFn<T, W> extends DoFn<FailsafeElement<T, W>, FailsafeElement<T, W>> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      FailsafeElement<T, W> element = context.element();
      // TODO: Logging every exception might overload Google Cloud Logging API. Find a better way to
      // log these errors.
      LOG.error(element.toString() + element.getErrorMessage() + element.getStacktrace());
      context.output(element);
    }
  }
}

Étape suivante