Écrire des messages Pub/Sub Lite à l'aide d'Apache Spark

La Connecteur Spark pour Pub/Sub Lite est une bibliothèque cliente Java Open Source qui prend en charge l'utilisation Pub/Sub Lite en tant que source d'entrée et de sortie pour Apache Spark Structured Streaming pour en savoir plus. Le connecteur fonctionne dans toutes les distributions Apache Spark, y compris Dataproc.

Ce guide de démarrage rapide vous montre comment :

  • Lire les messages de Pub/Sub Lite
  • Écrire des messages dans Pub/Sub Lite

avec PySpark à partir d'un cluster Dataproc Spark.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  4. Activer les API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Activer les API

  5. Installez Google Cloud CLI.
  6. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  9. Activer les API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Activer les API

  10. Installez Google Cloud CLI.
  11. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init

Prérequis

  1. Créez des variables pour votre projet.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Créez un bucket Cloud Storage. Les noms des buckets Cloud Storage doivent être uniques.

    export BUCKET=your-bucket-name
    gcloud storage buckets create gs://$BUCKET
    
  3. Créez un sujet et un abonnement Pub/Sub Lite dans un emplacement compatible. Consultez Créer un sujet. si vous utilisez une réservation Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Créez un cluster Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region : région Dataproc compatible avec votre sujet et votre abonnement Pub/Sub Lite
    • --image-version : version d'image du cluster, qui détermine la version d'Apache Spark installée sur le cluster. Choisir Versions des images 2.x.x car le connecteur Spark pour Pub/Sub Lite est actuellement compatible avec Apache Spark 3.x.x.
    • --scopes: active l'accès API aux services Google Cloud dans le même projet.
    • --enable-component-gateway : permet d'accéder à l'interface utilisateur Web d'Apache Spark.
    • --bucket : bucket Cloud Storage de préproduction utilisé pour stocker les dépendances de tâches du cluster, les résultats du pilote et les fichiers de configuration du cluster.
  5. Clonez le dépôt du guide de démarrage rapide et accédez au répertoire de l'exemple de code :

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Écrire dans Pub/Sub Lite

L'exemple ci-dessous permet d'effectuer les opérations suivantes :

  • créer un source des tarifs qui génère des nombres consécutifs et des horodatages au format suivant : spark.sql.Row
  • les données pour qu'elles correspondent schéma de table par le connecteur Spark du connecteur Pub/Sub Lite writeStream API
  • Écrire les données dans un sujet Pub/Sub Lite existant
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Pour envoyer la tâche d'écriture à Dataproc, procédez comme suit :

Console

  1. Importez le script PySpark dans votre bucket Cloud Storage.
    1. Accédez à la console Cloud Storage.
    2. Sélectionnez votre bucket.
    3. Utilisez l'option Importer des fichiers pour importer le script PySpark que vous souhaitez utiliser.
  2. Envoyez la tâche à votre cluster Dataproc  :
    1. Accédez à la console Dataproc.
    2. Accédez aux tâches.
    3. Cliquez sur Submit job (Envoyer la tâche).
    4. Renseignez les détails de la tâche.
    5. Sous Cluster, choisissez votre cluster.
    6. Sous Job (Tâche), attribuez un nom à l'ID de tâche.
    7. Dans le champ Job Type (Type de tâche), sélectionnez PySpark.
    8. Dans le champ Main python file (Fichier Python principal), indiquez l'URI de stockage gcloud du a importé un script PySpark commençant par gs://.
    9. Pour Fichiers JAR, sélectionnez la dernière version du connecteur Spark parmi Maven , recherchez le fichier jar avec les dépendances dans les options de téléchargement, et copiez son lien.
    10. Pour Arguments, si vous utilisez le script PySpark complet de GitHub, saisissez --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID. Si vous copiez le script PySpark ci-dessus avec les tâches terminées, laissez le champ vide.
    11. Sous Properties (Propriétés), saisissez la clé spark.master et la valeur yarn.
    12. Cliquez sur Envoyer.

gcloud

