Modello Apache Kafka a BigQuery

Il modello Apache Kafka to BigQuery è una pipeline di inserimento flussi che importa i dati di testo dai cluster Google Cloud Managed Service per Apache Kafka e poi restituisce i record risultanti alle tabelle BigQuery. Eventuali errori che si verificano durante l'inserimento dei dati nella tabella di output vengono inseriti in una tabella degli errori separata in BigQuery.

Puoi utilizzare il modello Apache Kafka to BigQuery anche con Kafka autogestito o esterno.

Requisiti della pipeline

  • Il server broker Apache Kafka deve essere in esecuzione ed essere raggiungibile dalle macchine worker di Dataflow.
  • Gli argomenti Apache Kafka devono esistere.
  • Devi abilitare le API Dataflow, BigQuery e Cloud Storage. Se è richiesta l'autenticazione, devi attivare anche l'API Secret Manager.
  • Crea un set di dati e una tabella BigQuery con lo schema appropriato per l'argomento di input Kafka. Se utilizzi più schemi nello stesso argomento e vuoi scrivere in più tabelle, non è necessario creare la tabella prima di configurare la pipeline.
  • Quando la coda dei messaggi non recapitabili (non elaborati) per il modello è attiva, crea una tabella vuota che non abbia uno schema per la coda dei messaggi non recapitabili.

Formato dei messaggi Kafka

Il modello Apache Kafka to BigQuery supporta la lettura dei messaggi da Kafka nei seguenti formati: CONFLUENT_AVRO_WIRE_FORMAT, AVRO_BINARY_FORMAT e JSON.

Autenticazione

Il modello Apache Kafka to BigQuery supporta l'autenticazione SASL/PLAIN ai broker Kafka.

Parametri del modello

Parametri obbligatori

  • readBootstrapServerAndTopic: l'argomento Kafka da cui leggere l'input.
  • writeMode: consente di scrivere record in una o più tabelle (in base allo schema). La modalità DYNAMIC_TABLE_NAMES è supportata solo per il formato del messaggio di origine AVRO_CONFLUENT_WIRE_FORMAT e l'origine dello schema SCHEMA_REGISTRY. Il nome della tabella di destinazione viene generato automaticamente in base al nome dello schema Avro di ogni messaggio. Può essere un singolo schema (creazione di una singola tabella) o più schemi (creazione di più tabelle). La modalità SINGLE_TABLE_NAME scrive in una singola tabella (singolo schema) specificata dall'utente. Il valore predefinito è SINGLE_TABLE_NAME.
  • kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza KafkaAuthenticationMethod.NONE per nessuna autenticazione, KafkaAuthenticationMethod.SASL_PLAIN per nome utente e password SASL/PLAIN e KafkaAuthenticationMethod.TLS per l'autenticazione basata su certificato. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS deve essere utilizzato solo per il cluster Apache Kafka per BigQuery di Google Cloud, in quanto consente di eseguire l'autenticazione utilizzando le credenziali predefinite dell'applicazione.
  • messageFormat: il formato dei messaggi Kafka da leggere. I valori supportati sono AVRO_CONFLUENT_WIRE_FORMAT (Avro con codifica Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binario non codificato) e JSON. Il valore predefinito è: AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: se true, i messaggi non riusciti verranno scritti in BigQuery con informazioni aggiuntive sugli errori. Il valore predefinito è false.

