Pub/Sub Lite mit Apache Kafka verbinden

In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub Lite mithilfe des Kafka-Connectors für Pub/Sub-Gruppen integrieren.

Informationen zum Kafka-Connector der Pub/Sub-Gruppe

Apache Kafka ist eine Open-Source-Plattform für das Streaming von Ereignissen. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub Lite ist ein verwalteter Dienst zum asynchronen Senden und Empfangen von Nachrichten. Wie bei Kafka können Sie Pub/Sub Lite für die Kommunikation zwischen Komponenten in Ihrer Cloudarchitektur verwenden.

Mit dem Kafka-Connector für Pub/Sub-Gruppen können Sie diese beiden Systeme einbinden. Die folgenden Connectors sind in der Connector-JAR-Datei verpackt:

  • Der Senken-Connector liest Einträge aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub Lite.
  • Der Quell-Connector liest Nachrichten aus einem Pub/Sub Lite-Thema und veröffentlicht sie in Kafka.

Im Folgenden finden Sie einige Szenarien, in denen Sie den Kafka-Connector für Pub/Sub-Gruppen verwenden können:

  • Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
  • Sie haben ein Frontend-System, das Ereignisse in Kafka außerhalb von Google Cloud speichert, verwenden aber auch Google Cloud, um einige Ihrer Backend-Dienste auszuführen, die die Kafka-Ereignisse empfangen müssen.
  • Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie zur Datenanalyse an Google Cloud.
  • Sie haben ein Front-End-System, das Google Cloud verwendet, aber Sie speichern Daten auch lokal mit Kafka.

Für den Connector ist Kafka Connect erforderlich, ein Framework für das Streaming von Daten zwischen Kafka und anderen Systemen. Wenn Sie den Connector verwenden möchten, müssen Sie Kafka Connect zusammen mit Ihrem Kafka-Cluster ausführen.

In diesem Dokument wird davon ausgegangen, dass Sie sowohl mit Kafka als auch mit Pub/Sub Lite vertraut sind. Informationen zum Einstieg in Pub/Sub Lite finden Sie unter Nachrichten mit der Google Cloud Console in Pub/Sub Lite veröffentlichen und empfangen.

Erste Schritte mit dem Kafka-Connector für Pub/Sub-Gruppen

In diesem Abschnitt werden Sie durch die folgenden Aufgaben geführt:

  1. Konfigurieren Sie den Kafka-Connector der Pub/Sub-Gruppe.
  2. Ereignisse von Kafka an Pub/Sub Lite senden.
  3. Nachrichten von Pub/Sub Lite an Kafka senden.

Vorbereitung

Kafka installieren

Folgen Sie der Kurzanleitung für Apache Kafka, um eine Kafka-Datei mit einem einzelnen Knoten auf Ihrem lokalen Computer zu installieren. Führen Sie die folgenden Schritte in der Kurzanleitung aus:

  1. Laden Sie den neuesten Kafka-Release herunter und extrahieren Sie ihn.
  2. Starten Sie die Kafka-Umgebung.
  3. Erstellen Sie ein Kafka-Thema.

Authentifizieren

Der Kafka-Connector der Pub/Sub-Gruppe muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten zu senden und zu empfangen. So richten Sie die Authentifizierung ein:

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  5. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login
  6. Gewähren Sie Ihrem Google-Konto Rollen. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie EMAIL_ADDRESS durch Ihre E-Mail-Adresse.
    • Ersetzen Sie ROLE durch jede einzelne Rolle.
  7. Installieren Sie die Google Cloud CLI.
  8. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  9. Google Cloud-Projekt erstellen oder auswählen.

    • Erstellen Sie ein Google Cloud-Projekt:

      gcloud projects create PROJECT_ID

      Ersetzen Sie PROJECT_ID durch einen Namen für das Google Cloud-Projekt, das Sie erstellen.

    • Wählen Sie das von Ihnen erstellte Google Cloud-Projekt aus:

      gcloud config set project PROJECT_ID

      Ersetzen Sie PROJECT_ID durch den Namen Ihres Google Cloud-Projekts.

  10. Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:

    gcloud auth application-default login
  11. Gewähren Sie Ihrem Google-Konto Rollen. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Ersetzen Sie PROJECT_ID durch Ihre Projekt-ID.
    • Ersetzen Sie EMAIL_ADDRESS durch Ihre E-Mail-Adresse.
    • Ersetzen Sie ROLE durch jede einzelne Rolle.

Connector-JAR-Datei herunterladen

Laden Sie die Connector-JAR-Datei auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie in der GitHub-Readme-Datei unter Connector erwerben.

