Pub/Sub mit Apache Kafka verbinden

In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub mithilfe des Pub/Sub-Gruppen-Kafka-Connectors integrieren.

Pub/Sub-Gruppen-Kafka-Connector

Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub ist ein verwalteter Dienst zum asynchronen Senden und Empfangen von Nachrichten. Wie bei Kafka können Sie Pub/Sub für die Kommunikation zwischen Komponenten in Ihrer Cloud-Architektur verwenden.

Mit dem Pub/Sub-Gruppen-Kafka-Connector können Sie diese beiden Systeme integrieren. Die folgenden Connectors sind in der Connector-JAR-Datei enthalten:

  • Der Senke-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub.
  • Der Quell-Connector liest Nachrichten aus einem Pub/Sub-Thema und veröffentlicht sie in Kafka.

Hier sind einige Szenarien, in denen Sie den Pub/Sub-Gruppen-Kafka-Connector verwenden können:

  • Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
  • Sie haben ein Frontendsystem, das Ereignisse außerhalb von Google Cloud in Kafka speichert. Sie verwenden Google Cloud jedoch auch, um einige Ihrer Backenddienste auszuführen, die die Kafka-Ereignisse empfangen müssen.
  • Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie zur Datenanalyse an Google Cloud.
  • Sie haben ein Frontendsystem, das Google Cloud verwendet, aber Sie speichern auch Daten lokal mit Kafka.

Der Connector erfordert Kafka Connect, ein Framework zum Streamen von Daten zwischen Kafka und anderen Systemen. Wenn Sie den Connector verwenden möchten, müssen Sie Kafka Connect zusammen mit Ihrem Kafka-Cluster ausführen.

In diesem Dokument wird davon ausgegangen, dass Sie mit Kafka und Pub/Sub vertraut sind. Bevor Sie dieses Dokument lesen, sollten Sie eine der Pub/Sub-Kurzanleitungen durcharbeiten.

Der Pub/Sub-Connector unterstützt keine Integration zwischen Google Cloud IAM- und Kafka Connect-ACLs.

Erste Schritte mit dem Connector

In diesem Abschnitt werden die folgenden Aufgaben beschrieben:

  1. Konfigurieren Sie den Pub/Sub-Gruppen-Kafka-Connector.
  2. Ereignisse von Kafka an Pub/Sub senden
  3. Nachrichten von Pub/Sub an Kafka senden.

Vorbereitung

Kafka installieren

Folgen Sie dem Schnellstart für Apache Kafka, um einen einzelnen Kafka-Knoten auf Ihrem lokalen Computer zu installieren. Führen Sie die folgenden Schritte im Schnellstart aus:

  1. Laden Sie die neueste Kafka-Version herunter und extrahieren Sie sie.
  2. Starten Sie die Kafka-Umgebung.
  3. Erstellen Sie ein Kafka-Thema.

Authentifizieren

Der Pub/Sub-Gruppen-Kafka-Connector muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten senden und empfangen zu können. So richten Sie die Authentifizierung ein:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

JAR-Datei des Connectors herunterladen

Laden Sie die JAR-Datei des Connectors auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie in der GitHub-Readme unter Connector herunterladen.

