Questo documento descrive come integrare Apache Kafka e Pub/Sub utilizzando il connettore Kafka per gruppi Pub/Sub.
Informazioni sul connettore Kafka per gruppi Pub/Sub
Apache Kafka è una piattaforma open source per lo streaming di eventi. È spesso utilizzato nelle architetture distribuite per abilitare la comunicazione tra componenti a basso accoppiamento. Pub/Sub è un servizio gestito per l'invio e la ricezione di messaggi in modo asincrono. Come per Kafka, puoi utilizzare Pub/Sub per comunicare tra i componenti della tua architettura cloud.
Il connettore Kafka per gruppi Pub/Sub ti consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel file JAR del connettore:
- Il connettore sink legge i record da uno o più argomenti Kafka e li pubblica su Pub/Sub.
- Il connettore di origine legge i messaggi da un argomento Pub/Sub e li pubblica in Kafka.
Di seguito sono riportati alcuni scenari in cui potresti utilizzare il connettore Kafka di gruppo Pub/Sub:
- Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
- Hai un sistema frontend che archivia gli eventi in Kafka al di fuori di Google Cloud, ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi servizi di backend, che devono ricevere gli eventi Kafka.
- Raccogli i log da una soluzione Kafka on-premise e li invii a Google Cloud per l'analisi dei dati.
- Hai un sistema frontend che utilizza Google Cloud, ma archivi anche i dati on-premise utilizzando Kafka.
Il connettore richiede Kafka Connect, un framework per lo streaming di dati tra Kafka e altri sistemi. Per utilizzare il connettore, devi eseguire Kafka Connect insieme al tuo cluster Kafka.
Questo documento presuppone che tu abbia familiarità sia con Kafka sia con Pub/Sub. Prima di leggere questo documento, ti consigliamo di completare una delle guide rapide di Pub/Sub.
Il connettore Pub/Sub non supporta alcuna integrazione tra le ACL di Google Cloud IAM e Kafka Connect.
Inizia a utilizzare il connettore
In questa sezione vengono illustrate le seguenti attività:- Configura il connettore Kafka di gruppo Pub/Sub.
- Invia gli eventi da Kafka a Pub/Sub.
- Invia messaggi da Pub/Sub a Kafka.
Prerequisiti
Installa Kafka
Segui la guida rapida di Apache Kafka per installare un singolo nodo Kafka sulla tua macchina locale. Completa i seguenti passaggi nella guida introduttiva:
- Scarica la release Kafka più recente ed estraila.
- Avvia l'ambiente Kafka.
- Crea un argomento Kafka.
Autentica
Il connettore Kafka per gruppi Pub/Sub deve autenticarsi con Pub/Sub per poter inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, segui questi passaggi:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Scaricare il file JAR del connettore
Scarica il file JAR del connettore nella macchina locale. Per maggiori informazioni, consulta la sezione Acquista il connettore nel file readme di GitHub.
Copia i file di configurazione del connettore
Clona o scarica il repository GitHub per il connettore.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia i contenuti della directory
config
nella sottodirectoryconfig
della tua installazione di Kafka.cp config/* [path to Kafka installation]/config/
Questi file contengono le impostazioni di configurazione per il connettore.
Aggiorna la configurazione di Kafka Connect
- Vai alla directory contenente il file binario di Kafka Connect che hai scaricato.
- Nella directory dei file binari di Kafka Connect, apri il file
config/connect-standalone.properties
in un editor di testo. - Se
plugin.path property
è commentato, rimuovi il commento. Aggiorna
plugin.path property
in modo da includere il percorso del file JAR del connettore.Esempio:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Imposta la proprietà
offset.storage.file.filename
su un nome file locale. In modalità autonoma, Kafka utilizza questo file per archiviare i dati di offset.Esempio:
offset.storage.file.filename=/tmp/connect.offsets
Inoltra gli eventi da Kafka a Pub/Sub
Questa sezione descrive come avviare il connettore di destinazione, pubblicare gli eventi in Kafka e leggere i messaggi inoltrati da Pub/Sub.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub con una sottoscrizione.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub per ricevere i messaggi da Kafka.
- PUBSUB_SUBSCRIPTION: il nome di un abbonamento Pub/Sub per l'argomento.
Apri il file
/config/cps-sink-connector.properties
in un editor di testo. Aggiungi i valori per le seguenti proprietà, contrassegnate da"TODO"
nei commenti:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da cui leggere.
- PROJECT_ID: il progetto Google Cloud che contiene lo argomento Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub per ricevere i messaggi da Kafka.
Dalla directory Kafka, esegui il seguente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Segui i passaggi descritti nella guida introduttiva ad Apache Kafka per scrivere alcuni eventi nell'argomento Kafka.
Utilizza l'interfaccia a riga di comando gcloud per leggere gli eventi da Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Inoltra i messaggi da Pub/Sub a Kafka
Questa sezione descrive come avviare il connettore di origine, pubblicare i messaggi su Pub/Sub e leggere i messaggi inoltrati da Kafka.
Utilizza gcloud CLI per creare un argomento Pub/Sub con una sottoscrizione.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub.
- PUBSUB_SUBSCRIPTION: il nome di un abbonamento Pub/Sub.
Apri il file denominato
/config/cps-source-connector.properties
in un editor di testo. Aggiungi i valori per le seguenti proprietà, contrassegnate da"TODO"
nei commenti:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Sostituisci quanto segue:
- KAFKA_TOPIC: gli argomenti Kafka per ricevere i messaggi Pub/Sub.
- PROJECT_ID: il progetto Google Cloud che contiene lo argomento Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub.
Dalla directory Kafka, esegui il seguente comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Utilizza l'interfaccia a riga di comando gcloud per pubblicare un messaggio in Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leggi il messaggio da Kafka. Segui i passaggi descritti nella guida introduttiva ad Apache Kafka per leggere i messaggi dall'argomento Kafka.
Conversione dei messaggi
Un record Kafka contiene una chiave e un valore, che sono array di byte di lunghezza variabile. Se vuoi, un record Kafka può avere anche intestazioni, che sono coppie chiave/valore. Un messaggio Pub/Sub è composto da due parti principali: il corpo del messaggio e zero o più attributi chiave-valore.
Kafka Connect utilizza i convertitori per serializzare chiavi e valori verso e da Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nei file di configurazione del connettore:
key.converter
: il convertitore utilizzato per serializzare le chiavi dei record.value.converter
: il convertitore utilizzato per serializzare i valori dei record.
Il corpo di un messaggio Pub/Sub è un oggetto ByteString
, pertanto la conversione più efficiente consiste nel copiare direttamente il payload. Per questo motivo, se possibile, consigliamo di utilizzare un convertitore che produca tipi di dati primitivi (int, float, stringa o schema di byte) per evitare la deserializzazione e la reserializzazione dello stesso corpo del messaggio.
Conversione da Kafka a Pub/Sub
Il connettore sink converte i record Kafka in messaggi Pub/Sub come segue:
- La chiave del record Kafka viene archiviata come attributo denominato
"key"
nel messaggio Pub/Sub. - Per impostazione predefinita, il connettore elimina tutte le intestazioni nel record Kafka. Tuttavia, se impostate l'opzione di configurazione
headers.publish
sutrue
, il connettore scrive le intestazioni come attributi Pub/Sub. Il connettore ignora tutte le intestazioni che superano i limiti di Pub/Sub per gli attributi dei messaggi. - Per gli schemi di tipo intero, float, stringa e byte, il connettore passa i byte del valore del record Kafka direttamente nel corpo del messaggio Pub/Sub.
- Per gli schemi struct, il connettore scrive ogni campo come attributo del messaggio Pub/Sub. Ad esempio, se il campo è
{ "id"=123 }
, Il messaggio Pub/Sub risultante ha un attributo"id"="123"
. Il valore del campo viene sempre convertito in una stringa. I tipi di mappe e struct non sono supportati come tipi di campo all'interno di uno struct. - Per gli schemi mappa, il connettore scrive ogni coppia chiave-valore come attributo del messaggio Pub/Sub. Ad esempio, se la mappa è
{"alice"=1,"bob"=2}
, il messaggio Pub/Sub risultante ha due attributi,"alice"="1"
e"bob"="2"
. Le chiavi e i valori vengono convertiti in stringhe.
Gli schemi di struct e map hanno alcuni comportamenti aggiuntivi:
Se vuoi, puoi specificare un determinato campo della struttura o una chiave della mappa come corpo del messaggio impostando la proprietà di configurazione
messageBodyName
. Il valore del campo o della chiave viene archiviato comeByteString
nel corpo del messaggio. Se non impostimessageBodyName
, il corpo del messaggio è vuoto per gli schemi struct e map.Per i valori array, il connettore supporta solo i tipi di array primitivi. La sequenza di valori nell'array viene concatenata in un singolo oggetto
ByteString
.
Conversione da Pub/Sub a Kafka
Il connettore di origine converte i messaggi Pub/Sub in record Kafka come segue:
Chiave del record Kafka: per impostazione predefinita, la chiave è impostata su
null
. Facoltativamente, puoi specificare un attributo del messaggio Pub/Sub da utilizzare come chiave impostando l'opzione di configurazionekafka.key.attribute
. In questo caso, il connettore cerca un attributo con quel nome e imposta la chiave del record sul valore dell'attributo. Se l'attributo specificato non è presente, la chiave del record viene impostata sunull
.Valore del record Kafka. Il connettore scrive il valore del record come segue:
Se il messaggio Pub/Sub non ha attributi personalizzati, il connettore scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipo
byte[]
, utilizzando il convertitore specificato davalue.converter
.Se il messaggio Pub/Sub ha attributi personalizzati e
kafka.record.headers
èfalse
, il connettore scrive una struct nel valore del record. Lo struct contiene un campo per ogni attributo e un campo chiamato"message"
il cui valore è il corpo del messaggio Pub/Sub (memorizzato come byte):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In questo caso, devi utilizzare un
value.converter
compatibile con gli schemistruct
, ad esempioorg.apache.kafka.connect.json.JsonConverter
.Se il messaggio Pub/Sub ha attributi personalizzati e
kafka.record.headers
ètrue
, il connettore scrive gli attributi come intestazioni dei record Kafka. Scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipobyte[]
, utilizzando il convertitore specificato davalue.converter
.
Intestazioni dei record Kafka. Per impostazione predefinita, le intestazioni sono vuote, a meno che non imposti
kafka.record.headers
sutrue
.
Opzioni di configurazione
Oltre alle configurazioni fornite dall'API Kafka Connect, il connettore Kafka per gruppi Pub/Sub supporta la configurazione di sink e origini come descritto in Configurazioni del connettore Pub/Sub.
Richiedere assistenza
Se hai bisogno di aiuto, crea un ticket di assistenza. Per domande e discussioni generali, crea un problema nel repository GitHub.
Passaggi successivi
- Comprendi le differenze tra Kafka e Pub/Sub.
- Scopri di più sul connettore Kafka di gruppo Pub/Sub.
- Consulta il repository GitHub del connettore Kafka di gruppo Pub/Sub.