Ce document explique comment intégrer Apache Kafka et Pub/Sub en à l'aide du connecteur Kafka de groupe Pub/Sub.
À propos du connecteur Kafka de groupe Pub/Sub
Apache Kafka est une plate-forme Open Source de streaming d'événements. Il est généralement utilisées dans des architectures distribuées pour permettre la communication les composants couplés. Pub/Sub est un service géré permettant d'envoyer de recevoir des messages de manière asynchrone. Comme avec Kafka, vous pouvez utiliser Pub/Sub pour communiquer entre les composants de votre cloud de l'architecture.
Le connecteur Kafka de groupe Pub/Sub vous permet d'intégrer ces deux systèmes. Les connecteurs suivants sont inclus dans le fichier JAR du connecteur:
- Le connecteur de récepteur lit les enregistrements d'un ou de plusieurs sujets Kafka. et les publie sur Pub/Sub.
- Le connecteur source lit les messages d'un sujet Pub/Sub. et les publie sur Kafka.
Voici quelques scénarios dans lesquels vous pouvez utiliser le connecteur Kafka de groupe Pub/Sub:
- Vous migrez une architecture basée sur Kafka vers Google Cloud.
- Vous disposez d'un système frontend qui stocke des événements dans Kafka en dehors Google Cloud, mais vous utilisez aussi Google Cloud pour exécuter une partie de votre backend qui doivent recevoir les événements Kafka.
- Vous collectez les journaux d'une solution Kafka sur site et les envoyez Google Cloud pour l'analyse de données.
- Vous disposez d'un système frontend qui utilise Google Cloud, mais vous stockez également des données sur site à l'aide de Kafka.
Le connecteur requiert Kafka Connect qui est un framework de flux de données entre Kafka et d'autres systèmes. Pour utiliser vous devez exécuter Kafka Connect en même temps que votre cluster Kafka.
Dans ce document, nous partons du principe que vous connaissez à la fois Kafka et Pub/Sub. Avant de lire ce document, il est bon de effectuer l'une des Guides de démarrage rapide de Pub/Sub
Le connecteur Pub/Sub ne permet aucune intégration entre les LCA de Google Cloud IAM et les LCA de Kafka Connect.
Premiers pas avec le connecteur
Cette section vous guide tout au long des tâches suivantes:- Configurer le connecteur Kafka du groupe Pub/Sub
- Envoyer des événements depuis Kafka vers Pub/Sub
- Envoyer des messages depuis Pub/Sub vers Kafka
Prérequis
Installer Kafka
Suivez le Guide de démarrage rapide d'Apache Kafka pour installer un Kafka à nœud unique sur votre machine locale. Effectuez ces étapes dans dans le guide de démarrage rapide:
- Téléchargez la dernière version de Kafka et extrayez-la.
- Démarrez l'environnement Kafka.
- Créez un sujet Kafka.
Authentifier
Le connecteur Kafka du groupe Pub/Sub doit s'authentifier auprès de Pub/Sub pour envoyer et recevoir des messages Pub/Sub. Pour configurer l'authentification, procédez comme suit:
- Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- en remplaçant
PROJECT_ID
par l'ID de votre projet : - Remplacez
EMAIL_ADDRESS
par votre adresse e-mail. - Remplacez
ROLE
par chaque rôle individuel.
- en remplaçant
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- en remplaçant
PROJECT_ID
par l'ID de votre projet : - Remplacez
EMAIL_ADDRESS
par votre adresse e-mail. - Remplacez
ROLE
par chaque rôle individuel.
- en remplaçant
Télécharger le fichier JAR du connecteur
Téléchargez le fichier JAR du connecteur sur votre ordinateur local. Pour en savoir plus, consultez Procurez-vous le connecteur. dans le fichier README de GitHub.
Copier les fichiers de configuration du connecteur
Clonez ou téléchargez le Dépôt GitHub pour le connecteur.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copiez le contenu du répertoire
config
dans le sous-répertoireconfig
de votre installation Kafka.cp config/* [path to Kafka installation]/config/
Ces fichiers contiennent les paramètres de configuration du connecteur.
Mettre à jour votre configuration Kafka Connect
- Accédez au répertoire contenant le binaire Kafka Connect que vous téléchargée.
- Dans le répertoire binaire Kafka Connect, ouvrez le fichier nommé
config/connect-standalone.properties
dans un éditeur de texte. - Si l'élément
plugin.path property
est mis en commentaire, annulez la mise en commentaire. Mettez à jour
plugin.path property
pour inclure le chemin d'accès au fichier JAR du connecteur.Exemple :
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Définissez la propriété
offset.storage.file.filename
sur un nom de fichier local. Dans en mode autonome, Kafka utilise ce fichier pour stocker les données de décalage.Exemple :
offset.storage.file.filename=/tmp/connect.offsets
Transférer des événements de Kafka vers Pub/Sub
Cette section explique comment démarrer le connecteur de récepteur, publier des événements sur Kafka, puis de lire les messages transférés depuis Pub/Sub.
Utilisez la Google Cloud CLI pour créer un sujet Pub/Sub avec une abonnement.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Remplacez les éléments suivants :
- PUBSUB_TOPIC: nom d'un sujet Pub/Sub à reçoivent les messages de Kafka.
- PUBSUB_SUBSCRIPTION: nom d'un objet Pub/Sub pour le sujet.
Ouvrez le fichier
/config/cps-sink-connector.properties
dans un éditeur de texte. Ajouter des valeurs pour les propriétés suivantes, qui sont marquées"TODO"
dans commentaires:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Remplacez les éléments suivants :
- KAFKA_TOPICS: liste de sujets Kafka à lire, séparés par une virgule
- PROJECT_ID: projet Google Cloud contenant vos sujet Pub/Sub.
- PUBSUB_TOPIC: sujet Pub/Sub qui doit recevoir le de Kafka.
À partir du répertoire Kafka, exécutez la commande suivante:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Suivez les étapes décrites dans le Guide de démarrage rapide d'Apache Kafka pour écrire des événements dans votre sujet Kafka.
Utilisez la gcloud CLI pour lire les événements à partir de Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Transférer des messages de Pub/Sub vers Kafka
Cette section explique comment démarrer le connecteur source, publier des messages dans Pub/Sub et lire les messages transférés depuis Kafka.
Utilisez la gcloud CLI pour créer un sujet Pub/Sub avec une abonnement.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Remplacez les éléments suivants :
- PUBSUB_TOPIC: nom d'un sujet Pub/Sub.
- PUBSUB_SUBSCRIPTION: nom d'un objet Pub/Sub abonnement.
Ouvrez le fichier nommé
/config/cps-source-connector.properties
dans un fichier texte. éditeur. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées"TODO"
dans les commentaires:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Remplacez les éléments suivants :
- KAFKA_TOPIC: sujets Kafka qui reçoivent le les messages Pub/Sub.
- PROJECT_ID: projet Google Cloud contenant vos sujet Pub/Sub.
- PUBSUB_TOPIC: sujet Pub/Sub
À partir du répertoire Kafka, exécutez la commande suivante:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Utilisez la gcloud CLI pour publier un message sur Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Lisez le message de Kafka. Suivez les étapes décrites dans le Guide de démarrage rapide d'Apache Kafka pour lire les messages du sujet Kafka.
Conversion par SMS
Un enregistrement Kafka contient une clé et une valeur, qui sont des tableaux d'octets de longueur variable. Éventuellement, un L'enregistrement Kafka peut également avoir des en-têtes, qui sont des paires clé-valeur. A Message Pub/Sub comporte deux parties principales: le corps du message et zéro, un ou plusieurs attributs clé-valeur.
Kafka Connect utilise des convertisseurs pour sérialiser les clés et les valeurs vers et depuis Kafka. Pour contrôler la sérialisation, définissez les propriétés suivantes dans le connecteur de configuration:
key.converter
: convertisseur utilisé pour sérialiser les clés d'enregistrement.value.converter
: convertisseur utilisé pour sérialiser les valeurs d'enregistrement.
Le corps d'un message Pub/Sub est un objet ByteString
.
la conversion la plus efficace consiste
à copier directement la charge utile. C'est pourquoi nous
il est recommandé d'utiliser un convertisseur produisant des types de données primitifs (entier, nombre à virgule flottante,
ou un schéma d'octets) dans la mesure du possible, pour éviter de désérialiser et
la sérialisation du même corps de message.
Conversion de Kafka vers Pub/Sub
Le connecteur de récepteur convertit les enregistrements Kafka en messages Pub/Sub en tant que ce qui suit:
- La clé d'enregistrement Kafka est stockée en tant qu'attribut nommé
"key"
dans Message Pub/Sub. - Par défaut, le connecteur supprime tous les en-têtes de l'enregistrement Kafka. Toutefois, si
vous définissez l'option de configuration
headers.publish
surtrue
, le connecteur écrit les en-têtes en tant qu'attributs Pub/Sub. Le connecteur ignore les en-têtes qui dépassent les Pub/Sub limites sur les attributs des messages. - Pour les schémas de type entier, nombre à virgule flottante, chaîne et octets, le connecteur transmet les octets de la valeur de l'enregistrement Kafka directement dans le message Pub/Sub .
- Pour les schémas de structure, le connecteur écrit chaque champ en tant qu'attribut de la
Message Pub/Sub. Par exemple, si le champ est
{ "id"=123 }
, le message Pub/Sub résultant possède un attribut"id"="123"
. La la valeur du champ est toujours convertie en chaîne. Les types Map et STRUCT ne sont pas comme types de champs dans un struct. - Pour les schémas de mappage, le connecteur écrit chaque paire clé-valeur en tant qu'attribut de
le message Pub/Sub. Par exemple, si la carte est
{"alice"=1,"bob"=2}
, le message Pub/Sub résultant comporte deux les attributs"alice"="1"
et"bob"="2"
. Les clés et les valeurs sont converties aux chaînes.
Les schémas struct et map ont des comportements supplémentaires:
Vous pouvez éventuellement spécifier un champ de structure ou une clé de mappage spécifique en tant que dans le corps du message en définissant la propriété de configuration
messageBodyName
. La du champ ou de la clé est stockée en tant queByteString
dans le corps du message. Si vous ne définissez pasmessageBodyName
, le corps du message est vide pour struct et des schémas de carte.Pour les valeurs de tableau, le connecteur n'accepte que les types de tableaux primitifs. La séquence de valeurs du tableau est concaténée en un seul élément
ByteString
.
Conversion de Pub/Sub vers Kafka
Le connecteur source convertit les messages Pub/Sub en enregistrements Kafka. comme suit:
Clé d'enregistrement Kafka: par défaut, la clé est définie sur
null
. Vous pouvez éventuellement spécifier un attribut de message Pub/Sub à utiliser comme clé en définissant l'option de configurationkafka.key.attribute
. Dans ce cas, le connecteur recherche un attribut portant ce nom et définit la clé d'enregistrement sur le . Si l'attribut spécifié n'est pas présent, la clé d'enregistrement est définie surnull
.Valeur de l'enregistrement Kafka. Le connecteur écrit la valeur de l'enregistrement comme suit:
Si le message Pub/Sub ne comporte aucun attribut personnalisé, le connecteur écrit le corps du message Pub/Sub directement dans l'enregistrement Kafka en tant que type
byte[]
, à l'aide du convertisseur spécifié parvalue.converter
Si le message Pub/Sub comporte des attributs personnalisés
kafka.record.headers
est défini surfalse
, le connecteur écrit un struct dans le record (valeur d'enregistrement). La structure contient un champ pour chaque attribut et un champ nommée"message"
, dont la valeur est le corps du message Pub/Sub ; (stockées en octets):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Dans ce cas, vous devez utiliser un
value.converter
compatible avec Schémasstruct
, tels queorg.apache.kafka.connect.json.JsonConverter
.Si le message Pub/Sub comporte des attributs personnalisés
kafka.record.headers
est défini surtrue
, le connecteur écrit les attributs sous la forme En-têtes d'enregistrement Kafka. Il écrit le corps du message Pub/Sub. directement à la valeur de l'enregistrement Kafka en tant que typebyte[]
, à l'aide du convertisseur spécifié parvalue.converter
.
En-têtes d'enregistrement Kafka. Par défaut, les en-têtes sont vides, sauf si vous définissez De
kafka.record.headers
àtrue
.
Options de configuration
En plus des configurations fournies par l'API Kafka Connect, Le connecteur Kafka de groupe Pub/Sub est compatible avec les configurations suivantes.
Options de configuration du connecteur de récepteur
Le connecteur de récepteur accepte les options de configuration suivantes.
Paramètre | Type de données | Description |
---|---|---|
connector.class |
String |
Obligatoire. Classe Java du connecteur. Pour
connecteur de récepteur Pub/Sub, la valeur doit être
com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
|
cps.endpoint |
String |
Point de terminaison Pub/Sub à utiliser. Valeur par défaut : |
cps.project |
String |
Obligatoire. La ressource Google Cloud contenant sujet Pub/Sub. |
cps.topic |
String |
Obligatoire. Le sujet Pub/Sub à publier des enregistrements Kafka. |
gcp.credentials.file.path |
String |
Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite. |
gcp.credentials.json |
String |
Facultatif. Un blob JSON contenant Google Cloud pour pour authentifier Pub/Sub Lite. |
headers.publish |
Boolean |
Si la valeur est Valeur par défaut : |
maxBufferBytes |
Long |
Nombre maximal d'octets à recevoir sur une partition Kafka de sujet avant de les publier sur Pub/Sub. Valeur par défaut: 10 000 000. |
maxBufferSize |
Integer |
Nombre maximal d'enregistrements à recevoir sur une partition de sujet Kafka avant de les publier sur Pub/Sub. Valeur par défaut: 100. |
maxDelayThresholdMs |
Integer |
Délai d'attente maximal avant d'atteindre
Valeur par défaut: 100. |
maxOutstandingMessages |
Long |
Le nombre maximal d'enregistrements pouvant être en attente, dont : les lots incomplets et en attente, avant que l'éditeur ne bloque à nouveau publication. Valeur par défaut : |
maxOutstandingRequestBytes |
Long |
Nombre maximal total d'octets pouvant être en attente, y compris les lots incomplets et en attente, avant que l'éditeur ne bloque à nouveau publication. Valeur par défaut : |
maxRequestTimeoutMs |
Integer |
Le délai avant expiration des requêtes de publication individuelles sur Pub/Sub en millisecondes. Valeur par défaut: 10 000. |
maxTotalTimeoutMs |
Integer |
Délai total, en millisecondes, pour un appel à publier dans Pub/Sub, y compris les nouvelles tentatives. Valeur par défaut: 60 000. |
metadata.publish |
Boolean |
Si la valeur est Valeur par défaut : |
messageBodyName |
String |
Lorsque vous utilisez un schéma de valeur struct ou map, spécifie le nom d'un ou une clé à utiliser comme corps du message Pub/Sub. Voir Conversion de Kafka vers Pub/Sub. Valeur par défaut : |
orderingKeySource |
String |
Spécifie comment définir la clé de tri dans Pub/Sub . Les valeurs possibles sont les suivantes :
Valeur par défaut : |
topics |
String |
Obligatoire. Une liste de sujets Kafka séparés par une virgule à à lire. |
Options de configuration du connecteur source
Le connecteur source accepte les options de configuration suivantes.
Paramètre | Type de données | Description |
---|---|---|
connector.class |
String |
Obligatoire. Classe Java du connecteur. Pour
connecteur source Pub/Sub, la valeur doit être
com.google.pubsub.kafka.source.CloudPubSubSourceConnector
|
cps.endpoint |
String |
Point de terminaison Pub/Sub à utiliser. Valeur par défaut : |
cps.makeOrderingKeyAttribute |
Boolean |
Lorsque la valeur est Valeur par défaut : |
cps.maxBatchSize |
Integer |
Nombre maximal de messages à traiter par lot par demande d'extraction Pub/Sub. Par défaut: 100 |
cps.project |
String |
Obligatoire. Le projet Google Cloud contenant le sujet Pub/Sub. |
cps.subscription |
String |
Obligatoire. Le nom de l'Pub/Sub à partir duquel extraire les messages. |
gcp.credentials.file.path |
String |
Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite. |
gcp.credentials.json |
String |
Facultatif. Un blob JSON contenant Google Cloud pour pour authentifier Pub/Sub Lite. |
kafka.key.attribute |
String |
Attribut de message Pub/Sub à utiliser comme clé pour
les messages publiés dans Kafka. Si elle est définie sur Valeur par défaut : |
kafka.partition.count |
Integer |
Nombre de partitions Kafka pour le sujet Kafka dans lequel les messages
sont publiés. Ce paramètre est ignoré si le schéma de partition est
Par défaut : 1. |
kafka.partition.scheme |
String |
Le schéma permettant d'attribuer un message à une partition dans Kafka. Peut être l'une des valeurs suivantes:
Valeur par défaut : |
kafka.record.headers |
Boolean |
Si la valeur est |
kafka.topic |
String |
Obligatoire. Le sujet Kafka qui reçoit les messages de Pub/Sub. |
Assistance
Si vous avez besoin d'aide, créez une demande d'assistance. Pour les questions et discussions générales, créez un problème dans le dépôt GitHub.
Étape suivante
- Découvrez les différences entre Kafka et Pub/Sub.
- En savoir plus sur le connecteur Kafka de groupe Pub/Sub
- Voir le connecteur Kafka de groupe Pub/Sub Dépôt GitHub