In diesem Dokument wird beschrieben, wie Sie Apache Kafka und Pub/Sub mit dem Kafka-Connector für Pub/Sub-Gruppen.
Informationen zum Kafka-Connector für Pub/Sub-Gruppen
Apache Kafka ist eine Open-Source-Plattform für Streamingereignisse. Es wird häufig in verteilten Architekturen verwendet, um die Kommunikation zwischen lose gekoppelten Komponenten zu ermöglichen. Pub/Sub ist ein verwalteter Dienst zum Nachrichten asynchron empfangen. Wie bei Kafka können Sie Pub/Sub für die Kommunikation zwischen Komponenten in der Cloud Architektur.
Mit dem Kafka-Connector für Pub/Sub-Gruppen können Sie diese beiden Systeme einbinden. Die folgenden Connectors sind in der Connector-JAR-Datei enthalten:
- Der Senken-Connector liest Datensätze aus einem oder mehreren Kafka-Themen und veröffentlicht sie in Pub/Sub.
- Der Quell-Connector liest Nachrichten aus einem Pub/Sub-Thema. und veröffentlicht sie in Kafka.
Hier sind einige Szenarien, in denen Sie den Kafka-Connector für Pub/Sub-Gruppen verwenden können:
- Sie migrieren eine Kafka-basierte Architektur zu Google Cloud.
- Sie haben ein Frontend-System, das Ereignisse in Kafka außerhalb von Google Cloud, aber Sie verwenden auch Google Cloud, um einen Teil Ihres Back-Ends auszuführen. die die Kafka-Ereignisse empfangen müssen.
- Sie erfassen Logs aus einer lokalen Kafka-Lösung und senden sie an Google Cloud für Datenanalysen
- Sie haben ein Frontendsystem, das Google Cloud verwendet, aber Sie speichern auch Daten lokal mit Kafka.
Für den Connector ist Folgendes erforderlich: Kafka Connect ein Framework für das Streaming von Daten zwischen Kafka und anderen Systemen. Wenn Sie den Connector verwenden möchten, müssen Sie Kafka Connect zusammen mit Ihrem Kafka-Cluster ausführen.
In diesem Dokument wird davon ausgegangen, dass Sie mit Kafka und Pub/Sub vertraut sind. Bevor Sie dieses Dokument lesen, eine der Pub/Sub-Kurzanleitungen
Der Pub/Sub-Connector unterstützt keine Integration zwischen Google Cloud IAM- und Kafka Connect-ACLs.
Erste Schritte mit dem Connector
Dieser Abschnitt führt Sie durch die folgenden Aufgaben:- Konfigurieren Sie den Kafka-Connector für die Pub/Sub-Gruppe.
- Ereignisse von Kafka an Pub/Sub senden
- Nachrichten von Pub/Sub an Kafka senden
Vorbereitung
Kafka installieren
Folgen Sie Kurzanleitung zu Apache Kafka um ein Kafka mit einem einzelnen Knoten auf Ihrem lokalen Computer zu installieren. Führen Sie die folgenden Schritte im Schnellstart aus:
- Laden Sie die neueste Kafka-Version herunter und extrahieren Sie sie.
- Starten Sie die Kafka-Umgebung.
- Erstellen Sie ein Kafka-Thema.
Authentifizieren
Der Pub/Sub-Gruppen-Kafka-Connector muss sich bei Pub/Sub authentifizieren, um Pub/Sub-Nachrichten senden und empfangen zu können. So richten Sie die Authentifizierung ein: führen Sie die folgenden Schritte aus:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
JAR-Datei des Connectors herunterladen
Laden Sie die Connector-JAR-Datei auf Ihren lokalen Computer herunter. Weitere Informationen finden Sie unter Connector beschaffen in der GitHub-Readme-Datei.
Connector-Konfigurationsdateien kopieren
Klonen Sie das GitHub-Repository für den Connector oder laden Sie es herunter.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Kopieren Sie den Inhalt des Verzeichnisses
config
in das Unterverzeichnisconfig
Ihrer Kafka-Installation.cp config/* [path to Kafka installation]/config/
Diese Dateien enthalten Konfigurationseinstellungen für den Connector.
Kafka Connect-Konfiguration aktualisieren
- Wechseln Sie zu dem Verzeichnis, das die von Ihnen verwendete Kafka Connect-Binärdatei enthält. heruntergeladen.
- Öffnen Sie im Binärverzeichnis von Kafka Connect die Datei namens
config/connect-standalone.properties
in einem Texteditor. - Wenn
plugin.path property
auskommentiert ist, entfernen Sie die Kommentarzeichen. Aktualisieren Sie
plugin.path property
, um den Pfad zur Connector-JAR-Datei anzugeben.Beispiel:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Legen Sie das Attribut
offset.storage.file.filename
auf einen lokalen Dateinamen fest. Im Standalone-Modus verwendet Kafka diese Datei, um Offset-Daten zu speichern.Beispiel:
offset.storage.file.filename=/tmp/connect.offsets
Ereignisse von Kafka an Pub/Sub weiterleiten
In diesem Abschnitt wird beschrieben, wie Sie den Senken-Connector starten, Ereignisse in Kafka veröffentlichen, und anschließend die weitergeleiteten Nachrichten aus Pub/Sub lesen.
Verwenden Sie die Google Cloud CLI, um ein Pub/Sub-Thema mit einem Abo.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Ersetzen Sie Folgendes:
- PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas, über das die Nachrichten von Kafka empfangen werden.
- PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos für das Thema.
Öffnen Sie die Datei
/config/cps-sink-connector.properties
in einem Texteditor. Fügen Sie Werte für die folgenden Properties hinzu, die in den Kommentaren mit"TODO"
gekennzeichnet sind:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Ersetzen Sie Folgendes:
- KAFKA_TOPICS: Eine durch Kommas getrennte Liste von Kafka-Themen, aus denen gelesen werden soll.
- PROJECT_ID: Das Google Cloud-Projekt, das die Datei enthält. Pub/Sub-Thema.
- PUBSUB_TOPIC: Das Pub/Sub-Thema, über das die Nachrichten von Kafka empfangen werden.
Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Folgen Sie der Anleitung in der Kurzanleitung zu Apache Kafka um einige Ereignisse in Ihr Kafka-Thema zu schreiben.
Lesen Sie die Ereignisse mit der gcloud CLI aus Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Nachrichten von Pub/Sub an Kafka weiterleiten
In diesem Abschnitt wird beschrieben, wie Sie den Quell-Connector starten, Pub/Sub und lesen Sie die weitergeleiteten Nachrichten aus Kafka.
Erstellen Sie mit der gcloud CLI ein Pub/Sub-Thema mit einem Abo.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Ersetzen Sie Folgendes:
- PUBSUB_TOPIC: Der Name eines Pub/Sub-Themas.
- PUBSUB_SUBSCRIPTION: Der Name eines Pub/Sub-Abos.
Öffnen Sie die Datei
/config/cps-source-connector.properties
in einem Texteditor. Fügen Sie Werte für die folgenden Attribute hinzu, die mit"TODO"
gekennzeichnet sind: die Kommentare:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Ersetzen Sie Folgendes:
- KAFKA_TOPIC: Die Kafka-Themen, auf denen die Pub/Sub-Nachrichten empfangen werden sollen.
- PROJECT_ID: Das Google Cloud-Projekt, das Ihr Pub/Sub-Thema.
- PUBSUB_TOPIC: Das Pub/Sub-Thema.
Führen Sie im Kafka-Verzeichnis den folgenden Befehl aus:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Veröffentlichen Sie eine Nachricht mit der gcloud CLI in Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Nachricht aus Kafka lesen Folgen Sie der Anleitung im Schnellstart für Apache Kafka, um die Nachrichten aus dem Kafka-Thema zu lesen.
SMS-Conversion
Einen Kafka-Eintrag enthält einen Schlüssel und einen Wert, bei denen es sich um Byte-Arrays mit variabler Länge handelt. Optional kann ein Der Kafka-Eintrag kann auch Header haben, bei denen es sich um Schlüssel/Wert-Paare handelt. A Pub/Sub-Nachricht besteht aus zwei Hauptteilen: dem Nachrichtentext und null oder mehr Schlüssel/Wert-Attributen.
Kafka Connect verwendet Konverter, um Schlüssel und Werte von und nach Kafka zu serialisieren. Legen Sie im Connector die folgenden Eigenschaften fest, um die Serialisierung zu steuern Konfigurationsdateien:
key.converter
: Der Konverter, der zum Serialisieren von Datensatzschlüsseln verwendet wird.value.converter
: Der Converter, der zum Serialisieren von Datensatzwerten verwendet wird.
Der Text einer Pub/Sub-Nachricht ist ein ByteString
-Objekt.
ist das direkte Kopieren der Nutzlast. Aus diesem Grund empfehlen wir, nach Möglichkeit einen Konverter zu verwenden, der primitive Datentypen (Integer, Float, String oder Bytes-Schema) erzeugt, um die De- und Reserialisierung desselben Nachrichtentexts zu vermeiden.
Konvertierung von Kafka zu Pub/Sub
Der Senken-Connector konvertiert Kafka-Eintragsdaten so in Pub/Sub-Nachrichten:
- Der Kafka-Eintragsschlüssel wird als Attribut mit dem Namen
"key"
in der Pub/Sub-Nachricht gespeichert. - Standardmäßig entfernt der Connector alle Header im Kafka-Datensatz. Wenn jedoch
legen Sie für die Konfigurationsoption
headers.publish
den Connectortrue
fest. schreibt die Header als Pub/Sub-Attribute. Der Connector überspringt alle Header, die die Pub/Sub-Limits für Nachrichtenattribute überschreiten. - Bei Ganzzahl-, Gleitkomma-, String- und Byte-Schemas übergibt der Connector die Bytes des Kafka-Eintrags direkt in den Pub/Sub-Nachrichtenkörper.
- Bei Strukturschemas schreibt der Connector jedes Feld als Attribut des
Pub/Sub-Nachricht. Wenn das Feld beispielsweise
{ "id"=123 }
ist, hat die resultierende Pub/Sub-Nachricht das Attribut"id"="123"
. Die -Feldwert wird immer in einen String konvertiert. Map- und Struct-Typen sind nicht werden als Feldtypen innerhalb einer Struktur unterstützt. - Bei Kartenschemas schreibt der Connector jedes Schlüssel/Wert-Paar als Attribut
der Pub/Sub-Nachricht. Wenn die Karte beispielsweise
{"alice"=1,"bob"=2}
ist, hat die resultierende Pub/Sub-Nachricht zwei Attribute:"alice"="1"
und"bob"="2"
. Die Schlüssel und Werte werden konvertiert. in Strings umwandeln.
Für Struktur- und Kartenschemata gelten einige zusätzliche Verhaltensweisen:
Optional können Sie ein bestimmtes Strukturfeld oder einen bestimmten Kartenschlüssel als Nachrichtentext angeben, indem Sie die Konfigurationseigenschaft
messageBodyName
festlegen. Die des Felds oder Schlüssels wird alsByteString
im Nachrichtentext gespeichert. WennmessageBodyName
nicht festlegen, ist der Nachrichtentext für Struktur und Kartenschemas.Bei Arraywerten unterstützt der Connector nur primitive Arraytypen. Die Wertesequenz im Array wird zu einem einzelnen
ByteString
-Objekt zusammengefügt.
Konvertierung von Pub/Sub zu Kafka
Der Quell-Connector konvertiert Pub/Sub-Nachrichten in Kafka-Einträge wie folgt:
Kafka-Eintragsschlüssel: Standardmäßig ist der Schlüssel auf
null
festgelegt. Optional können Sie ein Pub/Sub-Nachrichtenattribut als Schlüssel angeben, indem Sie die Konfigurationsoptionkafka.key.attribute
festlegen. In diesem Fall sucht der Connector nach einem Attribut mit diesem Namen und legt den Datensatzschlüssel auf den Attributwert fest. Wenn das angegebene Attribut nicht vorhanden ist, lautet der Eintragsschlüssel aufnull
festgelegt.Kafka-Eintragswert Der Connector schreibt den Eintragswert so:
Wenn die Pub/Sub-Nachricht keine benutzerdefinierten Attribute hat, schreibt der Connector den Pub/Sub-Nachrichtentext mithilfe des von
value.converter
angegebenen Converters direkt alsbyte[]
-Typ in den Kafka-Eintragswert.Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und
kafka.record.headers
=false
ist, schreibt der Connector einen Datensatz in den Datensatzwert. Die Struktur enthält ein Feld für jedes Attribut und ein Feld namens"message"
, dessen Wert der Pub/Sub-Nachrichtentext ist (gespeichert als Byte):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In diesem Fall müssen Sie ein
value.converter
verwenden, das mitstruct
-Schemas kompatibel ist, z. B.org.apache.kafka.connect.json.JsonConverter
.Wenn die Pub/Sub-Nachricht benutzerdefinierte Attribute hat und
kafka.record.headers
=true
ist, schreibt der Connector die Attribute als Kafka-Eintragsheader. Schreibt den Pub/Sub-Nachrichtentext direkt in den Kafka-Eintragswert alsbyte[]
-Typ, indem Sie den Converter verwenden. angegeben durchvalue.converter
.
Kafka-Eintragsheader Die Header sind standardmäßig leer, es sei denn, Sie legen
kafka.record.headers
auftrue
fest.
Konfigurationsoptionen
Zusätzlich zu den Konfigurationen, die von der Kafka Connect API bereitgestellt werden, unterstützt der Pub/Sub-Gruppen-Kafka-Connector die Senke- und Quellkonfiguration wie in den Pub/Sub-Connector-Konfigurationen beschrieben.
Support
Wenn Sie Hilfe benötigen, erstellen Sie ein Support-Ticket. Wenn du allgemeine Fragen hast oder eine Diskussion starten möchtest, erstelle ein Problem im GitHub-Repository.
Nächste Schritte
- Unterschiede zwischen Kafka und Pub/Sub
- Weitere Informationen zum Pub/Sub-Gruppen-Kafka-Connector
- Kafka-Connector für Pub/Sub-Gruppen ansehen GitHub-Repository.