Plantilla de Apache Kafka a la planilla de Kafka

La plantilla de Apache Kafka a Apache Kafka crea una canalización de transmisión que transmite datos como bytes desde una fuente de Apache Kafka y, luego, escribe los bytes en un receptor de Apache Kafka.

Requisitos de la canalización

  • El tema de origen de Apache Kafka debe existir.
  • Los servidores del agente de origen y receptor de Apache Kafka deben estar en ejecución y se debe poder acceder a ellos desde las máquinas de trabajador de Dataflow.
  • Si usas Google Cloud Managed Service para Apache Kafka como fuente o receptor, el tema debe existir antes de iniciar la plantilla.

Formato del mensaje de Kafka

Los mensajes fuente de Apache Kafka se leen como bytes, y los bytes se escriben en el receptor de Apache Kafka.

Autenticación

La plantilla de Apache Kafka a Apache Kafka admite la autenticación SASL/PLAIN y TLS para los agentes de Kafka.

Parámetros de la plantilla

Parámetros obligatorios

  • readBootstrapServerAndTopic: El servidor de arranque y el tema de Kafka desde los que se lee la entrada. Por ejemplo, localhost:9092;topic1,topic2
  • kafkaReadAuthenticationMode: Es el modo de autenticación que se usará con el clúster de Kafka. Usa KafkaAuthenticationMethod.NONE para no autenticación, KafkaAuthenticationMethod.SASL_PLAIN para nombre de usuario y contraseña de SASL/PLAIN y KafkaAuthenticationMethod.TLS para autenticación basada en certificados. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS solo debe usarse para el clúster de Apache Kafka para BigQuery de Google Cloud, ya que permite autenticarse con credenciales predeterminadas de la aplicación.
  • writeBootstrapServerAndTopic: El tema de Kafka en el que se escribirá el resultado.
  • kafkaWriteAuthenticationMethod: Es el modo de autenticación que se usará con el clúster de Kafka. Usa NONE para la no autenticación, SASL_PLAIN para nombre de usuario y contraseña de SASL/PLAIN y TLS para autenticación basada en certificados. El valor predeterminado es APPLICATION_DEFAULT_CREDENTIALS.

