Questo documento descrive come integrare Apache Kafka e Pub/Sub tramite usando il connettore Kafka del gruppo Pub/Sub.
Informazioni sul connettore Kafka del gruppo Pub/Sub
Apache Kafka è una piattaforma open source per l'inserimento di flussi di eventi. Di solito utilizzata in architetture distribuite per consentire la comunicazione componenti accoppiati. Pub/Sub è un servizio gestito per l'invio ricevere i messaggi in modo asincrono. Come con Kafka, puoi utilizzare Pub/Sub per comunicare tra i componenti nel cloud dell'architettura.
Il connettore Kafka del gruppo Pub/Sub consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel JAR del connettore:
- Il connettore sink legge i record da uno o più argomenti Kafka e le pubblica in Pub/Sub.
- Il connettore di origine legge i messaggi da un argomento Pub/Sub e le pubblica su Kafka.
Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:
- Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
- Hai un sistema di frontend che archivia gli eventi in Kafka all'esterno ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi backend che devono ricevere gli eventi Kafka.
- Puoi raccogliere i log da una soluzione Kafka on-premise e inviarli a Google Cloud per l'analisi dei dati.
- Hai un sistema frontend che utilizza Google Cloud, ma archivi anche i dati on-premise con Kafka.
Il connettore richiede Kafka Connect che è un framework per trasmettere dati in modalità flusso tra Kafka e altri sistemi. Per utilizzare devi eseguire Kafka Connect insieme al tuo cluster Kafka.
In questo documento si presuppone che tu abbia familiarità sia con Kafka, sia in Pub/Sub. Prima di leggere questo documento, è buona norma completare una delle Guide rapide di Pub/Sub.
Il connettore Pub/Sub non supporta alcuna integrazione tra gli ACL Google Cloud IAM e Kafka Connect.
Inizia a utilizzare il connettore
Questa sezione illustra le seguenti attività:- Configurare il connettore Kafka del gruppo Pub/Sub.
- Inviare eventi da Kafka a Pub/Sub.
- Inviare messaggi da Pub/Sub a Kafka.
Prerequisiti
Installa Kafka
Segui le Guida rapida di Apache Kafka per installare Kafka a nodo singolo sulla tua macchina locale. Completa questi passaggi in della guida rapida:
- Scarica l'ultima release di Kafka ed estraila.
- Avvia l'ambiente Kafka.
- Creare un argomento Kafka.
Autentica
Il connettore Kafka del gruppo Pub/Sub deve eseguire l'autenticazione con Pub/Sub nell'ordine per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, segui questi passaggi:
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Scarica il JAR del connettore
Scarica il file JAR del connettore sul computer locale. Per ulteriori informazioni, vedi Acquista il connettore nel file Leggimi di GitHub.
Copia i file di configurazione del connettore
Clona o scarica il Repository GitHub per il connettore.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia il contenuto della directory
config
nella sottodirectoryconfig
di dell'installazione di Kafka.cp config/* [path to Kafka installation]/config/
Questi file contengono le impostazioni di configurazione per il connettore.
Aggiorna la configurazione di Kafka Connect
- Vai alla directory che contiene il programma binario Kafka Connect che scaricato.
- Nella directory binaria Kafka Connect, apri il file denominato
config/connect-standalone.properties
in un editor di testo. - Se
plugin.path property
ha un commento, rimuovi il commento. Aggiorna
plugin.path property
in modo da includere il percorso del JAR del connettore.Esempio:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Imposta la proprietà
offset.storage.file.filename
sul nome di un file locale. Nella modalità autonoma, Kafka usa questo file per archiviare i dati di offset.Esempio:
offset.storage.file.filename=/tmp/connect.offsets
Inoltra eventi da Kafka a Pub/Sub
Questa sezione descrive come avviare il connettore sink, pubblicare eventi in Kafka, leggere i messaggi inoltrati da Pub/Sub.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub con un abbonamento.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub in ricevono i messaggi da Kafka.
- PUBSUB_SUBSCRIPTION: il nome di un Pub/Sub abbonamento per l'argomento.
Apri il file
/config/cps-sink-connector.properties
in un editor di testo. Aggiungi per le seguenti proprietà, contrassegnate come"TODO"
nel commenti:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da leggere da cui proviene.
- PROJECT_ID: il progetto Google Cloud che contiene Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub per ricevere messaggi da Kafka.
Dalla directory Kafka, esegui questo comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Segui i passaggi nella Guida rapida di Apache Kafka per scrivere alcuni eventi nel tuo argomento Kafka.
Utilizza gcloud CLI per leggere gli eventi da Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Inoltrare messaggi da Pub/Sub a Kafka
Questa sezione descrive come avviare il connettore di origine e pubblicare messaggi in Pub/Sub e leggere i messaggi inoltrati da Kafka.
Utilizza gcloud CLI per creare un argomento Pub/Sub con un abbonamento.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub.
- PUBSUB_SUBSCRIPTION: il nome di un Pub/Sub abbonamento.
Apri il file denominato
/config/cps-source-connector.properties
in un testo dell'editor. Aggiungi valori per le seguenti proprietà, contrassegnate come"TODO"
in i commenti:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Sostituisci quanto segue:
- KAFKA_TOPIC: gli argomenti Kafka per ricevere i messaggi Pub/Sub.
- PROJECT_ID: il progetto Google Cloud che contiene Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub.
Dalla directory Kafka, esegui questo comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Utilizza gcloud CLI per pubblicare un messaggio in Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leggere il messaggio di Kafka. Segui i passaggi nella Guida rapida di Apache Kafka per leggere i messaggi dall'argomento Kafka.
Conversione dei messaggi
Un record Kafka contiene una chiave e un valore, che sono array di byte a lunghezza variabile. Facoltativamente, Il record Kafka può anche avere intestazioni, che sono coppie chiave/valore. R Messaggio Pub/Sub è composto da due parti principali: il corpo del messaggio e zero o più attributi chiave-valore.
Kafka Connect utilizza i convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nel connettore di configurazione YAML:
key.converter
: il convertitore utilizzato per serializzare le chiavi di registrazione.value.converter
: il convertitore utilizzato per serializzare i valori dei record.
Il corpo di un messaggio Pub/Sub è un oggetto ByteString
, quindi
La conversione più efficiente consiste nel copiare direttamente il payload. Per questo motivo,
consigliamo di utilizzare un convertitore che generi tipi di dati primitivi (numero intero, numero intero,
stringa o schema di byte), ove possibile, per evitare la deserializzazione e
rializziamo lo stesso corpo del messaggio.
Conversione da Kafka a Pub/Sub
Il connettore sink converte i record Kafka in messaggi Pub/Sub come che segue:
- La chiave di record Kafka viene archiviata come attributo denominato
"key"
nel un messaggio Pub/Sub. - Per impostazione predefinita, il connettore elimina qualsiasi intestazione nel record Kafka. Tuttavia, se
imposti l'opzione di configurazione
headers.publish
sutrue
, il connettore scrive le intestazioni come attributi Pub/Sub. Il connettore salta qualsiasi intestazione che superi Pub/Sub limiti degli attributi dei messaggi. - Per gli schemi con numeri interi, in virgola mobile, stringa e byte, il connettore passa i byte del valore del record Kafka direttamente nel messaggio Pub/Sub del testo.
- Per gli schemi struct, il connettore scrive ogni campo come attributo del
un messaggio Pub/Sub. Ad esempio, se il campo è
{ "id"=123 }
, il messaggio Pub/Sub risultante ha l'attributo"id"="123"
. La viene sempre convertito in una stringa. I tipi di mappa e struct non sono supportati come tipi di campo all'interno di uno struct. - Per gli schemi di mappa, il connettore scrive ogni coppia chiave-valore come attributo di
il messaggio Pub/Sub. Ad esempio, se la mappa è
{"alice"=1,"bob"=2}
, il messaggio Pub/Sub risultante ha due"alice"="1"
e"bob"="2"
. Le chiavi e i valori vengono convertiti alle stringhe.
Gli schemi di struttura e mappa presentano alcuni comportamenti aggiuntivi:
Facoltativamente, puoi specificare un particolare campo di struct o una chiave di mappa come campo corpo del messaggio impostando la proprietà di configurazione
messageBodyName
. La del campo o della chiave viene archiviato comeByteString
nel corpo del messaggio. Se se non impostimessageBodyName
, il corpo del messaggio è vuoto per struct e schemi delle mappe.Per i valori degli array, il connettore supporta solo tipi di array primitivi. La sequenza di valori nell'array è concatenata in un unico
ByteString
.
Conversione da Pub/Sub a Kafka
Il connettore di origine converte i messaggi Pub/Sub in record Kafka come segue:
Chiave di registrazione Kafka: per impostazione predefinita, la chiave è impostata su
null
. Facoltativamente, puoi specificare un attributo di messaggio Pub/Sub da usare come chiave, impostare l'opzione di configurazionekafka.key.attribute
. In questo caso, cerca un attributo con quel nome e imposta la chiave di record sul valore dell'attributo. Se l'attributo specificato non è presente, la chiave di record è impostato sunull
.Valore del record Kafka. Il connettore scrive il valore del record come segue:
Se il messaggio Pub/Sub non ha attributi personalizzati, scrive il corpo del messaggio Pub/Sub direttamente nel record Kafka come tipo
byte[]
, utilizzando il convertitore specificatovalue.converter
.Se il messaggio Pub/Sub contiene attributi personalizzati e
kafka.record.headers
èfalse
, il connettore scrive uno struct nella e un valore record. Lo struct contiene un campo per ogni attributo e un campo denominato"message"
il cui valore è il corpo del messaggio Pub/Sub (archiviati come byte):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In questo caso, devi utilizzare un
value.converter
che sia compatibile construct
schemi, ad esempioorg.apache.kafka.connect.json.JsonConverter
.Se il messaggio Pub/Sub contiene attributi personalizzati e
kafka.record.headers
ètrue
, il connettore scrive gli attributi come Intestazioni dei record Kafka. Scrive il corpo del messaggio Pub/Sub direttamente al valore del record Kafka come tipobyte[]
, utilizzando il convertitore specificato davalue.converter
.
Intestazioni dei record Kafka. Per impostazione predefinita, le intestazioni sono vuote, a meno che non imposti Da
kafka.record.headers
atrue
.
Opzioni di configurazione
Oltre alle configurazioni fornite dall'API Kafka Connect, Il connettore Kafka del gruppo Pub/Sub supporta le seguenti configurazioni.
Opzioni di configurazione del connettore sink
Il connettore sink supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per
del connettore sink Pub/Sub, il valore deve essere
com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
L'endpoint Pub/Sub da utilizzare. Valore predefinito: |
cps.project |
String |
Obbligatorio. Google Cloud che contiene Pub/Sub. |
cps.topic |
String |
Obbligatorio. L'argomento Pub/Sub da pubblicare Record Kafka in |
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un blob JSON che contiene Google Cloud per l'autenticazione di Pub/Sub Lite. |
headers.publish |
Boolean |
Se Valore predefinito: |
maxBufferBytes |
Long |
Il numero massimo di byte da ricevere su una partizione Kafka dell'argomento prima di pubblicarle in Pub/Sub. Valore predefinito: 10000000. |
maxBufferSize |
Integer |
Il numero massimo di record da ricevere su una partizione di argomento Kafka prima di pubblicarle in Pub/Sub. Valore predefinito: 100. |
maxDelayThresholdMs |
Integer |
Il tempo massimo di attesa per raggiungere
Valore predefinito: 100. |
maxOutstandingMessages |
Long |
Il numero massimo di record che possono essere in sospeso, inclusi batch incompleti e in attesa, prima che il publisher blocchi ulteriormente pubblicazione. Valore predefinito: |
maxOutstandingRequestBytes |
Long |
Il numero massimo di byte totali che possono essere in sospeso, inclusi batch incompleti e in attesa, prima che il publisher blocchi ulteriormente pubblicazione. Valore predefinito: |
maxRequestTimeoutMs |
Integer |
Il timeout per le singole richieste di pubblicazione in Pub/Sub, in millisecondi. Valore predefinito: 10000. |
maxTotalTimeoutMs |
Integer |
Il timeout totale, in millisecondi, per una chiamata in cui pubblicare Pub/Sub, inclusi i nuovi tentativi. Valore predefinito: 60.000. |
metadata.publish |
Boolean |
Quando Valore predefinito: |
messageBodyName |
String |
Quando utilizzi uno schema di valori di struct o mappa, specifica il nome di un campo o chiave da utilizzare come corpo del messaggio Pub/Sub. Consulta Conversione da Kafka a Pub/Sub. Valore predefinito: |
orderingKeySource |
String |
Specifica come impostare la chiave di ordinamento in Pub/Sub per creare un nuovo messaggio email. Può essere uno dei seguenti valori:
Valore predefinito: |
topics |
String |
Obbligatorio. Un elenco separato da virgole di argomenti Kafka su per la lettura. |
Opzioni di configurazione del connettore di origine
Il connettore di origine supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per
al connettore di origine Pub/Sub, il valore deve essere
com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
L'endpoint Pub/Sub da utilizzare. Valore predefinito: |
cps.makeOrderingKeyAttribute |
Boolean |
Quando Valore predefinito: |
cps.maxBatchSize |
Integer |
Il numero massimo di messaggi da raggruppare per richiesta di pull a in Pub/Sub. Valore predefinito: 100 |
cps.project |
String |
Obbligatorio. Il progetto Google Cloud che contiene Pub/Sub. |
cps.subscription |
String |
Obbligatorio. il nome del bucket Pub/Sub sottoscrizione da cui eseguire il pull dei messaggi. |
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un blob JSON che contiene Google Cloud per l'autenticazione di Pub/Sub Lite. |
kafka.key.attribute |
String |
L'attributo del messaggio Pub/Sub da utilizzare come chiave
dei messaggi pubblicati in Kafka. Se impostato su Valore predefinito: |
kafka.partition.count |
Integer |
Il numero di partizioni Kafka per l'argomento Kafka in cui i messaggi
vengono pubblicati. Questo parametro viene ignorato se lo schema di partizione è
Valore predefinito: 1. |
kafka.partition.scheme |
String |
Lo schema per assegnare un messaggio a una partizione in Kafka. Può essere uno dei seguenti valori:
Valore predefinito: |
kafka.record.headers |
Boolean |
Se |
kafka.topic |
String |
Obbligatorio. L'argomento Kafka che riceve messaggi da in Pub/Sub. |
Richiedere assistenza
Se hai bisogno di aiuto, crea un ticket di assistenza. Per domande e discussioni generali, crea un problema nel repository GitHub.
Passaggi successivi
- Comprendere le differenze tra Kafka e Pub/Sub.
- Scopri di più sul connettore Kafka del gruppo Pub/Sub.
- Visualizza il connettore Kafka del gruppo Pub/Sub Repository GitHub.