Parametri facoltativi

  • outputTableSpec: posizione della tabella BigQuery in cui scrivere l'output. Il nome deve essere nel formato <project>:<dataset>.<table_name>. Lo schema della tabella deve corrispondere agli oggetti di input.
  • persistKafkaKey: se true, la pipeline manterrà la chiave del messaggio Kafka nella tabella BigQuery, in un campo _key di tipo BYTES. Il valore predefinito è false (la chiave viene ignorata).
  • outputProject: progetto di output BigQuery in cui risiede il set di dati. Le tabelle verranno create dinamicamente nel set di dati. Il valore predefinito è vuoto.
  • outputDataset: il set di dati BigQuery di output in cui scrivere l'output. Le tabelle verranno create dinamicamente nel set di dati. Se le tabelle vengono create in precedenza, i nomi devono rispettare la convenzione di denominazione specificata. Il nome deve essere bqTableNamePrefix + Avro Schema FullName , ogni parola sarà separata da un trattino -. Il valore predefinito è vuoto.
  • bqTableNamePrefix: prefisso dei nomi da utilizzare durante la creazione delle tabelle di output BigQuery. Applicabile solo quando si utilizza il registry dello schema. Il valore predefinito è vuoto.
  • createDisposition: BigQuery CreateDisposition. Ad esempio: CREATE_IF_NEEDED, CREATE_NEVER. Il valore predefinito è CREATE_IF_NEEDED.
  • writeDisposition: BigQuery WriteDisposition. Ad esempio: WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
  • useAutoSharding: se true, la pipeline utilizza lo sharding automatico durante la scrittura in BigQuery. Il valore predefinito è true.
  • numStorageWriteApiStreams: specifica il numero di stream di scrittura. Questo parametro deve essere impostato. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: specifica la frequenza di attivazione in secondi. Questo parametro deve essere impostato. Il valore predefinito è 5 secondi.
  • useStorageWriteApiAtLeastOnce: questo parametro viene applicato solo se è attivata l'opzione "Utilizza l'API BigQuery Storage Write". Se è attivata, per l'API Storage Write verrà utilizzata la semantica almeno una volta, altrimenti verrà utilizzata la semantica esattamente una volta. Il valore predefinito è false.
  • enableCommitOffsets: esegui il commit degli offset dei messaggi elaborati in Kafka. Se questa opzione è attivata, le lacune o l'elaborazione duplicata dei messaggi vengono ridotte al minimo al riavvio della pipeline. È necessario specificare l'ID gruppo di consumatori. Il valore predefinito è false.
  • consumerGroupId: l'identificatore univoco del gruppo di consumer a cui appartiene questa pipeline. Obbligatorio se è attivata l'opzione Commit Offsets to Kafka. Il valore predefinito è vuoto.
  • kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset di cui è stato eseguito il commit. Il più antico è quello iniziale, mentre il più recente viene calcolato partendo dal messaggio più recente. Il valore predefinito è latest.
  • kafkaReadUsernameSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadPasswordSecretId: l'ID segreto di Google Cloud Secret Manager contenente la password di Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadKeystoreLocation: il percorso di Google Cloud Storage del file dell'archivio chiavi Java (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: il percorso di Google Cloud Storage del file dell'archivio attendibilità Java (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadTruststorePasswordSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere alla chiave privata all'interno del file dell'archivio chiavi Java (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaFormat: il formato dello schema Kafka. Può essere fornito come SINGLE_SCHEMA_FILE o SCHEMA_REGISTRY. Se è specificato SINGLE_SCHEMA_FILE, utilizza lo schema menzionato nel file dello schema avro per tutti i messaggi. Se viene specificato SCHEMA_REGISTRY, i messaggi possono avere un singolo schema o più schemi. Il valore predefinito è SINGLE_SCHEMA_FILE.
  • confluentAvroSchemaPath: il percorso di Google Cloud Storage al singolo file dello schema Avro utilizzato per decodificare tutti i messaggi di un argomento. Il valore predefinito è vuoto.
  • schemaRegistryConnectionUrl: l'URL dell'istanza Confluent Schema Registry utilizzata per gestire gli schemi Avro per la decodifica dei messaggi. Il valore predefinito è vuoto.
  • binaryAvroSchemaPath: il percorso di Google Cloud Storage al file dello schema Avro utilizzato per decodificare i messaggi Avro con codifica binaria. Il valore predefinito è vuoto.
  • schemaRegistryAuthenticationMode: modalità di autenticazione di Schema Registry. Può essere NONE, TLS o OAUTH. Il valore predefinito è NESSUNO.
  • schemaRegistryTruststoreLocation: posizione del certificato SSL in cui sono archiviati l'archivio di attendibilità per l'autenticazione a Schema Registry. Ad esempio, /your-bucket/truststore.jks.
  • schemaRegistryTruststorePasswordSecretId: SecretId in Secret Manager in cui è memorizzata la password per accedere al secret nel truststore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeystoreLocation: posizione dell'archivio chiavi contenente il certificato SSL e la chiave privata. Ad esempio, /your-bucket/keystore.jks.
  • schemaRegistryKeystorePasswordSecretId: SecretId in Secret Manager dove si trova la password per accedere al file del keystore, ad esempio projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryKeyPasswordSecretId: SecretId della password richiesta per accedere alla chiave privata del client archiviata nel keystore. Ad esempio, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version.
  • schemaRegistryOauthClientId: l'ID client utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: l'ID secret di Google Cloud Secret Manager contenente il client secret da utilizzare per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • schemaRegistryOauthScope: l'ambito del token di accesso utilizzato per autenticare il client Schema Registry in modalità OAUTH. Questo campo è facoltativo, poiché la richiesta può essere effettuata senza che venga passato un parametro di ambito. Ad esempio, openid.
  • schemaRegistryOauthTokenEndpointUrl: l'URL basato su HTTP(S) per il provider di identità OAuth/OIDC utilizzato per autenticare il client Schema Registry in modalità OAUTH. Obbligatorio per il formato del messaggio AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: nome completo della tabella BigQuery per i messaggi non riusciti. I messaggi che non sono riusciti a raggiungere la tabella di output per diversi motivi (ad es. schema non corrispondente, JSON con formato errato) vengono scritti in questa tabella. La tabella verrà creata dal modello. Ad esempio, your-project-id:your-dataset.your-table-name.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle funzioni definite dall'utente è disattivato. Il valore predefinito è 0.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Kafka to BigQuery template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming Almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: il nome della tabella BigQuery
  • KAFKA_TOPICS: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF.

  • KAFKA_SERVER_ADDRESSES: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP richiede il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery_Flex",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: il nome della tabella BigQuery
  • KAFKA_TOPICS: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF.

  • KAFKA_SERVER_ADDRESSES: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP richiede il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.

Per ulteriori informazioni, consulta Scrivere dati da Kafka in BigQuery con Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueue;
import com.google.cloud.teleport.v2.kafka.transforms.AvroDynamicTransform;
import com.google.cloud.teleport.v2.kafka.transforms.AvroTransform;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaRecordErrorConverters.WriteKafkaRecordMessageErrors;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.MessageFormatConstants;
import com.google.cloud.teleport.v2.kafka.values.KafkaTemplateParameters.SchemaFormat;
import com.google.cloud.teleport.v2.options.KafkaToBigQueryFlexOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryWriteUtils;
import com.google.cloud.teleport.v2.transforms.ErrorConverters;
import com.google.cloud.teleport.v2.transforms.StringMessageToTableRow;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.KafkaRecordCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link KafkaToBigQueryFlex} pipeline is a streaming pipeline which ingests text data from
 * Kafka, and outputs the resulting records to BigQuery. Any errors which occur in the
 * transformation of the data, or inserting into the output table will be inserted into a separate
 * errors table in BigQuery. Both output and error tables are specified by the user as parameters.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The Kafka topic exists and the message is encoded in a valid JSON format.
 *   <li>The BigQuery output table exists.
 *   <li>The Kafka brokers are reachable from the Dataflow worker machines.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/kafka-to-bigquery/README_Kafka_to_BigQuery.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Kafka_to_BigQuery_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to BigQuery",
    description =
        "The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, and outputs the resulting records to BigQuery. "
            + "Any errors which occur in the transformation of the data, or inserting into the output table are inserted into a separate errors table in BigQuery. "
            + "For any errors which occur in the transformation of the data, the original records can be inserted into a separate Kafka topic. The template supports "
            + "reading a Kafka topic which contains single/multiple schema(s). It can write to a single or multiple BigQuery tables, depending on the schema of records. ",
    optionsClass = KafkaToBigQueryFlexOptions.class,
    flexContainerName = "kafka-to-bigquery-flex",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/kafka-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The output BigQuery table must exist.",
      "The Apache Kafka broker server must be running and be reachable from the Dataflow worker machines.",
      "The Apache Kafka topics must exist and the messages must be encoded in a valid JSON format."
    },
    skipOptions = {"useStorageWriteApi"})
