Cómo escribir mensajes de Pub/Sub Lite con Apache Spark

El conector de Spark de Pub/Sub Lite es una biblioteca cliente de Java de código abierto que admite el uso de Pub/Sub Lite como fuente de entrada y salida para la Transmisión estructurada de Apache Spark. El conector funciona en todas las distribuciones de Apache Spark, incluido Dataproc.

Esta guía de inicio rápido te muestra cómo hacer lo siguiente:

  • leer mensajes de Pub/Sub Lite
  • escribir mensajes en Pub/Sub Lite

usar PySpark desde un clúster de Spark de Dataproc

Antes de comenzar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Configurar

  1. Crea variables para tu proyecto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
  2. Crea un bucket de Cloud Storage Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global.

    export BUCKET=your-bucket-name
    gcloud storage buckets create gs://$BUCKET
    
  3. Crea un tema y una suscripción de Pub/Sub Lite en una ubicación compatible. Consulta Cómo crear un tema si usas una reserva de Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
  4. Crea un clúster de Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    • --region: una región de Dataproc compatible en la que se encuentran la suscripción y el tema de Pub/Sub Lite.
    • --image-version: Es la versión con imágenes del clúster, que determina la versión de Apache Spark instalada en el clúster. Elige versiones de actualización con imágenes 2.x.x porque, por el momento, el conector de Spark para Pub/Sub Lite es compatible con Apache Spark 3.x.x.
    • --scopes: Habilita el acceso a la API a los servicios de Google Cloud en el mismo proyecto.
    • --enable-component-gateway: Habilita el acceso a la IU web de Apache Spark.
    • --bucket: Es un bucket de etapa de pruebas de Cloud Storage que se usa para almacenar las dependencias de trabajos del clúster, los resultados del controlador y los archivos de configuración del clúster.
  5. Clona el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Escribe en Pub/Sub Lite

En el siguiente ejemplo, se realizan las siguientes acciones:

  • crear una fuente de tasa que genere números y marcas de tiempo consecutivos con formato spark.sql.Row
  • transformar los datos para que coincidan con el esquema de la tabla requerido por la API writeStream del conector de Spark de Pub/Sub Lite
  • escribir los datos en un tema existente de Pub/Sub Lite
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Para enviar el trabajo de escritura a Dataproc, sigue estos pasos:

Console

  1. Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
    1. Ve a la consola de Cloud Storage.
    2. Selecciona tu bucket.
    3. Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
  2. Envía el trabajo a tu clúster de Dataproc:
    1. Ve a la consola de Dataproc.
    2. Navega a trabajos.
    3. Haga clic en Enviar trabajo.
    4. Completa los detalles del trabajo.
    5. En Clúster, selecciona tu clúster.
    6. En Trabajo, asigna un nombre al ID del trabajo.
    7. En Tipo de trabajo, elige PySpark.
    8. En Archivo principal de Python, proporciona el URI de almacenamiento de gcloud de la secuencia de comandos de PySpark subida que comienza con gs://.
    9. En el caso de los archivos Jar, elige la versión más reciente del conector de Spark de Maven, busca el Jar con dependencias en las opciones de descarga y copia su vínculo.
    10. En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco.
    11. En Propiedades, ingresa la clave spark.master y el valor yarn.
    12. Haga clic en Enviar.

gcloud

Usa el comando gcloud dataproc jobs submit pyspark para enviar el trabajo a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: la región de Dataproc preseleccionada.
  • --cluster: el nombre del clúster de Dataproc.
  • --jars: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el Jar de Uber con dependencias de Maven.
  • --driver-log-levels: establece el nivel de registro en INFO en el nivel raíz.
  • --properties: usa el administrador de recursos de YARN para la instancia principal de Spark.
  • --: proporciona los argumentos que requiere la secuencia de comandos.

Si la operación writeStream se realiza de forma exitosa, deberías ver mensajes de registro como los siguientes de forma local y en la página de detalles del trabajo en la consola de Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lee desde Pub/Sub Lite

En el siguiente ejemplo, se leerán mensajes de una suscripción existente de Pub/Sub Lite con la API de readStream. El conector producirá mensajes que se ajusten al esquema de tabla fijo con formato spark.sql.Row.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Para enviar el trabajo de lectura a Dataproc, sigue estos pasos:

Console

  1. Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
    1. Ve a la consola de Cloud Storage.
    2. Selecciona tu bucket.
    3. Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
  2. Envía el trabajo a tu clúster de Dataproc:
    1. Ve a la consola de Dataproc.
    2. Navega a trabajos.
    3. Haga clic en Enviar trabajo.
    4. Completa los detalles del trabajo.
    5. En Clúster, selecciona tu clúster.
    6. En Trabajo, asigna un nombre al ID del trabajo.
    7. En Tipo de trabajo, elige PySpark.
    8. En Archivo principal de Python, proporciona el URI de almacenamiento de gcloud de la secuencia de comandos de PySpark subida que comienza con gs://.
    9. En el caso de los archivos Jar, elige la versión más reciente del conector de Spark de Maven, busca el Jar con dependencias en las opciones de descarga y copia su vínculo.
    10. En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco.
    11. En Propiedades, ingresa la clave spark.master y el valor yarn.
    12. Haga clic en Enviar.

gcloud

Usa el comando gcloud dataproc jobs submit pyspark otra vez para enviar el trabajo a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: la región de Dataproc preseleccionada.
  • --cluster: el nombre del clúster de Dataproc.
  • --jars: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el Jar de Uber con dependencias de Maven.
  • --driver-log-levels: establece el nivel de registro en INFO en el nivel raíz.
  • --properties: usa el administrador de recursos de YARN para la instancia principal de Spark.
  • --: Proporciona argumentos obligatorios para la secuencia de comandos.

Si la operación readStream se realiza de forma exitosa, deberías ver mensajes de registro como los siguientes de forma local y en la página de detalles del trabajo en la consola de Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Vuelve a reproducir y borra definitivamente mensajes de Pub/Sub Lite

Las operaciones de salto no funcionan cuando se lee de Pub/Sub Lite con el conector de Spark de Pub/Sub Lite porque los sistemas de Apache Spark realizan su propio seguimiento de los desplazamientos dentro de las particiones. La solución alternativa es agotar, buscar y reiniciar los flujos de trabajo.

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.

  1. Borra el tema y la suscripción.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Borra el clúster de Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Quita el bucket de Cloud Storage.

    gcloud storage rm gs://$BUCKET
    

¿Qué sigue?