Modèle Apache Kafka vers Cloud Storage

Le modèle Apache Kafka vers Cloud Storage est un pipeline de flux de données qui ingère les données textuelles de Google Cloud Managed Service pour Apache Kafka et génère les enregistrements dans Cloud Storage.

Vous pouvez également utiliser le modèle Apache Kafka vers BigQuery avec Kafka autogéré ou externe.

Conditions requises pour ce pipeline

  • Le bucket Cloud Storage de sortie doit exister.
  • Le serveur de courtiers Apache Kafka doit être en cours d'exécution et joignable depuis les machines de nœud de calcul Dataflow.
  • Les sujets Apache Kafka doivent exister.

Format de message Kafka

Le modèle Apache Kafka vers Cloud Storage permet de lire des messages de Kafka aux formats suivants : CONFLUENT_AVRO_WIRE_FORMAT et JSON.

Format du fichier de sortie

Le format du fichier de sortie est identique à celui du message Kafka d'entrée. Par exemple, si vous sélectionnez JSON pour le format de message Kafka, les fichiers JSON sont écrits dans le bucket Cloud Storage de sortie.

Authentification

Le modèle Apache Kafka vers Cloud Storage est compatible avec l'authentification SASL/PLAIN pour les courtiers Kafka.

Paramètres de modèle

Paramètres obligatoires

  • readBootstrapServerAndTopic: sujet Kafka à partir duquel lire l'entrée.
  • outputDirectory: chemin d'accès et préfixe du nom de fichier pour l'écriture des fichiers de sortie. Doit se terminer par une barre oblique. Exemple :gs://your-bucket/your-path/
  • kafkaReadAuthenticationMode: mode d'authentification à utiliser avec le cluster Kafka. Utilisez KafkaAuthenticationMethod.NONE pour désactiver l'authentification, KafkaAuthenticationMethod.SASL_PLAIN pour le nom d'utilisateur et le mot de passe SASL/PLAIN, et KafkaAuthenticationMethod.TLS pour l'authentification basée sur un certificat. KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS ne doit être utilisé que pour le cluster Google Cloud Apache Kafka pour BigQuery. Il permet de s'authentifier à l'aide des identifiants par défaut de l'application.
  • messageFormat: format des messages Kafka à lire. Les valeurs acceptées sont AVRO_CONFLUENT_WIRE_FORMAT (Avro encodé par Confluent Schema Registry), AVRO_BINARY_ENCODING (Avro binaire simple) et JSON. Valeur par défaut : AVRO_CONFLUENT_WIRE_FORMAT.
  • useBigQueryDLQ: si la valeur est "true", les messages ayant échoué sont écrits dans BigQuery avec des informations d'erreur supplémentaires. La valeur par défaut est "false".

