Ce document explique comment intégrer Apache Kafka et Pub/Sub à l'aide du connecteur Kafka du groupe Pub/Sub.
À propos du connecteur Kafka du groupe Pub/Sub
Apache Kafka est une plate-forme Open Source pour les événements de streaming. Il est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Pub/Sub est un service géré permettant d'envoyer et de recevoir des messages de manière asynchrone. Comme avec Kafka, vous pouvez utiliser Pub/Sub pour communiquer entre les composants de votre architecture cloud.
Le connecteur Kafka du groupe Pub/Sub vous permet d'intégrer ces deux systèmes. Le fichier JAR du connecteur contient les connecteurs suivants:
- Le connecteur de récepteur lit les enregistrements d'un ou plusieurs sujets Kafka et les publie dans Pub/Sub.
- Le connecteur source lit les messages d'un sujet Pub/Sub et les publie dans Kafka.
Voici quelques scénarios dans lesquels vous pouvez utiliser le connecteur Kafka du groupe Pub/Sub:
- Vous migrez une architecture basée sur Kafka vers Google Cloud.
- Vous disposez d'un système d'interface qui stocke les événements dans Kafka en dehors de Google Cloud, mais vous utilisez également Google Cloud pour exécuter certains de vos services de backend, qui doivent recevoir les événements Kafka.
- Vous collectez les journaux à partir d'une solution Kafka sur site et les envoyez à Google Cloud pour l'analyse de données.
- Vous disposez d'un système d'interface qui utilise Google Cloud, mais vous stockez également des données sur site à l'aide de Kafka.
Le connecteur nécessite Kafka Connect, un framework permettant de diffuser des données en flux continu entre Kafka et d'autres systèmes. Pour utiliser le connecteur, vous devez exécuter Kafka Connect en même temps que votre cluster Kafka.
Dans ce document, nous partons du principe que vous connaissez bien Kafka et Pub/Sub. Avant de lire ce document, il est recommandé de suivre l'un des guides de démarrage rapide de Pub/Sub.
Premiers pas avec le connecteur
Cette section décrit les tâches suivantes:- Configurez le connecteur Kafka du groupe Pub/Sub.
- Envoyer des événements de Kafka vers Pub/Sub
- Envoyez des messages de Pub/Sub à Kafka.
Prerequisites
Installer Kafka
Suivez le guide de démarrage rapide d'Apache Kafka pour installer un seul nœud Kafka sur votre ordinateur local. Effectuez les étapes suivantes dans le guide de démarrage rapide:
- Téléchargez la dernière version de Kafka et extrayez-la.
- Démarrez l'environnement Kafka.
- Créez un sujet Kafka.
Authentifier
Le connecteur Kafka du groupe Pub/Sub doit s'authentifier auprès de Pub/Sub afin d'envoyer et de recevoir des messages Pub/Sub. Pour configurer l'authentification, procédez comme suit:
- Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Cloud :
gcloud projects create PROJECT_ID
-
Sélectionnez le projet Cloud que vous avez créé :
gcloud config set project PROJECT_ID
-
-
Créez des identifiants d'authentification pour votre compte Google :
gcloud auth application-default login
-
Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- en remplaçant
PROJECT_ID
par l'ID de votre projet : - Remplacez
EMAIL_ADDRESS
par votre adresse e-mail. - Remplacez
ROLE
par chaque rôle individuel.
- en remplaçant
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Cloud :
gcloud projects create PROJECT_ID
-
Sélectionnez le projet Cloud que vous avez créé :
gcloud config set project PROJECT_ID
-
-
Créez des identifiants d'authentification pour votre compte Google :
gcloud auth application-default login
-
Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- en remplaçant
PROJECT_ID
par l'ID de votre projet : - Remplacez
EMAIL_ADDRESS
par votre adresse e-mail. - Remplacez
ROLE
par chaque rôle individuel.
- en remplaçant
Télécharger le fichier JAR du connecteur
Téléchargez le fichier JAR du connecteur sur votre ordinateur local. Pour en savoir plus, consultez la section Acquérir le connecteur dans le readme GitHub.
Copier les fichiers de configuration du connecteur
Clonez ou téléchargez le dépôt GitHub du connecteur.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copiez le contenu du répertoire
config
dans le sous-répertoireconfig
de votre installation Kafka.cp config/* [path to Kafka installation]/config/
Ces fichiers contiennent les paramètres de configuration du connecteur.
Mettre à jour votre configuration Kafka Connect
- Accédez à votre répertoire Kafka.
- Ouvrez le fichier
config/connect-standalone.properties
dans un éditeur de texte. - Si l'élément
plugin.path property
est commenté, annulez la mise en commentaire. Mettez à jour
plugin.path property
pour inclure le chemin d'accès au fichier JAR du connecteur.Exemple :
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Définissez la propriété
offset.storage.file.filename
sur un nom de fichier local. En mode autonome, Kafka utilise ce fichier pour stocker les données de décalage.Exemple :
offset.storage.file.filename=/tmp/connect.offsets
Transférer des événements de Kafka vers Pub/Sub
Cette section explique comment démarrer le connecteur de récepteur, publier des événements sur Kafka, puis lire les messages transférés à partir de Pub/Sub.
Utilisez Google Cloud CLI pour créer un sujet Pub/Sub avec un abonnement.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Remplacez les éléments suivants :
- PUBSUB_TOPIC: nom d'un sujet Pub/Sub pour recevoir les messages de Kafka.
- PUBSUB_SUBSCRIPTION: nom d'un abonnement Pub/Sub associé au sujet.
Ouvrez le fichier
/config/cps-sink-connector.properties
dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées"TODO"
dans les commentaires:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Remplacez les éléments suivants :
- KAFKA_TOPICS: liste de sujets Kafka séparés par une virgule.
- PROJECT_ID: projet Google Cloud qui contient votre sujet Pub/Sub.
- PUBSUB_TOPIC: sujet Pub/Sub qui recevra les messages de Kafka.
Depuis le répertoire Kafka, exécutez la commande suivante:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour écrire des événements dans votre sujet Kafka.
Utilisez gcloud CLI pour lire les événements à partir de Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Transférer des messages de Pub/Sub vers Kafka
Cette section explique comment démarrer le connecteur source, publier des messages dans Pub/Sub et lire les messages transférés depuis Kafka.
Utilisez gcloud CLI pour créer un sujet Pub/Sub avec un abonnement.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Remplacez les éléments suivants :
- PUBSUB_TOPIC: nom d'un sujet Pub/Sub.
- PUBSUB_SUBSCRIPTION: nom d'un abonnement Pub/Sub.
Ouvrez le fichier nommé
/config/cps-source-connector.properties
dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées"TODO"
dans les commentaires:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Remplacez les éléments suivants :
- KAFKA_TOPIC: sujets Kafka pour recevoir les messages Pub/Sub.
- PROJECT_ID: projet Google Cloud qui contient votre sujet Pub/Sub.
- PUBSUB_TOPIC: sujet Pub/Sub.
Depuis le répertoire Kafka, exécutez la commande suivante:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Utilisez gcloud CLI pour publier un message sur Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Lisez le message de Kafka. Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour lire les messages du sujet Kafka.
Conversion par SMS
Un enregistrement Kafka contient une clé et une valeur, qui sont des tableaux d'octets de longueur variable. Un enregistrement Kafka peut également contenir des en-têtes, qui sont des paires clé/valeur. Un message Pub/Sub est composé de deux parties principales: le corps du message et zéro ou plusieurs attributs clé-valeur.
Kafka Connect utilise des convertisseurs pour sérialiser les clés et les valeurs depuis et vers Kafka. Pour contrôler la sérialisation, définissez les propriétés suivantes dans les fichiers de configuration du connecteur:
key.converter
: convertisseur utilisé pour sérialiser les clés d'enregistrement.value.converter
: convertisseur utilisé pour sérialiser les valeurs d'enregistrement.
Le corps d'un message Pub/Sub est un objet ByteString
. La conversion la plus efficace consiste donc à copier directement la charge utile. Pour cette raison, nous vous recommandons d'utiliser un convertisseur qui produit des types de données primitifs (entier, flottant, chaîne ou schéma d'octets) dans la mesure du possible, afin d'empêcher la désérialisation et la sérialisation du même corps de message.
Conversion de Kafka vers Pub/Sub
Le connecteur de récepteur convertit les enregistrements Kafka en messages Pub/Sub comme suit:
- La clé d'enregistrement Kafka est stockée sous la forme d'un attribut nommé
"key"
dans le message Pub/Sub. - Par défaut, le connecteur supprime les en-têtes de l'enregistrement Kafka. Toutefois, si vous définissez l'option de configuration
headers.publish
surtrue
, le connecteur écrit les en-têtes en tant qu'attributs Pub/Sub. Le connecteur ignore les en-têtes qui dépassent les limites applicables aux attributs de message Pub/Sub. - Pour les schémas d'entiers, de valeurs flottantes, de chaîne et d'octets, le connecteur transmet les octets de la valeur de l'enregistrement Kafka directement dans le corps du message Pub/Sub.
- Pour les schémas de structure, le connecteur écrit chaque champ en tant qu'attribut du message Pub/Sub. Par exemple, si le champ est
{ "id"=123 }
, le message Pub/Sub qui en résulte possède un attribut"id"="123"
. La valeur du champ est toujours convertie en chaîne. - Pour les schémas de mappage, le connecteur écrit chaque paire clé/valeur en tant qu'attribut du message Pub/Sub. Par exemple, si le mappage est
{"alice"=1,"bob"=2}
, le message Pub/Sub obtenu comporte deux attributs,"alice"="1"
et"bob"="2"
. Les clés et les valeurs sont converties en chaînes.
Les schémas de structure et de carte ont des comportements supplémentaires:
Vous pouvez éventuellement spécifier un champ de structure ou une clé de mappage particulière en tant que corps du message, en définissant la propriété de configuration
messageBodyName
. La valeur du champ ou de la clé est stockée en tant queByteString
dans le corps du message. Si vous ne définissez pasmessageBodyName
, le corps du message est vide pour les schémas de structure et de mappage.Pour les valeurs de tableau, le connecteur n'accepte que les types de tableaux primitifs. La séquence de valeurs du tableau est concaténée dans un seul objet
ByteString
.
Conversion de Pub/Sub vers Kafka
Le connecteur source convertit les messages Pub/Sub en enregistrements Kafka comme suit:
Clé d'enregistrement Kafka: par défaut, la clé est définie sur
null
. Si vous le souhaitez, vous pouvez spécifier un attribut de message Pub/Sub à utiliser comme clé, en définissant l'option de configurationkafka.key.attribute
. Dans ce cas, le connecteur recherche un attribut portant ce nom et définit la clé d'enregistrement sur la valeur de l'attribut. Si l'attribut spécifié n'est pas présent, la clé d'enregistrement est définie surnull
.Valeur d'enregistrement Kafka. Le connecteur écrit la valeur d'enregistrement comme suit:
Si le message Pub/Sub ne comporte aucun attribut personnalisé, le connecteur écrit le corps du message Pub/Sub directement dans la valeur de l'enregistrement Kafka en tant que type
byte[]
, à l'aide du convertisseur spécifié parvalue.converter
.Si le message Pub/Sub comporte des attributs personnalisés et que
kafka.record.headers
est défini surfalse
, le connecteur écrit une structure dans la valeur d'enregistrement. La structure contient un champ pour chaque attribut et un champ nommé"message"
, dont la valeur est le corps du message Pub/Sub (stocké en octets):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Dans ce cas, vous devez utiliser un
value.converter
compatible avec les schémasstruct
, tel queorg.apache.kafka.connect.json.JsonConverter
.Si le message Pub/Sub comporte des attributs personnalisés et que
kafka.record.headers
est défini surtrue
, le connecteur les écrit en tant qu'en-têtes d'enregistrement Kafka. Elle écrit le corps du message Pub/Sub directement dans la valeur de l'enregistrement Kafka en tant que typebyte[]
, à l'aide du convertisseur spécifié parvalue.converter
.
En-têtes d'enregistrement Kafka. Par défaut, les en-têtes sont vides, sauf si vous définissez
kafka.record.headers
surtrue
.
Options de configuration
En plus des configurations fournies par l'API Kafka Connect, le connecteur Kafka du groupe Pub/Sub est compatible avec les configurations suivantes.
Options de configuration du connecteur de récepteur
Le connecteur de récepteur accepte les options de configuration suivantes.
Paramètre | Type de données | Description |
---|---|---|
connector.class |
String |
Obligatoire. Classe Java du connecteur. La valeur du connecteur de récepteur Pub/Sub doit être com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
Point de terminaison Pub/Sub à utiliser. Valeur par défaut : |
cps.project |
String |
Obligatoire. Google Cloud contenant le sujet Pub/Sub. |
cps.topic |
String |
Obligatoire. Sujet Pub/Sub dans lequel publier les enregistrements Kafka. |
gcp.credentials.file.path |
String |
Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite. |
gcp.credentials.json |
String |
Facultatif. Un blob JSON contenant Google Cloud pour authentifier Pub/Sub Lite. |
headers.publish |
Boolean |
Lorsque la valeur est Valeur par défaut : |
maxBufferBytes |
Long |
Nombre maximal d'octets à recevoir sur une partition du sujet Kafka avant de les publier dans Pub/Sub. Valeur par défaut: 10000000. |
maxBufferSize |
Integer |
Nombre maximal d'enregistrements à recevoir sur une partition du sujet Kafka avant de les publier dans Pub/Sub. Valeur par défaut: 100. |
maxDelayThresholdMs |
Integer |
Délai maximal d'attente avant d'atteindre Valeur par défaut: 100. |
maxOutstandingMessages |
Long |
Nombre maximal d'enregistrements en attente, y compris des lots incomplets et en attente, avant que l'éditeur ne bloque une autre publication. Valeur par défaut : |
maxOutstandingRequestBytes |
Long |
Nombre maximal d'octets pouvant être en attente, y compris des lots incomplets et en attente, avant que l'éditeur ne bloque une autre publication. Valeur par défaut : |
maxRequestTimeoutMs |
Integer |
Délai avant expiration des requêtes de publication individuelles vers Pub/Sub, en millisecondes. Valeur par défaut: 10 000. |
maxTotalTimeoutMs |
Integer |
Délai total, en millisecondes, pour un appel à publier dans Pub/Sub, y compris les tentatives. Valeur par défaut: 60 000. |
metadata.publish |
Boolean |
Lorsque la valeur est Valeur par défaut : |
messageBodyName |
String |
Lorsque vous utilisez un schéma de structure ou de mappage de valeur, spécifie le nom d'un champ ou d'une clé à utiliser comme corps de message Pub/Sub. Consultez la section Conversion depuis Kafka vers Pub/Sub. Valeur par défaut : |
orderingKeySource |
String |
Spécifie comment définir la clé de tri dans le message Pub/Sub. Les valeurs possibles sont les suivantes :
Valeur par défaut : |
topics |
String |
Obligatoire. Liste des sujets Kafka à lire, séparés par une virgule. |
Options de configuration du connecteur source
Le connecteur source est compatible avec les options de configuration suivantes.
Paramètre | Type de données | Description |
---|---|---|
connector.class |
String |
Obligatoire. Classe Java du connecteur. La valeur du connecteur source Pub/Sub doit être com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
Point de terminaison Pub/Sub à utiliser. Valeur par défaut : |
cps.makeOrderingKeyAttribute |
Boolean |
Lorsque Valeur par défaut : |
cps.maxBatchSize |
Integer |
Nombre maximal de messages à regrouper par requête d'extraction envoyée à Pub/Sub. Valeur par défaut: 100 |
cps.project |
String |
Obligatoire. Projet Google Cloud contenant le sujet Pub/Sub. |
cps.subscription |
String |
Obligatoire. Nom de l'abonnement Pub/Sub à partir duquel extraire les messages. |
gcp.credentials.file.path |
String |
Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite. |
gcp.credentials.json |
String |
Facultatif. Un blob JSON contenant Google Cloud pour authentifier Pub/Sub Lite. |
kafka.key.attribute |
String |
Attribut de message Pub/Sub à utiliser comme clé pour les messages publiés sur Kafka. Si la valeur est Valeur par défaut : |
kafka.partition.count |
Integer |
Nombre de partitions Kafka pour le sujet Kafka dans lequel les messages sont publiés. Ce paramètre est ignoré si le schéma de partition est Par défaut: 1. |
kafka.partition.scheme |
String |
Schéma d'attribution d'un message à une partition dans Kafka. Il peut s'agir de l'une des valeurs suivantes:
Valeur par défaut : |
kafka.record.headers |
Boolean |
Si la valeur est |
kafka.topic |
String |
Obligatoire. Le sujet Kafka qui reçoit les messages de Pub/Sub |
Assistance
Si vous avez besoin d'aide, créez une demande d'assistance. Pour les questions générales et les discussions, créez un problème dans le dépôt GitHub.
Étapes suivantes
- Comprendre les différences entre Kafka et Pub/Sub
- Apprenez-en plus sur le connecteur Kafka du groupe Pub/Sub.
- Consultez le dépôt GitHub du connecteur Kafka du groupe Pub/Sub.