Écrire des données de Kafka vers BigQuery avec Dataflow

Ce document fournit des conseils généraux sur la création et le déploiement d'un pipeline Dataflow qui transite les données depuis Apache Kafka vers BigQuery.

Apache Kafka est une plate-forme Open Source pour les événements de traitement par flux. Kafka est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Vous pouvez utiliser Dataflow pour lire des événements à partir de Kafka, les traiter et écrire les résultats dans une table BigQuery pour une analyse plus approfondie.

Lire des événements Kafka dans BigQuery

Google fournit un modèle Dataflow qui configure un pipeline Kafka vers BigQuery. Le modèle utilise le connecteur BigQueryIO fourni dans le SDK Apache Beam.

Pour utiliser ce modèle, procédez comme suit :

  1. Déployez Kafka dans Google Cloud ou ailleurs.
  2. Configurez la mise en réseau.
  3. Définissez les autorisations IAM (Identity and Access Management).
  4. Écrivez une fonction permettant de transformer les données d'événement.
  5. Créez la table de sortie BigQuery.
  6. Déployez le modèle Dataflow.

Déployer Kafka

Dans Google Cloud, vous pouvez déployer un cluster Kafka sur des instances de machine virtuelle (VM) Compute Engine ou utiliser un service Kafka géré tiers. Pour en savoir plus sur les options de déploiement sur Google Cloud, consultez la page Qu'est-ce que Apache Kafka ?. Vous pouvez trouver des solutions Kafka tierces sur Google Cloud Marketplace.

Vous pouvez également disposer d'un cluster Kafka existant se trouvant en dehors de Google Cloud. Par exemple, vous pouvez avoir une charge de travail existante déployée sur site ou dans un autre cloud public.

Configurer la mise en réseau

Par défaut, Dataflow lance les instances au sein de votre réseau cloud privé virtuel (VPC) par défaut. Selon votre configuration Kafka, vous devrez peut-être configurer un réseau et un sous-réseau différents pour Dataflow. Pour plus d'informations, consultez la section Spécifier un réseau et un sous-réseau. Lors de la configuration de votre réseau, créez des règles de pare-feu permettant aux machines de nœud de calcul Dataflow d'atteindre les agents Kafka.

Si vous utilisez VPC Service Controls, placez le cluster Kafka dans le périmètre VPC Service Controls, ou étendez les périmètres au VPN autorisé ou à Cloud Interconnect.

Si votre cluster Kafka est déployé en dehors de Google Cloud, vous devez créer une connexion réseau entre Dataflow et le cluster Kafka. Il existe plusieurs options de mise en réseau avec différents compromis :

Dedicated Interconnect est la meilleure option pour obtenir des performances et une fiabilité prévisibles. Toutefois, sa mise en place peut prendre plus de temps que pour les autres options, car des organisations tierces doivent approvisionner les nouveaux circuits. Avec une topologie basée sur une adresse IP publique, vous pouvez commencer rapidement à travailler, car la mise en réseau ne nécessite que peu d'efforts.

Les deux sections suivantes décrivent ces options plus en détail.

Espace d'adressage RFC 1918 partagé

Dedicated Interconnect et le VPN IPsec vous donnent tous deux un accès direct aux adresses IP RFC 1918 de votre cloud privé virtuel (VPC), ce qui peut simplifier la configuration de Kafka. Si vous utilisez une topologie basée sur un VPN, envisagez de configurer un VPN à haut débit.

Par défaut, Dataflow lance les instances sur votre réseau VPC par défaut. Dans la topologie d'un réseau privé avec des routes explicitement définies dans Cloud Router qui connectent des sous-réseaux de Google Cloud à ce cluster Kafka, vous avez besoin de contrôler davantage l'emplacement des instances Dataflow. Vous pouvez utiliser Dataflow pour configurer les paramètres d'exécution de network et subnetwork.

