In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub mithilfe des Kafka-Connectors für Pub/Sub-Gruppen einbinden.
Informationen zum Kafka-Connector für Pub/Sub-Gruppen
Apache Kafka ist eine Open-Source-Plattform zum Streamen von Ereignissen. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub ist ein verwalteter Dienst zum asynchronen Senden und Empfangen von Nachrichten. Wie bei Kafka können Sie Pub/Sub verwenden, um zwischen Komponenten in Ihrer Cloud-Architektur zu kommunizieren.
Mit dem Kafka-Connector für Pub/Sub-Gruppen können Sie diese beiden Systeme einbinden. Die folgenden Connectors sind in der Connector-JAR-Datei enthalten:
- Der Senken-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub.
- Der Quell-Connector liest Nachrichten aus einem Pub/Sub-Thema und veröffentlicht sie in Kafka.
Hier sind einige Szenarien, in denen Sie den Kafka-Connector für Pub/Sub-Gruppen verwenden können:
- Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
- Sie haben ein Frontend-System, das Ereignisse in Kafka außerhalb von Google Cloud speichert. Sie verwenden aber auch Google Cloud, um einige Ihrer Backend-Dienste auszuführen, die die Kafka-Ereignisse empfangen müssen.
- Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie zur Datenanalyse an Google Cloud.
- Sie haben ein Front-End-System, das Google Cloud verwendet, speichern Daten aber auch lokal mit Kafka.
Für den Connector ist Kafka Connect erforderlich, ein Framework für das Streaming von Daten zwischen Kafka und anderen Systemen. Wenn Sie den Connector verwenden möchten, müssen Sie Kafka Connect parallel zu Ihrem Kafka-Cluster ausführen.
In diesem Dokument wird davon ausgegangen, dass Sie mit Kafka und Pub/Sub vertraut sind. Bevor Sie dieses Dokument lesen, sollten Sie einen der Pub/Sub-Kurzanleitungen durchgehen.
Der Pub/Sub-Connector unterstützt keine Integration zwischen Google Cloud IAM- und Kafka Connect-ACLs.
Erste Schritte mit dem Connector
In diesem Abschnitt werden die folgenden Aufgaben beschrieben:- Konfigurieren Sie den Kafka-Connector für die Pub/Sub-Gruppe.
- Ereignisse von Kafka an Pub/Sub senden.
- Nachrichten von Pub/Sub an Kafka senden
Vorbereitung
Kafka installieren
Folgen Sie der Kurzanleitung zu Apache Kafka, um ein Kafka mit einem einzelnen Knoten auf Ihrem lokalen Computer zu installieren. Führen Sie in der Kurzanleitung die folgenden Schritte aus:
- Laden Sie die neueste Kafka-Version herunter und extrahieren Sie sie.
- Starten Sie die Kafka-Umgebung.
- Erstellen Sie ein Kafka-Thema.
Authentifizieren
Der Kafka-Connector für die Pub/Sub-Gruppe muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten senden und empfangen zu können. So richten Sie die Authentifizierung ein:
- Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:
gcloud auth application-default login
-
Gewähren Sie Ihrem Google-Konto Rollen. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Ersetzen Sie
PROJECT_ID
durch Ihre Projekt-ID. - Ersetzen Sie
EMAIL_ADDRESS
durch Ihre E-Mail-Adresse. - Ersetzen Sie
ROLE
durch jede einzelne Rolle.
- Ersetzen Sie
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Erstellen Sie lokale Anmeldedaten zur Authentifizierung für Ihr Google-Konto:
gcloud auth application-default login
-
Gewähren Sie Ihrem Google-Konto Rollen. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Ersetzen Sie
PROJECT_ID
durch Ihre Projekt-ID. - Ersetzen Sie
EMAIL_ADDRESS
durch Ihre E-Mail-Adresse. - Ersetzen Sie
ROLE
durch jede einzelne Rolle.
- Ersetzen Sie
Connector-JAR-Datei herunterladen
Laden Sie die Connector-JAR-Datei auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie in der GitHub-Readme-Datei unter Connector erwerben.
Connector-Konfigurationsdateien kopieren
Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Kopieren Sie den Inhalt des Verzeichnisses
config
in das Unterverzeichnisconfig
Ihrer Kafka-Installation.cp config/* [path to Kafka installation]/config/
Diese Dateien enthalten Konfigurationseinstellungen für den Connector.
Kafka Connect-Konfiguration aktualisieren
- Wechseln Sie zu dem Verzeichnis, das die von Ihnen heruntergeladene Kafka Connect-Binärdatei enthält.
- Öffnen Sie im Binärverzeichnis von Kafka Connect die Datei
config/connect-standalone.properties
in einem Texteditor. - Wenn
plugin.path property
auskommentiert ist, entfernen Sie die Kommentarzeichen. Aktualisieren Sie
plugin.path property
, um den Pfad zur Connector-JAR-Datei anzugeben.Beispiel:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Legen Sie das Attribut
offset.storage.file.filename
auf einen lokalen Dateinamen fest. Im eigenständigen Modus verwendet Kafka diese Datei zum Speichern von Offset-Daten.Beispiel:
offset.storage.file.filename=/tmp/connect.offsets
Ereignisse von Kafka an Pub/Sub weiterleiten
In diesem Abschnitt wird beschrieben, wie Sie den Senken-Connector starten, Ereignisse in Kafka veröffentlichen und dann die weitergeleiteten Nachrichten aus Pub/Sub lesen.
Erstellen Sie mit der Google Cloud CLI ein Pub/Sub-Thema mit einem Abo.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Ersetzen Sie Folgendes:
- PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas, das die Nachrichten von Kafka empfangen soll.
- PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos für das Thema.
Öffnen Sie die Datei
/config/cps-sink-connector.properties
in einem Texteditor. Fügen Sie Werte für die folgenden Attribute hinzu, die in den Kommentaren mit"TODO"
gekennzeichnet sind:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Ersetzen Sie Folgendes:
- KAFKA_TOPICS: eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.
- PROJECT_ID: Das Google Cloud-Projekt, das das Pub/Sub-Thema enthält.
- PUBSUB_TOPIC: Das Pub/Sub-Thema, das die Nachrichten von Kafka empfangen soll.
Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Führen Sie die Schritte in der Apache Kafka-Kurzanleitung aus, um einige Ereignisse in Ihr Kafka-Thema zu schreiben.
Verwenden Sie die gcloud CLI, um die Ereignisse aus Pub/Sub zu lesen.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Nachrichten von Pub/Sub an Kafka weiterleiten
In diesem Abschnitt wird beschrieben, wie Sie den Quell-Connector starten, Nachrichten in Pub/Sub veröffentlichen und die weitergeleiteten Nachrichten von Kafka lesen.
Verwenden Sie die gcloud CLI, um ein Pub/Sub-Thema mit einem Abo zu erstellen.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Ersetzen Sie Folgendes:
- PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas.
- PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos.
Öffnen Sie die Datei mit dem Namen
/config/cps-source-connector.properties
in einem Texteditor. Fügen Sie Werte für die folgenden Attribute hinzu, die in den Kommentaren als"TODO"
gekennzeichnet sind:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Ersetzen Sie Folgendes:
- KAFKA_TOPIC: Die Kafka-Themen, die die Pub/Sub-Nachrichten empfangen sollen.
- PROJECT_ID: Das Google Cloud-Projekt, das das Pub/Sub-Thema enthält.
- PUBSUB_TOPIC: Das Pub/Sub-Thema.
Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Veröffentlichen Sie mit der gcloud CLI eine Nachricht in Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Lesen Sie die Nachricht von Kafka. Führen Sie die Schritte in der Apache Kafka-Kurzanleitung aus, um die Nachrichten aus dem Kafka-Thema zu lesen.
SMS-Conversion
Ein Kafka-Eintrag enthält einen Schlüssel und einen Wert, bei denen es sich um Byte-Arrays mit variabler Länge handelt. Optional kann ein Kafka-Eintrag auch Header haben, bei denen es sich um Schlüssel/Wert-Paare handelt. Eine Pub/Sub-Nachricht besteht aus zwei Hauptteilen: dem Nachrichtentext und null oder mehr Schlüssel/Wert-Attributen.
Kafka Connect verwendet Converter, um Schlüssel und Werte zu und von Kafka zu serialisieren. Legen Sie die folgenden Attribute in den Konfigurationsdateien des Connectors fest, um die Serialisierung zu steuern:
key.converter
: Der Converter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.value.converter
: Der Converter, der zum Serialisieren von Datensatzwerten verwendet wird.
Der Text einer Pub/Sub-Nachricht ist ein ByteString
-Objekt. Daher ist die effizienteste Konvertierung das direkte Kopieren der Nutzlast. Aus diesem Grund empfehlen wir die Verwendung eines Konverters, der nach Möglichkeit primitive Datentypen (Ganzzahl-, Gleitkomma-, String- oder Byte-Schema) erzeugt, um die Deserialisierung und Reserialisierung desselben Nachrichtentexts zu verhindern.
Konvertierung von Kafka zu Pub/Sub
Der Senken-Connector konvertiert Kafka-Einträge so in Pub/Sub-Nachrichten:
- Der Kafka-Eintragsschlüssel wird als Attribut mit dem Namen
"key"
in der Pub/Sub-Nachricht gespeichert. - Standardmäßig löscht der Connector alle Header im Kafka-Eintrag. Wenn Sie die Konfigurationsoption
headers.publish
jedoch auftrue
festlegen, schreibt der Connector die Header als Pub/Sub-Attribute. Der Connector überspringt alle Header, die die Pub/Sub-Limits für Nachrichtenattribute überschreiten. - Bei Integer-, Float-, String- und Byteschemas übergibt der Connector die Byte des Kafka-Eintragswerts direkt an den Pub/Sub-Nachrichtentext.
- Bei Strukturschemas schreibt der Connector jedes Feld als Attribut der Pub/Sub-Nachricht. Wenn das Feld beispielsweise
{ "id"=123 }
ist, hat die resultierende Pub/Sub-Nachricht das Attribut"id"="123"
. Der Feldwert wird immer in einen String umgewandelt. Map- und Struct-Typen werden innerhalb einer Struktur nicht als Feldtypen unterstützt. - Bei Kartenschemas schreibt der Connector jedes Schlüssel/Wert-Paar als Attribut der Pub/Sub-Nachricht. Wenn die Karte beispielsweise
{"alice"=1,"bob"=2}
ist, hat die resultierende Pub/Sub-Nachricht die beiden Attribute"alice"="1"
und"bob"="2"
. Die Schlüssel und Werte werden in Strings konvertiert.
Struktur- und Kartenschemas weisen einige zusätzliche Verhaltensweisen auf:
Optional können Sie ein bestimmtes Strukturfeld oder einen Zuordnungsschlüssel als Nachrichtentext angeben, indem Sie das Konfigurationsattribut
messageBodyName
festlegen. Der Wert des Felds oder Schlüssels wird alsByteString
im Nachrichtentext gespeichert. Wenn SiemessageBodyName
nicht festlegen, ist der Nachrichtentext für Struktur- und Map-Schemas leer.Bei Arraywerten unterstützt der Connector nur primitive Arraytypen. Die Folge der Werte im Array wird zu einem einzigen
ByteString
-Objekt verkettet.
Konvertierung von Pub/Sub zu Kafka
Der Quellconnector konvertiert Pub/Sub-Nachrichten in Kafka-Einträge:
Kafka-Eintragsschlüssel: Standardmäßig ist der Schlüssel auf
null
festgelegt. Optional können Sie mit der Konfigurationsoptionkafka.key.attribute
ein Pub/Sub-Nachrichtenattribut als Schlüssel angeben. In diesem Fall sucht der Connector nach einem Attribut mit diesem Namen und legt den Eintragsschlüssel auf den Attributwert fest. Wenn das angegebene Attribut nicht vorhanden ist, wird der Eintragsschlüssel aufnull
gesetzt.Kafka-Eintragswert. Der Connector schreibt den Eintragswert so:
Wenn die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, schreibt der Connector den Pub/Sub-Nachrichtentext direkt als
byte[]
-Typ in den Kafka-Eintragswert. Dazu verwendet er den invalue.converter
angegebenen Konverter.Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und
kafka.record.headers
auffalse
gesetzt ist, schreibt der Connector eine Struktur in den Datensatzwert. Die Struktur enthält ein Feld für jedes Attribut und ein Feld mit dem Namen"message"
, dessen Wert der (gespeicherte als Byte) Pub/Sub-Nachrichtentext ist:{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In diesem Fall müssen Sie einen
value.converter
verwenden, der mitstruct
-Schemas kompatibel ist, z. B.org.apache.kafka.connect.json.JsonConverter
.Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und
kafka.record.headers
auftrue
gesetzt ist, schreibt der Connector die Attribute als Kafka-Eintragsheader. Der Pub/Sub-Nachrichtentext wird mithilfe des invalue.converter
angegebenen Konverters direkt alsbyte[]
-Typ in den Kafka-Eintragswert geschrieben.
Kafka-Eintragsheader. Standardmäßig sind die Header leer, es sei denn, Sie legen
kafka.record.headers
auftrue
fest.
Konfigurationsoptionen
Zusätzlich zu den von der Kafka Connect API bereitgestellten Konfigurationen unterstützt der Kafka-Connector für Pub/Sub-Gruppen die folgenden Konfigurationen.
Konfigurationsoptionen für Senken-Connectors
Der Senken-Connector unterstützt die folgenden Konfigurationsoptionen.
Einstellung | Datentyp | Beschreibung |
---|---|---|
connector.class |
String |
Erforderlich. Die Java-Klasse für den Connector. Für den Connector für die Pub/Sub-Senke muss der Wert com.google.pubsub.kafka.sink.CloudPubSubSinkConnector sein.
|
cps.endpoint |
String |
Der zu verwendende Pub/Sub-Endpunkt. Standardeinstellung: |
cps.project |
String |
Erforderlich. Die Google Cloud, die das Pub/Sub-Thema enthält. |
cps.topic |
String |
Erforderlich. Das Pub/Sub-Thema, in dem Kafka-Einträge veröffentlicht werden sollen. |
gcp.credentials.file.path |
String |
Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten zur Authentifizierung von Pub/Sub Lite gespeichert sind. |
gcp.credentials.json |
String |
Optional. Ein JSON-Blob, das Google Cloud zur Authentifizierung von Pub/Sub Lite enthält. |
headers.publish |
Boolean |
Wenn Standardeinstellung: |
maxBufferBytes |
Long |
Die maximale Anzahl von Byte, die für eine Kafka-Themenpartition vor der Veröffentlichung in Pub/Sub empfangen werden sollen. Standardwert: 10000000. |
maxBufferSize |
Integer |
Die maximale Anzahl der Datensätze, die in einer Kafka-Themenpartition empfangen werden sollen, bevor sie in Pub/Sub veröffentlicht werden. Der Standardwert is 100. |
maxDelayThresholdMs |
Integer |
Die maximale Wartezeit bis zum Erreichen von Der Standardwert is 100. |
maxOutstandingMessages |
Long |
Die maximale Anzahl der Datensätze, die ausstehen können, einschließlich unvollständiger und ausstehender Batches, bevor der Verlag oder Webpublisher die weitere Veröffentlichung blockiert. Standardeinstellung: |
maxOutstandingRequestBytes |
Long |
Die maximale Gesamtzahl der Byte, die ausstehen können, einschließlich unvollständiger und ausstehender Batches, bevor der Publisher eine weitere Veröffentlichung blockiert. Standardeinstellung: |
maxRequestTimeoutMs |
Integer |
Das Zeitlimit für einzelne Veröffentlichungsanfragen an Pub/Sub in Millisekunden. Standardwert: 10.000. |
maxTotalTimeoutMs |
Integer |
Das gesamte Zeitlimit in Millisekunden für einen Aufruf zur Veröffentlichung in Pub/Sub, einschließlich Wiederholungen. Standardwert: 60.000. |
metadata.publish |
Boolean |
Bei Standardeinstellung: |
messageBodyName |
String |
Gibt bei Verwendung eines Struktur- oder Zuordnungswertschemas den Namen eines Felds oder Schlüssels an, der als Pub/Sub-Nachrichtentext verwendet werden soll. Siehe Konvertierung von Kafka zu Pub/Sub. Standardeinstellung: |
orderingKeySource |
String |
Gibt an, wie der Sortierschlüssel in der Pub/Sub-Nachricht festgelegt wird. Kann einer der folgenden Werte sein:
Standardeinstellung: |
topics |
String |
Erforderlich. Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll. |
Konfigurationsoptionen für den Quell-Connector
Der Quell-Connector unterstützt die folgenden Konfigurationsoptionen.
Einstellung | Datentyp | Beschreibung |
---|---|---|
connector.class |
String |
Erforderlich. Die Java-Klasse für den Connector. Für den Pub/Sub-Quellconnector muss der Wert com.google.pubsub.kafka.source.CloudPubSubSourceConnector sein.
|
cps.endpoint |
String |
Der zu verwendende Pub/Sub-Endpunkt. Standardeinstellung: |
cps.makeOrderingKeyAttribute |
Boolean |
Bei Standardeinstellung: |
cps.maxBatchSize |
Integer |
Die maximale Anzahl von Nachrichten, die pro Pull-Anfrage an Pub/Sub in Batches gestapelt werden sollen. Standardeinstellung: 100 |
cps.project |
String |
Erforderlich. Das Google Cloud-Projekt, das das Pub/Sub-Thema enthält. |
cps.subscription |
String |
Erforderlich. Der Name des Pub/Sub-Abos, aus dem Nachrichten abgerufen werden sollen. |
gcp.credentials.file.path |
String |
Optional. Der Pfad zu einer Datei, in der Google Cloud-Anmeldedaten zur Authentifizierung von Pub/Sub Lite gespeichert sind. |
gcp.credentials.json |
String |
Optional. Ein JSON-Blob, das Google Cloud zur Authentifizierung von Pub/Sub Lite enthält. |
kafka.key.attribute |
String |
Das Pub/Sub-Nachrichtenattribut, das als Schlüssel für in Kafka veröffentlichte Nachrichten verwendet werden soll. Wenn Standardeinstellung: |
kafka.partition.count |
Integer |
Die Anzahl der Kafka-Partitionen für das Kafka-Thema, in denen Nachrichten veröffentlicht werden. Dieser Parameter wird ignoriert, wenn das Partitionsschema Der Standardwert ist 1. |
kafka.partition.scheme |
String |
Das Schema zum Zuweisen einer Nachricht zu einer Partition in Kafka. Folgende Werte sind möglich:
Standardeinstellung: |
kafka.record.headers |
Boolean |
Wenn |
kafka.topic |
String |
Erforderlich. Das Kafka-Thema, das Nachrichten von Pub/Sub empfängt. |
Support
Wenn Sie Hilfe benötigen, erstellen Sie ein Support-Ticket. Bei allgemeinen Fragen und Diskussionen erstellen Sie ein Problem im GitHub-Repository.
Nächste Schritte
- Unterschiede zwischen Kafka und Pub/Sub
- Weitere Informationen zum Kafka-Connector für Pub/Sub-Gruppen
- Weitere Informationen finden Sie im GitHub-Repository zu Kafka-Connector für Pub/Sub-Gruppen.