Modèle Apache Kafka vers BigQuery

Le modèle Apache Kafka vers BigQuery est un pipeline de streaming qui ingère les données textuelles issues d'Apache Kafka, exécute une fonction définie par l'utilisateur et génère les enregistrements obtenus dans BigQuery. Les erreurs survenant lors de la transformation des données, de l'exécution de la fonction définie par l'utilisateur ou de l'insertion dans la table de sortie sont enregistrées dans une table d'erreurs distincte dans BigQuery. Si la table d'erreurs n'existe pas avant l'exécution, elle est créée.

Conditions requises pour ce pipeline

  • La table BigQuery de sortie doit exister.
  • Le serveur de courtiers Apache Kafka doit être en cours d'exécution et joignable depuis les machines de nœud de calcul Dataflow.
  • Les sujets Apache Kafka doivent exister et les messages doivent être encodés dans un format JSON valide.

Paramètres de modèle

Paramètres obligatoires

  • outputTableSpec : emplacement de la table de sortie BigQuery dans lequel écrire la sortie. Par exemple, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Selon la propriété createDisposition spécifiée, la table de sortie peut être créée automatiquement à l'aide du schéma Avro fourni par l'utilisateur.

Paramètres facultatifs

  • bootstrapServers : adresse hôte des serveurs de courtiers Apache Kafka en cours d'exécution dans une liste d'éléments séparés par une virgule. Chaque adresse d'hôte doit être au format 35.70.252.199:9092. (par exemple, localhost:9092,127.0.0.1:9093).
  • inputTopics : Apache Kafka à lire dans une liste d'éléments séparés par une virgule. (exemple : topic1,topic2).
  • outputDeadletterTable : table BigQuery pour les messages ayant échoué. Les messages n'ayant pas pu atteindre la table de sortie pour différentes raisons (par exemple, schéma non concordant ou format JSON non valide) sont écrits dans cette table. Si elle n'existe pas, elle sera créée lors de l'exécution du pipeline. Si ce paramètre n'est pas spécifié, "outputTableSpec_error_records" est utilisé à la place. (Par exemple : your-project-id:your-dataset.your-table-name).
  • messageFormat : format du message. Il peut s'agir de AVRO ou JSON. La valeur par défaut est JSON.
  • avroSchemaPath : chemin d'accès Cloud Storage au fichier de schéma Avro. Par exemple, gs://MyBucket/file.avsc.
  • useStorageWriteApiAtLeastOnce : Ce paramètre ne prend effet que si l'option "Utiliser l'API BigQuery Storage Write" est activée. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. La valeur par défaut est "false".
  • readBootstrapServers : liste des serveurs d'amorçage Kafka, séparés par une virgule. (par exemple, localhost:9092,127.0.0.1:9093).
  • kafkaReadTopics : sujet(s) Kafka à partir du ou desquels lire l'entrée. (exemple : topic1,topic2).
  • kafkaReadOffset : décalage Kafka à lire. La valeur par défaut est la plus récente.
  • kafkaReadUsernameSecretId : ID du secret Secret Manager pour le nom d'utilisateur SASL_PLAIN. Doit être au format projects/{project}/secrets/{secret}/versions/{secret_version}. (par exemple, projects/votre-id-projet/secrets/votre-secret/versions/votre-version-secret).
  • kafkaReadPasswordSecretId : ID du secret Secret Manager pour le mot de passe SASL_PLAIN. Doit être au format projects/{project}/secrets/{secret}/versions/{version_secret} (Exemple: projects/votre-id-projet/secrets/votre-secret/versions/votre-version-secret).
  • javascriptTextTransformGcsPath : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. (Exemple: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes : spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.
  • writeDisposition : valeur BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Par exemple, WRITE_APPEND, WRITE_EMPTY ou WRITE_TRUNCATE. La valeur par défaut est WRITE_APPEND.
  • createDisposition : valeur CreateDisposition de BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Par exemple, CREATE_IF_NEEDED et CREATE_NEVER. La valeur par défaut est CREATE_IF_NEEDED.
  • useStorageWriteApi : si la valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : valeur de l'enregistrement Kafka, sérialisée en tant que chaîne JSON.
  • Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Kafka to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement par flux de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont proposés, veuillez suivre les instructions expliquant comment échapper les virgules.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, veuillez suivre les instructions expliquant comment échapper les virgules.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • KAFKA_TOPICS : liste des sujets Apache Kakfa. Si plusieurs sujets sont proposés, veuillez suivre les instructions expliquant comment échapper les virgules.
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • KAFKA_SERVER_ADDRESSES : liste d'adresses IP du serveur de courtiers Apache Kafka. Chaque adresse IP doit être associée au numéro de port à partir duquel le serveur est accessible. Exemple : 35.70.252.199:9092. Si plusieurs adresses sont fournies, veuillez suivre les instructions expliquant comment échapper les virgules.

Pour en savoir plus, consultez la page Écrire des données de Kafka vers BigQuery avec Dataflow.

Étapes suivantes