Parámetros opcionales

  • enableCommitOffsets: Confirma los desplazamientos de los mensajes procesados en Kafka. Si se habilita, esto minimizará las brechas o el procesamiento duplicado de los mensajes cuando se reinicie la canalización. Requiere especificar el ID del grupo de consumidores. La configuración predeterminada es "false".
  • consumerGroupId: Es el identificador único del grupo de consumidores al que pertenece esta canalización. Obligatorio si la confirmación de desplazamientos a Kafka está habilitada. La configuración predeterminada es vacía.
  • kafkaReadOffset: El punto de partida para leer mensajes cuando no existen compensaciones confirmadas. El primero comienza desde el principio y el más reciente desde el mensaje más reciente. La configuración predeterminada es: la más reciente.
  • kafkaReadUsernameSecretId: El ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka que se usará con la autenticación SASL_PLAIN. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La configuración predeterminada es vacía.
  • kafkaReadPasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se usará con la autenticación SASL_PLAIN. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La configuración predeterminada es vacía.
  • kafkaReadKeystoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java KeyStore (JKS) que contiene el certificado TLS y la clave privada que se usará para la autenticación con el clúster de Kafka. Por ejemplo, gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del agente de Kafka.
  • kafkaReadTruststorePasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java TrustStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
  • kafkaReadKeystorePasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder a la clave privada dentro del archivo Java KeyStore (JKS) para la autenticación TLS de Kafka. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteUsernameSecretId: El ID del secreto de Google Cloud Secret Manager que contiene el nombre de usuario de Kafka para la autenticación SASL_PLAIN con el clúster de Kafka de destino. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> La configuración predeterminada es vacía.
  • kafkaWritePasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña de Kafka que se usará para la autenticación SASL_PLAIN con el clúster de Kafka de destino. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> La configuración predeterminada es vacía.
  • kafkaWriteKeystoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java KeyStore (JKS) que contiene el certificado TLS y la clave privada para la autenticación con el clúster de Kafka de destino. Por ejemplo, gs://<BUCKET>/<KEYSTORE>.jks
  • kafkaWriteTruststoreLocation: Es la ruta de acceso de Google Cloud Storage al archivo Java TrustStore (JKS) que contiene los certificados de confianza que se usarán para verificar la identidad del agente de Kafka de destino.
  • kafkaWriteTruststorePasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder al archivo Java TrustStore (JKS) para la autenticación de TLS con el clúster de Kafka de destino. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeystorePasswordSecretId: El ID de secreto de Google Cloud Secret Manager que contiene la contraseña para acceder al archivo Java KeyStore (JKS) que se usará en la autenticación TLS con el clúster de Kafka de destino. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeyPasswordSecretId: El ID del secreto de Google Cloud Secret Manager que contiene la contraseña que se usará para acceder a la clave privada dentro del archivo Java KeyStore (JKS) para la autenticación TLS con el clúster de Kafka de destino. Por ejemplo, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Kafka to Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BIGQUERY_TABLE: Es el nombre de la tabla de Cloud Storage
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, debes escapar las comas. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, debes escapar las comas. Consulta gcloud topic escaping.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • BIGQUERY_TABLE: Es el nombre de la tabla de Cloud Storage
  • KAFKA_TOPICS: Es la lista de temas de Apache Kkafa. Si se proporcionan varios temas, debes escapar las comas. Consulta gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE: El URI de Cloud Storage de .js archivo que define la función definida por el usuario (UDF) de JavaScript que deseas usar, por ejemplo:gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION es el nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar.

    Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.

  • KAFKA_SERVER_ADDRESSES: La lista de direcciones IP del servidor del agente de Apache Kafka. Cada dirección IP debe tener el número de puerto desde el que se puede acceder al servidor. Por ejemplo: 35.70.252.199:9092. Si se proporcionan varias direcciones, debes escapar las comas. Consulta gcloud topic escaping.

Para obtener más información, consulta Escribe datos de Kafka en Cloud Storage con Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.kafka.options.KafkaWriteOptions;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.FileAwareProducerFactoryFn;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.kafka.values.KafkaAuthenticationMethod;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.ByteArraySerializer;

@Template(
    name = "Kafka_to_Kafka",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to Kafka",
    description = "A pipeline that writes data to a kafka destination from another kafka source",
    optionsClass = KafkaToKafka.KafkaToKafkaOptions.class,
    flexContainerName = "kafka-to-kafka",
    contactInformation = "https://cloud.google.com/support")
public class KafkaToKafka {
  public interface KafkaToKafkaOptions
      extends PipelineOptions, KafkaReadOptions, KafkaWriteOptions, DataflowPipelineOptions {}

