Modello Apache Kafka a Kafka

Il modello Apache Kafka to Apache Kafka crea una pipeline di inserimento flussi che importa i dati sotto forma di byte da un'origine Apache Kafka e poi li scrive in un destinazione Apache Kafka.

Requisiti della pipeline

  • L'argomento di origine Apache Kafka deve esistere.
  • I server broker di origine e di destinazione Apache Kafka devono essere in esecuzione ed essere raggiungibili dalle macchine worker di Dataflow.
  • Se utilizzi il servizio gestito Google Cloud per Apache Kafka come origine o destinazione, l'argomento deve esistere prima del lancio del modello.

Formato dei messaggi Kafka

I messaggi di origine Apache Kafka vengono letti come byte e i byte vengono scritti nello scopo Apache Kafka.

Autenticazione

Il modello da Apache Kafka ad Apache Kafka supporta l'autenticazione SASL/PLAIN e TLS ai broker Kafka.

Parametri del modello

Parametri obbligatori

  • readBootstrapServerAndTopic: il server bootstrap Kafka e l'argomento da cui leggere l'input. Ad esempio, localhost:9092;topic1,topic2.
  • kafkaReadAuthenticationMode: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza KafkaAuthenticationMethod.NONE per l'autenticazione non richiesta, KafkaAuthenticationMethod.SASL_PLAIN per nome utente e password SASL/PLAIN e KafkaAuthenticationMethod.TLS per l'autenticazione basata su certificato. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS deve essere utilizzato solo per il cluster Apache Kafka per BigQuery di Google Cloud, in quanto consente di eseguire l'autenticazione utilizzando le credenziali predefinite dell'applicazione.
  • writeBootstrapServerAndTopic: l'argomento Kafka in cui scrivere l'output.
  • kafkaWriteAuthenticationMethod: la modalità di autenticazione da utilizzare con il cluster Kafka. Utilizza NONE per l'autenticazione senza password, SASL_PLAIN per il nome utente e la password SASL/PLAIN e TLS per l'autenticazione basata su certificato. Valore predefinito: APPLICATION_DEFAULT_CREDENTIALS.

