Connetti Pub/Sub Lite ad Apache Kafka

Questo documento descrive come integrare Apache Kafka e Pub/Sub Lite tramite usando il connettore Kafka del gruppo Pub/Sub.

Informazioni sul connettore Kafka del gruppo Pub/Sub

Apache Kafka è una piattaforma open source per l'inserimento di flussi di eventi. Di solito utilizzata in architetture distribuite per consentire la comunicazione componenti accoppiati. Pub/Sub Lite è un servizio gestito per l'invio ricevere i messaggi in modo asincrono. Come con Kafka, puoi utilizzare Pub/Sub Lite per comunicare tra i componenti nel cloud dell'architettura.

Il connettore Kafka del gruppo Pub/Sub consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel JAR del connettore:

  • Il connettore sink legge i record da uno o più argomenti Kafka e le pubblica in Pub/Sub Lite.
  • Il connettore di origine legge i messaggi da un argomento Pub/Sub Lite e le pubblica su Kafka.

Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:

  • Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
  • Hai un sistema di frontend che archivia gli eventi in Kafka all'esterno ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi backend che devono ricevere gli eventi Kafka.
  • Puoi raccogliere i log da una soluzione Kafka on-premise e inviarli a Google Cloud per l'analisi dei dati.
  • Hai un sistema frontend che utilizza Google Cloud, ma archivi anche i dati on-premise con Kafka.

Il connettore richiede Kafka Connect che è un framework per trasmettere dati in modalità flusso tra Kafka e altri sistemi. Per utilizzare devi eseguire Kafka Connect insieme al tuo cluster Kafka.

In questo documento si presuppone che tu abbia familiarità sia con Kafka, sia in Pub/Sub Lite. Per iniziare a utilizzare Pub/Sub Lite, consulta Pubblica e ricevi messaggi in Pub/Sub Lite utilizzando la console Google Cloud.

Inizia a utilizzare il connettore Kafka del gruppo Pub/Sub

Questa sezione illustra le seguenti attività:

  1. Configurare il connettore Kafka del gruppo Pub/Sub.
  2. Inviare eventi da Kafka a Pub/Sub Lite.
  3. Inviare messaggi da Pub/Sub Lite a Kafka.

Prerequisiti

Installa Kafka

Segui le Guida rapida di Apache Kafka per installare Kafka a nodo singolo sulla tua macchina locale. Completa questi passaggi in della guida rapida:

  1. Scarica l'ultima release di Kafka ed estraila.
  2. Avvia l'ambiente Kafka.
  3. Creare un argomento Kafka.

Autentica

Il connettore Kafka del gruppo Pub/Sub deve eseguire l'autenticazione con Pub/Sub nell'ordine per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, segui questi passaggi:

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  6. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.
  7. Installa Google Cloud CLI.
  8. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Crea credenziali di autenticazione locali per il tuo Account Google:

    gcloud auth application-default login
  11. Concedi i ruoli al tuo Account Google. Esegui questo comando una volta per ciascuno dei seguenti ruoli IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni singolo ruolo.

Scarica il JAR del connettore

Scarica il file JAR del connettore sul computer locale. Per ulteriori informazioni, vedi Acquista il connettore nel file Leggimi di GitHub.

