Pub/Sub Lite-Nachrichten mit Apache Spark schreiben

Der Pub/Sub Lite Spark Connector ist eine Open-Source-Java-Clientbibliothek, die die Verwendung von Pub/Sub Lite als Eingabe- und Ausgabequelle für Apache Spark Structured Streaming unterstützt. Der Connector funktioniert in allen Apache Spark-Distributionen, einschließlich Dataproc.

In dieser Schnellstartanleitung werden folgende Verfahren erläutert:

  • Nachrichten aus Pub/Sub Lite lesen
  • Nachrichten in Pub/Sub Lite schreiben

PySpark in einem Dataproc Spark-Cluster verwenden.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  4. Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs aktivieren.

    Aktivieren Sie die APIs

  5. Installieren Sie die Google Cloud CLI.
  6. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  9. Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs aktivieren.

    Aktivieren Sie die APIs

  10. Installieren Sie die Google Cloud CLI.
  11. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init

Einrichten

  1. Variablen für Ihr Projekt erstellen

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Cloud Storage-Bucket erstellen Cloud Storage-Bucket-Namen müssen global eindeutig sein.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Erstellen Sie ein Pub/Sub Lite-Thema und -Abo an einem unterstützten Standort. Weitere Informationen finden Sie unter Thema erstellen, wenn Sie eine Pub/Sub Lite-Reservierung verwenden.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Erstellen Sie einen Dataproc-Cluster.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: Eine unterstützte Dataproc-Region, in der sich Ihr Pub/Sub Lite-Thema und -Abo befinden.
    • --image-version: Die Image-Version des Clusters, die die auf dem Cluster installierte Apache Spark-Version bestimmt. Wählen Sie Release-Versionen (2.x.x) aus, da der Spark-Connector für Pub/Sub Lite derzeit Apache Spark 3.x.x unterstützt.
    • --scopes: Aktiviert den API-Zugriff auf Google Cloud-Dienste im selben Projekt.
    • --enable-component-gateway: Zugriff auf die Apache Spark-Web-UI aktivieren.
    • --bucket: Ein Cloud Storage-Staging-Bucket, in dem Clusterjobabhängigkeiten, Treiberausgaben und Clusterkonfigurationsdateien gespeichert werden.
  5. Klonen Sie das Kurzanleitungs-Repository und gehen Sie zum Beispielcodeverzeichnis:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

In Pub/Sub Lite schreiben

Im folgenden Beispiel werden folgende Aufgaben ausgeführt:

  • Erstellen Sie eine Preisquelle, die fortlaufende Zahlen und Zeitstempel im Format spark.sql.Row generiert.
  • Transformieren Sie die Daten so, dass sie dem erforderlichen Tabellenschema von der writeStream API des Pub/Sub Lite-Spark-Connectors entsprechen.
  • Daten in ein vorhandenes Pub/Sub Lite-Thema schreiben
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

So senden Sie den Schreibjob an Dataproc:

Console

  1. Laden Sie das PySpark-Skript in Ihren Cloud Storage-Bucket hoch.
    1. Gehen Sie zur Cloud Storage Console.
    2. Wählen Sie Ihren Bucket aus.
    3. Wählen Sie unter Dateien hochladen das PySpark-Skript hoch, das Sie verwenden möchten.
  2. Senden Sie den Job an Ihren Dataproc-Cluster:
    1. Rufen Sie die Dataproc-Konsole auf.
    2. Wechseln Sie zu "Jobs".
    3. Klicken Sie auf Job senden.
    4. Geben Sie die Jobdetails ein.
    5. Wählen Sie unter Cluster Ihren Cluster aus.
    6. Geben Sie unter Job einen Namen für die Job-ID ein.
    7. Wählen Sie als Jobtyp die Option "PySpark" aus.
    8. Geben Sie für die Python-Hauptdatei den gsutil-URI des hochgeladenen PySpark-Skripts an, der mit gs:// beginnt.
    9. Wählen Sie unter Jar-Dateien die neueste Version des Spark-Connectors von Maven aus, suchen Sie in den Downloadoptionen nach der JAR-Datei mit Abhängigkeiten und kopieren Sie den Link.
    10. Wenn Sie das vollständige PySpark-Skript von GitHub verwenden, geben Sie unter Argumente die Werte --project_number=PROJECT_NUMBER,--location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID ein. Wenn Sie das PySpark-Skript oben kopieren und die Aufgaben abgeschlossen ist, lassen Sie das Feld leer.
    11. Geben Sie unter Properties den Schlüssel spark.master und den Wert yarn ein.
    12. Klicken Sie auf Senden.

gcloud

