Escribe mensajes de Pub/Sub Lite con Apache Spark

El conector de Spark de Pub/Sub Lite es una biblioteca cliente de Java de código abierto que admite el uso de Pub/Sub Lite como fuente de entrada y salida para la transmisión estructurada de Apache Spark. El conector funciona en todas las distribuciones de Apache Spark, incluido Dataproc.

Esta guía de inicio rápido te muestra cómo hacer lo siguiente:

  • leer mensajes de Pub/Sub Lite
  • escribir mensajes en Pub/Sub Lite

con PySpark desde un clúster de Spark de Dataproc.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  4. Habilita las API de Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Habilita las API

  5. Instala Google Cloud CLI.
  6. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  9. Habilita las API de Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Habilita las API

  10. Instala Google Cloud CLI.
  11. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init

Configurar

  1. Crea variables para tu proyecto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Crea un bucket de Cloud Storage Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Crea un tema y una suscripción de Pub/Sub Lite en una ubicación compatible. Consulta Crea un tema si usas una reserva de Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Crea un clúster de Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: una región de Dataproc compatible en la que se encuentran la suscripción y el tema de Pub/Sub Lite.
    • --image-version: Es la versión con imágenes del clúster, que determina la versión de Apache Spark instalada en el clúster. Elige las versiones de actualización de la imagen 2.x.x porque, en la actualidad, el conector de Spark de Pub/Sub Lite es compatible con Apache Spark 3.x.x.
    • --scopes: Habilita el acceso a la API a los servicios de Google Cloud en el mismo proyecto.
    • --enable-component-gateway: Habilita el acceso a la IU web de Apache Spark.
    • --bucket: Es un bucket de etapa de pruebas de Cloud Storage que se usa para almacenar las dependencias de trabajos del clúster, los resultados del controlador y los archivos de configuración del clúster.
  5. Clona el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Escribe en Pub/Sub Lite

En el siguiente ejemplo, se realizan las siguientes acciones:

  • crear una fuente de tasas que genere números consecutivos y marcas de tiempo con el formato spark.sql.Row
  • transformar los datos para que coincidan con el esquema de tabla requerido por la API de writeStream del conector de Spark de Pub/Sub Lite
  • escribir los datos en un tema existente de Pub/Sub Lite
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Para enviar el trabajo de escritura a Dataproc, sigue estos pasos:

Console

  1. Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
    1. Ve a la consola de Cloud Storage.
    2. Selecciona tu bucket.
    3. Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
  2. Envía el trabajo a tu clúster de Dataproc:
    1. Ve a la consola de Dataproc.
    2. Navega a trabajos.
    3. Haga clic en Enviar trabajo.
    4. Completa los detalles del trabajo.
    5. En Clúster, selecciona tu clúster.
    6. En Trabajo, asigna un nombre al ID del trabajo.
    7. En Tipo de trabajo, elige PySpark.
    8. En Archivo principal de Python, proporciona el URI de gsutil de la secuencia de comandos de PySpark subida que comienza con gs://.
    9. En el caso de los archivos JAR, elige la versión más reciente del conector de Spark en Maven, busca el archivo jar con dependencias en las opciones de descarga y copia el vínculo.
    10. En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco.
    11. En Propiedades, ingresa la clave spark.master y el valor yarn.
    12. Haga clic en Enviar.

gcloud

Usa el comando gcloud dataproc jobs submit pyspark para enviar el trabajo a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: la región de Dataproc preseleccionada.
  • --cluster: el nombre del clúster de Dataproc.
  • --jars: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el jar uber con dependencias de Maven.
  • --driver-log-levels: establece el nivel de registro en INFO en el nivel raíz.
  • --properties: usa el administrador de recursos de YARN para la instancia principal de Spark.
  • --: proporciona los argumentos que requiere la secuencia de comandos.

Si la operación writeStream se realiza de forma correcta, deberías ver mensajes de registro como el siguiente de forma local y en la página de detalles del trabajo en Google Cloud Console:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lee desde Pub/Sub Lite

En el siguiente ejemplo, se leerán los mensajes de una suscripción a Pub/Sub Lite existente con la API de readStream. El conector generará mensajes que se ajusten al esquema de tabla fijo con el formato spark.sql.Row.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Para enviar el trabajo de lectura a Dataproc, sigue estos pasos:

Console

  1. Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
    1. Ve a la consola de Cloud Storage.
    2. Selecciona tu bucket.
    3. Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
  2. Envía el trabajo a tu clúster de Dataproc:
    1. Ve a la consola de Dataproc.
    2. Navega a trabajos.
    3. Haga clic en Enviar trabajo.
    4. Completa los detalles del trabajo.
    5. En Clúster, selecciona tu clúster.
    6. En Trabajo, asigna un nombre al ID del trabajo.
    7. En Tipo de trabajo, elige PySpark.
    8. En Archivo principal de Python, proporciona el URI de gsutil de la secuencia de comandos de PySpark subida que comienza con gs://.
    9. En el caso de los archivos JAR, elige la versión más reciente del conector de Spark en Maven, busca el archivo jar con dependencias en las opciones de descarga y copia el vínculo.
    10. En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco.
    11. En Propiedades, ingresa la clave spark.master y el valor yarn.
    12. Haga clic en Enviar.

gcloud

Usa el comando gcloud dataproc jobs submit pyspark otra vez para enviar el trabajo a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: la región de Dataproc preseleccionada.
  • --cluster: el nombre del clúster de Dataproc.
  • --jars: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el jar uber con dependencias de Maven.
  • --driver-log-levels: establece el nivel de registro en INFO en el nivel raíz.
  • --properties: usa el administrador de recursos de YARN para la instancia principal de Spark.
  • --: Proporciona argumentos obligatorios para la secuencia de comandos.

Si la operación readStream se realiza de forma correcta, deberías ver mensajes de registro como el siguiente de forma local y en la página de detalles del trabajo en Google Cloud Console:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Volver a reproducir y borrar definitivamente mensajes de Pub/Sub Lite

Las operaciones de búsqueda no funcionan cuando se leen desde Pub/Sub Lite con el conector de Spark de Pub/Sub Lite porque los sistemas de Apache Spark realizan su propio seguimiento de los desplazamientos dentro de las particiones. La solución alternativa es desviar, buscar y reiniciar los flujos de trabajo.

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página.

  1. Borra el tema y la suscripción.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Borra el clúster de Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Quita el bucket de Cloud Storage.

    gsutil rb gs://$BUCKET
    

¿Qué sigue?