Assurez-vous que le sous-réseau correspondant dispose de suffisamment d'adresses IP disponibles pour permettre à Dataflow de lancer des instances lorsqu'il tente d'effectuer un scaling horizontal. De même, lorsque vous créez un réseau distinct pour le lancement de vos instances Dataflow, assurez-vous qu'une règle de pare-feu active le trafic TCP entre toutes les machines virtuelles du projet. Cette règle de pare-feu est déjà configurée sur le réseau par défaut.

Espace d'adressage IP public

Cette architecture utilise Transport Layer Security (TLS) pour sécuriser le trafic entre les clients externes et Kafka, et utilise du trafic non chiffré pour les communications entre agents. Lorsque l'écouteur Kafka se lie à une interface réseau utilisée à la fois pour la communication interne et externe, la configuration de l'écouteur s'effectue facilement. Toutefois, dans de nombreux scénarios, les adresses présentées en externe des agents Kafka du cluster diffèrent des interfaces réseau internes utilisées par Kafka. Dans de tels scénarios, vous pouvez utiliser la propriété advertised.listeners :

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Les clients externes se connectent à l'aide du port 9093 via un canal SSL, et les clients internes à l'aide du port 9092 via un canal de texte brut. Lorsque vous spécifiez une adresse sous advertised.listeners, utilisez des noms DNS (kafkabroker-n.mydomain.com, dans cet exemple) qui correspondent à la même instance pour le trafic externe et interne. L'utilisation d'adresses IP publiques peut ne pas fonctionner, car les adresses risquent de ne pas être résolues pour le trafic interne.

Définir les autorisations IAM

Les tâches Dataflow utilisent deux comptes de service IAM :

  • Le service Dataflow utilise un compte de service Dataflow pour manipuler les ressources Google Cloud, comme la création de VM.
  • Les VM de nœud de calcul Dataflow utilisent un compte de service de nœud de calcul pour accéder aux fichiers de votre pipeline et à d'autres ressources. Ce compte de service doit disposer d'un accès en écriture à la table de sortie BigQuery. Il doit également avoir accès à toutes les autres ressources auxquelles la tâche de pipeline fait référence.

Assurez-vous que ces deux comptes de service disposent des rôles appropriés. Pour en savoir plus, consultez la section Sécurité et autorisations pour Cloud Dataflow.

Transformer les données pour BigQuery

Le modèle Kafka vers BigQuery crée un pipeline qui lit les événements d'un ou de plusieurs sujets Kafka et les écrit dans une table BigQuery. Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) en JavaScript qui transforme les données d'événement avant leur écriture dans BigQuery.

La sortie du pipeline doit consister en des données au format JSON correspondant au schéma de la table de sortie. Si les données d'événement Kafka sont déjà au format JSON, vous pouvez créer une table BigQuery avec un schéma correspondant et transmettre les événements directement à BigQuery. Sinon, créez une fonction définie par l'utilisateur qui prend les données d'événement en entrée et renvoie des données JSON correspondant à votre table BigQuery.

Supposons par exemple que les données d'événement contiennent deux champs :

  • name (chaîne)
  • customer_id (entier)

La sortie du pipeline Dataflow peut ressembler à ceci :

{ "name": "Alice", "customer_id": 1234 }

En supposant que les données d'événement ne sont pas déjà au format JSON, vous devez écrire une fonction définie par l'utilisateur qui transforme les données, comme suit :

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

Cette fonction permet d'effectuer un traitement supplémentaire sur les données d'événement, telles que le filtrage des événements, la suppression des informations personnelles ou l'enrichissement des données à l'aide de champs supplémentaires.

Pour en savoir plus sur l'écriture d'une fonction définie par l'utilisateur pour le modèle, consultez la page Étendre votre modèle Dataflow avec des fonctions définies par l'utilisateur. Importez le fichier JavaScript dans Cloud Storage.

Créer la table de sortie BigQuery

Créez la table de sortie BigQuery avant d'exécuter le modèle. Le schéma de la table doit être compatible avec la sortie JSON du pipeline. Pour chaque propriété de la charge utile JSON, le pipeline écrit la valeur dans la colonne de table BigQuery du même nom. Toutes les propriétés manquantes dans le fichier JSON sont interprétées comme des valeurs NULL.