Paramètres facultatifs

  • windowDuration: durée/taille de la fenêtre dans laquelle les données seront écrites dans Cloud Storage. Les formats autorisés sont les suivants : Ns (pour les secondes, exemple : 5s), Nm (pour les minutes, exemple : 12m), Nh (pour les heures, exemple : 2h). Exemple :5m La valeur par défaut est "5m".
  • outputFilenamePrefix: préfixe à placer sur chaque fichier ciblé sur une fenêtre. Exemple :output- La valeur par défaut est : "output".
  • numShards: nombre maximal de partitions de sortie générées lors de l'écriture. Un nombre plus élevé de segments entraîne un débit plus élevé pour l'écriture dans Cloud Storage, mais potentiellement un coût d'agrégation de données plus élevé entre les partitions lors du traitement des fichiers Cloud Storage de sortie. La valeur par défaut est déterminée par Dataflow.
  • enableCommitOffsets: commit des décalages des messages traités vers Kafka. Si cette option est activée, les écarts ou le traitement en double des messages seront minimisés lors du redémarrage du pipeline. L'ID du groupe de consommateurs doit être spécifié. La valeur par défaut est "false".
  • consumerGroupId: identifiant unique du groupe de consommateurs auquel ce pipeline appartient. Obligatoire si l'option "Commit Offsets to Kafka" (Enregistrer les décalages dans Kafka) est activée. La valeur par défaut est vide.
  • kafkaReadOffset: point de départ de la lecture des messages lorsqu'il n'existe aucun décalage validé. Le premier commence au début, le dernier à partir du message le plus récent. Valeur par défaut : le plus récent.
  • kafkaReadUsernameSecretId: ID du secret Google Cloud Secret Manager contenant le nom d'utilisateur Kafka à utiliser avec l'authentification SASL_PLAIN. Par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La valeur par défaut est vide.
  • kafkaReadPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe Kafka à utiliser avec l'authentification SASL_PLAIN. Par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La valeur par défaut est vide.
  • kafkaReadKeystoreLocation: chemin d'accès Google Cloud Storage au fichier Java KeyStore (JKS) contenant le certificat TLS et la clé privée à utiliser lors de l'authentification avec le cluster Kafka. Exemple :gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: chemin d'accès Google Cloud Storage au fichier Java TrustStore (JKS) contenant les certificats approuvés à utiliser pour vérifier l'identité du courtier Kafka.
  • kafkaReadTruststorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java TrustStore (JKS) pour l'authentification TLS de Kafka (par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder à la clé privée dans le fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaFormat: format de schéma de Kafka. Peut être fourni sous la forme SINGLE_SCHEMA_FILE ou SCHEMA_REGISTRY. Si SINGLE_SCHEMA_FILE est spécifié, utilisez le schéma mentionné dans le fichier de schéma Avro pour tous les messages. Si SCHEMA_REGISTRY est spécifié, les messages peuvent avoir un seul schéma ou plusieurs. La valeur par défaut est SINGLE_schema_FILE.
  • confluentAvroSchemaPath: chemin d'accès Google Cloud Storage au fichier de schéma Avro unique utilisé pour décoder tous les messages d'un sujet. La valeur par défaut est vide.
  • schemaRegistryConnectionUrl: URL de l'instance Confluent Schema Registry utilisée pour gérer les schémas Avro pour le décodage des messages. La valeur par défaut est vide.
  • binaryAvroSchemaPath: chemin d'accès Google Cloud Storage au fichier de schéma Avro utilisé pour décoder les messages Avro encodés en binaire. La valeur par défaut est vide.
  • schemaRegistryAuthenticationMode: mode d'authentification du schéma Registry. Peut être NONE, TLS ou OAUTH. La valeur par défaut est "NONE".
  • schemaRegistryTruststoreLocation: emplacement du certificat SSL où le truststore pour l'authentification au Schema Registry est stocké. Exemple :/your-bucket/truststore.jks
  • schemaRegistryTruststorePasswordSecretId: SecretId dans Secret Manager où le mot de passe permettant d'accéder au secret dans le truststore est stocké. Exemple :projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
  • schemaRegistryKeystoreLocation: emplacement du keystore contenant le certificat SSL et la clé privée. Exemple :/your-bucket/keystore.jks
  • schemaRegistryKeystorePasswordSecretId: SecretId dans Secret Manager où le mot de passe permet d'accéder au fichier de clés (par exemple, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryKeyPasswordSecretId: SecretId du mot de passe requis pour accéder à la clé privée du client stockée dans le keystore (par exemple, projects/your-project-number/secrets/your-secret-name/versions/your-secret-version).
  • schemaRegistryOauthClientId: ID client utilisé pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT.
  • schemaRegistryOauthClientSecretId: ID du secret Google Cloud Secret Manager contenant le secret client à utiliser pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • schemaRegistryOauthScope: champ qui spécifie la portée du jeton d'accès utilisée pour authentifier le client Schema Registry en mode OAuth. Ce champ est facultatif, car la requête peut être effectuée sans paramètre de portée. Exemple :openid
  • schemaRegistryOauthTokenEndpointUrl: URL basée sur HTTP(S) du fournisseur d'identité OAuth/OIDC utilisé pour authentifier le client Schema Registry en mode OAUTH. Obligatoire pour le format de message AVRO_CONFLUENT_WIRE_FORMAT.
  • outputDeadletterTable: nom complet de la table BigQuery pour les messages ayant échoué. Les messages n'ayant pas pu atteindre la table de sortie pour différentes raisons (par exemple, schéma non concordant ou format JSON non valide) sont écrits dans cette table. La table sera créée par le modèle. Exemple :your-project-id:your-dataset.your-table-name

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Kafka to Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table Cloud Storage.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont fournis, vous devez échapper les virgules. Consultez gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit comporter le numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, vous devez échapper les virgules. Consultez gcloud topic escaping.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table Cloud Storage.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont fournis, vous devez échapper les virgules. Consultez gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit comporter le numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, vous devez échapper les virgules. Consultez gcloud topic escaping.

Pour en savoir plus, consultez la page Écrire des données de Kafka vers Cloud Storage avec Dataflow.