public class KafkaToBigQueryFlex {

  /* Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(KafkaToBigQueryFlex.class);

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  public static final TupleTag<FailsafeElement<KafkaRecord<String, String>, String>>
      TRANSFORM_DEADLETTER_OUT = new TupleTag<>() {};

  /** String/String Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(
          NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of()));

  private static ErrorHandler<BadRecord, ?> errorHandler = new ErrorHandler.DefaultErrorHandler<>();
  private static BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for its execution to finish. If blocking execution is required, use the {@link
   * KafkaToBigQueryFlex#run(KafkaToBigQueryFlexOptions)} method to start the pipeline and invoke
   * {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) throws Exception {
    UncaughtExceptionLogger.register();

    KafkaToBigQueryFlexOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToBigQueryFlexOptions.class);

    run(options);
  }

  public static Boolean useErrorHandler(KafkaToBigQueryFlexOptions options) {
    return options.getUseBigQueryDLQ();
  }

  public static WriteResult processKafkaRecords(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {

    // Validate the pipeline options for MessageFormat and SchemaFormat.
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (options.getBinaryAvroSchemaPath() != null
            && options.getBinaryAvroSchemaPath().isBlank())) {
      throw new IllegalArgumentException(
          "Binary Avro Schema Path cannot be empty for AVRO_BINARY_ENCODING.");
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {

      if ((options.getSchemaRegistryConnectionUrl() != null
              && options.getSchemaRegistryConnectionUrl().isBlank())
          && (options.getConfluentAvroSchemaPath() != null
              && options.getConfluentAvroSchemaPath().isBlank())) {
        throw new IllegalArgumentException(
            "Either Schema Registry Connection URL or Confluent Avro Schema Path must be provided for AVRO_CONFLUENT_WIRE_FORMAT.");
      }

      if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
        if (!options.getConfluentAvroSchemaPath().isBlank()
            && (options.getOutputTableSpec() != null && options.getOutputTableSpec().isBlank())) {
          throw new IllegalArgumentException(
              "The outputTableSpec parameter is required when using the SINGLE_SCHEMA_FILE schema format.");
        }
      } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
        if (options.getSchemaRegistryConnectionUrl() != null
            && (options.getOutputDataset() != null && options.getOutputDataset().isBlank())) {
          throw new IllegalArgumentException(
              "An output BigQuery dataset is required. It will be used to create tables per schema.");
        }
      } else {
        throw new IllegalArgumentException(
            "Unsupported schemaFormat parameter value: " + options.getSchemaFormat());
      }
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (!options.getBinaryAvroSchemaPath().isBlank())) {
      return handleAvroBinaryEncoding(kafkaRecords, options);
    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && (!options.getSchemaRegistryConnectionUrl().isBlank()
            || !options.getConfluentAvroSchemaPath().isBlank())) {
      return handleAvroConfluentWireFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getMessageFormat() + " is unsupported.");
    }
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(KafkaToBigQueryFlexOptions options) throws Exception {

    // Enable Streaming Engine
    options.setEnableStreamingEngine(true);

    List<String> dataflowServiceOptions = options.getDataflowServiceOptions();
    if (dataflowServiceOptions == null) {
      dataflowServiceOptions = new ArrayList<>();
    }
    dataflowServiceOptions.add("enable_streaming_engine_resource_based_billing");
    options.setDataflowServiceOptions(dataflowServiceOptions);

    // Validate BQ STORAGE_WRITE_API options
    options.setUseStorageWriteApi(true);
    if (options.getStorageWriteApiTriggeringFrequencySec() == null) {
      options.setStorageWriteApiTriggeringFrequencySec(5);
    }
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
    MetadataValidator.validate(options);

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    List<String> topicsList;
    String bootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> bootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      topicsList = List.of(bootstrapServerAndTopicList.get(1));
      bootstrapServers = bootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    // Configure error handler for Dead letter queue
    if (options.getUseBigQueryDLQ()) {
      if (options.getOutputDeadletterTable() == null
          || options.getOutputDeadletterTable().isBlank()) {
        throw new IllegalArgumentException(
            "Please provide a valid BigQuery full qualified table name when using BigQuery"
                + "Dead letter queue");
      }
      badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
      errorHandler =
          pipeline.registerBadRecordErrorHandler(
              BigQueryDeadLetterQueue.newBuilder()
                  .setTableName(options.getOutputDeadletterTable())
                  .build());
    }

    // Get the Kafka config
    Map<String, Object> kafkaConfig = KafkaConfig.fromReadOptions(options);

    /*
     * Steps:
     *  1) Read messages in from Kafka
     *  2) Transform the messages into TableRows
     *     - Transform message payload via UDF
     *     - Convert UDF result to TableRow objects
     *  3) Write successful records out to BigQuery
     *  4) Write failed records out to BigQuery
     */