Parametri facoltativi

  • enableCommitOffsets: esegui il commit degli offset dei messaggi elaborati in Kafka. Se questa opzione è attivata, le lacune o l'elaborazione duplicata dei messaggi vengono ridotte al minimo al riavvio della pipeline. È necessario specificare l'ID gruppo di consumatori. Il valore predefinito è false.
  • consumerGroupId: l'identificatore univoco del gruppo di consumer a cui appartiene questa pipeline. Obbligatorio se è attivata l'opzione Commit Offsets to Kafka. Il valore predefinito è vuoto.
  • kafkaReadOffset: il punto di partenza per la lettura dei messaggi quando non esistono offset di cui è stato eseguito il commit. Il più recente è quello iniziale, mentre il meno recente viene calcolato partendo dal messaggio più recente. Il valore predefinito è latest.
  • kafkaReadUsernameSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene il nome utente Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadPasswordSecretId: l'ID segreto di Google Cloud Secret Manager contenente la password di Kafka da utilizzare con l'autenticazione SASL_PLAIN. Ad esempio: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaReadKeystoreLocation: il percorso di Google Cloud Storage del file dell'archivio chiavi Java (JKS) contenente il certificato TLS e la chiave privata da utilizzare per l'autenticazione con il cluster Kafka. Ad esempio, gs://your-bucket/keystore.jks.
  • kafkaReadTruststoreLocation: il percorso di Google Cloud Storage del file dell'archivio attendibilità Java (JKS) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka.
  • kafkaReadTruststorePasswordSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java KeyStore (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeyPasswordSecretId: l'ID secret di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere alla chiave privata all'interno del file dell'archivio chiavi Java (JKS) per l'autenticazione TLS di Kafka. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteUsernameSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene il nome utente Kafka per l'autenticazione SASL_PLAIN con il cluster Kafka di destinazione. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaWritePasswordSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene la password Kafka da utilizzare per l'autenticazione SASL_PLAIN con il cluster Kafka di destinazione. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Il valore predefinito è vuoto.
  • kafkaWriteKeystoreLocation: il percorso di Google Cloud Storage del file dell'archivio chiavi Java (JKS) contenente il certificato TLS e la chiave privata per l'autenticazione con il cluster Kafka di destinazione. Ad esempio, gs://<BUCKET>/<KEYSTORE>.jks.
  • kafkaWriteTruststoreLocation: il percorso di Google Cloud Storage del file JKS (Java TrustStore) contenente i certificati attendibili da utilizzare per verificare l'identità del broker Kafka di destinazione.
  • kafkaWriteTruststorePasswordSecretId: l'ID segreto di Secret Manager di Google Cloud che contiene la password da utilizzare per accedere al file Java TrustStore (JKS) per l'autenticazione TLS con il cluster Kafka di destinazione. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeystorePasswordSecretId: l'ID del secret di Secret Manager di Google Cloud contenente la password per accedere al file dell'archivio chiavi Java (JKS) da utilizzare per l'autenticazione TLS con il cluster Kafka di destinazione. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaWriteKeyPasswordSecretId: l'ID secret di Secret Manager di Google Cloud contenente la password da utilizzare per accedere alla chiave privata all'interno del file dell'archivio chiavi Java (JKS) per l'autenticazione TLS con il cluster Kafka di destinazione. Ad esempio, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Kafka to Cloud Storage template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming Almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: il nome della tabella Cloud Storage
  • KAFKA_TOPICS: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF.

  • KAFKA_SERVER_ADDRESSES: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve avere il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • BIGQUERY_TABLE: il nome della tabella Cloud Storage
  • KAFKA_TOPICS: l'elenco di argomenti Apache Kafka. Se vengono forniti più argomenti, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare, ad esempio gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare

    Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF.

  • KAFKA_SERVER_ADDRESSES: l'elenco di indirizzi IP del server broker Apache Kafka. Ogni indirizzo IP deve avere il numero di porta da cui è accessibile il server. Ad esempio: 35.70.252.199:9092. Se vengono forniti più indirizzi, devi eseguire l'escapismo delle virgole. Consulta gcloud topic escaping.

Per ulteriori informazioni, consulta Scrivere dati da Kafka in Cloud Storage con Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.kafka.options.KafkaWriteOptions;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.FileAwareProducerFactoryFn;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaAuthenticationMethod;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.ByteArraySerializer;

@Template(
    name = "Kafka_to_Kafka",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to Kafka",
    description = "A pipeline that writes data to a kafka destination from another kafka source",
    optionsClass = KafkaToKafka.KafkaToKafkaOptions.class,
    flexContainerName = "kafka-to-kafka",
    contactInformation = "https://cloud.google.com/support")
public class KafkaToKafka {
  public interface KafkaToKafkaOptions
      extends PipelineOptions, KafkaReadOptions, KafkaWriteOptions, DataflowPipelineOptions {}

  public static void main(String[] args) throws IOException {
    UncaughtExceptionLogger.register();
    KafkaToKafkaOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToKafkaOptions.class);
    run(options);
  }

  public static PipelineResult run(KafkaToKafkaOptions options) {

    if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
      checkArgument(
          options.getKafkaReadUsernameSecretId().trim().length() > 0,
          "KafkaReadUsernameSecretId required to access username for source Kafka");
      checkArgument(
          options.getKafkaReadPasswordSecretId().trim().length() > 0,
          "KafkaReadPasswordSecretId required to access password for source kafka");
    } else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.TLS)) {
      checkArgument(
          options.getKafkaReadTruststoreLocation().trim().length() > 0,
          "KafkaReadTruststoreLocation for trust store certificate required for ssl authentication");
      checkArgument(
          options.getKafkaReadTruststorePasswordSecretId().trim().length() > 0,
          "KafkaReadTruststorePassword for trust store password required for accessing truststore");
      checkArgument(
          options.getKafkaReadKeystoreLocation().trim().length() > 0,
          "KafkaReadKeystoreLocation for key store location required for ssl authentication");
      checkArgument(
          options.getKafkaReadKeystorePasswordSecretId().trim().length() > 0,
          "KafkaReadKeystorePassword for key store password required to access key store");
      checkArgument(
          options.getKafkaReadKeyPasswordSecretId().trim().length() > 0,
          "KafkaReadKeyPasswordSecretId version for key password required for SSL authentication");
    } else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.NONE)
        || (options
            .getKafkaReadAuthenticationMode()
            .equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS))) {
    } else {
      throw new UnsupportedOperationException(
          "Authentication method not supported: " + options.getKafkaReadAuthenticationMode());
    }

    if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
      checkArgument(
          options.getKafkaWriteUsernameSecretId().trim().length() > 0,
          "KafkaWriteUsernameSecretId required to access username for source Kafka");
      checkArgument(
          options.getKafkaWritePasswordSecretId().trim().length() > 0,
          "KafkaWritePasswordSecretId required to access password for destination Kafka");
    } else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.TLS)) {
      checkArgument(
          options.getKafkaWriteTruststoreLocation().trim().length() > 0,
          "KafkaWriteTruststoreLocation for trust store certificate required for ssl authentication");
      checkArgument(
          options.getKafkaWriteTruststorePasswordSecretId().trim().length() > 0,
          "KafkaWriteTruststorePasswordSecretId for trust store password required for accessing truststore");
      checkArgument(
          options.getKafkaWriteKeystoreLocation().trim().length() > 0,
          "KafkaWriteKeystoreLocation for key store location required for ssl authentication");
      checkArgument(
          options.getKafkaWriteKeystorePasswordSecretId().trim().length() > 0,
          "KafkaWriteKeystorePasswordSecretId for key store password required to access key store");
      checkArgument(
          options.getKafkaWriteKeyPasswordSecretId().trim().length() > 0,
          "KafkaWriteKeyPasswordSecretId for source key password secret id version required for SSL authentication");
    } else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.NONE)
        || options
            .getKafkaWriteAuthenticationMethod()
            .equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
      // No additional validation is required for these auth mechanisms since they don't depend on
      // any specific pipeline options.
    } else {
      throw new UnsupportedOperationException(
          "Authentication method not supported: " + options.getKafkaWriteAuthenticationMethod());
    }

    String sourceTopic;
    String sourceBootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> sourceBootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      sourceTopic = sourceBootstrapServerAndTopicList.get(1);
      sourceBootstrapServers = sourceBootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    String destinationTopic;
    String destinationBootstrapServers;
    if (options.getWriteBootstrapServerAndTopic() != null) {
      List<String> destinationBootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getWriteBootstrapServerAndTopic(), options.getProject());
      destinationBootstrapServers = destinationBootstrapServerAndTopicList.get(0);
      destinationTopic = destinationBootstrapServerAndTopicList.get(1);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
            "Read from Kafka",
            KafkaTransform.readBytesFromKafka(
                    sourceBootstrapServers,
                    Collections.singletonList(sourceTopic),
                    KafkaConfig.fromReadOptions(options),
                    options.getEnableCommitOffsets())
                .withoutMetadata())
        .apply(
            "Write to Kafka",
            KafkaIO.<byte[], byte[]>write()
                .withBootstrapServers(destinationBootstrapServers)
                .withTopic(destinationTopic)
                .withKeySerializer(ByteArraySerializer.class)
                .withValueSerializer(ByteArraySerializer.class)
                .withProducerConfigUpdates(KafkaConfig.fromWriteOptions(options))
                .withProducerFactoryFn(new FileAwareProducerFactoryFn()));

    return pipeline.run();
  }
}

Passaggi successivi