Copia i file di configurazione del connettore

  1. Clona o scarica il Repository GitHub per il connettore.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copia il contenuto della directory config nella sottodirectory config di dell'installazione di Kafka.

    cp config/* [path to Kafka installation]/config/
    

Questi file contengono le impostazioni di configurazione per il connettore.

Aggiorna la configurazione di Kafka Connect

  1. Vai alla directory che contiene il programma binario Kafka Connect che scaricato.
  2. Nella directory binaria Kafka Connect, apri il file denominato config/connect-standalone.properties in un editor di testo.
  3. Se plugin.path property ha un commento, rimuovi il commento.
  4. Aggiorna plugin.path property in modo da includere il percorso del JAR del connettore.

    Esempio:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Imposta la proprietà offset.storage.file.filename sul nome di un file locale. Nella modalità autonoma, Kafka usa questo file per archiviare i dati di offset.

    Esempio:

    offset.storage.file.filename=/tmp/connect.offsets
    

Inoltra eventi da Kafka a Pub/Sub Lite

Questa sezione descrive come avviare il connettore sink, pubblicare eventi in Kafka, e poi leggere i messaggi inoltrati da Pub/Sub Lite.

  1. Utilizza Google Cloud CLI per creare una prenotazione Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Sostituisci quanto segue:

    • RESERVATION_NAME: il nome del file Pub/Sub Lite prenotazione.
    • LOCATION: la località dell'evento prenotazione.
  2. Utilizza Google Cloud CLI per creare un argomento Pub/Sub Lite con un abbonamento.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Sostituisci quanto segue:

    • LITE_TOPIC: il nome dell'argomento Pub/Sub Lite in ricevono messaggi da Kafka.
    • LOCATION: la località dell'argomento. Il valore deve corrispondere al valore località della prenotazione.
    • RESERVATION_NAME: il nome del file Pub/Sub Lite prenotazione.
    • LITE_SUBSCRIPTION: il nome di un Pub/Sub Lite abbonamento per l'argomento.
  3. Apri il file /config/pubsub-lite-sink-connector.properties in un editor di testo. Aggiungi per le seguenti proprietà, contrassegnate come "TODO" nel commenti:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    Sostituisci quanto segue:

    • KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da leggere da cui proviene.
    • PROJECT_ID: il progetto Google Cloud che contiene Pub/Sub Lite.
    • LOCATION: la posizione dell'argomento Pub/Sub Lite.
    • LITE_TOPIC: l'argomento Pub/Sub Lite da ricevere messaggi da Kafka.
  4. Dalla directory Kafka, esegui questo comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Segui i passaggi nella Guida rapida di Apache Kafka per scrivere alcuni eventi nel tuo argomento Kafka.

  6. Sottoscrivi la sottoscrizione Pub/Sub Lite utilizzando uno qualsiasi dei i metodi mostrati Ricezione di messaggi da sottoscrizioni Lite.

Inoltrare messaggi da Pub/Sub Lite a Kafka

Questa sezione descrive come avviare il connettore di origine e pubblicare messaggi in Pub/Sub Lite e leggere i messaggi inoltrati da Kafka.

  1. Utilizza Google Cloud CLI per creare una prenotazione Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Sostituisci quanto segue:

    • RESERVATION_NAME: il nome del file Pub/Sub Lite prenotazione.
    • LOCATION: la località dell'evento prenotazione.
  2. Utilizza Google Cloud CLI per creare un argomento Pub/Sub Lite con un abbonamento.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Sostituisci quanto segue:

    • LITE_TOPIC: il nome dell'argomento Pub/Sub Lite.
    • LOCATION: la località dell'argomento. Il valore deve corrispondere al valore località della prenotazione.
    • RESERVATION_NAME: il nome del file Pub/Sub Lite prenotazione.
    • LITE_SUBSCRIPTION: il nome di un Pub/Sub Lite abbonamento per l'argomento.
  3. Apri il file denominato /config/pubsub-lite-source-connector.properties in un editor di testo. Aggiungi valori per le seguenti proprietà, contrassegnate "TODO" nei commenti:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    Sostituisci quanto segue:

    • KAFKA_TOPIC: gli argomenti Kafka per ricevere i messaggi Pub/Sub.
    • PROJECT_ID: il progetto Google Cloud che contiene Pub/Sub.
    • LOCATION: la posizione dell'argomento Pub/Sub Lite.
    • LITE_SUBSCRIPTION: l'argomento Pub/Sub Lite.
  4. Dalla directory Kafka, esegui questo comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Pubblica messaggi nell'argomento Pub/Sub Lite utilizzando uno dei illustrati in Pubblicazione di messaggi su argomenti Lite.

  6. Leggere il messaggio di Kafka. Segui i passaggi nella Guida rapida di Apache Kafka per leggere i messaggi dall'argomento Kafka.

Conversione dei messaggi

Un record Kafka contiene una chiave e un valore, che sono array di byte a lunghezza variabile. Facoltativamente, Il record Kafka può anche avere intestazioni, che sono coppie chiave/valore. R Messaggio Pub/Sub Lite contiene i seguenti campi:

  • key: chiave messaggio (bytes)
  • data: dati dei messaggi (bytes)
  • attributes: zero o più attributi. Ogni attributo è un (key,values[]) mappa. Un singolo attributo può avere più valori.
  • event_time: un timestamp facoltativo fornito dall'utente.

Kafka Connect utilizza i convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nel connettore di configurazione YAML:

  • key.converter: il convertitore utilizzato per serializzare le chiavi di registrazione.
  • value.converter: il convertitore utilizzato per serializzare i valori dei record.

Conversione da Kafka a Pub/Sub Lite

Il connettore sink converte i record Kafka in messaggi Pub/Sub Lite come segue:

Record Kafka (SinkRecord) messaggio Pub/Sub Lite
Chiave key
Valore data
Intestazioni attributes
Timestamp eventTime
Tipo di timestamp attributes["x-goog-pubsublite-source-kafka-event-time-type"]
Argomento attributes["x-goog-pubsublite-source-kafka-topic"]
Partition attributes["x-goog-pubsublite-source-kafka-offset"]
Offset attributes["x-goog-pubsublite-source-kafka-partition"]

Le chiavi, i valori e le intestazioni sono codificati nel seguente modo:

  • Gli schemi null vengono trattati come schemi di stringhe.
  • I payload in byte vengono scritti direttamente senza conversioni.
  • I payload con stringhe, numeri interi e in virgola mobile sono codificati in una sequenza di Byte UTF-8.
  • Tutti gli altri payload sono codificati in un buffer di protocollo Value e poi convertita in stringa di byte.
    • I campi stringa nidificati sono codificati in un protobuf Value.
    • I campi di byte nidificati sono codificati in un protobuf Value che contiene con codifica Base64.
    • I campi numerici nidificati vengono codificati come doppio in un protobuf Value.
    • Le mappe con chiavi array, mappa o struct non sono supportate.

Conversione da Pub/Sub Lite a Kafka

Il connettore di origine converte i messaggi Pub/Sub Lite in record Kafka come segue:

messaggio Pub/Sub Lite Record Kafka (SourceRecord)
key Chiave
data Valore
attributes Intestazioni
event_time Timestamp. Se event_time non è presente, la pubblicazione dell'audiodescrizione.

Opzioni di configurazione

Oltre alle configurazioni fornite dall'API Kafka Connect, supporta le seguenti configurazioni Pub/Sub Lite.

Opzioni di configurazione del connettore sink

Il connettore sink supporta le seguenti opzioni di configurazione.

Impostazione Tipo di dati Descrizione
connector.class String Obbligatorio. La classe Java per il connettore. Per al connettore sink Pub/Sub Lite, il valore deve essere com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector.
gcp.credentials.file.path String Facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite.
gcp.credentials.json String Facoltativo. Un blob JSON che contiene Google Cloud per l'autenticazione di Pub/Sub Lite.
pubsublite.location String Obbligatorio. La posizione Pub/Sub Lite.
pubsublite.project String Obbligatorio. Google Cloud che contiene Pub/Sub Lite.
pubsublite.topic String Obbligatorio. L'argomento Pub/Sub Lite da pubblicare Record Kafka in
topics String Obbligatorio. Un elenco separato da virgole di argomenti Kafka su per la lettura.

Opzioni di configurazione del connettore di origine

Il connettore di origine supporta le seguenti opzioni di configurazione.

Impostazione Tipo di dati Descrizione
connector.class String Obbligatorio. La classe Java per il connettore. Per al connettore di origine Pub/Sub Lite, il valore deve essere com.google.pubsublite.kafka.source.PubSubLiteSourceConnector.
gcp.credentials.file.path String Facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite.
gcp.credentials.json String Facoltativo. Un blob JSON che contiene Google Cloud per l'autenticazione di Pub/Sub Lite.
kafka.topic String Obbligatorio. L'argomento Kafka che riceve messaggi da in Pub/Sub Lite.
pubsublite.location String Obbligatorio. La posizione Pub/Sub Lite.
pubsublite.partition_flow_control.bytes Long

Il numero massimo di byte in sospeso per partizione Pub/Sub Lite.

Valore predefinito: 20.000.000

pubsublite.partition_flow_control.messages Long

Il numero massimo di messaggi in sospeso per partizione Pub/Sub Lite.

Valore predefinito: Long.MAX_VALUE

pubsublite.project String Obbligatorio. Il progetto Google Cloud che contiene Pub/Sub Lite.
pubsublite.subscription String Obbligatorio. Il nome di Pub/Sub Lite sottoscrizione da cui eseguire il pull dei messaggi.

Passaggi successivi