Questo documento descrive come integrare Apache Kafka e Pub/Sub Lite utilizzando il connettore Kafka del gruppo Pub/Sub.
Informazioni sul connettore Kafka del gruppo Pub/Sub
Apache Kafka è una piattaforma open source per il flusso di eventi. È comunemente utilizzato in architetture distribuite per consentire la comunicazione tra componenti a basso accoppiamento. Pub/Sub Lite è un servizio gestito per l'invio e la ricezione di messaggi in modo asincrono. Come con Kafka, puoi usare Pub/Sub Lite per comunicare tra i componenti della tua architettura cloud.
Il connettore Kafka del gruppo Pub/Sub consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel JAR del connettore:
- Il connettore sink legge i record da uno o più argomenti Kafka e li pubblica su Pub/Sub Lite.
- Il connettore di origine legge i messaggi da un argomento Pub/Sub Lite e li pubblica in Kafka.
Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:
- Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
- Disponi di un sistema frontend che archivia gli eventi in Kafka al di fuori di Google Cloud, ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi servizi di backend, che devono ricevere gli eventi Kafka.
- Puoi raccogliere i log da una soluzione Kafka on-premise e inviarli a Google Cloud per l'analisi dei dati.
- Disponi di un sistema frontend che utilizza Google Cloud, ma archivi i dati on-premise utilizzando Kafka.
Il connettore richiede Kafka Connect, che è un framework per lo streaming di dati tra Kafka e altri sistemi. Per utilizzare il connettore, devi eseguire Kafka Connect insieme al cluster Kafka.
Questo documento presuppone la conoscenza di Kafka e Pub/Sub Lite. Per iniziare a utilizzare Pub/Sub Lite, consulta l'articolo Pubblicare e ricevere messaggi in Pub/Sub Lite utilizzando la console Google Cloud.
Inizia a utilizzare il connettore Kafka del gruppo Pub/Sub
Questa sezione illustra le seguenti attività:- Configurare il connettore Kafka del gruppo Pub/Sub.
- Invia eventi da Kafka a Pub/Sub Lite.
- Inviare messaggi da Pub/Sub Lite a Kafka.
Prerequisiti
Installa Kafka
Segui la guida rapida di Apache Kafka per installare un Kafka a nodo singolo sulla tua macchina locale. Completa questi passaggi nella guida rapida:
- Scarica l'ultima release di Kafka ed estraila.
- Avvia l'ambiente Kafka.
- Creare un argomento Kafka.
Autentica
Il connettore Kafka del gruppo Pub/Sub deve eseguire l'autenticazione con Pub/Sub per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, svolgi i seguenti passaggi:
- Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni singolo ruolo.
- Sostituisci
- Installa Google Cloud CLI.
-
Per initialize gcloud CLI, esegui questo comando:
gcloud init
-
Crea o seleziona un progetto Google Cloud.
-
Crea un progetto Google Cloud:
gcloud projects create PROJECT_ID
Sostituisci
PROJECT_ID
con un nome per il progetto Google Cloud che stai creando. -
Seleziona il progetto Google Cloud che hai creato:
gcloud config set project PROJECT_ID
Sostituisci
PROJECT_ID
con il nome del tuo progetto Google Cloud.
-
-
Crea credenziali di autenticazione locali per il tuo Account Google:
gcloud auth application-default login
-
Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Sostituisci
PROJECT_ID
con l'ID progetto. - Sostituisci
EMAIL_ADDRESS
con il tuo indirizzo email. - Sostituisci
ROLE
con ogni singolo ruolo.
- Sostituisci
Scarica il JAR del connettore
Scarica il file JAR del connettore sulla tua macchina locale. Per maggiori informazioni, consulta Acquisire il connettore nel file Leggimi di GitHub.
Copia i file di configurazione del connettore
Clona o scarica il repository GitHub per il connettore.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copia i contenuti della directory
config
nella sottodirectoryconfig
della tua installazione di Kafka.cp config/* [path to Kafka installation]/config/
Questi file contengono le impostazioni di configurazione per il connettore.
Aggiorna la configurazione di Kafka Connect
- Vai alla directory che contiene il programma binario di Kafka Connect che hai scaricato.
- Nella directory binaria di Kafka Connect, apri il file denominato
config/connect-standalone.properties
in un editor di testo. - Se
plugin.path property
ha ricevuto un commento, rimuovilo. Aggiorna
plugin.path property
in modo da includere il percorso del JAR del connettore.Esempio:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Imposta la proprietà
offset.storage.file.filename
su un nome file locale. In modalità autonoma, Kafka utilizza questo file per archiviare i dati di offset.Esempio:
offset.storage.file.filename=/tmp/connect.offsets
Inoltrare eventi da Kafka a Pub/Sub Lite
Questa sezione descrive come avviare il connettore sink, pubblicare eventi in Kafka e quindi leggere i messaggi inoltrati da Pub/Sub Lite.
Utilizza Google Cloud CLI per creare una prenotazione Pub/Sub Lite.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Sostituisci quanto segue:
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LOCATION: la località della prenotazione.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub Lite con una sottoscrizione.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Sostituisci quanto segue:
- LITE_TOPIC: il nome dell'argomento Pub/Sub Lite per ricevere messaggi da Kafka.
- LOCATION: la località dell'argomento. Il valore deve corrispondere alla località della prenotazione.
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LITE_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub Lite per l'argomento.
Apri il file
/config/pubsub-lite-sink-connector.properties
in un editor di testo. Aggiungi valori per le seguenti proprietà, contrassegnate con"TODO"
nei commenti:topics=KAFKA_TOPICS pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.topic=LITE_TOPIC
Sostituisci quanto segue:
- KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da cui leggere.
- PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub Lite.
- LOCATION: la posizione dell'argomento Pub/Sub Lite.
- LITE_TOPIC: l'argomento Pub/Sub Lite per ricevere messaggi da Kafka.
Dalla directory Kafka, esegui questo comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-sink-connector.properties
Segui la procedura nella guida rapida di Apache Kafka per scrivere alcuni eventi nel tuo argomento Kafka.
Iscriviti alla sottoscrizione Pub/Sub Lite utilizzando uno dei metodi mostrati in Ricezione di messaggi dalle sottoscrizioni Lite.
Inoltrare messaggi da Pub/Sub Lite a Kafka
Questa sezione descrive come avviare il connettore di origine, pubblicare messaggi su Pub/Sub Lite e leggere i messaggi inoltrati da Kafka.
Utilizza Google Cloud CLI per creare una prenotazione Pub/Sub Lite.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Sostituisci quanto segue:
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LOCATION: la località della prenotazione.
Utilizza Google Cloud CLI per creare un argomento Pub/Sub Lite con una sottoscrizione.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Sostituisci quanto segue:
- LITE_TOPIC: il nome dell'argomento Pub/Sub Lite.
- LOCATION: la località dell'argomento. Il valore deve corrispondere alla località della prenotazione.
- RESERVATION_NAME: il nome della prenotazione Pub/Sub Lite.
- LITE_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub Lite per l'argomento.
Apri il file denominato
/config/pubsub-lite-source-connector.properties
in un editor di testo. Aggiungi i valori per le seguenti proprietà, contrassegnate"TODO"
nei commenti:topic=KAFKA_TOPIC pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.subscription=LITE_SUBSCRIPTION
Sostituisci quanto segue:
- KAFKA_TOPIC: gli argomenti Kafka per la ricezione dei messaggi Pub/Sub.
- PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
- LOCATION: la posizione dell'argomento Pub/Sub Lite.
- LITE_SUBSCRIPTION: l'argomento Pub/Sub Lite.
Dalla directory Kafka, esegui questo comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-source-connector.properties
Pubblicare messaggi nell'argomento Pub/Sub Lite utilizzando uno dei metodi indicati in Pubblicazione di messaggi in argomenti Lite.
Leggi il messaggio di Kafka. Segui i passaggi nella guida rapida di Apache Kafka per leggere i messaggi dell'argomento Kafka.
Conversione dei messaggi
Un record Kafka contiene una chiave e un valore, ovvero array di byte a lunghezza variabile. Facoltativamente, un record Kafka può anche includere intestazioni, che sono coppie chiave-valore. Un messaggio Pub/Sub Lite contiene i seguenti campi:
key
: chiave messaggio (bytes
)data
: dati dei messaggi (bytes
)attributes
: zero o più attributi. Ogni attributo è una mappa(key,values[])
. Un singolo attributo può avere più valori.event_time
: un timestamp facoltativo dell'evento fornito dall'utente.
Kafka Connect utilizza convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nei file di configurazione del connettore:
key.converter
: il convertitore utilizzato per serializzare le chiavi di registrazione.value.converter
: il convertitore utilizzato per serializzare i valori dei record.
Conversione da Kafka a Pub/Sub Lite
Il connettore sink converte i record Kafka in messaggi Pub/Sub Lite nel modo seguente.
Record Kafka
(SinkRecord ) |
Messaggio Pub/Sub Lite |
---|---|
Chiave | key |
Valore | data |
Intestazioni | attributes |
Timestamp | eventTime |
Tipo di timestamp | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
Argomento | attributes["x-goog-pubsublite-source-kafka-topic"] |
Partizione | attributes["x-goog-pubsublite-source-kafka-offset"] |
Offset | attributes["x-goog-pubsublite-source-kafka-partition"] |
Le chiavi, i valori e le intestazioni vengono codificati come segue:
- Gli schemi null vengono trattati come schemi di stringhe.
- I payload di byte sono scritti direttamente senza alcuna conversione.
- I payload di tipo stringa, numero intero e in virgola mobile sono codificati in una sequenza di byte UTF-8.
- Tutti gli altri payload sono codificati in un tipo di buffer di protocollo
Value
e quindi convertiti in una stringa di byte.- I campi di stringhe nidificati sono codificati in un protobuf
Value
. - I campi di byte nidificati sono codificati in un protobuf
Value
che contiene i byte codificati in Base64. - I campi numerici nidificati sono codificati come doppio in un protobuf
Value
. - Le mappe con chiavi array, mappa o struct non sono supportate.
- I campi di stringhe nidificati sono codificati in un protobuf
Conversione da Pub/Sub Lite a Kafka
Il connettore di origine converte i messaggi Pub/Sub Lite in record Kafka nel seguente modo:
Messaggio Pub/Sub Lite | Record Kafka
(SourceRecord )
|
---|---|
key |
Chiave |
data |
Valore |
attributes |
Intestazioni |
event_time |
Timestamp. Se event_time non è presente, viene utilizzata
la data e l'ora di pubblicazione. |
Opzioni di configurazione
Oltre alle configurazioni fornite dall'API Kafka Connect, il connettore supporta le seguenti configurazioni di Pub/Sub Lite.
Opzioni di configurazione del connettore sink
Il connettore sink supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per il connettore sink di Pub/Sub Lite, il valore deve essere com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector .
|
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file in cui sono archiviate le credenziali Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un BLOB JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
pubsublite.location |
String |
Obbligatorio. La località dell'argomento Pub/Sub Lite. |
pubsublite.project |
String |
Obbligatorio. La risorsa Google Cloud che contiene l'argomento Pub/Sub Lite. |
pubsublite.topic |
String |
Obbligatorio. L'argomento Pub/Sub Lite in cui pubblicare i record Kafka. |
topics |
String |
Obbligatorio. Un elenco separato da virgole di argomenti Kafka da cui leggere. |
Opzioni di configurazione del connettore di origine
Il connettore di origine supporta le seguenti opzioni di configurazione.
Impostazione | Tipo di dati | Descrizione |
---|---|---|
connector.class |
String |
Obbligatorio. La classe Java per il connettore. Per
il connettore di origine Pub/Sub Lite, il valore deve essere
com.google.pubsublite.kafka.source.PubSubLiteSourceConnector .
|
gcp.credentials.file.path |
String |
Facoltativo. Il percorso di un file in cui sono archiviate le credenziali Google Cloud per l'autenticazione di Pub/Sub Lite. |
gcp.credentials.json |
String |
Facoltativo. Un BLOB JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite. |
kafka.topic |
String |
Obbligatorio. L'argomento Kafka che riceve messaggi da Pub/Sub Lite. |
pubsublite.location |
String |
Obbligatorio. La località dell'argomento Pub/Sub Lite. |
pubsublite.partition_flow_control.bytes |
Long |
Il numero massimo di byte in sospeso per partizione Pub/Sub Lite. Valore predefinito: 20.000.000 |
pubsublite.partition_flow_control.messages |
Long |
Il numero massimo di messaggi in sospeso per partizione Pub/Sub Lite. Valore predefinito: |
pubsublite.project |
String |
Obbligatorio. Il progetto Google Cloud che contiene l'argomento Pub/Sub Lite. |
pubsublite.subscription |
String |
Obbligatorio. Il nome della sottoscrizione Pub/Sub Lite da cui eseguire il pull dei messaggi. |
Passaggi successivi
- Differenze tra Kafka e Pub/Sub.
- Scopri di più sul connettore Kafka del gruppo Pub/Sub.
- Consulta il Repository GitHub del connettore Kafka del gruppo Pub/Sub.