Connetti Pub/Sub ad Apache Kafka

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Questo documento descrive come integrare Apache Kafka e Pub/Sub utilizzando il connettore Kafka di Pub/Sub Group.

Informazioni sul connettore Kafka di Pub/Sub Group

Apache Kafka è una piattaforma open source per eventi in streaming. È comunemente usato nelle architetture distribuite per consentire la comunicazione tra componenti a basso accoppiamento. Pub/Sub è un servizio gestito per l'invio e la ricezione di messaggi in modo asincrono. Come con Kafka, puoi utilizzare Pub/Sub per comunicare tra i componenti della tua architettura cloud.

Il connettore Kafka del gruppo Pub/Sub ti consente di integrare questi due sistemi. I seguenti connettori sono contenuti nella JAR Connector:

  • Il connettore sink legge i record da uno o più argomenti Kafka e li pubblica su Pub/Sub.
  • Il connettore di origine legge i messaggi da un argomento Pub/Sub e li pubblica su Kafka.

Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:

  • Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
  • Hai un sistema frontend che archivia gli eventi in Kafka al di fuori di Google Cloud, ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi servizi di backend, i quali devono ricevere gli eventi Kafka.
  • Raccogli i log da una soluzione Kafka on-premise e li invii a Google Cloud per l'analisi dei dati.
  • Disponi di un sistema frontend che utilizza Google Cloud, ma archivi i dati anche on-premise utilizzando Kafka.

Il connettore richiede Kafka Connect, che è un framework per lo streaming di dati tra Kafka e altri sistemi. Per utilizzare il connettore, devi eseguire Kafka Connect insieme al cluster Kafka.

Questo documento presuppone che tu conosca sia Kafka sia Pub/Sub. Prima di leggere questo documento, è consigliabile eseguire una delle guide rapide su Pub/Sub.

Inizia a utilizzare il connettore

In questa sezione troverai le seguenti attività:

  1. Configura il connettore Kafka di Pub/Sub Group.
  2. Invia eventi da Kafka a Pub/Sub.
  3. Inviare messaggi da Pub/Sub a Kafka.

Prerequisiti

Installa Kafka

Segui la guida rapida di Apache Kafka per installare un nodo singolo Kafka sulla tua macchina locale. Completa questi passaggi nella guida rapida:

  1. Scarica l'ultima release di Kafka ed estraila.
  2. Avvia l'ambiente Kafka.
  3. Creare un argomento Kafka.

Autentica

Il connettore Kafka di Pub/Sub Group deve autenticarsi con Pub/Sub per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, esegui i seguenti passaggi:

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Creare un progetto Cloud:

      gcloud projects create PROJECT_ID
    • Seleziona il progetto Cloud che hai creato:

      gcloud config set project PROJECT_ID
  5. Crea le credenziali di autenticazione per il tuo Account Google:

    gcloud auth application-default login
  6. Concedi i ruoli al tuo Account Google. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni ruolo.
  7. Installa Google Cloud CLI.
  8. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

    gcloud init
  9. Crea o seleziona un progetto Google Cloud.

    • Creare un progetto Cloud:

      gcloud projects create PROJECT_ID
    • Seleziona il progetto Cloud che hai creato:

      gcloud config set project PROJECT_ID
  10. Crea le credenziali di autenticazione per il tuo Account Google:

    gcloud auth application-default login
  11. Concedi i ruoli al tuo Account Google. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni ruolo.

Scarica il connettore JAR

Scarica il file JAR del connettore sulla tua macchina locale. Per ulteriori informazioni, consulta Acquisire il connettore nel file Leggimi di GitHub.