  public static void main(String[] args) throws IOException {
    UncaughtExceptionLogger.register();
    KafkaToKafkaOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToKafkaOptions.class);
    run(options);
  }

  public static PipelineResult run(KafkaToKafkaOptions options) {

    if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
      checkArgument(
          options.getKafkaReadUsernameSecretId().trim().length() > 0,
          "KafkaReadUsernameSecretId required to access username for source Kafka");
      checkArgument(
          options.getKafkaReadPasswordSecretId().trim().length() > 0,
          "KafkaReadPasswordSecretId required to access password for source kafka");
    } else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.TLS)) {
      checkArgument(
          options.getKafkaReadTruststoreLocation().trim().length() > 0,
          "KafkaReadTruststoreLocation for trust store certificate required for ssl authentication");
      checkArgument(
          options.getKafkaReadTruststorePasswordSecretId().trim().length() > 0,
          "KafkaReadTruststorePassword for trust store password required for accessing truststore");
      checkArgument(
          options.getKafkaReadKeystoreLocation().trim().length() > 0,
          "KafkaReadKeystoreLocation for key store location required for ssl authentication");
      checkArgument(
          options.getKafkaReadKeystorePasswordSecretId().trim().length() > 0,
          "KafkaReadKeystorePassword for key store password required to access key store");
      checkArgument(
          options.getKafkaReadKeyPasswordSecretId().trim().length() > 0,
          "KafkaReadKeyPasswordSecretId version for key password required for SSL authentication");
    } else if (options.getKafkaReadAuthenticationMode().equals(KafkaAuthenticationMethod.NONE)
        || (options
            .getKafkaReadAuthenticationMode()
            .equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS))) {
    } else {
      throw new UnsupportedOperationException(
          "Authentication method not supported: " + options.getKafkaReadAuthenticationMode());
    }

    if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.SASL_PLAIN)) {
      checkArgument(
          options.getKafkaWriteUsernameSecretId().trim().length() > 0,
          "KafkaWriteUsernameSecretId required to access username for source Kafka");
      checkArgument(
          options.getKafkaWritePasswordSecretId().trim().length() > 0,
          "KafkaWritePasswordSecretId required to access password for destination Kafka");
    } else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.TLS)) {
      checkArgument(
          options.getKafkaWriteTruststoreLocation().trim().length() > 0,
          "KafkaWriteTruststoreLocation for trust store certificate required for ssl authentication");
      checkArgument(
          options.getKafkaWriteTruststorePasswordSecretId().trim().length() > 0,
          "KafkaWriteTruststorePasswordSecretId for trust store password required for accessing truststore");
      checkArgument(
          options.getKafkaWriteKeystoreLocation().trim().length() > 0,
          "KafkaWriteKeystoreLocation for key store location required for ssl authentication");
      checkArgument(
          options.getKafkaWriteKeystorePasswordSecretId().trim().length() > 0,
          "KafkaWriteKeystorePasswordSecretId for key store password required to access key store");
      checkArgument(
          options.getKafkaWriteKeyPasswordSecretId().trim().length() > 0,
          "KafkaWriteKeyPasswordSecretId for source key password secret id version required for SSL authentication");
    } else if (options.getKafkaWriteAuthenticationMethod().equals(KafkaAuthenticationMethod.NONE)
        || options
            .getKafkaWriteAuthenticationMethod()
            .equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
      // No additional validation is required for these auth mechanisms since they don't depend on
      // any specific pipeline options.
    } else {
      throw new UnsupportedOperationException(
          "Authentication method not supported: " + options.getKafkaWriteAuthenticationMethod());
    }

    String sourceTopic;
    String sourceBootstrapServers;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> sourceBootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      sourceTopic = sourceBootstrapServerAndTopicList.get(1);
      sourceBootstrapServers = sourceBootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    String destinationTopic;
    String destinationBootstrapServers;
    if (options.getWriteBootstrapServerAndTopic() != null) {
      List<String> destinationBootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getWriteBootstrapServerAndTopic(), options.getProject());
      destinationBootstrapServers = destinationBootstrapServerAndTopicList.get(0);
      destinationTopic = destinationBootstrapServerAndTopicList.get(1);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(
            "Read from Kafka",
            KafkaTransform.readBytesFromKafka(
                    sourceBootstrapServers,
                    Collections.singletonList(sourceTopic),
                    KafkaConfig.fromReadOptions(options),
                    options.getEnableCommitOffsets())
                .withoutMetadata())
        .apply(
            "Write to Kafka",
            KafkaIO.<byte[], byte[]>write()
                .withBootstrapServers(destinationBootstrapServers)
                .withTopic(destinationTopic)
                .withKeySerializer(ByteArraySerializer.class)
                .withValueSerializer(ByteArraySerializer.class)
                .withProducerConfigUpdates(KafkaConfig.fromWriteOptions(options))
                .withProducerFactoryFn(new FileAwareProducerFactoryFn()));

    return pipeline.run();
  }
}

¿Qué sigue?