Modèle Apache Kafka vers Kafka

Le modèle Apache Kafka vers Apache Kafka crée un pipeline de streaming qui ingère les données sous forme d'octets d'une source Apache Kafka, puis écrit les octets dans un récepteur Apache Kafka.

Conditions requises pour ce pipeline

  • Le sujet source Apache Kafka doit exister.
  • Les serveurs de courtiers source et de destination Apache Kafka doivent être en cours d'exécution et joignables depuis les machines de nœud de calcul Dataflow.
  • Si vous utilisez Google Cloud Managed Service pour Apache Kafka comme source ou comme récepteur, le sujet doit exister avant de lancer le modèle.

Format de message Kafka

Les messages sources Apache Kafka sont lus en tant qu'octets, et les octets sont écrits dans le récepteur Apache Kafka.

Authentification

Le modèle Apache Kafka vers Apache Kafka est compatible avec l'authentification SASL/PLAIN et TLS auprès des courtiers Kafka.

Paramètres de modèle

Paramètres obligatoires

  • readBootstrapServerAndTopic: serveur d'amorçage Kafka et sujet à partir duquel lire l'entrée. Exemple :localhost:9092;topic1,topic2
  • kafkaReadAuthenticationMode: mode d'authentification à utiliser avec le cluster Kafka. Utilisez NONE pour désactiver l'authentification, SASL_PLAIN pour le nom d'utilisateur et le mot de passe SASL/PLAIN, et TLS pour l'authentification basée sur un certificat. Apache Kafka pour BigQuery n'est compatible qu'avec le mode d'authentification SASL_PLAIN. Valeur par défaut : SASL_PLAIN.
  • writeBootstrapServerAndTopic: sujet Kafka dans lequel écrire la sortie.
  • kafkaWriteAuthenticationMethod: mode d'authentification à utiliser avec le cluster Kafka. Utilisez "NONE" pour désactiver l'authentification, "SASL_PLAIN" pour le nom d'utilisateur et le mot de passe SASL/PLAIN, et "TLS" pour l'authentification basée sur un certificat. La valeur par défaut est APPLICATION_DEFAULT_CREDENTIALS.

Paramètres facultatifs

  • enableCommitOffsets: commit des décalages des messages traités vers Kafka. Si cette option est activée, les écarts ou le traitement en double des messages seront minimisés lors du redémarrage du pipeline. L'ID du groupe de consommateurs doit être spécifié. La valeur par défaut est "false".
  • consumerGroupId: identifiant unique du groupe de consommateurs auquel ce pipeline appartient. Obligatoire si l'option "Commit Offsets to Kafka" (Enregistrer les décalages dans Kafka) est activée. La valeur par défaut est vide.
  • kafkaReadOffset: point de départ de la lecture des messages lorsqu'il n'existe aucun décalage validé. Le premier commence au début, le dernier à partir du message le plus récent. Valeur par défaut : le plus récent.
  • kafkaReadUsernameSecretId: ID du secret Google Cloud Secret Manager contenant le nom d'utilisateur Kafka à utiliser avec l'authentification SASL_PLAIN. Par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La valeur par défaut est vide.
  • kafkaReadPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe Kafka à utiliser avec l'authentification SASL_PLAIN. Par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. La valeur par défaut est vide.
  • kafkaReadKeystoreLocation: chemin d'accès Google Cloud Storage au fichier Java KeyStore (JKS) contenant le certificat TLS et la clé privée à utiliser lors de l'authentification avec le cluster Kafka. Exemple :gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: chemin d'accès Google Cloud Storage du fichier Java TrustStore (JKS) contenant les certificats approuvés à utiliser pour vérifier l'identité du courtier Kafka.
  • kafkaReadTruststorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java TrustStore (JKS) pour l'authentification TLS de Kafka (par exemple, projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>).
  • kafkaReadKeystorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder à la clé privée dans le fichier Java KeyStore (JKS) pour l'authentification TLS Kafka. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteUsernameSecretId: ID du secret Google Cloud Secret Manager contenant le nom d'utilisateur Kafka pour l'authentification SASL_PLAIN avec le cluster Kafka de destination. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> La valeur par défaut est vide.
  • kafkaWritePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe Kafka à utiliser pour l'authentification SASL_PLAIN avec le cluster Kafka de destination. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION> La valeur par défaut est vide.
  • kafkaWriteKeystoreLocation: chemin d'accès Google Cloud Storage au fichier Java KeyStore (JKS) contenant le certificat TLS et la clé privée pour l'authentification avec le cluster Kafka de destination. Exemple :gs://<BUCKET>/<KEYSTORE>.jks
  • kafkaWriteTruststoreLocation: chemin d'accès Google Cloud Storage au fichier Java TrustStore (JKS) contenant les certificats approuvés à utiliser pour vérifier l'identité de l'agent Kafka de destination.
  • kafkaWriteTruststorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder au fichier Java TrustStore (JKS) pour l'authentification TLS avec le cluster Kafka de destination. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeystorePasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe permettant d'accéder au fichier Java KeyStore (JKS) à utiliser pour l'authentification TLS avec le cluster Kafka de destination. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeyPasswordSecretId: ID de secret Google Cloud Secret Manager contenant le mot de passe à utiliser pour accéder à la clé privée dans le fichier Java KeyStore (JKS) pour l'authentification TLS avec le cluster Kafka de destination. Exemple :projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Kafka to Cloud Storage template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table Cloud Storage.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont fournis, vous devez échapper les virgules. Consultez gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit comporter le numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, vous devez échapper les virgules. Consultez gcloud topic escaping.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table Cloud Storage.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont fournis, vous devez échapper les virgules. Consultez gcloud topic escaping.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit comporter le numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, vous devez échapper les virgules. Consultez gcloud topic escaping.

Pour en savoir plus, consultez la page Écrire des données de Kafka vers Cloud Storage avec Dataflow.

Étape suivante