Dans l'exemple précédent, la table BigQuery contiendra les colonnes suivantes :

Nom de la colonne Type de données
name STRING
customer_id INTEGER

Vous pouvez utiliser l'instruction SQL CREATE TABLE pour créer la table :

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

Vous pouvez également spécifier le schéma de la table à l'aide d'un fichier de définition JSON. Pour en savoir plus, consultez la page Spécifier un schéma dans la documentation de BigQuery.

Exécuter la tâche Dataflow

Après avoir créé la table BigQuery, exécutez le modèle Dataflow.

Console

Pour créer la tâche Dataflow à l'aide de la console Google Cloud, procédez comme suit :

  1. Accédez à la page Dataflow dans la console Google Cloud.
  2. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
  3. Dans le champ Nom du job, saisissez un nom de job.
  4. Pour Point de terminaison régional, sélectionnez une région.
  5. Sélectionnez le modèle "Kafka vers BigQuery".
  6. Sous Paramètres obligatoires, saisissez le nom de la table de sortie BigQuery. La table doit déjà exister et posséder un schéma valide.
  7. Cliquez sur Afficher les paramètres facultatifs et saisissez des valeurs pour au moins les paramètres suivants :

    • Sujet Kafka à partir duquel lire l'entrée.
    • Liste des serveurs d'amorçage Kafka, séparés par une virgule.
    • Adresse e-mail d'un compte de service.

    Saisissez des paramètres supplémentaires si nécessaire. Vous devrez peut-être spécifier les éléments suivants :

    • Mise en réseau : pour utiliser un réseau VPC autre que le réseau par défaut, spécifiez le réseau et le sous-réseau.
    • UDF : pour utiliser une fonction JavaScript définie par l'utilisateur, spécifiez l'emplacement Cloud Storage du script et le nom de la fonction JavaScript à appeler.

gcloud

Pour créer la tâche Dataflow à l'aide de Google Cloud CLI, exécutez la commande suivante :

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

Remplacez les variables suivantes :

  • JOB_NAME : nom de la tâche de votre choix
  • LOCATION : région dans laquelle la tâche doit être exécutée. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.
  • KAFKA_TOPICS : liste de sujets Kafka à lire, séparés par une virgule.
  • BOOTSTRAP_SERVERS : liste de serveurs d'amorçage Kafka, séparés par une virgule. Exemple : 127:9092,127.0.0.1:9093.
  • OUTPUT_TABLE : table de sortie BigQuery, spécifiée en tant que PROJECT_ID:DATASET_NAME.TABLE_NAME. Exemple : my_project:dataset1.table1.
  • IAM_SERVICE_ACCOUNT Facultatif. Adresse e-mail du compte de service associée à l'exécution de la tâche.
  • UDF_SCRIPT_PATH Facultatif. Chemin d'accès Cloud Storage à un fichier JavaScript contenant une fonction définie par l'utilisateur. Exemple : gs://your-bucket/your-function.js
  • UDF_FUNCTION_NAME Facultatif. Nom de la fonction JavaScript à appeler en tant que fonction définie par l'utilisateur.
  • VPC_NETWORK_NAME Facultatif. Réseau auquel les nœuds de calcul seront affectés.
  • SUBNET_NAME Facultatif. Sous-réseau auquel les nœuds de calcul seront affectés.

Types de données

Cette section explique comment gérer différents types de données dans le schéma de la table BigQuery.

En interne, les messages JSON sont convertis en objets TableRow et les valeurs des champs TableRow sont traduites en types BigQuery.

Types scalaires

L'exemple suivant permet de créer une table BigQuery avec différents types de données scalaires, y compris des types de chaîne, numériques, booléens, de date/heure, d'intervalle et de géographie :

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

Voici une charge utile JSON avec des champs compatibles :

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