Konfigurationsdateien des Connectors kopieren

  1. Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Kopieren Sie den Inhalt des Verzeichnisses config in das Unterverzeichnis config Ihrer Kafka-Installation.

    cp config/* [path to Kafka installation]/config/
    

Diese Dateien enthalten Konfigurationseinstellungen für den Connector.

Kafka Connect-Konfiguration aktualisieren

  1. Wechseln Sie zu dem Verzeichnis, das die heruntergeladene Kafka Connect-Binärdatei enthält.
  2. Öffnen Sie im Binärverzeichnis von Kafka Connect die Datei config/connect-standalone.properties in einem Texteditor.
  3. Wenn das plugin.path property auskommentiert ist, entfernen Sie das Kommentarzeichen.
  4. Aktualisieren Sie plugin.path property, um den Pfad zur Connector-JAR-Datei aufzunehmen.

    Beispiel:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Legen Sie für das Attribut offset.storage.file.filename einen lokalen Dateinamen fest. Im eigenständigen Modus verwendet Kafka diese Datei zum Speichern von Offset-Daten.

    Beispiel:

    offset.storage.file.filename=/tmp/connect.offsets
    

Ereignisse von Kafka an Pub/Sub Lite weiterleiten

In diesem Abschnitt wird beschrieben, wie Sie den Senken-Connector starten, Ereignisse in Kafka veröffentlichen und dann die weitergeleiteten Nachrichten von Pub/Sub Lite lesen.

  1. Verwenden Sie die Google Cloud CLI, um eine Pub/Sub Lite-Reservierung zu erstellen.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Ersetzen Sie Folgendes:

    • RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
    • LOCATION: Der Standort der Reservierung.
  2. Verwenden Sie die Google Cloud CLI, um ein Pub/Sub Lite-Thema mit einem Abo zu erstellen.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Ersetzen Sie Folgendes:

    • LITE_TOPIC: Der Name des Pub/Sub Lite-Themas, in dem Nachrichten von Kafka empfangen werden sollen.
    • LOCATION: Der Speicherort des Themas. Der Wert muss mit dem Standort der Reservierung übereinstimmen.
    • RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
    • LITE_SUBSCRIPTION: Der Name eines Pub/Sub Lite-Abos für das Thema.
  3. Öffnen Sie die Datei /config/pubsub-lite-sink-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Attribute hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    Ersetzen Sie Folgendes:

    • KAFKA_TOPICS: Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.
    • PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub Lite-Thema enthält.
    • LOCATION: Der Speicherort des Pub/Sub Lite-Themas.
    • LITE_TOPIC: Das Pub/Sub Lite-Thema, das Nachrichten von Kafka empfangen soll.
  4. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Folgen Sie den Schritten in der Kurzanleitung zu Apache Kafka, um einige Ereignisse in Ihr Kafka-Thema zu schreiben.

  6. Abonnieren Sie das Pub/Sub Lite-Abo mit einer der Methoden, die unter Nachrichten von Lite-Abos empfangen beschrieben werden.

Nachrichten von Pub/Sub Lite an Kafka weiterleiten

In diesem Abschnitt wird beschrieben, wie Sie den Quell-Connector starten, Nachrichten in Pub/Sub Lite veröffentlichen und die weitergeleiteten Nachrichten von Kafka lesen.

  1. Verwenden Sie die Google Cloud CLI, um eine Pub/Sub Lite-Reservierung zu erstellen.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Ersetzen Sie Folgendes:

    • RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
    • LOCATION: Der Standort der Reservierung.
  2. Verwenden Sie die Google Cloud CLI, um ein Pub/Sub Lite-Thema mit einem Abo zu erstellen.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Ersetzen Sie Folgendes:

    • LITE_TOPIC: Der Name des Pub/Sub Lite-Themas.
    • LOCATION: Der Speicherort des Themas. Der Wert muss mit dem Standort der Reservierung übereinstimmen.
    • RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
    • LITE_SUBSCRIPTION: Der Name eines Pub/Sub Lite-Abos für das Thema.
  3. Öffnen Sie die Datei /config/pubsub-lite-source-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Attribute hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    Ersetzen Sie Folgendes:

    • KAFKA_TOPIC: Die Kafka-Themen, die die Pub/Sub-Nachrichten empfangen sollen.
    • PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub-Thema enthält.
    • LOCATION: Der Speicherort des Pub/Sub Lite-Themas.
    • LITE_SUBSCRIPTION: Das Pub/Sub Lite-Thema.
  4. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Veröffentlichen Sie Nachrichten im Pub/Sub Lite-Thema mit einer der Methoden, die unter Nachrichten für Lite-Themen veröffentlichen beschrieben werden.

  6. Lesen Sie die Nachricht von Kafka. Folgen Sie den Schritten in der Kurzanleitung zu Apache Kafka, um die Nachrichten zum Kafka-Thema zu lesen.

SMS-Conversion

Ein Kafka-Eintrag enthält einen Schlüssel und einen Wert, bei denen es sich um Bytearrays mit variabler Länge handelt. Optional kann ein Kafka-Eintrag auch Header enthalten, bei denen es sich um Schlüssel/Wert-Paare handelt. Eine Pub/Sub Lite-Nachricht enthält die folgenden Felder:

  • key: Nachrichtenschlüssel (bytes)
  • data: Nachrichtendaten (bytes)
  • attributes: null oder mehr Attribute. Jedes Attribut ist eine (key,values[])-Zuordnung. Ein einzelnes Attribut kann mehrere Werte haben.
  • event_time: Ein optionaler, vom Nutzer bereitgestellter Ereigniszeitstempel.

Kafka Connect verwendet Converter, um Schlüssel und Werte zu und von Kafka zu synchronisieren. Legen Sie die folgenden Attribute in den Konfigurationsdateien des Connectors fest, um die Serialisierung zu steuern:

  • key.converter: Der Converter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.
  • value.converter: Der Converter, der zum Serialisieren von Datensatzwerten verwendet wird.

Konvertierung von Kafka zu Pub/Sub Lite

Der Senken-Connector konvertiert Kafka-Einträge so in Pub/Sub Lite-Nachrichten.

Kafka-Eintrag (SinkRecord) Pub/Sub Lite-Nachricht
Schlüssel key
Wert data
Header attributes
Zeitstempel eventTime
Zeitstempeltyp attributes["x-goog-pubsublite-source-kafka-event-time-type"]
Thema attributes["x-goog-pubsublite-source-kafka-topic"]
Partition attributes["x-goog-pubsublite-source-kafka-offset"]
Offset attributes["x-goog-pubsublite-source-kafka-partition"]

Schlüssel, Werte und Header werden wie folgt codiert:

  • Nullschemas werden als Stringschemas behandelt.
  • Die Nutzlasten von Byte werden direkt und ohne Konvertierung geschrieben.
  • String-, Ganzzahl- und Gleitkommautzlasten werden in einer Sequenz von UTF-8-Byte codiert.
  • Alle anderen Nutzlasten werden in einen Protokollzwischenspeicher vom Typ Value codiert und dann in einen Bytestring konvertiert.
    • Verschachtelte Stringfelder werden in einem protobuf-Value codiert.
    • Verschachtelte Bytefelder werden in einem Protokollpuffer Value codiert, der die base64-codierten Byte enthält.
    • Verschachtelte numerische Felder werden als Double in einem protobuf-Value codiert.
    • Karten mit Array-, Map- oder Struct-Schlüsseln werden nicht unterstützt.

Konvertierung von Pub/Sub Lite zu Kafka

Der Quell-Connector konvertiert Pub/Sub Lite-Nachrichten so in Kafka-Einträge:

Pub/Sub Lite-Nachricht Kafka-Eintrag (SourceRecord)
key Schlüssel
data Wert
attributes Header
event_time Timestamp. Wenn event_time nicht vorhanden ist, wird der Veröffentlichungszeitpunkt verwendet.

Konfigurationsoptionen

Zusätzlich zu den von der Kafka Connect API bereitgestellten Konfigurationen unterstützt der Connector die folgenden Pub/Sub Lite-Konfigurationen.

Konfigurationsoptionen für Senken-Connectors

Der Senken-Connector unterstützt die folgenden Konfigurationsoptionen.

Einstellung Datentyp Beschreibung
connector.class String Erforderlich. Die Java-Klasse für den Connector. Für den Pub/Sub Lite-Senken-Connector muss der Wert com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector sein.
gcp.credentials.file.path String Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten zur Authentifizierung von Pub/Sub Lite gespeichert sind.
gcp.credentials.json String Optional. Ein JSON-Blob, das Google Cloud zur Authentifizierung von Pub/Sub Lite enthält.
pubsublite.location String Erforderlich. Der Speicherort des Pub/Sub Lite-Themas.
pubsublite.project String Erforderlich. Die Google Cloud, die das Pub/Sub Lite-Thema enthält.
pubsublite.topic String Erforderlich. Das Pub/Sub Lite-Thema, in dem Kafka-Einträge veröffentlicht werden sollen.
topics String Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.

Konfigurationsoptionen für Quell-Connector

Der Quell-Connector unterstützt die folgenden Konfigurationsoptionen.

Einstellung Datentyp Beschreibung
connector.class String Erforderlich. Die Java-Klasse für den Connector. Für den Pub/Sub Lite-Quell-Connector muss der Wert com.google.pubsublite.kafka.source.PubSubLiteSourceConnector sein.
gcp.credentials.file.path String Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten zur Authentifizierung von Pub/Sub Lite gespeichert sind.
gcp.credentials.json String Optional. Ein JSON-Blob, das Google Cloud zur Authentifizierung von Pub/Sub Lite enthält.
kafka.topic String Erforderlich. Das Kafka-Thema, das Nachrichten von Pub/Sub Lite empfängt.
pubsublite.location String Erforderlich. Der Speicherort des Pub/Sub Lite-Themas.
pubsublite.partition_flow_control.bytes Long

Die maximale Anzahl ausstehender Byte pro Pub/Sub Lite-Partition.

Standard: 20.000.000

pubsublite.partition_flow_control.messages Long

Die maximale Anzahl ausstehender Nachrichten pro Pub/Sub Lite-Partition.

Standardwert: Long.MAX_VALUE

pubsublite.project String Erforderlich. Das Google Cloud-Projekt, das das Pub/Sub Lite-Thema enthält.
pubsublite.subscription String Erforderlich. Der Name des Pub/Sub Lite-Abos, aus dem Nachrichten abgerufen werden sollen.

Nächste Schritte