In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub Lite mithilfe des Pub/Sub-Gruppen-Kafka-Connectors integrieren.
Pub/Sub-Gruppen-Kafka-Connector
Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub Lite ist ein verwalteter Dienst zum asynchronen Senden und Empfangen von Nachrichten. Wie bei Kafka können Sie Pub/Sub Lite für die Kommunikation zwischen Komponenten in Ihrer Cloud-Architektur verwenden.
Mit dem Pub/Sub-Gruppen-Kafka-Connector können Sie diese beiden Systeme integrieren. Die folgenden Connectors sind in der Connector-JAR-Datei enthalten:
- Der Senke-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub Lite.
- Der Quell-Connector liest Nachrichten aus einem Pub/Sub Lite-Thema und veröffentlicht sie in Kafka.
Hier sind einige Szenarien, in denen Sie den Pub/Sub-Gruppen-Kafka-Connector verwenden können:
- Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
- Sie haben ein Frontendsystem, das Ereignisse außerhalb von Google Cloud in Kafka speichert. Sie verwenden Google Cloud jedoch auch, um einige Ihrer Backenddienste auszuführen, die die Kafka-Ereignisse empfangen müssen.
- Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie zur Datenanalyse an Google Cloud.
- Sie haben ein Frontendsystem, das Google Cloud verwendet, aber Sie speichern auch Daten lokal mit Kafka.
Der Connector erfordert Kafka Connect, ein Framework zum Streamen von Daten zwischen Kafka und anderen Systemen. Wenn Sie den Connector verwenden möchten, müssen Sie Kafka Connect zusammen mit Ihrem Kafka-Cluster ausführen.
In diesem Dokument wird davon ausgegangen, dass Sie mit Kafka und Pub/Sub Lite vertraut sind. Informationen zum Einstieg in Pub/Sub Lite finden Sie unter Mit der Google Cloud Console Nachrichten in Pub/Sub Lite veröffentlichen und empfangen.
Erste Schritte mit dem Pub/Sub-Gruppen-Kafka-Connector
In diesem Abschnitt werden die folgenden Aufgaben beschrieben:- Konfigurieren Sie den Pub/Sub-Gruppen-Kafka-Connector.
- Ereignisse von Kafka an Pub/Sub Lite senden
- Nachrichten von Pub/Sub Lite an Kafka senden.
Vorbereitung
Kafka installieren
Folgen Sie dem Schnellstart für Apache Kafka, um einen einzelnen Kafka-Knoten auf Ihrem lokalen Computer zu installieren. Führen Sie die folgenden Schritte im Schnellstart aus:
- Laden Sie die neueste Kafka-Version herunter und extrahieren Sie sie.
- Starten Sie die Kafka-Umgebung.
- Erstellen Sie ein Kafka-Thema.
Authentifizieren
Der Pub/Sub-Gruppen-Kafka-Connector muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten senden und empfangen zu können. So richten Sie die Authentifizierung ein:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
JAR-Datei des Connectors herunterladen
Laden Sie die JAR-Datei des Connectors auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie in der GitHub-Readme unter Connector herunterladen.
Connector-Konfigurationsdateien kopieren
Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Kopieren Sie den Inhalt des Verzeichnisses
config
in das Unterverzeichnisconfig
Ihrer Kafka-Installation.cp config/* [path to Kafka installation]/config/
Diese Dateien enthalten Konfigurationseinstellungen für den Connector.
Kafka Connect-Konfiguration aktualisieren
- Rufen Sie das Verzeichnis auf, das die heruntergeladene Kafka Connect-Binärdatei enthält.
- Öffnen Sie im Binärverzeichnis von Kafka Connect die Datei
config/connect-standalone.properties
in einem Texteditor. - Entfernen Sie das Kommentarzeichen bei
plugin.path property
, falls es vorhanden ist. Aktualisieren Sie die
plugin.path property
, um den Pfad zum JAR-File des Connectors anzugeben.Beispiel:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Legen Sie für die Eigenschaft
offset.storage.file.filename
einen lokalen Dateinamen fest. Im Standalone-Modus verwendet Kafka diese Datei, um Offset-Daten zu speichern.Beispiel:
offset.storage.file.filename=/tmp/connect.offsets
Ereignisse von Kafka an Pub/Sub Lite weiterleiten
In diesem Abschnitt wird beschrieben, wie Sie den Senken-Connector starten, Ereignisse in Kafka veröffentlichen und dann die weitergeleiteten Nachrichten aus Pub/Sub Lite lesen.
Erstellen Sie mit der Google Cloud CLI eine Pub/Sub Lite-Reservierung.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Ersetzen Sie Folgendes:
- RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
- LOCATION: Der Standort der Reservierung.
Erstellen Sie mit der Google Cloud CLI ein Pub/Sub Lite-Thema mit einem Abo.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Ersetzen Sie Folgendes:
- LITE_TOPIC: Der Name des Pub/Sub Lite-Themas, über das Nachrichten von Kafka empfangen werden sollen.
- LOCATION: Der Speicherort des Themas. Der Wert muss mit dem Standort der Reservierung übereinstimmen.
- RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
- LITE_SUBSCRIPTION: Der Name eines Pub/Sub Lite-Abos für das Thema.
Öffnen Sie die Datei
/config/pubsub-lite-sink-connector.properties
in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit"TODO"
gekennzeichnet sind:topics=KAFKA_TOPICS pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.topic=LITE_TOPIC
Ersetzen Sie Folgendes:
- KAFKA_TOPICS: Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.
- PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub Lite-Thema enthält.
- LOCATION: Der Speicherort des Pub/Sub Lite-Themas.
- LITE_TOPIC: Das Pub/Sub Lite-Thema, über das Nachrichten von Kafka empfangen werden sollen.
Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-sink-connector.properties
Folgen Sie der Anleitung im Schnellstart für Apache Kafka, um einige Ereignisse in Ihr Kafka-Thema zu schreiben.
Sie können das Pub/Sub Lite-Abo mit einer der Methoden abonnieren, die unter Nachrichten von Lite-Abos empfangen beschrieben werden.
Nachrichten von Pub/Sub Lite an Kafka weiterleiten
In diesem Abschnitt wird beschrieben, wie Sie den Quell-Connector starten, Nachrichten in Pub/Sub Lite veröffentlichen und die weitergeleiteten Nachrichten aus Kafka lesen.
Erstellen Sie mit der Google Cloud CLI eine Pub/Sub Lite-Reservierung.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Ersetzen Sie Folgendes:
- RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
- LOCATION: Der Standort der Reservierung.
Erstellen Sie mit der Google Cloud CLI ein Pub/Sub Lite-Thema mit einem Abo.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Ersetzen Sie Folgendes:
- LITE_TOPIC: Der Name des Pub/Sub Lite-Themas.
- LOCATION: Der Speicherort des Themas. Der Wert muss mit dem Standort der Reservierung übereinstimmen.
- RESERVATION_NAME: Der Name der Pub/Sub Lite-Reservierung.
- LITE_SUBSCRIPTION: Der Name eines Pub/Sub Lite-Abos für das Thema.
Öffnen Sie die Datei
/config/pubsub-lite-source-connector.properties
in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit"TODO"
gekennzeichnet sind:topic=KAFKA_TOPIC pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.subscription=LITE_SUBSCRIPTION
Ersetzen Sie Folgendes:
- KAFKA_TOPIC: Die Kafka-Themen, auf denen die Pub/Sub-Nachrichten empfangen werden sollen.
- PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub-Thema enthält.
- LOCATION: Der Speicherort des Pub/Sub Lite-Themas.
- LITE_SUBSCRIPTION: Das Pub/Sub Lite-Thema.
Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-source-connector.properties
Veröffentlichen Sie Nachrichten im Pub/Sub Lite-Thema mit einer der Methoden unter Nachrichten in Lite-Themen veröffentlichen.
Nachricht aus Kafka lesen Folgen Sie der Anleitung im Schnellstart für Apache Kafka, um die Nachrichten aus dem Kafka-Thema zu lesen.
Nachrichten-Conversion
Ein Kafka-Eintrag enthält einen Schlüssel und einen Wert, die Byte-Arrays mit variabler Länge sind. Optional kann ein Kafka-Eintrag auch Header enthalten, also Schlüssel/Wert-Paare. Eine Pub/Sub Lite-Nachricht enthält die folgenden Felder:
key
: Nachrichtenschlüssel (bytes
)data
: Nachrichtendaten (bytes
)attributes
: Null oder mehrere Attribute. Jedes Attribut ist eine(key,values[])
-Zuordnung. Ein einzelnes Attribut kann mehrere Werte haben.event_time
: Ein optionaler vom Nutzer angegebener Ereigniszeitstempel.
Kafka Connect verwendet Konverter, um Schlüssel und Werte von und nach Kafka zu serialisieren. Legen Sie zum Steuern der Serialization die folgenden Eigenschaften in den Konfigurationsdateien des Connectors fest:
key.converter
: Der Konverter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.value.converter
: Der Konverter, der zum Serialisieren von Datensatzwerten verwendet wird.
Umstellung von Kafka auf Pub/Sub Lite
Der Senken-Connector konvertiert Kafka-Datensätze so in Pub/Sub Lite-Nachrichten:
Kafka-Eintrag
(SinkRecord ) |
Pub/Sub Lite-Nachricht |
---|---|
Schlüssel | key |
Wert | data |
Header | attributes |
Zeitstempel | eventTime |
Zeitstempeltyp | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
Thema | attributes["x-goog-pubsublite-source-kafka-topic"] |
Partition | attributes["x-goog-pubsublite-source-kafka-offset"] |
Offset | attributes["x-goog-pubsublite-source-kafka-partition"] |
Schlüssel, Werte und Header werden so codiert:
- Null-Schemas werden als String-Schemas behandelt.
- Byte-Nutzlast wird ohne Konvertierung direkt geschrieben.
- String-, Ganzzahl- und Gleitkomman-Nutzlasten werden in eine Sequenz von UTF-8-Byte codiert.
- Alle anderen Nutzlasten werden in einen Protokollbuffer vom Typ
Value
codiert und dann in einen Byte-String umgewandelt.- Verschachtelte Stringfelder werden in ein protobuf-
Value
codiert. - Verschachtelte Byte-Felder werden in ein Protobuf
Value
codiert, das die base64-codierten Bytes enthält. - Verschachtelte numerische Felder werden als Gleitkommazahl mit doppelter Genauigkeit in ein Protobuf-
Value
codiert. - Zuordnungen mit Array-, Zuordnungs- oder Strukturschlüsseln werden nicht unterstützt.
- Verschachtelte Stringfelder werden in ein protobuf-
Umstellung von Pub/Sub Lite auf Kafka
Der Quell-Connector konvertiert Pub/Sub Lite-Nachrichten so in Kafka-Eintragsobjekte:
Pub/Sub Lite-Nachricht | Kafka-Eintrag
(SourceRecord )
|
---|---|
key |
Schlüssel |
data |
Wert |
attributes |
Header |
event_time |
Timestamp. Wenn event_time nicht vorhanden ist, wird die Veröffentlichungszeit verwendet. |
Konfigurationsoptionen
Zusätzlich zu den von der Kafka Connect API bereitgestellten Konfigurationen unterstützt der Connector die folgenden Pub/Sub Lite-Konfigurationen.
Konfigurationsoptionen für den Sink-Connector
Der Sink-Connector unterstützt die folgenden Konfigurationsoptionen.
Einstellung | Datentyp | Beschreibung |
---|---|---|
connector.class |
String |
Erforderlich. Die Java-Klasse für den Connector. Für den Pub/Sub Lite-Sink-Connector muss der Wert com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector sein.
|
gcp.credentials.file.path |
String |
Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten zur Authentifizierung von Pub/Sub Lite gespeichert sind. |
gcp.credentials.json |
String |
Optional. Ein JSON-Blob, der Google Cloud für die Authentifizierung von Pub/Sub Lite enthält. |
pubsublite.location |
String |
Erforderlich. Der Speicherort des Pub/Sub Lite-Themas. |
pubsublite.project |
String |
Erforderlich. Die Google Cloud, die das Pub/Sub Lite-Thema enthält. |
pubsublite.topic |
String |
Erforderlich. Das Pub/Sub Lite-Thema, in dem Kafka-Datensätze veröffentlicht werden sollen. |
topics |
String |
Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll. |
Konfigurationsoptionen für Quell-Connectors
Der Quell-Connector unterstützt die folgenden Konfigurationsoptionen.
Einstellung | Datentyp | Beschreibung |
---|---|---|
connector.class |
String |
Erforderlich. Die Java-Klasse für den Connector. Für den Pub/Sub Lite-Quell-Connector muss der Wert com.google.pubsublite.kafka.source.PubSubLiteSourceConnector sein.
|
gcp.credentials.file.path |
String |
Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten zur Authentifizierung von Pub/Sub Lite gespeichert sind. |
gcp.credentials.json |
String |
Optional. Ein JSON-Blob, der Google Cloud für die Authentifizierung von Pub/Sub Lite enthält. |
kafka.topic |
String |
Erforderlich. Das Kafka-Thema, das Nachrichten von Pub/Sub Lite empfängt. |
pubsublite.location |
String |
Erforderlich. Der Speicherort des Pub/Sub Lite-Themas. |
pubsublite.partition_flow_control.bytes |
Long |
Die maximale Anzahl ausstehender Byte pro Pub/Sub Lite-Partition. Standard: 20.000.000 |
pubsublite.partition_flow_control.messages |
Long |
Die maximale Anzahl ausstehender Nachrichten pro Pub/Sub Lite-Partition. Standardwert: |
pubsublite.project |
String |
Erforderlich. Das Google Cloud-Projekt, das das Pub/Sub Lite-Thema enthält. |
pubsublite.subscription |
String |
Erforderlich. Der Name des Pub/Sub Lite-Abos, aus dem Nachrichten abgerufen werden sollen. |
Nächste Schritte
- Unterschiede zwischen Kafka und Pub/Sub
- Weitere Informationen zum Pub/Sub-Gruppen-Kafka-Connector
- GitHub-Repository für den Pub/Sub-Gruppen-Kafka-Connector