Remarques :

  • Pour une colonne TIMESTAMP, vous pouvez utiliser la méthode JavaScript Date.toJSON pour mettre en forme la valeur.
  • Pour la colonne GEOGRAPHY, vous pouvez spécifier la zone géographique à l'aide de texte connu (WKT) ou de GeoJSON, mis en forme en tant que chaîne. Pour en savoir plus, consultez la section Charger des données géospatiales.

Pour en savoir plus concernant les types de données dans BigQuery, consultez la page Types de données.

Tableaux

Vous pouvez stocker un tableau dans BigQuery à l'aide du type de données ARRAY. Dans l'exemple suivant, la charge utile JSON contient une propriété nommée scores dont la valeur est un tableau JSON :

{"name":"Emily","scores":[10,7,10,9]}

L'instruction SQL CREATE TABLE suivante crée une table BigQuery avec un schéma compatible :

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

La table obtenue ressemble à ceci :

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

Structures

Le type de données STRUCT dans BigQuery contient une liste numérotée de champs nommés. Vous pouvez utiliser un STRUCT pour conserver des objets JSON qui suivent un schéma cohérent.

Dans l'exemple suivant, la charge utile JSON contient une propriété nommée val dont la valeur est un objet JSON :

{"name":"Emily","val":{"a":"yes","b":"no"}}

L'instruction SQL CREATE TABLE suivante crée une table BigQuery avec un schéma compatible :

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

La table obtenue ressemble à ceci :

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Données d'événement semi-structurées

Si les données d'événement Kafka ne suivent pas un schéma strict, envisagez de les stocker dans BigQuery en tant que type de données JSON (Preview). En stockant des données JSON en tant que type de données JSON, vous n'avez pas besoin de définir le schéma d'événement à l'avance. Après l'ingestion des données, vous pouvez interroger la table de sortie en utilisant l'accès aux champs (notation par points) et les opérateurs d'accès aux tableaux.

Commencez par créer une table avec une colonne JSON :

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

Définissez ensuite une fonction JavaScript définie par l'utilisateur qui encapsule la charge utile de l'événement dans un objet JSON :

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

Une fois les données écrites dans BigQuery, vous pouvez interroger les champs individuels à l'aide de l'opérateur d'accès aux champs. Par exemple, la requête suivante renvoie la valeur du champ name pour chaque enregistrement :

SELECT event_data.name FROM my_dataset1.kafka_events;

Pour en savoir plus sur l'utilisation de JSON dans BigQuery, consultez la page Utiliser des données JSON en langage SQL standard Google.

Erreurs et journalisation

Vous pouvez rencontrer des erreurs relatives à l'exécution du pipeline, ou des erreurs lors du traitement des événements Kafka individuels.

Pour en savoir plus sur la gestion des erreurs de pipeline, consultez la page Dépannage et débogage des pipelines.

Si la tâche s'exécute correctement, mais qu'une erreur se produit lors du traitement d'un événement Kafka individuel, la tâche de pipeline écrit un enregistrement d'erreur dans une table dans BigQuery. La tâche elle-même n'échoue pas et l'erreur au niveau de l'événement n'apparaît pas comme une erreur dans le journal de tâche Dataflow.

La tâche de pipeline crée automatiquement la table de façon à conserver les enregistrements d'erreurs. Par défaut, le nom de la table est "output_table_error_records", où output_table correspond au nom de la table de sortie. Par exemple, si la table de sortie est nommée kafka_events, la table d'erreur est nommée kafka_events_error_records. Vous pouvez spécifier un autre nom en définissant le paramètre de modèle outputDeadletterTable :

outputDeadletterTable=my_project:dataset1.errors_table

Voici les erreurs possibles :

  • Erreurs de sérialisation, y compris un format JSON mal formaté.
  • Erreurs de conversion de type, causées par une incohérence dans le schéma de la table et les données JSON.
  • Champs supplémentaires dans les données JSON qui ne sont pas présents dans le schéma de la table.

Exemples de messages d'erreur :

Type d'erreur Données d'événement errorMessage
Erreur de sérialisation "Hello world" Échec de la sérialisation de JSON vers la ligne de table : "Hello world"
Erreur de conversion du type {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Champ inconnu {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Étapes suivantes