Questo documento descrive come integrare Apache Kafka e Pub/Sub utilizzando il connettore Kafka del gruppo Pub/Sub.
Informazioni sul connettore Kafka del gruppo Pub/Sub
Apache Kafka è una piattaforma open source per il flusso di eventi. È comunemente utilizzato in architetture distribuite per consentire la comunicazione tra componenti a basso accoppiamento. Pub/Sub è un servizio gestito per l'invio e la ricezione di messaggi in modo asincrono. Come con Kafka, puoi usare Pub/Sub per comunicare tra i componenti della tua architettura cloud.
Il connettore Kafka del gruppo Pub/Sub consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel JAR del connettore:
- Il connettore sink legge i record da uno o più argomenti Kafka e li pubblica in Pub/Sub.
- Il connettore di origine legge i messaggi da un argomento Pub/Sub e li pubblica in Kafka.
Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:
- Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
- Disponi di un sistema frontend che archivia gli eventi in Kafka al di fuori di Google Cloud, ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi servizi di backend, che devono ricevere gli eventi Kafka.
- Puoi raccogliere i log da una soluzione Kafka on-premise e inviarli a Google Cloud per l'analisi dei dati.
- Disponi di un sistema frontend che utilizza Google Cloud, ma archivi i dati on-premise utilizzando Kafka.
Il connettore richiede Kafka Connect, che è un framework per lo streaming di dati tra Kafka e altri sistemi. Per utilizzare il connettore, devi eseguire Kafka Connect insieme al cluster Kafka.
Questo documento presuppone la conoscenza di Kafka e Pub/Sub. Prima di leggere questo documento, ti consigliamo di completare una delle guide rapide su Pub/Sub.
Il connettore Pub/Sub non supporta alcuna integrazione tra gli ACL di Google Cloud IAM e Kafka Connect.
Inizia a utilizzare il connettore
Questa sezione illustra le seguenti attività:- Configurare il connettore Kafka del gruppo Pub/Sub.
- Invia eventi da Kafka a Pub/Sub.
- Inviare messaggi da Pub/Sub a Kafka.
Prerequisiti
Installa Kafka
Segui la guida rapida di Apache Kafka per installare un Kafka a nodo singolo sulla tua macchina locale. Completa questi passaggi nella guida rapida:
- Scarica l'ultima release di Kafka ed estraila.
- Avvia l'ambiente Kafka.
- Creare un argomento Kafka.
Autentica
Il connettore Kafka del gruppo Pub/Sub deve eseguire l'autenticazione con Pub/Sub per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, svolgi i seguenti passaggi:
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni singolo ruolo.
- Sostituisci
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni singolo ruolo.
- Sostituisci
Scarica il JAR del connettore
Scarica il file JAR del connettore sulla tua macchina locale. Per maggiori informazioni, consulta Acquisire il connettore nel file Leggimi di GitHub.
Copia i file di configurazione del connettore
Clona o scarica il repository di GitHub per il connettore.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia i contenuti della directory
config
nella sottodirectoryconfig
della tua installazione di Kafka.cp config/* [path to Kafka installation]/config/
Questi file contengono le impostazioni di configurazione per il connettore.
Aggiorna la configurazione di Kafka Connect
- Vai alla directory che contiene il programma binario di Kafka Connect che hai scaricato.
- Nella directory binaria di Kafka Connect, apri il file denominato
config/connect-standalone.properties
in un editor di testo. - Se
plugin.path property
ha ricevuto un commento, rimuovilo. Aggiorna
plugin.path property
in modo da includere il percorso del JAR del connettore.Esempio:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Imposta la proprietà
offset.storage.file.filename
su un nome file locale. In modalità autonoma, Kafka utilizza questo file per archiviare i dati di offset.Esempio:
offset.storage.file.filename=/tmp/connect.offsets
Inoltrare eventi da Kafka a Pub/Sub
Questa sezione descrive come avviare il connettore sink, pubblicare eventi in Kafka e quindi leggere i messaggi inoltrati da Pub/Sub.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub con una sottoscrizione.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub per ricevere i messaggi da Kafka.
- PUBSUB_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub per l'argomento.
Apri il file
/config/cps-sink-connector.properties
in un editor di testo. Aggiungi valori per le seguenti proprietà, contrassegnate con"TODO"
nei commenti:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da cui leggere.
- PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub per ricevere i messaggi da Kafka.
Dalla directory Kafka, esegui questo comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Segui i passaggi nella guida rapida di Apache Kafka per scrivere alcuni eventi nel tuo argomento Kafka.
Utilizza gcloud CLI per leggere gli eventi da Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Inoltrare messaggi da Pub/Sub a Kafka
Questa sezione descrive come avviare il connettore di origine, pubblicare messaggi in Pub/Sub e leggere i messaggi inoltrati da Kafka.
Utilizza gcloud CLI per creare un argomento Pub/Sub con una sottoscrizione.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Sostituisci quanto segue:
- PUBSUB_TOPIC: il nome di un argomento Pub/Sub.
- PUBSUB_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub.
Apri il file denominato
/config/cps-source-connector.properties
in un editor di testo. Aggiungi i valori delle seguenti proprietà, contrassegnate come"TODO"
nei commenti:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Sostituisci quanto segue:
- KAFKA_TOPIC: gli argomenti Kafka per la ricezione dei messaggi Pub/Sub.
- PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
- PUBSUB_TOPIC: l'argomento Pub/Sub.
Dalla directory Kafka, esegui questo comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Utilizza gcloud CLI per pubblicare un messaggio in Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leggi il messaggio di Kafka. Segui i passaggi nella guida rapida di Apache Kafka per leggere i messaggi dell'argomento Kafka.
Conversione dei messaggi
Un record Kafka contiene una chiave e un valore, ovvero array di byte a lunghezza variabile. Facoltativamente, un record Kafka può anche includere intestazioni, che sono coppie chiave-valore. Un messaggio Pub/Sub ha due parti principali: il corpo del messaggio e zero o più attributi chiave-valore.
Kafka Connect utilizza convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nei file di configurazione del connettore:
key.converter
: il convertitore utilizzato per serializzare le chiavi di registrazione.value.converter
: il convertitore utilizzato per serializzare i valori dei record.
Il corpo di un messaggio Pub/Sub è un oggetto ByteString
, quindi la conversione più efficiente consiste nel copiare direttamente il payload. Per questo motivo, consigliamo di utilizzare un convertitore che produca tipi di dati primitivi (schema intero, decimale, stringa o byte), ove possibile, per evitare la deserializzazione e
riserializzazione dello stesso corpo del messaggio.
Conversione da Kafka a Pub/Sub
Il connettore sink converte i record Kafka in messaggi Pub/Sub come segue:
- La chiave del record Kafka viene archiviata come attributo denominato
"key"
nel messaggio Pub/Sub. - Per impostazione predefinita, il connettore elimina qualsiasi intestazione nel record Kafka. Tuttavia, se imposti l'opzione di configurazione
headers.publish
sutrue
, il connettore scrive le intestazioni come attributi Pub/Sub. Il connettore ignora tutte le intestazioni che superano i limiti di Pub/Sub per gli attributi dei messaggi. - Per gli schemi con numeri interi, valori in virgola mobile, stringhe e byte, il connettore trasmette i byte del valore del record Kafka direttamente al corpo del messaggio Pub/Sub.
- Per gli schemi struct, il connettore scrive ogni campo come attributo del messaggio Pub/Sub. Ad esempio, se il campo è
{ "id"=123 }
, il messaggio Pub/Sub risultante avrà un attributo"id"="123"
. Il valore del campo viene sempre convertito in una stringa. I tipi di mappa e struct non sono supportati come tipi di campo all'interno di uno struct. - Per gli schemi di mappa, il connettore scrive ogni coppia chiave-valore come attributo del messaggio Pub/Sub. Ad esempio, se la mappa è
{"alice"=1,"bob"=2}
, il messaggio Pub/Sub risultante avrà due attributi:"alice"="1"
e"bob"="2"
. Le chiavi e i valori vengono convertiti in stringhe.
Gli schemi di struttura e mappa presentano alcuni comportamenti aggiuntivi:
Facoltativamente, puoi specificare un particolare campo di struct o chiave della mappa come corpo del messaggio impostando la proprietà di configurazione
messageBodyName
. Il valore del campo o della chiave è archiviato comeByteString
nel corpo del messaggio. Se non impostimessageBodyName
, il corpo del messaggio è vuoto per gli schemi di struct e mappa.Per i valori array, il connettore supporta solo i tipi array primitivi. La sequenza dei valori nell'array è concatenata in un singolo oggetto
ByteString
.
Conversione da Pub/Sub a Kafka
Il connettore di origine converte i messaggi Pub/Sub in record Kafka nel seguente modo:
Chiave di registrazione Kafka: per impostazione predefinita, la chiave è impostata su
null
. Facoltativamente, puoi specificare un attributo del messaggio Pub/Sub da utilizzare come chiave impostando l'opzione di configurazionekafka.key.attribute
. In questo caso, il connettore cerca un attributo con quel nome e imposta la chiave del record sul valore dell'attributo. Se l'attributo specificato non è presente, la chiave di record è impostata sunull
.Valore del record Kafka: Il connettore scrive il valore del record come segue:
Se il messaggio Pub/Sub non contiene attributi personalizzati, il connettore scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipo
byte[]
, utilizzando il convertitore specificato davalue.converter
.Se il messaggio Pub/Sub ha attributi personalizzati e
kafka.record.headers
èfalse
, il connettore scrive uno struct nel valore del record. Lo struct contiene un campo per ogni attributo e un campo denominato"message"
il cui valore è il corpo del messaggio Pub/Sub (archiviato come byte):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In questo caso, devi utilizzare un elemento
value.converter
compatibile con gli schemistruct
, comeorg.apache.kafka.connect.json.JsonConverter
.Se il messaggio Pub/Sub ha attributi personalizzati e
kafka.record.headers
ètrue
, il connettore scrive gli attributi come intestazioni di record Kafka. Scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipobyte[]
, utilizzando il convertitore specificato davalue.converter
.
Intestazioni dei record Kafka. Per impostazione predefinita, le intestazioni sono vuote, a meno che non imposti
kafka.record.headers
sutrue
.
Opzioni di configurazione
Oltre alle configurazioni fornite dall'API Kafka Connect, il connettore Kafka del gruppo Pub/Sub supporta le seguenti configurazioni.
Opzioni di configurazione del connettore sink
Il connettore sink supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per il connettore sink di Pub/Sub, il valore deve essere com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
L'endpoint Pub/Sub da utilizzare. Valore predefinito: |
cps.project |
String |
Obbligatorio. La risorsa Google Cloud che contiene l'argomento Pub/Sub. |
cps.topic |
String |
Obbligatorio. L'argomento Pub/Sub in cui pubblicare i record Kafka. |
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file in cui sono archiviate le credenziali Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un BLOB JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
headers.publish |
Boolean |
Quando Valore predefinito: |
maxBufferBytes |
Long |
Il numero massimo di byte da ricevere su una partizione Kafka di un argomento prima di pubblicarli in Pub/Sub. Valore predefinito: 10000000. |
maxBufferSize |
Integer |
Il numero massimo di record da ricevere su una partizione di argomento Kafka prima di pubblicarli in Pub/Sub. Valore predefinito: 100. |
maxDelayThresholdMs |
Integer |
Il tempo massimo di attesa per raggiungere Valore predefinito: 100. |
maxOutstandingMessages |
Long |
Il numero massimo di record che possono essere in sospeso, inclusi batch incompleti e in attesa, prima che l'editore blocchi l'ulteriore pubblicazione. Valore predefinito: |
maxOutstandingRequestBytes |
Long |
Il numero massimo di byte totali che possono essere in sospeso, inclusi batch incompleti e in attesa, prima che l'editore blocchi l'ulteriore pubblicazione. Valore predefinito: |
maxRequestTimeoutMs |
Integer |
Il timeout in millisecondi per le singole richieste di pubblicazione in Pub/Sub. Valore predefinito: 10.000. |
maxTotalTimeoutMs |
Integer |
Il timeout totale in millisecondi per la pubblicazione di una chiamata in Pub/Sub, inclusi i nuovi tentativi. Valore predefinito: 60.000. |
metadata.publish |
Boolean |
Quando Valore predefinito: |
messageBodyName |
String |
Quando utilizzi uno schema di valori struct o mappa, specifica il nome di un campo o di una chiave da utilizzare come corpo del messaggio Pub/Sub. Consulta Conversione da Kafka a Pub/Sub. Valore predefinito: |
orderingKeySource |
String |
Specifica come impostare la chiave di ordinamento nel messaggio Pub/Sub. Può avere uno dei seguenti valori:
Valore predefinito: |
topics |
String |
Obbligatorio. Un elenco separato da virgole di argomenti Kafka da cui leggere. |
Opzioni di configurazione del connettore di origine
Il connettore di origine supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per
il connettore di origine Pub/Sub, il valore deve essere
com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
L'endpoint Pub/Sub da utilizzare. Valore predefinito: |
cps.makeOrderingKeyAttribute |
Boolean |
Quando Valore predefinito: |
cps.maxBatchSize |
Integer |
Il numero massimo di messaggi da raggruppare per richiesta di pull a Pub/Sub. Valore predefinito: 100 |
cps.project |
String |
Obbligatorio. Il progetto Google Cloud che contiene l'argomento Pub/Sub. |
cps.subscription |
String |
Obbligatorio. Il nome della sottoscrizione Pub/Sub da cui eseguire il pull dei messaggi. |
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file in cui sono archiviate le credenziali Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un BLOB JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
kafka.key.attribute |
String |
L'attributo del messaggio Pub/Sub da utilizzare come chiave per i messaggi pubblicati in Kafka. Se è impostata su Valore predefinito: |
kafka.partition.count |
Integer |
Il numero di partizioni Kafka per l'argomento Kafka in cui vengono pubblicati i messaggi. Questo parametro viene ignorato se lo schema di partizione è
Valore predefinito: 1. |
kafka.partition.scheme |
String |
Lo schema per l'assegnazione di un messaggio a una partizione in Kafka. Può essere uno dei seguenti valori:
Valore predefinito: |
kafka.record.headers |
Boolean |
Se |
kafka.topic |
String |
Obbligatorio. L'argomento Kafka che riceve messaggi da Pub/Sub. |
Richiedere assistenza
Se hai bisogno di aiuto, crea un ticket di assistenza. Per discussioni e domande generali, crea un problema nel repository di GitHub.
Passaggi successivi
- Differenze tra Kafka e Pub/Sub.
- Scopri di più sul connettore Kafka del gruppo Pub/Sub.
- Consulta il Repository GitHub del connettore Kafka del gruppo Pub/Sub.