Pub/Sub Lite-Nachrichten mit Apache Spark schreiben
<ph type="x-smartling-placeholder">Die Pub/Sub Lite Spark-Connector ist eine Open-Source-Java-Client-Bibliothek, die die Verwendung von Pub/Sub Lite als Eingabe- und Ausgabequelle für Apache Spark Structured Streaming . Der Connector funktioniert in allen Apache Spark-Distributionen, einschließlich Dataproc.
In dieser Schnellstartanleitung werden folgende Verfahren erläutert:
- Nachrichten aus Pub/Sub Lite lesen
- Nachrichten in Pub/Sub Lite schreiben
mit PySpark über einen Dataproc Spark-Cluster.
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Einrichten
Variablen für Ihr Projekt erstellen
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Cloud Storage-Bucket erstellen Cloud Storage-Bucket-Namen müssen global eindeutig sein.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Erstellen Sie ein Pub/Sub Lite-Thema und -Abo an einem unterstützten Standort. Weitere Informationen finden Sie unter Thema erstellen. wenn Sie eine Pub/Sub Lite-Reservierung verwenden.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Erstellen Sie einen Dataproc-Cluster.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: Eine unterstützte Dataproc-Region, in der sich Ihr Pub/Sub Lite-Thema und -Abo befinden.--image-version
: Die Image-Version des Clusters, die die auf dem Cluster installierte Apache Spark-Version bestimmt. Auswählen Release-Versionen von 2.x.x-Images da der Spark-Connector für Pub/Sub Lite derzeit Apache Spark 3.x.x unterstützt.--scopes
: Aktiviert den API-Zugriff auf Google Cloud-Dienste im selben Projekt.--enable-component-gateway
: Zugriff auf die Apache Spark-Web-UI aktivieren.--bucket
: Ein Cloud Storage-Staging-Bucket, in dem Clusterjobabhängigkeiten, Treiberausgaben und Clusterkonfigurationsdateien gespeichert werden.
Klonen Sie das Kurzanleitungs-Repository und gehen Sie zum Beispielcodeverzeichnis:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
In Pub/Sub Lite schreiben
Im folgenden Beispiel werden folgende Aufgaben ausgeführt:
- erstelle ein
Preisquelle
generiert fortlaufende Zahlen und Zeitstempel im Format
spark.sql.Row
- Daten so transformieren, dass sie mit den
Tabellenschema
des Pub/Sub Lite Spark Connectors
writeStream
API - Daten in ein vorhandenes Pub/Sub Lite-Thema schreiben
So senden Sie den Schreibjob an Dataproc:
Console
- Laden Sie das PySpark-Skript in Ihren Cloud Storage-Bucket hoch.
- Gehen Sie zur Cloud Storage Console.
- Wählen Sie Ihren Bucket aus.
- Wählen Sie unter Dateien hochladen das PySpark-Skript hoch, das Sie verwenden möchten.
- Senden Sie den Job an Ihren Dataproc-Cluster:
- Rufen Sie die Dataproc-Konsole auf.
- Wechseln Sie zu "Jobs".
- Klicken Sie auf Job senden.
- Geben Sie die Jobdetails ein.
- Wählen Sie unter Cluster Ihren Cluster aus.
- Geben Sie unter Job einen Namen für die Job-ID ein.
- Wählen Sie als Jobtyp die Option "PySpark" aus.
- Geben Sie unter Python-Hauptdatei den gcloud-Speicher-URI des
hat ein PySpark-Skript hochgeladen, das mit
gs://
beginnt. - Wählen Sie unter Jar-Dateien die neueste Version des Spark-Connectors aus Maven suchen Sie in den Downloadoptionen nach der JAR-Datei mit Abhängigkeiten. kopieren Sie seinen Link.
- Wenn Sie das vollständige PySpark-Skript von GitHub verwenden, geben Sie unter Argumente die Werte
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID ein. Wenn Sie das PySpark-Skript oben kopieren und die Aufgaben abgeschlossen ist, lassen Sie das Feld leer. - Geben Sie unter Properties den Schlüssel
spark.master
und den Wertyarn
ein. - Klicken Sie auf Senden.
gcloud
Verwenden Sie den Befehl gcloud dataproc jobs submit pyspark, um den Job an Dataproc zu senden:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: die vorab ausgewählte Dataproc-Region.--cluster
: der Name des Dataproc-Clusters.--jars
: die Uber-JAR-Datei des Pub/Sub Lite Spark-Connectors mit Abhängigkeiten in einem öffentlichen Cloud Storage-Bucket. Weitere Informationen finden Sie hier: Link um die Uber-JAR-Datei mit Abhängigkeiten von Maven herunterzuladen.--driver-log-levels
: Setzen Sie die Logging-Ebene auf der Stammebene auf INFO.--properties
: Verwenden Sie den YARN-Ressourcenmanager für den Spark-Master.--
: Geben Sie die für das Skript erforderlichen Argumente an.
Wenn der writeStream
-Vorgang erfolgreich ist, sollten Sie Logeinträge sehen
lokal sowie auf der Seite mit den Jobdetails im
Google Cloud Console:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Aus Pub/Sub Lite lesen
Im folgenden Beispiel werden Nachrichten aus einer vorhandenen
Pub/Sub Lite-Abo mit der
readStream
der API erstellen. Der Connector gibt Nachrichten aus, die den korrigierten
Tabellenschema
formatiert als
spark.sql.Row
.
So senden Sie den Lesejob an Dataproc:
Console
- Laden Sie das PySpark-Skript in Ihren Cloud Storage-Bucket hoch.
- Gehen Sie zur Cloud Storage Console.
- Wählen Sie Ihren Bucket aus.
- Wählen Sie unter Dateien hochladen das PySpark-Skript hoch, das Sie verwenden möchten.
- Senden Sie den Job an Ihren Dataproc-Cluster:
- Rufen Sie die Dataproc-Konsole auf.
- Wechseln Sie zu "Jobs".
- Klicken Sie auf Job senden.
- Geben Sie die Jobdetails ein.
- Wählen Sie unter Cluster Ihren Cluster aus.
- Geben Sie unter Job einen Namen für die Job-ID ein.
- Wählen Sie als Jobtyp die Option "PySpark" aus.
- Geben Sie unter Python-Hauptdatei den gcloud-Speicher-URI des
hat ein PySpark-Skript hochgeladen, das mit
gs://
beginnt. - Wählen Sie unter Jar-Dateien die neueste Version des Spark-Connectors aus Maven suchen Sie in den Downloadoptionen nach der JAR-Datei mit Abhängigkeiten. kopieren Sie seinen Link.
- Wenn Sie das vollständige PySpark-Skript von GitHub verwenden, geben Sie unter Argumente die Werte
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID ein. Wenn Sie das PySpark-Skript oben kopieren und die Aufgaben abgeschlossen ist, lassen Sie das Feld leer. - Geben Sie unter Properties den Schlüssel
spark.master
und den Wertyarn
ein. - Klicken Sie auf Senden.
gcloud
Verwenden Sie den Befehl gcloud dataproc jobs submit pyspark noch einmal, um den Job an Dataproc zu senden:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: die vorab ausgewählte Dataproc-Region.--cluster
: der Name des Dataproc-Clusters.--jars
: die Uber-JAR-Datei des Pub/Sub Lite Spark-Connectors mit Abhängigkeiten in einem öffentlichen Cloud Storage-Bucket. Weitere Informationen finden Sie hier: Link um die Uber-JAR-Datei mit Abhängigkeiten von Maven herunterzuladen.--driver-log-levels
: Setzen Sie die Logging-Ebene auf der Stammebene auf INFO.--properties
: Verwenden Sie den YARN-Ressourcenmanager für den Spark-Master.--
: Geben Sie die erforderlichen Argumente für das Skript an.
Wenn der readStream
-Vorgang erfolgreich ist, sollten Sie Logeinträge sehen
lokal sowie auf der Seite mit den Jobdetails im
Google Cloud Console:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Nachrichten aus Pub/Sub Lite noch einmal wiedergeben und dauerhaft löschen
Suchvorgänge funktionieren beim Lesen aus Pub/Sub Lite mit dem Spark-Connector von Pub/Sub Lite aus, Apache Spark-Systeme bieten die Offsets innerhalb von Partitionen verfolgen. Um das Problem zu umgehen, zu suchen und die Workflows neu zu starten.
Bereinigen
Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen in Rechnung gestellt werden:
Löschen Sie das Thema und das Abo.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Löschen Sie den Dataproc-Cluster.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Entfernen Sie den Cloud Storage-Bucket.
gcloud storage rm gs://$BUCKET
Nächste Schritte
In der Beispiel für die Wortzahl in Java für den Spark-Connector von Pub/Sub Lite.
Weitere Spark-Connectors von Google Cloud-Produkten: BigQuery-Connector Bigtable-Connector, Cloud Storage-Connector.