Connettere Pub/Sub ad Apache Kafka

Questo documento descrive come integrare Apache Kafka e Pub/Sub tramite usando il connettore Kafka del gruppo Pub/Sub.

Informazioni sul connettore Kafka del gruppo Pub/Sub

Apache Kafka è una piattaforma open source per l'inserimento di flussi di eventi. Di solito utilizzata in architetture distribuite per consentire la comunicazione componenti accoppiati. Pub/Sub è un servizio gestito per l'invio ricevere i messaggi in modo asincrono. Come con Kafka, puoi utilizzare Pub/Sub per comunicare tra i componenti nel cloud dell'architettura.

Il connettore Kafka del gruppo Pub/Sub consente di integrare questi due sistemi. I seguenti connettori sono pacchettizzati nel JAR del connettore:

  • Il connettore sink legge i record da uno o più argomenti Kafka e le pubblica in Pub/Sub.
  • Il connettore di origine legge i messaggi da un argomento Pub/Sub e le pubblica su Kafka.

Ecco alcuni scenari in cui potresti utilizzare il connettore Kafka del gruppo Pub/Sub:

  • Stai eseguendo la migrazione di un'architettura basata su Kafka a Google Cloud.
  • Hai un sistema di frontend che archivia gli eventi in Kafka all'esterno ma utilizzi anche Google Cloud per eseguire alcuni dei tuoi backend che devono ricevere gli eventi Kafka.
  • Puoi raccogliere i log da una soluzione Kafka on-premise e inviarli a Google Cloud per l'analisi dei dati.
  • Hai un sistema frontend che utilizza Google Cloud, ma archivi anche i dati on-premise con Kafka.

Il connettore richiede Kafka Connect che è un framework per trasmettere dati in modalità flusso tra Kafka e altri sistemi. Per utilizzare devi eseguire Kafka Connect insieme al tuo cluster Kafka.

In questo documento si presuppone che tu abbia familiarità sia con Kafka, sia in Pub/Sub. Prima di leggere questo documento, è buona norma completare una delle Guide rapide di Pub/Sub.

Il connettore Pub/Sub non supporta alcuna integrazione tra gli ACL Google Cloud IAM e Kafka Connect.

Inizia a utilizzare il connettore

Questa sezione illustra le seguenti attività:

  1. Configurare il connettore Kafka del gruppo Pub/Sub.
  2. Inviare eventi da Kafka a Pub/Sub.
  3. Inviare messaggi da Pub/Sub a Kafka.

Prerequisiti

Installa Kafka

Segui le Guida rapida di Apache Kafka per installare Kafka a nodo singolo sulla tua macchina locale. Completa questi passaggi in della guida rapida:

  1. Scarica l'ultima release di Kafka ed estraila.
  2. Avvia l'ambiente Kafka.
  3. Creare un argomento Kafka.

Autentica

Il connettore Kafka del gruppo Pub/Sub deve eseguire l'autenticazione con Pub/Sub nell'ordine per inviare e ricevere messaggi Pub/Sub. Per configurare l'autenticazione, segui questi passaggi:

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  4. Create local authentication credentials for your user account:

    gcloud auth application-default login
  5. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Scarica il JAR del connettore

Scarica il file JAR del connettore sul computer locale. Per ulteriori informazioni, vedi Acquista il connettore nel file Leggimi di GitHub.