Java
/*
 * Copyright (C) 2024 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueue;
import com.google.cloud.teleport.v2.kafka.dlq.BigQueryDeadLetterQueueOptions;
import com.google.cloud.teleport.v2.kafka.options.KafkaReadOptions;
import com.google.cloud.teleport.v2.kafka.options.SchemaRegistryOptions;
import com.google.cloud.teleport.v2.kafka.transforms.KafkaTransform;
import com.google.cloud.teleport.v2.kafka.utils.KafkaConfig;
import com.google.cloud.teleport.v2.kafka.utils.KafkaTopicUtils;
import com.google.cloud.teleport.v2.transforms.WriteTransform;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PCollection;

@Template(
    name = "Kafka_to_Gcs_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Kafka to Cloud Storage",
    description =
        "A streaming pipeline which ingests data from Kafka and writes to a pre-existing Cloud"
            + " Storage bucket with a variety of file types.",
    optionsClass = KafkaToGcsFlex.KafkaToGcsOptions.class,
    flexContainerName = "kafka-to-gcs-flex",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The output Google Cloud Storage directory must exist."})
public class KafkaToGcsFlex {

  public interface KafkaToGcsOptions
      extends PipelineOptions,
          DataflowPipelineOptions,
          KafkaReadOptions,
          SchemaRegistryOptions,
          BigQueryDeadLetterQueueOptions {

    // This is a duplicate option that already exist in KafkaReadOptions but keeping it here
    // so the KafkaTopic appears above the authentication enum on the Templates UI.
    @TemplateParameter.KafkaReadTopic(
        order = 1,
        name = "readBootstrapServerAndTopic",
        groupName = "Source",
        description = "Source Kafka Topic",
        helpText = "Kafka Topic to read the input from.")
    String getReadBootstrapServerAndTopic();

    void setReadBootstrapServerAndTopic(String value);

    @TemplateParameter.Duration(
        order = 20,
        optional = true,
        groupName = "Destination",
        description = "Window duration",
        helpText =
            "The window duration/size in which data will be written to Cloud Storage. Allowed formats are: Ns (for "
                + "seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h).",
        example = "5m")
    @Default.String("5m")
    String getWindowDuration();

    void setWindowDuration(String windowDuration);

    @TemplateParameter.GcsWriteFolder(
        order = 21,
        groupName = "Destination",
        description = "Output file directory in Cloud Storage",
        helpText = "The path and filename prefix for writing output files. Must end with a slash.",
        example = "gs://your-bucket/your-path/")
    String getOutputDirectory();

    void setOutputDirectory(String outputDirectory);

    @TemplateParameter.Text(
        order = 22,
        optional = true,
        groupName = "Destination",
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file.",
        example = "output-")
    @Default.String("output")
    String getOutputFilenamePrefix();

    void setOutputFilenamePrefix(String outputFilenamePrefix);

    @TemplateParameter.Integer(
        order = 23,
        optional = true,
        description = "Maximum output shards",
        groupName = "Destination",
        helpText =
            "The maximum number of output shards produced when writing. A higher number of "
                + "shards means higher throughput for writing to Cloud Storage, but potentially higher "
                + "data aggregation cost across shards when processing output Cloud Storage files. "
                + "Default value is decided by Dataflow.")
    @Default.Integer(0)
    Integer getNumShards();

    void setNumShards(Integer numShards);
  }

  public static PipelineResult run(KafkaToGcsOptions options) throws Exception {
    // Create the Pipeline
    Pipeline pipeline = Pipeline.create(options);
    String bootstrapServes;
    List<String> topicsList;
    if (options.getReadBootstrapServerAndTopic() != null) {
      List<String> bootstrapServerAndTopicList =
          KafkaTopicUtils.getBootstrapServerAndTopic(
              options.getReadBootstrapServerAndTopic(), options.getProject());
      topicsList = List.of(bootstrapServerAndTopicList.get(1));
      bootstrapServes = bootstrapServerAndTopicList.get(0);
    } else {
      throw new IllegalArgumentException(
          "Please provide a valid bootstrap server which matches `[,:a-zA-Z0-9._-]+` and a topic which matches `[,a-zA-Z0-9._-]+`");
    }

    options.setStreaming(true);

    Map<String, Object> kafkaConfig = new HashMap<>(KafkaConfig.fromReadOptions(options));

    // Configure dead letter queue params
    ErrorHandler<BadRecord, ?> errorHandler = new ErrorHandler.DefaultErrorHandler<>();
    // Throwing Router throws the error instead of sending it to the DLQ. This will be the case
    // when no DLQ is configured and the pipeline will retry the failed error.
    BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;

    if (options.getUseBigQueryDLQ()) {
      if (options.getOutputDeadletterTable() == null
          || options.getOutputDeadletterTable().isBlank()) {
        throw new IllegalArgumentException(
            "Please provide a Fully Qualified BigQuery table name when BigQuery Dead Letter"
                + "Queue is enabled");
      }
      badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
      errorHandler =
          pipeline.registerBadRecordErrorHandler(
              BigQueryDeadLetterQueue.newBuilder()
                  .setTableName(options.getOutputDeadletterTable())
                  .build());
    }

    PCollection<KafkaRecord<byte[], byte[]>> kafkaRecord;
    // Step 1: Read from Kafka as bytes.
    KafkaIO.Read<byte[], byte[]> kafkaTransform =
        KafkaTransform.readBytesFromKafka(
            bootstrapServes, topicsList, kafkaConfig, options.getEnableCommitOffsets());
    kafkaRecord = pipeline.apply(kafkaTransform);

    kafkaRecord.apply(
        WriteTransform.newBuilder()
            .setOptions(options)
            .setBadRecordErrorHandler(errorHandler)
            .setBadRecordRouter(badRecordRouter)
            .build());
    if (options.getUseBigQueryDLQ()) {
      errorHandler.close();
    }
    return pipeline.run();
  }

  public static void main(String[] args) throws Exception {
    KafkaToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaToGcsOptions.class);

    run(options);
  }
}

Étape suivante