Exécutez la commande gcloud dataproc jobs submit pyspark pour envoyer la tâche à Dataproc :

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region : région Dataproc présélectionnée
  • --cluster : nom du cluster Dataproc.
  • --jars: fichier Uber du connecteur Pub/Sub Spark Spark avec des dépendances dans un bucket Cloud Storage public. Vous pouvez aussi consulter cette lien pour télécharger le fichier Uber JAR avec les dépendances depuis Maven.
  • --driver-log-levels : définissez le niveau de journalisation sur INFO au niveau racine.
  • --properties : utilisez le gestionnaire de ressources YARN pour le maître Spark.
  • -- : fournit les arguments requis par le script.

Si l'opération writeStream aboutit, des messages de journal doivent s'afficher. comme dans l'exemple ci-dessous, en local, ainsi que sur la page "Détails du job" dans Console Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lire à partir de Pub/Sub Lite

L'exemple suivant permet de lire les messages d'un un abonnement Pub/Sub Lite via readStream API. Le connecteur affichera des messages conformes aux schéma de table formaté comme suit : spark.sql.Row pour en savoir plus.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Pour envoyer la tâche de lecture à Dataproc, procédez comme suit :

Console

  1. Importez le script PySpark dans votre bucket Cloud Storage.
    1. Accédez à la console Cloud Storage.
    2. Sélectionnez votre bucket.
    3. Utilisez l'option Importer des fichiers pour importer le script PySpark que vous souhaitez utiliser.
  2. Envoyez la tâche à votre cluster Dataproc  :
    1. Accédez à la console Dataproc.
    2. Accédez aux tâches.
    3. Cliquez sur Submit job (Envoyer la tâche).
    4. Renseignez les détails de la tâche.
    5. Sous Cluster, choisissez votre cluster.
    6. Sous Job (Tâche), attribuez un nom à l'ID de tâche.
    7. Dans le champ Job Type (Type de tâche), sélectionnez PySpark.
    8. Dans le champ Main python file (Fichier Python principal), indiquez l'URI de stockage gcloud du a importé un script PySpark commençant par gs://.
    9. Pour Fichiers JAR, sélectionnez la dernière version du connecteur Spark parmi Maven , recherchez le fichier jar avec les dépendances dans les options de téléchargement, et copiez son lien.
    10. Pour Arguments, si vous utilisez le script PySpark complet de GitHub, saisissez --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID. Si vous copiez le script PySpark ci-dessus avec les tâches terminées, laissez le champ vide.
    11. Sous Properties (Propriétés), saisissez la clé spark.master et la valeur yarn.
    12. Cliquez sur Envoyer.

gcloud

Exécutez la commande gcloud dataproc jobs submit pyspark pour envoyer à nouveau la tâche à Dataproc :

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region : région Dataproc présélectionnée
  • --cluster : nom du cluster Dataproc.
  • --jars: fichier Uber du connecteur Pub/Sub Spark Spark avec des dépendances dans un bucket Cloud Storage public. Vous pouvez aussi consulter cette lien pour télécharger le fichier Uber JAR avec les dépendances depuis Maven.
  • --driver-log-levels : définissez le niveau de journalisation sur INFO au niveau racine.
  • --properties : utilisez le gestionnaire de ressources YARN pour le maître Spark.
  • -- : fournit les arguments requis pour le script.

Si l'opération readStream aboutit, des messages de journal doivent s'afficher. comme dans l'exemple ci-dessous, en local, ainsi que sur la page "Détails du job" dans Console Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Relire et supprimer définitivement des messages de Pub/Sub Lite

Les opérations de recherche ne fonctionnent pas lors de la lecture à l'aide du connecteur Spark Pub/Sub Lite, Les systèmes Apache Spark effectuent son propre suivi des décalages dans les partitions. La solution consiste à drainer, rechercher et redémarrer les workflows.

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page soient facturées sur votre compte Google Cloud, procédez comme suit :

  1. Supprimez le sujet et l'abonnement.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Supprimez le cluster Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Supprimez le bucket Cloud Storage.

    gcloud storage rm gs://$BUCKET
    

Étape suivante