Pub/Sub Lite mit Apache Kafka verbinden

In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub Lite mithilfe des Pub/Sub-Gruppen-Kafka-Connectors integrieren.

Pub/Sub-Gruppen-Kafka-Connector

Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub Lite ist ein verwalteter Dienst zum Nachrichten asynchron empfangen. Wie bei Kafka können Sie Pub/Sub Lite für die Kommunikation zwischen Komponenten in der Cloud Architektur.

Mit dem Pub/Sub-Gruppen-Kafka-Connector können Sie diese beiden Systeme integrieren. Die folgenden Connectors sind in der Connector-JAR-Datei enthalten:

  • Der Senke-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub Lite.
  • Der Quell-Connector liest Nachrichten aus einem Pub/Sub Lite-Thema. und veröffentlicht sie in Kafka.

Hier sind einige Szenarien, in denen Sie den Pub/Sub-Gruppen-Kafka-Connector verwenden können:

  • Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
  • Sie haben ein Frontend-System, das Ereignisse in Kafka außerhalb von Google Cloud, aber Sie verwenden auch Google Cloud, um einen Teil Ihres Back-Ends auszuführen. die die Kafka-Ereignisse empfangen müssen.
  • Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie zur Datenanalyse an Google Cloud.
  • Sie haben ein Front-End-System, das Google Cloud verwendet, speichern aber auch Daten. lokal mit Kafka ausgeführt werden.

Für den Connector ist Folgendes erforderlich: Kafka Connect ein Framework für das Streaming von Daten zwischen Kafka und anderen Systemen. Wenn Sie den Connector verwenden möchten, müssen Sie Kafka Connect zusammen mit Ihrem Kafka-Cluster ausführen.

In diesem Dokument wird davon ausgegangen, dass Sie mit Kafka und Pub/Sub Lite Informationen zu den ersten Schritten mit Pub/Sub Lite finden Sie unter Nachrichten in Pub/Sub Lite mit der Google Cloud Console veröffentlichen und empfangen

Erste Schritte mit dem Pub/Sub-Gruppen-Kafka-Connector

Dieser Abschnitt führt Sie durch die folgenden Aufgaben:

  1. Konfigurieren Sie den Kafka-Connector für die Pub/Sub-Gruppe.
  2. Ereignisse von Kafka an Pub/Sub Lite senden
  3. Nachrichten von Pub/Sub Lite an Kafka senden.

Vorbereitung

Kafka installieren

Folgen Sie Kurzanleitung zu Apache Kafka um ein Kafka mit einem einzelnen Knoten auf Ihrem lokalen Computer zu installieren. Führen Sie diese Schritte aus in der Kurzanleitung:

  1. Laden Sie die neueste Kafka-Version herunter und extrahieren Sie sie.
  2. Starten Sie die Kafka-Umgebung.
  3. Erstellen Sie ein Kafka-Thema.

Authentifizieren

Der Kafka-Connector für die Pub/Sub-Gruppe muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten senden und empfangen. So richten Sie die Authentifizierung ein:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Connector-JAR-Datei herunterladen

Laden Sie die JAR-Datei des Connectors auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie unter Connector beschaffen in der GitHub-Readme-Datei.

