Scrivere messaggi Pub/Sub Lite utilizzando Apache Spark

La Connettore Spark Pub/Sub Lite è una libreria client Java open source che supporta l'utilizzo di Pub/Sub Lite come origine di input e output Streaming strutturato Apache Spark . Il connettore funziona in tutte le distribuzioni Apache Spark, tra cui Dataproc.

Questa guida rapida illustra come:

  • lettura dei messaggi da Pub/Sub Lite
  • scrivere messaggi in Pub/Sub Lite

utilizzando PySpark da un cluster Dataproc Spark.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Abilita le API

  5. Installa Google Cloud CLI.
  6. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  9. Abilita le API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Abilita le API

  10. Installa Google Cloud CLI.
  11. Per initialize gcloud CLI, esegui questo comando:

    gcloud init

Configura

  1. Crea variabili per il progetto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Creare un bucket Cloud Storage. Bucket Cloud Storage i nomi devono essere univoci a livello globale.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Crea un argomento e una sottoscrizione Pub/Sub Lite in un ambiente location. Consulta la sezione Creare un argomento. se utilizzi una prenotazione Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Creare un cluster Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: un progetto Dataproc supportato regione in cui si trovano l'argomento e la sottoscrizione Pub/Sub Lite.
    • --image-version: il valore versione immagine del cluster , che determina la versione di Apache Spark installata nel cluster. Scegli Versioni di release delle immagini 2.x.x poiché il connettore Spark di Pub/Sub Lite attualmente supporta Apache Spark 3.x.x.
    • --scopes: abilita l'accesso API ai servizi Google Cloud nello stesso progetto.
    • --enable-component-gateway: abilita l'accesso alla UI web di Apache Spark.
    • --bucket: un bucket Cloud Storage gestione temporanea utilizzato per archiviare il cluster le dipendenze dei job, l'output del driver e i file di configurazione del cluster.
  5. Clona il repository della guida rapida e vai alla directory del codice campione:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Scrittura in Pub/Sub Lite

Nell'esempio seguente:

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Per inviare il job di scrittura a Dataproc:

Console

  1. Carica lo script PySpark nel bucket Cloud Storage.
    1. Vai alla console di Cloud Storage.
    2. Seleziona il bucket.
    3. Utilizza Carica file per caricare lo script PySpark che vuoi per l'utilizzo.
  2. Invia il job al cluster Dataproc:
    1. Vai alla console Dataproc.
    2. Vai ai job.
    3. Fai clic su Invia job.
    4. Inserisci i dettagli del job.
    5. In Cluster, scegli il cluster.
    6. In Job, assegna un nome all'ID job.
    7. In Tipo di job, scegli PySpark.
    8. Per il file Python principale, fornisci l'URI gsutil dell'elemento caricato Script PySpark che inizia con gs://.
    9. Per File jar, scegli la versione più recente del connettore Spark da Maven , cerca il jar con dipendenze nelle opzioni di download e copia il relativo link.
    10. In Argomenti, se utilizzi lo script PySpark completo da GitHub, inserisci --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID ; se copi lo script PySpark precedente con l'attività completata, lascia il campo vuoto.
    11. In Proprietà, inserisci la chiave spark.master e il valore yarn.
    12. Fai clic su Invia.

gcloud

Utilizza la gcloud dataproc job send pyspark per inviare il job a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: il valore di Dataproc preselezionato region.
  • --cluster: il nome del cluster Dataproc.
  • --jars: jar uber del connettore Spark di Pub/Sub Lite con dipendenze in in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il jar uber con dipendenze di Maven.
  • --driver-log-levels: imposta il livello di logging su INFO a livello della directory principale.
  • --properties: utilizza il gestore di risorse YARN per il master Spark.
  • --: fornisce gli argomenti richiesti dallo script.

Se l'operazione writeStream ha esito positivo, dovresti visualizzare i messaggi di log come la seguente a livello locale e nella pagina dei dettagli dell'offerta di lavoro nel Console Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lettura da Pub/Sub Lite

L'esempio seguente leggerà i messaggi da un modello esistente la sottoscrizione Pub/Sub Lite utilizzando readStream API. Il connettore restituirà messaggi conformi alle schema della tabella formattato come spark.sql.Row .

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Per inviare il job di lettura a Dataproc:

Console

  1. Carica lo script PySpark nel bucket Cloud Storage.
    1. Vai alla console di Cloud Storage.
    2. Seleziona il bucket.
    3. Utilizza Carica file per caricare lo script PySpark che vuoi per l'utilizzo.
  2. Invia il job al cluster Dataproc:
    1. Vai alla console Dataproc.
    2. Vai ai job.
    3. Fai clic su Invia job.
    4. Inserisci i dettagli del job.
    5. In Cluster, scegli il cluster.
    6. In Job, assegna un nome all'ID job.
    7. In Tipo di job, scegli PySpark.
    8. Per il file Python principale, fornisci l'URI gsutil dell'elemento caricato Script PySpark che inizia con gs://.
    9. Per File jar, scegli la versione più recente del connettore Spark da Maven , cerca il jar con dipendenze nelle opzioni di download e copia il relativo link.
    10. In Argomenti, se utilizzi lo script PySpark completo da GitHub, inserisci --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID ; se copi lo script PySpark precedente con l'attività completata, lascia il campo vuoto.
    11. In Proprietà, inserisci la chiave spark.master e il valore yarn.
    12. Fai clic su Invia.

gcloud

Utilizza la gcloud dataproc job send pyspark il comando di nuovo per inviare il job a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: il valore di Dataproc preselezionato region.
  • --cluster: il nome del cluster Dataproc.
  • --jars: jar uber del connettore Spark di Pub/Sub Lite con dipendenze in in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il jar uber con dipendenze di Maven.
  • --driver-log-levels: imposta il livello di logging su INFO a livello della directory principale.
  • --properties: utilizza il gestore di risorse YARN per il master Spark.
  • --: fornisce gli argomenti richiesti per lo script.

Se l'operazione readStream ha esito positivo, dovresti visualizzare i messaggi di log come la seguente a livello locale e nella pagina dei dettagli dell'offerta di lavoro nel Console Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Riproduci di nuovo ed elimina definitivamente i messaggi da Pub/Sub Lite

Le operazioni di ricerca non funzionano durante la lettura da Pub/Sub Lite utilizzando il connettore Spark Pub/Sub Lite I sistemi Apache Spark funzionano il proprio monitoraggio degli offset all'interno delle partizioni. La soluzione è svuotare, cercare e riavviare i flussi di lavoro.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi per le risorse utilizzate in questa pagina, segui questi passaggi.

  1. Elimina l'argomento e la sottoscrizione.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Elimina il cluster Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Rimuovi il bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

Passaggi successivi