Copia i file di configurazione del connettore

  1. Clona o scarica il Repository GitHub per il connettore.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copia il contenuto della directory config nella sottodirectory config di dell'installazione di Kafka.

    cp config/* [path to Kafka installation]/config/
    

Questi file contengono le impostazioni di configurazione per il connettore.

Aggiorna la configurazione di Kafka Connect

  1. Vai alla directory che contiene il programma binario Kafka Connect che scaricato.
  2. Nella directory binaria Kafka Connect, apri il file denominato config/connect-standalone.properties in un editor di testo.
  3. Se plugin.path property ha un commento, rimuovi il commento.
  4. Aggiorna plugin.path property in modo da includere il percorso del JAR del connettore.

    Esempio:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Imposta la proprietà offset.storage.file.filename sul nome di un file locale. Nella modalità autonoma, Kafka usa questo file per archiviare i dati di offset.

    Esempio:

    offset.storage.file.filename=/tmp/connect.offsets
    

Inoltra eventi da Kafka a Pub/Sub

Questa sezione descrive come avviare il connettore sink, pubblicare eventi in Kafka, leggere i messaggi inoltrati da Pub/Sub.

  1. Utilizza Google Cloud CLI per creare un argomento Pub/Sub con un abbonamento.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Sostituisci quanto segue:

    • PUBSUB_TOPIC: il nome di un argomento Pub/Sub in ricevono i messaggi da Kafka.
    • PUBSUB_SUBSCRIPTION: il nome di un Pub/Sub abbonamento per l'argomento.
  2. Apri il file /config/cps-sink-connector.properties in un editor di testo. Aggiungi per le seguenti proprietà, contrassegnate come "TODO" nel commenti:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Sostituisci quanto segue:

    • KAFKA_TOPICS: un elenco separato da virgole di argomenti Kafka da leggere da cui proviene.
    • PROJECT_ID: il progetto Google Cloud che contiene Pub/Sub.
    • PUBSUB_TOPIC: l'argomento Pub/Sub per ricevere messaggi da Kafka.
  3. Dalla directory Kafka, esegui questo comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Segui i passaggi nella Guida rapida di Apache Kafka per scrivere alcuni eventi nel tuo argomento Kafka.

  5. Utilizza gcloud CLI per leggere gli eventi da Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Inoltrare messaggi da Pub/Sub a Kafka

Questa sezione descrive come avviare il connettore di origine e pubblicare messaggi in Pub/Sub e leggere i messaggi inoltrati da Kafka.

  1. Utilizza gcloud CLI per creare un argomento Pub/Sub con un abbonamento.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Sostituisci quanto segue:

    • PUBSUB_TOPIC: il nome di un argomento Pub/Sub.
    • PUBSUB_SUBSCRIPTION: il nome di un Pub/Sub abbonamento.
  2. Apri il file denominato /config/cps-source-connector.properties in un testo dell'editor. Aggiungi valori per le seguenti proprietà, contrassegnate come "TODO" in i commenti:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Sostituisci quanto segue:

    • KAFKA_TOPIC: gli argomenti Kafka per ricevere i messaggi Pub/Sub.
    • PROJECT_ID: il progetto Google Cloud che contiene Pub/Sub.
    • PUBSUB_TOPIC: l'argomento Pub/Sub.
  3. Dalla directory Kafka, esegui questo comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Utilizza gcloud CLI per pubblicare un messaggio in Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leggere il messaggio di Kafka. Segui i passaggi nella Guida rapida di Apache Kafka per leggere i messaggi dall'argomento Kafka.

Conversione dei messaggi

Un record Kafka contiene una chiave e un valore, che sono array di byte a lunghezza variabile. Facoltativamente, Il record Kafka può anche avere intestazioni, che sono coppie chiave/valore. R Messaggio Pub/Sub è composto da due parti principali: il corpo del messaggio e zero o più attributi chiave-valore.

Kafka Connect utilizza i convertitori per serializzare chiavi e valori da e verso Kafka. Per controllare la serializzazione, imposta le seguenti proprietà nel connettore di configurazione YAML:

  • key.converter: il convertitore utilizzato per serializzare le chiavi di registrazione.
  • value.converter: il convertitore utilizzato per serializzare i valori dei record.

Il corpo di un messaggio Pub/Sub è un oggetto ByteString, quindi La conversione più efficiente consiste nel copiare direttamente il payload. Per questo motivo, consigliamo di utilizzare un convertitore che generi tipi di dati primitivi (numero intero, numero intero, stringa o schema di byte), ove possibile, per evitare la deserializzazione e rializziamo lo stesso corpo del messaggio.

Conversione da Kafka a Pub/Sub

Il connettore sink converte i record Kafka in messaggi Pub/Sub come che segue:

  • La chiave di record Kafka viene archiviata come attributo denominato "key" nel un messaggio Pub/Sub.
  • Per impostazione predefinita, il connettore elimina qualsiasi intestazione nel record Kafka. Tuttavia, se imposti l'opzione di configurazione headers.publish su true, il connettore scrive le intestazioni come attributi Pub/Sub. Il connettore salta qualsiasi intestazione che superi Pub/Sub limiti degli attributi dei messaggi.
  • Per gli schemi con numeri interi, in virgola mobile, stringa e byte, il connettore passa i byte del valore del record Kafka direttamente nel messaggio Pub/Sub del testo.
  • Per gli schemi struct, il connettore scrive ogni campo come attributo del un messaggio Pub/Sub. Ad esempio, se il campo è { "id"=123 }, il messaggio Pub/Sub risultante ha l'attributo "id"="123". La viene sempre convertito in una stringa. I tipi di mappa e struct non sono supportati come tipi di campo all'interno di uno struct.
  • Per gli schemi di mappa, il connettore scrive ogni coppia chiave-valore come attributo di il messaggio Pub/Sub. Ad esempio, se la mappa è {"alice"=1,"bob"=2}, il messaggio Pub/Sub risultante ha due "alice"="1" e "bob"="2". Le chiavi e i valori vengono convertiti alle stringhe.

Gli schemi di struttura e mappa presentano alcuni comportamenti aggiuntivi:

  • Facoltativamente, puoi specificare un particolare campo di struct o una chiave di mappa come campo corpo del messaggio impostando la proprietà di configurazione messageBodyName. La del campo o della chiave viene archiviato come ByteString nel corpo del messaggio. Se se non imposti messageBodyName, il corpo del messaggio è vuoto per struct e schemi delle mappe.

  • Per i valori degli array, il connettore supporta solo tipi di array primitivi. La sequenza di valori nell'array è concatenata in un unico ByteString .

Conversione da Pub/Sub a Kafka

Il connettore di origine converte i messaggi Pub/Sub in record Kafka come segue:

  • Chiave di registrazione Kafka: per impostazione predefinita, la chiave è impostata su null. Facoltativamente, puoi specificare un attributo di messaggio Pub/Sub da usare come chiave, impostare l'opzione di configurazione kafka.key.attribute. In questo caso, cerca un attributo con quel nome e imposta la chiave di record sul valore dell'attributo. Se l'attributo specificato non è presente, la chiave di record è impostato su null.

  • Valore del record Kafka. Il connettore scrive il valore del record come segue:

    • Se il messaggio Pub/Sub non ha attributi personalizzati, scrive il corpo del messaggio Pub/Sub direttamente nel record Kafka come tipo byte[], utilizzando il convertitore specificato value.converter.

    • Se il messaggio Pub/Sub contiene attributi personalizzati e kafka.record.headers è false, il connettore scrive uno struct nella e un valore record. Lo struct contiene un campo per ogni attributo e un campo denominato "message" il cui valore è il corpo del messaggio Pub/Sub (archiviati come byte):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      In questo caso, devi utilizzare un value.converter che sia compatibile con struct schemi, ad esempio org.apache.kafka.connect.json.JsonConverter.

    • Se il messaggio Pub/Sub contiene attributi personalizzati e kafka.record.headers è true, il connettore scrive gli attributi come Intestazioni dei record Kafka. Scrive il corpo del messaggio Pub/Sub direttamente al valore del record Kafka come tipo byte[], utilizzando il convertitore specificato da value.converter.

  • Intestazioni dei record Kafka. Per impostazione predefinita, le intestazioni sono vuote, a meno che non imposti Da kafka.record.headers a true.

Opzioni di configurazione

Oltre alle configurazioni fornite dall'API Kafka Connect, Il connettore Kafka del gruppo Pub/Sub supporta le seguenti configurazioni.

Opzioni di configurazione del connettore sink

Il connettore sink supporta le seguenti opzioni di configurazione.

Impostazione Tipo di dati Descrizione
connector.class String Obbligatorio. La classe Java per il connettore. Per del connettore sink Pub/Sub, il valore deve essere com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

L'endpoint Pub/Sub da utilizzare.

Valore predefinito: "pubsub.googleapis.com:443".

cps.project String Obbligatorio. Google Cloud che contiene Pub/Sub.
cps.topic String Obbligatorio. L'argomento Pub/Sub da pubblicare Record Kafka in
gcp.credentials.file.path String Facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite.
gcp.credentials.json String Facoltativo. Un blob JSON che contiene Google Cloud per l'autenticazione di Pub/Sub Lite.
headers.publish Boolean

Se true, includi tutte le intestazioni di record Kafka come degli attributi dei messaggi Pub/Sub.

Valore predefinito: false.

maxBufferBytes Long

Il numero massimo di byte da ricevere su una partizione Kafka dell'argomento prima di pubblicarle in Pub/Sub.

Valore predefinito: 10000000.

maxBufferSize Integer

Il numero massimo di record da ricevere su una partizione di argomento Kafka prima di pubblicarle in Pub/Sub.

Valore predefinito: 100.

maxDelayThresholdMs Integer

Il tempo massimo di attesa per raggiungere maxBufferSize o maxBufferBytes prima della pubblicazione record in sospeso in Pub/Sub, in millisecondi.

Valore predefinito: 100.

maxOutstandingMessages Long

Il numero massimo di record che possono essere in sospeso, inclusi batch incompleti e in attesa, prima che il publisher blocchi ulteriormente pubblicazione.

Valore predefinito: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

Il numero massimo di byte totali che possono essere in sospeso, inclusi batch incompleti e in attesa, prima che il publisher blocchi ulteriormente pubblicazione.

Valore predefinito: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

Il timeout per le singole richieste di pubblicazione in Pub/Sub, in millisecondi.

Valore predefinito: 10000.

maxTotalTimeoutMs Integer

Il timeout totale, in millisecondi, per una chiamata in cui pubblicare Pub/Sub, inclusi i nuovi tentativi.

Valore predefinito: 60.000.

metadata.publish Boolean

Quando true, includi l'argomento, la partizione, l'offset, e timestamp come attributi dei messaggi Pub/Sub.

Valore predefinito: false.

messageBodyName String

Quando utilizzi uno schema di valori di struct o mappa, specifica il nome di un campo o chiave da utilizzare come corpo del messaggio Pub/Sub. Consulta Conversione da Kafka a Pub/Sub.

Valore predefinito: "cps_message_body".

orderingKeySource String

Specifica come impostare la chiave di ordinamento in Pub/Sub per creare un nuovo messaggio email. Può essere uno dei seguenti valori:

  • none: non impostare la chiave di ordinamento.
  • key: utilizza la chiave di record Kafka come chiave di ordinamento.
  • partition: usa il numero di partizione, convertito in un stringa, come chiave di ordinamento. Utilizza questa impostazione solo per velocità effettiva bassa o argomenti con migliaia di partizioni.

Valore predefinito: none.

topics String Obbligatorio. Un elenco separato da virgole di argomenti Kafka su per la lettura.

Opzioni di configurazione del connettore di origine

Il connettore di origine supporta le seguenti opzioni di configurazione.

Impostazione Tipo di dati Descrizione
connector.class String Obbligatorio. La classe Java per il connettore. Per al connettore di origine Pub/Sub, il valore deve essere com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

L'endpoint Pub/Sub da utilizzare.

Valore predefinito: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Quando true, scrivi la chiave di ordinamento nel record Kafka, usando lo stesso formato degli attributi dei messaggi Pub/Sub. Vedi Conversione da Pub/Sub a Record Kafka.

Valore predefinito: false.

cps.maxBatchSize Integer

Il numero massimo di messaggi da raggruppare per richiesta di pull a in Pub/Sub.

Valore predefinito: 100

cps.project String Obbligatorio. Il progetto Google Cloud che contiene Pub/Sub.
cps.subscription String Obbligatorio. il nome del bucket Pub/Sub sottoscrizione da cui eseguire il pull dei messaggi.
gcp.credentials.file.path String Facoltativo. Il percorso di un file in cui sono archiviate le credenziali di Google Cloud per l'autenticazione di Pub/Sub Lite.
gcp.credentials.json String Facoltativo. Un blob JSON che contiene Google Cloud per l'autenticazione di Pub/Sub Lite.
kafka.key.attribute String

L'attributo del messaggio Pub/Sub da utilizzare come chiave dei messaggi pubblicati in Kafka. Se impostato su "orderingKey", utilizza chiave di ordinamento del messaggio. Se null, i record Kafka non hanno un chiave.

Valore predefinito: null.

kafka.partition.count Integer

Il numero di partizioni Kafka per l'argomento Kafka in cui i messaggi vengono pubblicati. Questo parametro viene ignorato se lo schema di partizione è "kafka_partitioner".

Valore predefinito: 1.

kafka.partition.scheme String

Lo schema per assegnare un messaggio a una partizione in Kafka. Può essere uno dei seguenti valori:

  • round_robin: assegna partizioni in un round robin moda.
  • hash_key: trova la partizione eseguendo l'hashing del record chiave.
  • hash_value: trova la partizione eseguendo l'hashing del record valore.
  • kafka_partitioner: delega la logica di partizionamento al produttore Kafka. Per impostazione predefinita, il producer Kafka rileva automaticamente numero di partizioni ed esegue una partizione basata su hash con omicidio o round robin, a seconda che venga fornita o meno una chiave di registrazione.
  • ordering_key: utilizza il codice hash dell'elemento chiave di ordinamento. Se non è presente alcuna chiave di ordinamento, utilizza round_robin.

Valore predefinito: round_robin.

kafka.record.headers Boolean

Se true, scrivi gli attributi del messaggio Pub/Sub come intestazioni Kafka.

kafka.topic String Obbligatorio. L'argomento Kafka che riceve messaggi da in Pub/Sub.

Richiedere assistenza

Se hai bisogno di aiuto, crea un ticket di assistenza. Per domande e discussioni generali, crea un problema nel repository GitHub.

Passaggi successivi