Connector-Konfigurationsdateien kopieren

  1. Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Kopieren Sie den Inhalt des Verzeichnisses config in das Unterverzeichnis config Ihrer Kafka-Installation.

    cp config/* [path to Kafka installation]/config/
    

Diese Dateien enthalten Konfigurationseinstellungen für den Connector.

Kafka Connect-Konfiguration aktualisieren

  1. Rufen Sie das Verzeichnis auf, das die heruntergeladene Kafka Connect-Binärdatei enthält.
  2. Öffnen Sie im Binärverzeichnis von Kafka Connect die Datei config/connect-standalone.properties in einem Texteditor.
  3. Entfernen Sie das Kommentarzeichen bei plugin.path property, falls es vorhanden ist.
  4. Aktualisieren Sie die plugin.path property, um den Pfad zum JAR-File des Connectors anzugeben.

    Beispiel:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Legen Sie für die Eigenschaft offset.storage.file.filename einen lokalen Dateinamen fest. Im Standalone-Modus verwendet Kafka diese Datei, um Offset-Daten zu speichern.

    Beispiel:

    offset.storage.file.filename=/tmp/connect.offsets
    

Ereignisse von Kafka an Pub/Sub weiterleiten

In diesem Abschnitt wird beschrieben, wie Sie den Senken-Connector starten, Ereignisse in Kafka veröffentlichen und dann die weitergeleiteten Nachrichten aus Pub/Sub lesen.

  1. Erstellen Sie mit der Google Cloud CLI ein Pub/Sub-Thema mit einem Abo.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    Ersetzen Sie Folgendes:

    • PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas, über das die Nachrichten von Kafka empfangen werden.
    • PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos für das Thema.
  2. Öffnen Sie die Datei /config/cps-sink-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    Ersetzen Sie Folgendes:

    • KAFKA_TOPICS: Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.
    • PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub-Thema enthält.
    • PUBSUB_TOPIC: Das Pub/Sub-Thema, über das die Nachrichten von Kafka empfangen werden.
  3. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Folgen Sie der Anleitung im Schnellstart für Apache Kafka, um einige Ereignisse in Ihr Kafka-Thema zu schreiben.

  5. Lesen Sie die Ereignisse mit der gcloud CLI aus Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

Nachrichten von Pub/Sub an Kafka weiterleiten

In diesem Abschnitt wird beschrieben, wie Sie den Quell-Connector starten, Nachrichten in Pub/Sub veröffentlichen und die weitergeleiteten Nachrichten aus Kafka lesen.

  1. Erstellen Sie mit der gcloud CLI ein Pub/Sub-Thema mit einem Abo.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    Ersetzen Sie Folgendes:

    • PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas.
    • PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos.
  2. Öffnen Sie die Datei /config/cps-source-connector.properties in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit "TODO" gekennzeichnet sind:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    Ersetzen Sie Folgendes:

    • KAFKA_TOPIC: Die Kafka-Themen, auf denen die Pub/Sub-Nachrichten empfangen werden sollen.
    • PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub-Thema enthält.
    • PUBSUB_TOPIC: Das Pub/Sub-Thema.
  3. Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Veröffentlichen Sie eine Nachricht mit der gcloud CLI in Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. Nachricht aus Kafka lesen Folgen Sie der Anleitung im Schnellstart für Apache Kafka, um die Nachrichten aus dem Kafka-Thema zu lesen.

Nachrichten-Conversion

Ein Kafka-Eintrag enthält einen Schlüssel und einen Wert, die Byte-Arrays mit variabler Länge sind. Optional kann ein Kafka-Eintrag auch Header enthalten, also Schlüssel/Wert-Paare. Eine Pub/Sub-Nachricht besteht aus zwei Hauptteilen: dem Nachrichtentext und null oder mehr Schlüssel/Wert-Attributen.

Kafka Connect verwendet Konverter, um Schlüssel und Werte von und nach Kafka zu serialisieren. Legen Sie zum Steuern der Serialization die folgenden Eigenschaften in den Konfigurationsdateien des Connectors fest:

  • key.converter: Der Konverter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.
  • value.converter: Der Konverter, der zum Serialisieren von Datensatzwerten verwendet wird.

Der Text einer Pub/Sub-Nachricht ist ein ByteString-Objekt. Die effizienteste Umwandlung besteht daher darin, die Nutzlast direkt zu kopieren. Aus diesem Grund empfehlen wir, nach Möglichkeit einen Konverter zu verwenden, der primitive Datentypen (Integer, Float, String oder Bytes-Schema) erzeugt, um die De- und Reserialisierung desselben Nachrichtentexts zu vermeiden.

Umstellung von Kafka auf Pub/Sub

Der Senken-Connector konvertiert Kafka-Eintragsdaten so in Pub/Sub-Nachrichten:

  • Der Kafka-Eintragsschlüssel wird als Attribut mit dem Namen "key" in der Pub/Sub-Nachricht gespeichert.
  • Standardmäßig entfernt der Connector alle Header im Kafka-Datensatz. Wenn Sie die Konfigurationsoption headers.publish jedoch auf true festlegen, schreibt der Connector die Header als Pub/Sub-Attribute. Der Connector überspringt alle Header, die die Pub/Sub-Limits für Nachrichtenattribute überschreiten.
  • Bei Ganzzahl-, Gleitkomma-, String- und Byte-Schemas übergibt der Connector die Bytes des Kafka-Eintrags direkt in den Pub/Sub-Nachrichtenkörper.
  • Bei Strukturschemata schreibt der Connector jedes Feld als Attribut der Pub/Sub-Nachricht. Wenn das Feld beispielsweise { "id"=123 } ist, hat die resultierende Pub/Sub-Nachricht das Attribut "id"="123". Der Feldwert wird immer in einen String konvertiert. Zuordnungs- und Strukturtypen werden nicht als Feldtypen innerhalb einer Struktur unterstützt.
  • Bei Kartenschemata schreibt der Connector jedes Schlüssel/Wert-Paar als Attribut der Pub/Sub-Nachricht. Wenn die Karte beispielsweise {"alice"=1,"bob"=2} ist, hat die resultierende Pub/Sub-Nachricht zwei Attribute: "alice"="1" und "bob"="2". Die Schlüssel und Werte werden in Strings umgewandelt.

Struct- und Kartenschemata haben einige zusätzliche Verhaltensweisen:

  • Optional können Sie ein bestimmtes Strukturfeld oder einen bestimmten Kartenschlüssel als Nachrichtentext angeben. Dazu müssen Sie die Konfigurationseigenschaft messageBodyName festlegen. Der Wert des Felds oder Schlüssels wird als ByteString im Nachrichtentext gespeichert. Wenn Sie messageBodyName nicht festlegen, ist der Nachrichtentext für Struktur- und Kartenschemata leer.

  • Für Arraywerte werden vom Connector nur primitive Arraytypen unterstützt. Die Wertesequenz im Array wird zu einem einzelnen ByteString-Objekt zusammengefügt.

Umstellung von Pub/Sub auf Kafka

Der Quell-Connector konvertiert Pub/Sub-Nachrichten so in Kafka-Eintragsobjekte:

  • Kafka-Eintragsschlüssel: Standardmäßig ist der Schlüssel auf null festgelegt. Optional können Sie ein Pub/Sub-Nachrichtenattribut als Schlüssel angeben, indem Sie die Konfigurationsoption kafka.key.attribute festlegen. In diesem Fall sucht der Connector nach einem Attribut mit diesem Namen und setzt den Datensatzschlüssel auf den Attributwert. Wenn das angegebene Attribut nicht vorhanden ist, wird der Datensatzschlüssel auf null gesetzt.

  • Kafka-Eintragswert Der Connector schreibt den Datensatzwert so:

    • Wenn die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, schreibt der Connector den Pub/Sub-Nachrichtentext mithilfe des von value.converter angegebenen Converters direkt als byte[]-Typ in den Kafka-Eintragswert.

    • Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und kafka.record.headers = false ist, schreibt der Connector einen Datensatz in den Datensatzwert. Die Struktur enthält ein Feld für jedes Attribut und ein Feld namens "message", dessen Wert der Pub/Sub-Nachrichtenkörper ist (als Bytes gespeichert):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      In diesem Fall müssen Sie ein value.converter verwenden, das mit struct-Schemas kompatibel ist, z. B. org.apache.kafka.connect.json.JsonConverter.

    • Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und kafka.record.headers = true ist, schreibt der Connector die Attribute als Kafka-Eintragsheader. Der Pub/Sub-Nachrichtentext wird mithilfe des von value.converter angegebenen Converters direkt als byte[]-Typ in den Kafka-Eintragswert geschrieben.

  • Kafka-Eintragsheader Die Header sind standardmäßig leer, es sei denn, Sie legen kafka.record.headers auf true fest.

Konfigurationsoptionen

Zusätzlich zu den Konfigurationen, die von der Kafka Connect API bereitgestellt werden, unterstützt der Pub/Sub-Gruppen-Kafka-Connector die Senke- und Quellkonfiguration wie in den Pub/Sub-Connector-Konfigurationen beschrieben.

Support

Wenn Sie Hilfe benötigen, erstellen Sie ein Support-Ticket. Wenn du allgemeine Fragen hast oder eine Diskussion beginnen möchtest, erstelle ein Problem im GitHub-Repository.

Nächste Schritte