Verwenden Sie den Befehl gcloud dataproc jobs submit pyspark, um den Job an Dataproc zu senden:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: die vorab ausgewählte Dataproc-Region.
  • --cluster: der Name des Dataproc-Clusters.
  • --jars: die Uber-JAR-Datei des Pub/Sub Lite Spark-Connectors mit Abhängigkeiten in einem öffentlichen Cloud Storage-Bucket. Sie können auch diesen Link aufrufen, um die Uber-JAR-Datei mit Abhängigkeiten von Maven herunterzuladen.
  • --driver-log-levels: Setzen Sie die Logging-Ebene auf der Stammebene auf INFO.
  • --properties: Verwenden Sie den YARN-Ressourcenmanager für den Spark-Master.
  • --: Geben Sie die für das Skript erforderlichen Argumente an.

Wenn der Vorgang writeStream erfolgreich ist, sollten Sie lokal sowie auf der Seite mit den Jobdetails in der Google Cloud Console Logeinträge wie die folgenden sehen:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Aus Pub/Sub Lite lesen

Im folgenden Beispiel werden Nachrichten aus einem vorhandenen Pub/Sub Lite-Abo mithilfe der readStream API gelesen. Der Connector gibt Nachrichten aus, die dem festen Tabellenschema entsprechen und als spark.sql.Row formatiert sind.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

So senden Sie den Lesejob an Dataproc:

Console

  1. Laden Sie das PySpark-Skript in Ihren Cloud Storage-Bucket hoch.
    1. Gehen Sie zur Cloud Storage Console.
    2. Wählen Sie Ihren Bucket aus.
    3. Wählen Sie unter Dateien hochladen das PySpark-Skript hoch, das Sie verwenden möchten.
  2. Senden Sie den Job an Ihren Dataproc-Cluster:
    1. Rufen Sie die Dataproc-Konsole auf.
    2. Wechseln Sie zu "Jobs".
    3. Klicken Sie auf Job senden.
    4. Geben Sie die Jobdetails ein.
    5. Wählen Sie unter Cluster Ihren Cluster aus.
    6. Geben Sie unter Job einen Namen für die Job-ID ein.
    7. Wählen Sie als Jobtyp die Option "PySpark" aus.
    8. Geben Sie für die Python-Hauptdatei den gsutil-URI des hochgeladenen PySpark-Skripts an, der mit gs:// beginnt.
    9. Wählen Sie unter Jar-Dateien die neueste Version des Spark-Connectors von Maven aus, suchen Sie in den Downloadoptionen nach der JAR-Datei mit Abhängigkeiten und kopieren Sie den Link.
    10. Wenn Sie das vollständige PySpark-Skript von GitHub verwenden, geben Sie unter Argumente die Werte --project_number=PROJECT_NUMBER,--location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID ein. Wenn Sie das PySpark-Skript oben kopieren und die Aufgaben abgeschlossen ist, lassen Sie das Feld leer.
    11. Geben Sie unter Properties den Schlüssel spark.master und den Wert yarn ein.
    12. Klicken Sie auf Senden.

gcloud

Verwenden Sie den Befehl gcloud dataproc jobs submit pyspark noch einmal, um den Job an Dataproc zu senden:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: die vorab ausgewählte Dataproc-Region.
  • --cluster: der Name des Dataproc-Clusters.
  • --jars: die Uber-JAR-Datei des Pub/Sub Lite Spark-Connectors mit Abhängigkeiten in einem öffentlichen Cloud Storage-Bucket. Sie können auch diesen Link aufrufen, um die Uber-JAR-Datei mit Abhängigkeiten von Maven herunterzuladen.
  • --driver-log-levels: Setzen Sie die Logging-Ebene auf der Stammebene auf INFO.
  • --properties: Verwenden Sie den YARN-Ressourcenmanager für den Spark-Master.
  • --: Geben Sie die erforderlichen Argumente für das Skript an.

Wenn der Vorgang readStream erfolgreich ist, sollten Sie lokal sowie auf der Seite mit den Jobdetails in der Google Cloud Console Logeinträge wie die folgenden sehen:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Nachrichten aus Pub/Sub Lite noch einmal wiedergeben und dauerhaft löschen

Suchvorgänge funktionieren nicht, wenn Daten aus Pub/Sub Lite mit dem Spark-Connector von Pub/Sub Lite gelesen werden, da Apache Spark-Systeme ihr eigenes Tracking von Offsets innerhalb von Partitionen ausführen. Sie können das Problem umgehen, indem Sie die Workflows per Drain beenden, danach suchen und neu starten.

Bereinigen

Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen in Rechnung gestellt werden.

  1. Löschen Sie das Thema und das Abo.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Löschen Sie den Dataproc-Cluster.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Entfernen Sie den Cloud Storage-Bucket.

    gsutil rb gs://$BUCKET
    

Nächste Schritte