Connector-Konfigurationsdateien kopieren

  1. Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Kopieren Sie den Inhalt des Verzeichnisses config in das Unterverzeichnis config von Ihre Kafka-Installation.

    cp config/* [path to Kafka installation]/config/
    

Diese Dateien enthalten Konfigurationseinstellungen für den Connector.

Kafka Connect-Konfiguration aktualisieren

  1. Rufen Sie das Verzeichnis auf, das die heruntergeladene Kafka Connect-Binärdatei enthält.
  2. Öffnen Sie im Binärverzeichnis von Kafka Connect die Datei namens config/connect-standalone.properties in einem Texteditor.
  3. Entfernen Sie das Kommentarzeichen bei plugin.path property, falls es vorhanden ist.
  4. Aktualisieren Sie plugin.path property, um den Pfad zur Connector-JAR-Datei anzugeben.

    Beispiel:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Legen Sie das Attribut offset.storage.file.filename auf einen lokalen Dateinamen fest. In im eigenständigen Modus verwendet wird, verwendet Kafka diese Datei zum Speichern von Offset-Daten.

    Beispiel:

    offset.storage.file.filename=/tmp/connect.offsets
    

Ereignisse von Kafka an Pub/Sub Lite weiterleiten

In diesem Abschnitt wird beschrieben, wie Sie den Senken-Connector starten, Ereignisse in Kafka veröffentlichen und dann die weitergeleiteten Nachrichten aus Pub/Sub Lite lesen.

  1. Erstellen Sie mit der Google Cloud CLI eine Pub/Sub Lite-Reservierung.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    Ersetzen Sie Folgendes:

    • RESERVATION_NAME: Der Name von Pub/Sub Lite. Reservierung.
    • LOCATION: Der Standort des Reservierung.
  2. Verwenden Sie die Google Cloud CLI, um ein Pub/Sub Lite-Thema mit einem Abo.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    Ersetzen Sie Folgendes:

    • LITE_TOPIC: Der Name des Pub/Sub Lite-Themas, das zu aktualisieren ist. Nachrichten von Kafka empfangen.
    • LOCATION: Der Speicherort des Themas. Der Wert muss mit dem Standort der Reservierung übereinstimmen.
    • RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
    • LITE_SUBSCRIPTION: Der Name von Pub/Sub Lite. Abo für das Thema.
  3. Öffnen Sie die Datei /config/pubsub-lite-sink-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Eigenschaften hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC

    Ersetzen Sie Folgendes:

    • KAFKA_TOPICS: eine durch Kommas getrennte Liste von zu lesenden Kafka-Themen aus.
    • PROJECT_ID: Das Google Cloud-Projekt, das die Datei enthält. Pub/Sub Lite-Thema.
    • LOCATION: Der Speicherort des Pub/Sub Lite-Themas.
    • LITE_TOPIC: Das zu empfangende Pub/Sub Lite-Thema Nachrichten von Kafka gesendet.
  4. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Folgen Sie der Anleitung in der Kurzanleitung zu Apache Kafka um einige Ereignisse in Ihr Kafka-Thema zu schreiben.

  6. Abonnieren Sie das Pub/Sub Lite-Abo mit einer der folgenden Methoden: Methoden aus Nachrichten von Lite-Abos empfangen

Nachrichten von Pub/Sub Lite an Kafka weiterleiten

In diesem Abschnitt wird beschrieben, wie Sie den Quell-Connector starten, Pub/Sub Lite und lesen Sie die weitergeleiteten Nachrichten von Kafka.

  1. Verwenden Sie die Google Cloud CLI, um eine Pub/Sub Lite-Reservierung zu erstellen.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4

    Ersetzen Sie Folgendes:

    • RESERVATION_NAME: Der Name von Pub/Sub Lite. Reservierung.
    • LOCATION: Der Standort des Reservierung.
  2. Verwenden Sie die Google Cloud CLI, um ein Pub/Sub Lite-Thema mit einem Abo.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC

    Ersetzen Sie Folgendes:

    • LITE_TOPIC: Der Name des Pub/Sub Lite-Themas.
    • LOCATION: Der Speicherort des Themas. Der Wert muss mit dem Standort der Reservierung übereinstimmen.
    • RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
    • LITE_SUBSCRIPTION: Der Name eines Pub/Sub Lite-Abos für das Thema.
  3. Öffnen Sie die Datei /config/pubsub-lite-source-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION

    Ersetzen Sie Folgendes:

    • KAFKA_TOPIC: Die Kafka-Themen, an die die Pub/Sub-Nachrichten gesendet werden sollen.
    • PROJECT_ID: Das Google Cloud-Projekt, das die Datei enthält. Pub/Sub-Thema.
    • LOCATION: Der Speicherort des Pub/Sub Lite-Themas.
    • LITE_SUBSCRIPTION: Das Pub/Sub Lite-Thema.
  4. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Veröffentlichen Sie Nachrichten in dem Pub/Sub Lite-Thema mit einer der Methoden in Nachrichten in Lite-Themen veröffentlichen

  6. Nachricht aus Kafka lesen Folgen Sie der Anleitung in der Kurzanleitung zu Apache Kafka um die Nachrichten aus dem Kafka-Thema zu lesen.

SMS-Conversion

Einen Kafka-Eintrag enthält einen Schlüssel und einen Wert, bei denen es sich um Byte-Arrays mit variabler Länge handelt. Optional kann ein Der Kafka-Eintrag kann auch Header haben, bei denen es sich um Schlüssel/Wert-Paare handelt. Eine Pub/Sub Lite-Nachricht enthält die folgenden Felder:

  • key: Nachrichtenschlüssel (bytes)
  • data: Nachrichtendaten (bytes)
  • attributes: Null oder mehrere Attribute. Jedes Attribut ist ein (key,values[]) Ein einzelnes Attribut kann mehrere Werte haben.
  • event_time: Ein optionaler vom Nutzer angegebener Ereigniszeitstempel.

Kafka Connect verwendet Konverter, um Schlüssel und Werte von und nach Kafka zu serialisieren. Legen Sie zum Steuern der Serialization die folgenden Eigenschaften in den Konfigurationsdateien des Connectors fest:

  • key.converter: Der Converter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.
  • value.converter: Der Converter, der zum Serialisieren von Datensatzwerten verwendet wird.

Konvertierung von Kafka zu Pub/Sub Lite

Der Senken-Connector konvertiert Kafka-Datensätze so in Pub/Sub Lite-Nachrichten:

Kafka-Eintrag (SinkRecord) Pub/Sub Lite-Nachricht
Schlüssel key
Wert data
Header attributes
Zeitstempel eventTime
Zeitstempeltyp attributes["x-goog-pubsublite-source-kafka-event-time-type"]
Thema attributes["x-goog-pubsublite-source-kafka-topic"]
Partition attributes["x-goog-pubsublite-source-kafka-offset"]
Offset attributes["x-goog-pubsublite-source-kafka-partition"]

Schlüssel, Werte und Header werden so codiert:

  • Nullschemas werden als Stringschemas behandelt.
  • Bytenutzlasten werden direkt und ohne Konvertierung geschrieben.
  • String-, Ganzzahl- und Gleitkomman-Nutzlasten werden in eine Sequenz von UTF-8-Byte codiert.
  • Alle anderen Nutzlasten werden in einem Protokollzwischenspeicher codiert. Value und dann in einen Byte-String konvertiert.
    • Verschachtelte Stringfelder werden in einen protobuf-Value codiert.
    • Verschachtelte Bytes-Felder werden in einen Protobuf-Value codiert, der die base64-codierten Bytes enthält.
    • Verschachtelte numerische Felder werden als Double in einem protobuf-Value codiert.
    • Zuordnungen mit Array-, Zuordnungs- oder Strukturschlüsseln werden nicht unterstützt.

Konvertierung von Pub/Sub Lite zu Kafka

Der Quell-Connector konvertiert Pub/Sub Lite-Nachrichten in Kafka-Eintragsdaten auf folgende Weise:

Pub/Sub Lite-Nachricht Kafka-Eintrag (SourceRecord)
key Schlüssel
data Wert
attributes Header
event_time Timestamp. Wenn event_time nicht vorhanden ist, wird die Veröffentlichungszeit verwendet.

Konfigurationsoptionen

Zusätzlich zu den von der Kafka Connect API bereitgestellten Konfigurationen unterstützt der Connector die folgenden Pub/Sub Lite-Konfigurationen.

Konfigurationsoptionen für den Sink-Connector

Der Senken-Connector unterstützt die folgenden Konfigurationsoptionen.

Einstellung Datentyp Beschreibung
connector.class String Erforderlich. Die Java-Klasse für den Connector. Für Pub/Sub Lite-Senken-Connector ist, muss der Wert com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector
gcp.credentials.file.path String Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten gespeichert sind zur Authentifizierung von Pub/Sub Lite.
gcp.credentials.json String Optional. Ein JSON-Blob, der Google Cloud für die Authentifizierung von Pub/Sub Lite enthält.
pubsublite.location String Erforderlich. Der Speicherort des Pub/Sub Lite-Themas.
pubsublite.project String Erforderlich. Die Google Cloud, die die Pub/Sub Lite-Thema.
pubsublite.topic String Erforderlich. Das Pub/Sub Lite-Thema, in dem Kafka-Datensätze veröffentlicht werden sollen.
topics String Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen Sie ablesen können.

Konfigurationsoptionen für den Quell-Connector

Der Quell-Connector unterstützt die folgenden Konfigurationsoptionen.

Einstellung Datentyp Beschreibung
connector.class String Erforderlich. Die Java-Klasse für den Connector. Für Pub/Sub Lite-Quell-Connector nutzt, muss der Wert com.google.pubsublite.kafka.source.PubSubLiteSourceConnector
gcp.credentials.file.path String Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten gespeichert sind zur Authentifizierung von Pub/Sub Lite.
gcp.credentials.json String Optional. Ein JSON-Blob, der Google Cloud für die Authentifizierung von Pub/Sub Lite enthält.
kafka.topic String Erforderlich. Das Kafka-Thema, das Nachrichten von Pub/Sub Lite empfängt.
pubsublite.location String Erforderlich. Der Speicherort des Pub/Sub Lite-Themas.
pubsublite.partition_flow_control.bytes Long

Die maximale Anzahl ausstehender Byte pro Pub/Sub Lite-Partition.

Standardeinstellung: 20.000.000

pubsublite.partition_flow_control.messages Long

Die maximale Anzahl ausstehender Nachrichten pro Pub/Sub Lite-Partition.

Standardwert: Long.MAX_VALUE

pubsublite.project String Erforderlich. Das Google Cloud-Projekt, das das Pub/Sub Lite-Thema enthält.
pubsublite.subscription String Erforderlich. Der Name von Pub/Sub Lite Abo, aus dem Nachrichten abgerufen werden sollen.

Nächste Schritte