    if (options.getMessageFormat() == null
        || options.getMessageFormat().equals(MessageFormatConstants.JSON)) {

      pipeline = runJsonPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        || options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)) {
      pipeline = runAvroPipeline(pipeline, options, topicsList, bootstrapServers, kafkaConfig);

    } else {
      throw new IllegalArgumentException("Invalid format specified: " + options.getMessageFormat());
    }
    if (useErrorHandler(options)) {
      errorHandler.close();
    }
    return pipeline.run();
  }

  private static WriteResult handleAvroBinaryEncoding(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      // BigQueryIO sets the BadRecordRouter to RecordingRouter even when the errorHandler is
      // DefaultErrorHandler(which is a no op). In this case, when the BadRecordRouter is
      // ThrowingRouter,
      // don't pass errorHandler to BigQueryIO.
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getBinaryAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getBinaryAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleAvroConfluentWireFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
      return handleSingleSchemaFileFormat(kafkaRecords, options);
    } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
      return handleSchemaRegistryFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getSchemaFormat() + " is unsupported.");
    }
  }

  private static WriteResult handleSingleSchemaFileFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getConfluentAvroSchemaPath() != null && options.getOutputTableSpec() != null)) {
      // TODO: Add error.
      throw new RuntimeException("");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryWrite.of(
              SchemaUtils.getAvroSchema(options.getConfluentAvroSchemaPath()),
              options.getOutputTableSpec(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroTransform.of(
                    options.getMessageFormat(),
                    options.getConfluentAvroSchemaPath(),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  private static WriteResult handleSchemaRegistryFormat(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
    if (!(options.getSchemaRegistryConnectionUrl() != null && options.getOutputDataset() != null)) {
      throw new RuntimeException(
          "Missing required parameters: Schema Registry URL and/or Output Dataset");
    }
    WriteResult writeResult;
    BigQueryWriteUtils.BigQueryDynamicWrite bigQueryWrite;
    if (useErrorHandler(options)) {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding(),
              errorHandler);
    } else {
      bigQueryWrite =
          BigQueryWriteUtils.BigQueryDynamicWrite.of(
              options.getOutputProject(),
              options.getOutputDataset(),
              options.getBqTableNamePrefix(),
              options.getWriteDisposition(),
              options.getCreateDisposition(),
              options.getNumStorageWriteApiStreams(),
              options.getStorageWriteApiTriggeringFrequencySec(),
              options.getPersistKafkaKey(),
              options.getUseAutoSharding());
    }
    writeResult =
        kafkaRecords
            .apply(
                AvroDynamicTransform.of(
                    options.getSchemaRegistryConnectionUrl(),
                    KafkaConfig.fromSchemaRegistryOptions(options),
                    errorHandler,
                    badRecordRouter))
            .apply(bigQueryWrite);
    return writeResult;
  }

  public static Pipeline runAvroPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig)
      throws Exception {

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && options.getBinaryAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Avro schema is needed in order to read non confluent wire format messages.");
    }
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && options.getSchemaRegistryConnectionUrl() == null
        && options.getConfluentAvroSchemaPath() == null) {
      throw new IllegalArgumentException(
          "Schema Registry Connection URL or Avro schema is needed in order to read confluent wire format messages.");
    }
    if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())
        && !Strings.isNullOrEmpty(options.getJavascriptTextTransformFunctionName())) {
      LOG.warn(
          "JavaScript UDF parameters are set while using Avro message format. "
              + "UDFs are supported for JSON format only. No UDF transformation will be applied.");
    }

    PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords;

    kafkaRecords =
        pipeline
            /*
             * Step #1: Read messages in from Kafka and convert to GenericRecords wrap in FailsafeElement
             */
            .apply(
                "ReadBytesFromKafka",
                KafkaTransform.readBytesFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))
            .setCoder(
                KafkaRecordCoder.of(NullableCoder.of(ByteArrayCoder.of()), ByteArrayCoder.of()));

    WriteResult writeResult = processKafkaRecords(kafkaRecords, options);
    return pipeline;
  }

  public static Pipeline runJsonPipeline(
      Pipeline pipeline,
      KafkaToBigQueryFlexOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig) {

    // Register the coder for pipeline
    FailsafeElementCoder<KafkaRecord<String, String>, String> coder =
        FailsafeElementCoder.of(
            KafkaRecordCoder.of(
                NullableCoder.of(StringUtf8Coder.of()), NullableCoder.of(StringUtf8Coder.of())),
            NullableCoder.of(StringUtf8Coder.of()));

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

    PCollectionTuple convertedTableRows;
    convertedTableRows =
        pipeline
            /*
             * Step #1: Read messages in from Kafka
             */
            .apply(
                "ReadFromKafka",
                KafkaTransform.readStringFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, options.getEnableCommitOffsets()))

            /*
             * Step #2: Transform the Kafka Messages into TableRows
             */
            .apply(
                "ConvertMessageToTableRow",
                StringMessageToTableRow.newBuilder()
                    .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                    .setFunctionName(options.getJavascriptTextTransformFunctionName())
                    .setReloadIntervalMinutes(
                        options.getJavascriptTextTransformReloadIntervalMinutes())
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());
    /*
     * Step #3: Write the successful records out to BigQuery
     */
    WriteResult writeResult =
        convertedTableRows
            .get(TRANSFORM_OUT)
            .apply(
                "WriteSuccessfulRecords",
                BigQueryIO.writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withExtendedErrorInfo()
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .to(options.getOutputTableSpec()));

    /*
     * Step 3 Contd.
     * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
     */
    PCollection<FailsafeElement<String, String>> failedInserts =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(KafkaToBigQueryFlex::wrapBigQueryInsertError))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #4: Write failed records out to BigQuery
       */
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply(
              "WriteTransformationFailedRecords",
              WriteKafkaRecordMessageErrors.newBuilder()
                  .setErrorRecordsTable(
                      ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
                  .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                  .build());
    } else {
      PCollectionList.of(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply("PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<>()));
    }

    if (options.getOutputDeadletterTable() != null) {
      /*
       * Step #5: Insert records that failed BigQuery inserts into a dead-letter table.
       */
      failedInserts.apply(
          "WriteInsertionFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(ObjectUtils.firstNonNull(options.getOutputDeadletterTable()))
              .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
              .build());
    } else {
      failedInserts.apply(
          "PrintInsertionFailedRecords", ParDo.of(new ThrowErrorFn<String, String>()));
    }

    return pipeline;
  }

  /**
   * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
   *
   * @param insertError BigQueryInsert error.
   * @return FailsafeElement object.
   */
  protected static FailsafeElement<String, String> wrapBigQueryInsertError(
      BigQueryInsertError insertError) {

    FailsafeElement<String, String> failsafeElement;
    try {

      failsafeElement =
          FailsafeElement.of(
              insertError.getRow().toPrettyString(), insertError.getRow().toPrettyString());
      failsafeElement.setErrorMessage(insertError.getError().toPrettyString());

    } catch (IOException e) {
      LOG.error("Failed to wrap BigQuery insert error.");
      throw new RuntimeException(e);
    }
    return failsafeElement;
  }

  static class ThrowErrorFn<T, W> extends DoFn<FailsafeElement<T, W>, FailsafeElement<T, W>> {

    @ProcessElement
    public void processElement(ProcessContext context) {
      FailsafeElement<T, W> element = context.element();
      // TODO: Logging every exception might overload Google Cloud Logging API. Find a better way to
      // log these errors.
      LOG.error(element.toString() + element.getErrorMessage() + element.getStacktrace());
      context.output(element);
    }
  }
}

Passaggi successivi