Questo documento descrive come integrare Apache Kafka e Pub/Sub utilizzando il connettore Kafka di Pub/Sub Group.
Informazioni sul connettore Kafka di Pub/Sub Group
Apache Kafka è una piattaforma open source per eventi in streaming. È comunemente usato nelle architetture distribuite per consentire la comunicazione tra componenti a basso accoppiamento. Pub/Sub è un servizio gestito per l'invio e la ricezione di messaggi in modo asincrono. Come con Kafka, puoi utilizzare Pub/Sub per comunicare tra i componenti della tua architettura cloud.
Il connettore Kafka del gruppo Pub/Sub ti consente di integrare questi due sistemi. I seguenti connettori sono contenuti nella JAR Connector:
- Il connettore sink legge i record da uno o più argomenti Kafka e li pubblica su Pub/Sub.
- Il connettore di origine legge i messaggi da un argomento Pub/Sub e li pubblica su Kafka.
Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:
- Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
- Hai un sistema frontend che archivia gli eventi in Kafka al di fuori di Google Cloud, ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi servizi di backend, i quali devono ricevere gli eventi Kafka.
- Raccogli i log da una soluzione Kafka on-premise e li invii a Google Cloud per l'analisi dei dati.
- Disponi di un sistema frontend che utilizza Google Cloud, ma archivi i dati anche on-premise utilizzando Kafka.
Il connettore richiede Kafka Connect, che è un framework per lo streaming di dati tra Kafka e altri sistemi. Per utilizzare il connettore, devi eseguire Kafka Connect insieme al cluster Kafka.
Questo documento presuppone che tu conosca sia Kafka sia Pub/Sub. Prima di leggere questo documento, è consigliabile eseguire una delle guide rapide su Pub/Sub.
Inizia a utilizzare il connettore
In questa sezione troverai le seguenti attività:- Configura il connettore Kafka di Pub/Sub Group.
- Invia eventi da Kafka a Pub/Sub.
- Inviare messaggi da Pub/Sub a Kafka.
Prerequisiti
Installa Kafka
Segui la guida rapida di Apache Kafka per installare un nodo singolo Kafka sulla tua macchina locale. Completa questi passaggi nella guida rapida:
- Scarica l'ultima release di Kafka ed estraila.
- Avvia l'ambiente Kafka.
- Creare un argomento Kafka.
Autentica
Il connettore Kafka di Pub/Sub Group deve autenticarsi con Pub/Sub per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, esegui i seguenti passaggi:
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Installa Google Cloud CLI.
-
Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Creare un progetto Cloud:
gcloud projects create PROJECT_ID
-
Seleziona il progetto Cloud che hai creato:
gcloud config set project PROJECT_ID
-
-
Crea le credenziali di autenticazione per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni ruolo.
- Sostituisci
- Installa Google Cloud CLI.
-
Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Creare un progetto Cloud:
gcloud projects create PROJECT_ID
-
Seleziona il progetto Cloud che hai creato:
gcloud config set project PROJECT_ID
-
-
Crea le credenziali di autenticazione per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni ruolo.
- Sostituisci
Scarica il connettore JAR
Scarica il file JAR del connettore sulla tua macchina locale. Per ulteriori informazioni, consulta Acquisire il connettore nel file Leggimi di GitHub.
Copia i file di configurazione del connettore
Clona o scarica il repository GitHub per il connettore.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia il contenuto della directory
config
nella sottodirectoryconfig
dell'installazione di Kafka.cp config/* [path to Kafka installation]/config/
Questi file contengono impostazioni di configurazione per il connettore.
Aggiornare la configurazione di Kafka Connect
- Vai alla directory di Kafka.
- Apri il file denominato
config/connect-standalone.properties
in un editor di testo. - Se
plugin.path property
viene commentato, rimuovi il commento. Aggiorna
plugin.path property
per includere il percorso della JAR del connettore.Esempio:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Imposta la proprietà
offset.storage.file.filename
su un nome file locale. In modalità autonoma, Kafka utilizza questo file per memorizzare i dati di offset.Esempio:
offset.storage.file.filename=/tmp/connect.offsets
Inoltrare eventi da Kafka a Pub/Sub
Questa sezione descrive come avviare il connettore sink, pubblicare eventi su Kafka e leggere i messaggi inoltrati da Pub/Sub.
Utilizzare Google Cloud CLI per creare un argomento Pub/Sub con una sottoscrizione.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub per la ricezione dei messaggi da Kafka.
- PUBSUB_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub per l'argomento.
Apri il file
/config/cps-sink-connector.properties
in un editor di testo. Aggiungi valori per le seguenti proprietà, che sono contrassegnate con"TODO"
nei commenti:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- KAFKA_TOPICS: un elenco di argomenti Kafka separati da virgole da cui leggere.
- PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub per ricevere i messaggi da Kafka.
Nella directory Kafka, esegui il comando seguente:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Segui i passaggi nella guida rapida di Apache Kafka per scrivere alcuni eventi nell'argomento Kafka.
Utilizza l'interfaccia a riga di comando gcloud per leggere gli eventi da Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Inoltrare messaggi da Pub/Sub a Kafka
Questa sezione descrive come avviare il connettore di origine, pubblicare messaggi su Pub/Sub e leggere i messaggi inoltrati da Kafka.
Utilizza l'interfaccia a riga di comando gcloud per creare un argomento Pub/Sub con una sottoscrizione.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub.
- PUBSUB_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub.
Apri il file denominato
/config/cps-source-connector.properties
in un editor di testo. Aggiungi valori per le seguenti proprietà, che sono contrassegnate con"TODO"
nei commenti:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Sostituisci quanto segue:
- KAFKA_TOPIC: gli argomenti di Kafka per ricevere i messaggi Pub/Sub.
- PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub.
Nella directory Kafka, esegui il comando seguente:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Utilizza l'interfaccia a riga di comando gcloud per pubblicare un messaggio in Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leggi il messaggio di Kafka. Segui i passaggi della guida rapida di Apache Kafka per leggere i messaggi dall'argomento Kafka.
Conversione del messaggio
Un record Kafka contiene una chiave e un valore, che sono array di byte a lunghezza variabile. Facoltativamente, un record Kafka può avere anche intestazioni, che sono coppie chiave/valore. Un messaggio Pub/Sub è composto da due parti principali: il corpo del messaggio e zero o più attributi chiave-valore.
Kafka Connect utilizza i convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nei file di configurazione del connettore:
key.converter
: il convertitore utilizzato per serializzare le chiavi di registrazione.value.converter
: il convertitore utilizzato per serializzare i valori dei record.
Il corpo di un messaggio Pub/Sub è un oggetto ByteString
, quindi la conversione più efficiente è copiare direttamente il payload. Per questo motivo, consigliamo di utilizzare un convertitore che produca i tipi di dati primitivi (intero, Float, Stringa o Schema di byte) ove possibile, per evitare la deserializzazione e la reserizzazione dello stesso corpo del messaggio.
Conversione da Kafka a Pub/Sub
Il connettore sink converti i record Kafka in messaggi Pub/Sub nel seguente modo:
- La chiave di record Kafka viene archiviata come attributo denominato
"key"
nel messaggio Pub/Sub. - Per impostazione predefinita, il connettore elimina tutte le intestazioni nel record Kafka. Tuttavia, se imposti l'opzione di configurazione
headers.publish
sutrue
, il connettore scrive le intestazioni come attributi Pub/Sub. Il connettore ignora le intestazioni che superano i limiti relativi agli attributi di messaggio di Pub/Sub. - Per gli schemi con numero intero, numero in virgola mobile, stringa e byte, il connettore trasmette i byte del valore del record Kafka direttamente nel corpo del messaggio Pub/Sub.
- Per gli schemi struct, il connettore scrive ogni campo come attributo del messaggio Pub/Sub. Ad esempio, se il campo è
{ "id"=123 }
, il messaggio Pub/Sub risultante ha un attributo"id"="123"
. Il valore del campo viene sempre convertito in stringa. - Per gli schemi mappa, il connettore scrive ogni coppia chiave-valore come attributo del messaggio Pub/Sub. Ad esempio, se la mappa è
{"alice"=1,"bob"=2}
, il messaggio Pub/Sub risultante ha due attributi,"alice"="1"
e"bob"="2"
. Le chiavi e i valori vengono convertiti in stringhe.
Gli schemi e gli schemi hanno alcuni comportamenti aggiuntivi:
Facoltativamente, puoi specificare un particolare campo struct o chiave mappa come corpo del messaggio, impostando la proprietà di configurazione
messageBodyName
. Il valore del campo o della chiave viene memorizzato comeByteString
nel corpo del messaggio. Se non impostimessageBodyName
, il corpo del messaggio è vuoto per gli schemi e le mappe.Per i valori dell'array, il connettore supporta solo tipi di array primitivi. La sequenza di valori nell'array è concatenata in un singolo oggetto
ByteString
.
Conversione da Pub/Sub a Kafka
Il connettore di origine converte i messaggi Pub/Sub in record Kafka nel seguente modo:
Chiave di record Kafka: per impostazione predefinita, la chiave è impostata su
null
. Facoltativamente, puoi specificare un attributo del messaggio Pub/Sub da utilizzare come chiave impostando l'opzione di configurazionekafka.key.attribute
. In questo caso, il connettore cerca un attributo con questo nome e imposta la chiave di registrazione sul valore dell'attributo. Se l'attributo specificato non è presente, la chiave di record viene impostata sunull
.Valore del record Kafka. Il connettore scrive il valore del record nel seguente modo:
Se il messaggio Pub/Sub non ha attributi personalizzati, il connettore scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipo
byte[]
, utilizzando il convertitore specificato davalue.converter
.Se il messaggio Pub/Sub ha attributi personalizzati e
kafka.record.headers
èfalse
, il connettore scrive una struct nel valore record. Lo struct contiene un campo per ogni attributo e un campo denominato"message"
il cui valore è il corpo del messaggio Pub/Sub (archiviato come byte):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In questo caso, devi utilizzare un elemento
value.converter
compatibile con gli schemistruct
, ad esempioorg.apache.kafka.connect.json.JsonConverter
.Se il messaggio Pub/Sub ha attributi personalizzati e
kafka.record.headers
ètrue
, il connettore scrive gli attributi come intestazioni del record Kafka. scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipobyte[]
, utilizzando il convertitore specificato davalue.converter
.
Intestazioni dei record Kafka. Per impostazione predefinita, le intestazioni sono vuote, a meno che non imposti
kafka.record.headers
sutrue
.
Opzioni di configurazione
Oltre alle configurazioni fornite dall'API Kafka Connect, il connettore Kafka per Pub/Sub Group supporta le seguenti configurazioni.
Opzioni di configurazione del connettore sink
Il connettore sink supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java del connettore. Per
il connettore sink Pub/Sub, il valore deve essere
com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
L'endpoint Pub/Sub da utilizzare. Valore predefinito: |
cps.project |
String |
Obbligatorio. Google Cloud che contiene l'argomento Pub/Sub. |
cps.topic |
String |
Obbligatorio. L'argomento Pub/Sub in cui pubblicare i record Kafka. |
gcp.credentials.file.path |
String |
Campo facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Campo facoltativo. Un blob JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
headers.publish |
Boolean |
Quando Valore predefinito: |
maxBufferBytes |
Long |
Il numero massimo di byte da ricevere su una partizione Kafka dell'argomento prima di pubblicarli in Pub/Sub. Valore predefinito: 10000000. |
maxBufferSize |
Integer |
Il numero massimo di record da ricevere su una partizione di un argomento Kafka prima della pubblicazione in Pub/Sub. Valore predefinito: 100. |
maxDelayThresholdMs |
Integer |
Il tempo di attesa massimo per raggiungere Valore predefinito: 100. |
maxOutstandingMessages |
Long |
Il numero massimo di record che possono essere in sospeso, inclusi batch in sospeso e in attesa, prima che l'editore blocchi ulteriormente la pubblicazione. Valore predefinito: |
maxOutstandingRequestBytes |
Long |
Il numero massimo di byte totali che possono essere in sospeso, inclusi i batch incompleti e in attesa, prima che l'editore blocchi ulteriormente la pubblicazione. Valore predefinito: |
maxRequestTimeoutMs |
Integer |
Il timeout per singole richieste di pubblicazione su Pub/Sub, in millisecondi. Valore predefinito: 10.000. |
maxTotalTimeoutMs |
Integer |
Il timeout totale, in millisecondi, per una chiamata a pubblicazione su Pub/Sub, inclusi i nuovi tentativi. Valore predefinito: 60.000. |
metadata.publish |
Boolean |
Quando Valore predefinito: |
messageBodyName |
String |
Quando utilizzi uno schema di valori struct o mappa, specifica il nome di un campo o di una chiave da utilizzare come corpo del messaggio Pub/Sub. Consulta Conversione da Kafka a Pub/Sub. Valore predefinito: |
orderingKeySource |
String |
Specifica come impostare la chiave di ordinamento nel messaggio Pub/Sub. Può avere uno dei seguenti valori:
Valore predefinito: |
topics |
String |
Obbligatorio. Un elenco di argomenti Kafka separati da virgole da cui leggere. |
Opzioni di configurazione del connettore di origine
Il connettore di origine supporta le opzioni di configurazione riportate di seguito.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java del connettore. Per il connettore di origine Pub/Sub, il valore deve essere com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
L'endpoint Pub/Sub da utilizzare. Valore predefinito: |
cps.makeOrderingKeyAttribute |
Boolean |
Quando Valore predefinito: |
cps.maxBatchSize |
Integer |
Il numero massimo di messaggi da inviare in batch per richiesta di pull a Pub/Sub. Valore predefinito: 100 |
cps.project |
String |
Obbligatorio. Il progetto Google Cloud che contiene l'argomento Pub/Sub. |
cps.subscription |
String |
Obbligatorio. Il nome della sottoscrizione Pub/Sub da cui eseguire il pull dei messaggi. |
gcp.credentials.file.path |
String |
Campo facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Campo facoltativo. Un blob JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
kafka.key.attribute |
String |
L'attributo del messaggio Pub/Sub da utilizzare come chiave per i messaggi pubblicati su Kafka. Se è impostato su Valore predefinito: |
kafka.partition.count |
Integer |
Il numero di partizioni Kafka per l'argomento Kafka in cui vengono pubblicati i messaggi. Questo parametro viene ignorato se lo schema di partizione è
Valore predefinito: 1. |
kafka.partition.scheme |
String |
Lo schema di assegnazione di un messaggio a una partizione in Kafka. Può essere uno dei seguenti valori:
Valore predefinito: |
kafka.record.headers |
Boolean |
Se |
kafka.topic |
String |
Obbligatorio. L'argomento Kafka che riceve i messaggi da Pub/Sub. |
Richiedere assistenza
Se hai bisogno di aiuto, crea un ticket di assistenza. Per domande e discussioni generali, crea un problema nel repository di GitHub.
Passaggi successivi
- Differenze tra Kafka e Pub/Sub.
- Scopri di più sul connettore Kafka di Pub/Sub Group.
- Visualizza il repository di GitHub del connettore Kafka di Pub/Sub Group.