Copia i file di configurazione del connettore

  1. Clona o scarica il repository GitHub per il connettore.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copia il contenuto della directory config nella sottodirectory config dell'installazione di Kafka.

    cp config/* [path to Kafka installation]/config/
    

Questi file contengono impostazioni di configurazione per il connettore.

Aggiornare la configurazione di Kafka Connect

  1. Vai alla directory di Kafka.
  2. Apri il file denominato config/connect-standalone.properties in un editor di testo.
  3. Se plugin.path property viene commentato, rimuovi il commento.
  4. Aggiorna plugin.path property per includere il percorso della JAR del connettore.

    Esempio:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Imposta la proprietà offset.storage.file.filename su un nome file locale. In modalità autonoma, Kafka utilizza questo file per memorizzare i dati di offset.

    Esempio:

    offset.storage.file.filename=/tmp/connect.offsets
    

Inoltrare eventi da Kafka a Pub/Sub

Questa sezione descrive come avviare il connettore sink, pubblicare eventi su Kafka e leggere i messaggi inoltrati da Pub/Sub.

  1. Utilizzare Google Cloud CLI per creare un argomento Pub/Sub con una sottoscrizione.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Sostituisci quanto segue:

    • PUBSUB_TOPIC: il nome di un argomento Pub/Sub per la ricezione dei messaggi da Kafka.
    • PUBSUB_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub per l'argomento.
  2. Apri il file /config/cps-sink-connector.properties in un editor di testo. Aggiungi valori per le seguenti proprietà, che sono contrassegnate con "TODO" nei commenti:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Sostituisci quanto segue:

    • KAFKA_TOPICS: un elenco di argomenti Kafka separati da virgole da cui leggere.
    • PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
    • PUBSUB_TOPIC: l'argomento Pub/Sub per ricevere i messaggi da Kafka.
  3. Nella directory Kafka, esegui il comando seguente:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Segui i passaggi nella guida rapida di Apache Kafka per scrivere alcuni eventi nell'argomento Kafka.

  5. Utilizza l'interfaccia a riga di comando gcloud per leggere gli eventi da Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Inoltrare messaggi da Pub/Sub a Kafka

Questa sezione descrive come avviare il connettore di origine, pubblicare messaggi su Pub/Sub e leggere i messaggi inoltrati da Kafka.

  1. Utilizza l'interfaccia a riga di comando gcloud per creare un argomento Pub/Sub con una sottoscrizione.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Sostituisci quanto segue:

    • PUBSUB_TOPIC: il nome di un argomento Pub/Sub.
    • PUBSUB_SUBSCRIPTION: il nome di una sottoscrizione Pub/Sub.
  2. Apri il file denominato /config/cps-source-connector.properties in un editor di testo. Aggiungi valori per le seguenti proprietà, che sono contrassegnate con "TODO" nei commenti:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Sostituisci quanto segue:

    • KAFKA_TOPIC: gli argomenti di Kafka per ricevere i messaggi Pub/Sub.
    • PROJECT_ID: il progetto Google Cloud che contiene l'argomento Pub/Sub.
    • PUBSUB_TOPIC: l'argomento Pub/Sub.
  3. Nella directory Kafka, esegui il comando seguente:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Utilizza l'interfaccia a riga di comando gcloud per pubblicare un messaggio in Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leggi il messaggio di Kafka. Segui i passaggi della guida rapida di Apache Kafka per leggere i messaggi dall'argomento Kafka.

Conversione del messaggio

Un record Kafka contiene una chiave e un valore, che sono array di byte a lunghezza variabile. Facoltativamente, un record Kafka può avere anche intestazioni, che sono coppie chiave/valore. Un messaggio Pub/Sub è composto da due parti principali: il corpo del messaggio e zero o più attributi chiave-valore.

Kafka Connect utilizza i convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nei file di configurazione del connettore:

  • key.converter: il convertitore utilizzato per serializzare le chiavi di registrazione.
  • value.converter: il convertitore utilizzato per serializzare i valori dei record.

Il corpo di un messaggio Pub/Sub è un oggetto ByteString, quindi la conversione più efficiente è copiare direttamente il payload. Per questo motivo, consigliamo di utilizzare un convertitore che produca i tipi di dati primitivi (intero, Float, Stringa o Schema di byte) ove possibile, per evitare la deserializzazione e la reserizzazione dello stesso corpo del messaggio.

Conversione da Kafka a Pub/Sub

Il connettore sink converti i record Kafka in messaggi Pub/Sub nel seguente modo:

  • La chiave di record Kafka viene archiviata come attributo denominato "key" nel messaggio Pub/Sub.
  • Per impostazione predefinita, il connettore elimina tutte le intestazioni nel record Kafka. Tuttavia, se imposti l'opzione di configurazione headers.publish su true, il connettore scrive le intestazioni come attributi Pub/Sub. Il connettore ignora le intestazioni che superano i limiti relativi agli attributi di messaggio di Pub/Sub.
  • Per gli schemi con numero intero, numero in virgola mobile, stringa e byte, il connettore trasmette i byte del valore del record Kafka direttamente nel corpo del messaggio Pub/Sub.
  • Per gli schemi struct, il connettore scrive ogni campo come attributo del messaggio Pub/Sub. Ad esempio, se il campo è { "id"=123 }, il messaggio Pub/Sub risultante ha un attributo "id"="123". Il valore del campo viene sempre convertito in stringa.
  • Per gli schemi mappa, il connettore scrive ogni coppia chiave-valore come attributo del messaggio Pub/Sub. Ad esempio, se la mappa è {"alice"=1,"bob"=2}, il messaggio Pub/Sub risultante ha due attributi, "alice"="1" e "bob"="2". Le chiavi e i valori vengono convertiti in stringhe.

Gli schemi e gli schemi hanno alcuni comportamenti aggiuntivi:

  • Facoltativamente, puoi specificare un particolare campo struct o chiave mappa come corpo del messaggio, impostando la proprietà di configurazione messageBodyName. Il valore del campo o della chiave viene memorizzato come ByteString nel corpo del messaggio. Se non imposti messageBodyName, il corpo del messaggio è vuoto per gli schemi e le mappe.

  • Per i valori dell'array, il connettore supporta solo tipi di array primitivi. La sequenza di valori nell'array è concatenata in un singolo oggetto ByteString.

Conversione da Pub/Sub a Kafka

Il connettore di origine converte i messaggi Pub/Sub in record Kafka nel seguente modo:

  • Chiave di record Kafka: per impostazione predefinita, la chiave è impostata su null. Facoltativamente, puoi specificare un attributo del messaggio Pub/Sub da utilizzare come chiave impostando l'opzione di configurazione kafka.key.attribute. In questo caso, il connettore cerca un attributo con questo nome e imposta la chiave di registrazione sul valore dell'attributo. Se l'attributo specificato non è presente, la chiave di record viene impostata su null.

  • Valore del record Kafka. Il connettore scrive il valore del record nel seguente modo:

    • Se il messaggio Pub/Sub non ha attributi personalizzati, il connettore scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipo byte[], utilizzando il convertitore specificato da value.converter.

    • Se il messaggio Pub/Sub ha attributi personalizzati e kafka.record.headers è false, il connettore scrive una struct nel valore record. Lo struct contiene un campo per ogni attributo e un campo denominato "message" il cui valore è il corpo del messaggio Pub/Sub (archiviato come byte):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      In questo caso, devi utilizzare un elemento value.converter compatibile con gli schemi struct, ad esempio org.apache.kafka.connect.json.JsonConverter.

    • Se il messaggio Pub/Sub ha attributi personalizzati e kafka.record.headers è true, il connettore scrive gli attributi come intestazioni del record Kafka. scrive il corpo del messaggio Pub/Sub direttamente nel valore del record Kafka come tipo byte[], utilizzando il convertitore specificato da value.converter.

  • Intestazioni dei record Kafka. Per impostazione predefinita, le intestazioni sono vuote, a meno che non imposti kafka.record.headers su true.

Opzioni di configurazione

Oltre alle configurazioni fornite dall'API Kafka Connect, il connettore Kafka per Pub/Sub Group supporta le seguenti configurazioni.

Opzioni di configurazione del connettore sink

Il connettore sink supporta le seguenti opzioni di configurazione.

Impostazione Tipo di dati Descrizione
connector.class String Obbligatorio. La classe Java del connettore. Per il connettore sink Pub/Sub, il valore deve essere com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

L'endpoint Pub/Sub da utilizzare.

Valore predefinito: "pubsub.googleapis.com:443".

cps.project String Obbligatorio. Google Cloud che contiene l'argomento Pub/Sub.
cps.topic String Obbligatorio. L'argomento Pub/Sub in cui pubblicare i record Kafka.
gcp.credentials.file.path String Campo facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite.
gcp.credentials.json String Campo facoltativo. Un blob JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite.
headers.publish Boolean

Quando true, includi le intestazioni dei record Kafka come attributi dei messaggi Pub/Sub.

Valore predefinito: false.

maxBufferBytes Long

Il numero massimo di byte da ricevere su una partizione Kafka dell'argomento prima di pubblicarli in Pub/Sub.

Valore predefinito: 10000000.

maxBufferSize Integer

Il numero massimo di record da ricevere su una partizione di un argomento Kafka prima della pubblicazione in Pub/Sub.

Valore predefinito: 100.

maxDelayThresholdMs Integer

Il tempo di attesa massimo per raggiungere maxBufferSize o maxBufferBytes prima di pubblicare record in sospeso su Pub/Sub, in millisecondi.

Valore predefinito: 100.

maxOutstandingMessages Long

Il numero massimo di record che possono essere in sospeso, inclusi batch in sospeso e in attesa, prima che l'editore blocchi ulteriormente la pubblicazione.

Valore predefinito: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

Il numero massimo di byte totali che possono essere in sospeso, inclusi i batch incompleti e in attesa, prima che l'editore blocchi ulteriormente la pubblicazione.

Valore predefinito: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

Il timeout per singole richieste di pubblicazione su Pub/Sub, in millisecondi.

Valore predefinito: 10.000.

maxTotalTimeoutMs Integer

Il timeout totale, in millisecondi, per una chiamata a pubblicazione su Pub/Sub, inclusi i nuovi tentativi.

Valore predefinito: 60.000.

metadata.publish Boolean

Quando true, includi l'argomento, la partizione, l'offset e il timestamp di Kafka come attributi del messaggio Pub/Sub.

Valore predefinito: false.

messageBodyName String

Quando utilizzi uno schema di valori struct o mappa, specifica il nome di un campo o di una chiave da utilizzare come corpo del messaggio Pub/Sub. Consulta Conversione da Kafka a Pub/Sub.

Valore predefinito: "cps_message_body".

orderingKeySource String

Specifica come impostare la chiave di ordinamento nel messaggio Pub/Sub. Può avere uno dei seguenti valori:

  • none: non impostare la chiave per effettuare gli ordini.
  • key: usa la chiave di record Kafka come chiave di ordinamento.
  • partition: utilizza il numero di partizione convertito in una stringa come chiave di ordinamento. Utilizza questa impostazione solo per gli argomenti a bassa velocità effettiva o per gli argomenti con migliaia di partizioni.

Valore predefinito: none.

topics String Obbligatorio. Un elenco di argomenti Kafka separati da virgole da cui leggere.

Opzioni di configurazione del connettore di origine

Il connettore di origine supporta le opzioni di configurazione riportate di seguito.

Impostazione Tipo di dati Descrizione
connector.class String Obbligatorio. La classe Java del connettore. Per il connettore di origine Pub/Sub, il valore deve essere com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

L'endpoint Pub/Sub da utilizzare.

Valore predefinito: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Quando true, scrivi la chiave di ordinamento nel record Kafka utilizzando lo stesso formato degli attributi del messaggio Pub/Sub. Consulta la pagina Conversione da Pub/Sub a record Kafka.

Valore predefinito: false.

cps.maxBatchSize Integer

Il numero massimo di messaggi da inviare in batch per richiesta di pull a Pub/Sub.

Valore predefinito: 100

cps.project String Obbligatorio. Il progetto Google Cloud che contiene l'argomento Pub/Sub.
cps.subscription String Obbligatorio. Il nome della sottoscrizione Pub/Sub da cui eseguire il pull dei messaggi.
gcp.credentials.file.path String Campo facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite.
gcp.credentials.json String Campo facoltativo. Un blob JSON contenente Google Cloud per l'autenticazione di Pub/Sub Lite.
kafka.key.attribute String

L'attributo del messaggio Pub/Sub da utilizzare come chiave per i messaggi pubblicati su Kafka. Se è impostato su "orderingKey", utilizza la chiave di ordinamento del messaggio. Se null, i record Kafka non hanno una chiave.

Valore predefinito: null.

kafka.partition.count Integer

Il numero di partizioni Kafka per l'argomento Kafka in cui vengono pubblicati i messaggi. Questo parametro viene ignorato se lo schema di partizione è "kafka_partitioner".

Valore predefinito: 1.

kafka.partition.scheme String

Lo schema di assegnazione di un messaggio a una partizione in Kafka. Può essere uno dei seguenti valori:

  • round_robin: assegna le partizioni in modo arrotondato.
  • hash_key: trova la partizione eseguendo l'hashing della chiave di record.
  • hash_value: trova la partizione eseguendo l'hashing del valore del record.
  • kafka_partitioner: delega la logica di partizionamento al producer Kafka. Per impostazione predefinita, il produttore di Kafka rileva automaticamente il numero di partizioni ed esegue la mappatura delle partizioni basata su hash murmur o il round robin, a seconda che venga fornita una chiave di record o meno.
  • ordering_key: utilizza il codice hash della chiave di ordinamento di un messaggio. Se non è presente alcuna chiave di ordinamento, utilizza round_robin.

Valore predefinito: round_robin.

kafka.record.headers Boolean

Se true, scrivi gli attributi del messaggio Pub/Sub come intestazioni Kafka.

kafka.topic String Obbligatorio. L'argomento Kafka che riceve i messaggi da Pub/Sub.

Richiedere assistenza

Se hai bisogno di aiuto, crea un ticket di assistenza. Per domande e discussioni generali, crea un problema nel repository di GitHub.

Passaggi successivi