Scrivere messaggi Pub/Sub Lite utilizzando Apache Spark

Il connettore Spark di Pub/Sub Lite è una libreria client Java open source che supporta l'utilizzo di Pub/Sub Lite come origine di input e output per il flusso strutturato di Apache Spark. Il connettore funziona in tutte le distribuzioni Apache Spark, incluso Dataproc.

Questa guida rapida illustra come:

  • lettura dei messaggi da Pub/Sub Lite
  • scrivere messaggi in Pub/Sub Lite

utilizzando PySpark da un cluster Dataproc Spark.

Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  4. Abilita le API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Abilita le API

  5. Installa Google Cloud CLI.
  6. Per initialize gcloud CLI, esegui questo comando:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Assicurati che la fatturazione sia attivata per il tuo progetto Google Cloud.

  9. Abilita le API Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Abilita le API

  10. Installa Google Cloud CLI.
  11. Per initialize gcloud CLI, esegui questo comando:

    gcloud init

Configura

  1. Crea variabili per il progetto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Creare un bucket Cloud Storage. I nomi dei bucket Cloud Storage devono essere univoci a livello globale.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Crea un argomento e una sottoscrizione Pub/Sub Lite in una località supportata. Consulta Creare un argomento se utilizzi una prenotazione Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Creare un cluster Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: una regione Dataproc supportata in cui si trovano l'argomento e l'abbonamento Pub/Sub Lite.
    • --image-version: la versione immagine del cluster, che determina la versione di Apache Spark installata nel cluster. Scegli versioni di rilascio delle immagini 2.x.x perché il connettore Spark Pub/Sub Lite attualmente supporta Apache Spark 3.x.x.
    • --scopes: abilita l'accesso API ai servizi Google Cloud nello stesso progetto.
    • --enable-component-gateway: abilita l'accesso alla UI web di Apache Spark.
    • --bucket: un bucket Cloud Storage gestione temporanea utilizzato per archiviare le dipendenze del job del cluster, l'output del driver e i file di configurazione del cluster.
  5. Clona il repository della guida rapida e vai alla directory del codice campione:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Scrittura in Pub/Sub Lite

Nell'esempio seguente:

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Per inviare il job di scrittura a Dataproc:

Console

  1. Carica lo script PySpark nel bucket Cloud Storage.
    1. Vai alla console di Cloud Storage.
    2. Seleziona il bucket.
    3. Utilizza Carica file per caricare lo script PySpark che vuoi utilizzare.
  2. Invia il job al cluster Dataproc:
    1. Vai alla console Dataproc.
    2. Vai ai job.
    3. Fai clic su Invia job.
    4. Inserisci i dettagli del job.
    5. In Cluster, scegli il cluster.
    6. In Job, assegna un nome all'ID job.
    7. In Tipo di job, scegli PySpark.
    8. Per il file Python principale, fornisci l'URI gsutil dello script PySpark caricato che inizia con gs://.
    9. Per i file jar, scegli la versione più recente del connettore Spark da Maven, cerca il jar con dipendenze nelle opzioni di download e copia il relativo link.
    10. In Argomenti, se utilizzi lo script PySpark completo di GitHub, inserisci --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID ; se copi lo script PySpark riportato sopra con l'attività completata, lascia vuoto il campo.
    11. In Proprietà, inserisci la chiave spark.master e il valore yarn.
    12. Fai clic su Invia.

gcloud

Usa il comando gcloud dataproc job send pyspark per inviare il job a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: la regione Dataproc preselezionata.
  • --cluster: il nome del cluster Dataproc.
  • --jars: jar uber del connettore Spark del connettore Pub/Sub Lite con dipendenze in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il file jar uber con dipendenze di Maven.
  • --driver-log-levels: imposta il livello di logging su INFO a livello della directory principale.
  • --properties: utilizza il gestore di risorse YARN per il master Spark.
  • --: fornisce gli argomenti richiesti dallo script.

Se l'operazione writeStream ha esito positivo, dovresti visualizzare messaggi di log come i seguenti a livello locale e nella pagina dei dettagli del job nella console Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lettura da Pub/Sub Lite

L'esempio seguente leggerà i messaggi di una sottoscrizione Pub/Sub Lite esistente utilizzando l'API readStream. Il connettore restituirà messaggi conformi allo schema della tabella fisso formattato come spark.sql.Row.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Per inviare il job di lettura a Dataproc:

Console

  1. Carica lo script PySpark nel bucket Cloud Storage.
    1. Vai alla console di Cloud Storage.
    2. Seleziona il bucket.
    3. Utilizza Carica file per caricare lo script PySpark che vuoi utilizzare.
  2. Invia il job al cluster Dataproc:
    1. Vai alla console Dataproc.
    2. Vai ai job.
    3. Fai clic su Invia job.
    4. Inserisci i dettagli del job.
    5. In Cluster, scegli il cluster.
    6. In Job, assegna un nome all'ID job.
    7. In Tipo di job, scegli PySpark.
    8. Per il file Python principale, fornisci l'URI gsutil dello script PySpark caricato che inizia con gs://.
    9. Per i file jar, scegli la versione più recente del connettore Spark da Maven, cerca il jar con dipendenze nelle opzioni di download e copia il relativo link.
    10. In Argomenti, se utilizzi lo script PySpark completo di GitHub, inserisci --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID ; se copi lo script PySpark riportato sopra con l'attività completata, lascia vuoto il campo.
    11. In Proprietà, inserisci la chiave spark.master e il valore yarn.
    12. Fai clic su Invia.

gcloud

Utilizza di nuovo il comando gcloud dataproc job send pyspark per inviare il job a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: la regione Dataproc preselezionata.
  • --cluster: il nome del cluster Dataproc.
  • --jars: jar uber del connettore Spark del connettore Pub/Sub Lite con dipendenze in un bucket Cloud Storage pubblico. Puoi anche visitare questo link per scaricare il file jar uber con dipendenze di Maven.
  • --driver-log-levels: imposta il livello di logging su INFO a livello della directory principale.
  • --properties: utilizza il gestore di risorse YARN per il master Spark.
  • --: fornisce gli argomenti richiesti per lo script.

Se l'operazione readStream ha esito positivo, dovresti visualizzare messaggi di log come i seguenti a livello locale e nella pagina dei dettagli del job nella console Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Riproduci di nuovo ed elimina definitivamente i messaggi da Pub/Sub Lite

Le operazioni di ricerca non funzionano durante la lettura da Pub/Sub Lite utilizzando il connettore Spark di Pub/Sub Lite, perché i sistemi Apache Spark eseguono il monitoraggio degli offset all'interno delle partizioni. La soluzione alternativa consiste nello svuotare, cercare e riavviare i flussi di lavoro.

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, segui questi passaggi.

  1. Elimina l'argomento e la sottoscrizione.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Elimina il cluster Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Rimuovi il bucket Cloud Storage.

    gsutil rb gs://$BUCKET
    

Passaggi successivi