Escribe mensajes de Pub/Sub Lite con Apache Spark

El Conector de Spark de Pub/Sub Lite es una biblioteca cliente de código abierto de Java que admite Pub/Sub Lite como fuente de entrada y salida para Transmisión estructurada de Apache Spark . El conector funciona en todas las distribuciones de Apache Spark, incluido Dataproc.

Esta guía de inicio rápido te muestra cómo hacer lo siguiente:

  • leer mensajes de Pub/Sub Lite
  • escribir mensajes en Pub/Sub Lite

mediante PySpark desde un clúster de Spark de Dataproc.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  4. Habilita las API de Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Habilita las API

  5. Instala Google Cloud CLI.
  6. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  9. Habilita las API de Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Habilita las API

  10. Instala Google Cloud CLI.
  11. Para inicializar la CLI de gcloud, ejecuta el siguiente comando:

    gcloud init

Configurar

  1. Crea variables para tu proyecto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Crea un bucket de Cloud Storage Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Crea un tema y una suscripción de Pub/Sub Lite en una ubicación compatible. Consulta Cómo crear un tema. si usas una reserva de Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Crea un clúster de Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: una región de Dataproc compatible en la que se encuentran la suscripción y el tema de Pub/Sub Lite.
    • --image-version: Es la versión con imágenes del clúster, que determina la versión de Apache Spark instalada en el clúster. Elegir Versiones de actualización de imágenes 2.x.x ya que, actualmente, el conector de Spark de Pub/Sub Lite es compatible con Apache Spark 3.x.x.
    • --scopes: Habilita el acceso a la API a los servicios de Google Cloud en el mismo proyecto.
    • --enable-component-gateway: Habilita el acceso a la IU web de Apache Spark.
    • --bucket: Es un bucket de etapa de pruebas de Cloud Storage que se usa para almacenar las dependencias de trabajos del clúster, los resultados del controlador y los archivos de configuración del clúster.
  5. Clona el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Escribe en Pub/Sub Lite

En el siguiente ejemplo, se realizan las siguientes acciones:

from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Para enviar el trabajo de escritura a Dataproc, sigue estos pasos:

Console

  1. Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
    1. Ve a la consola de Cloud Storage.
    2. Selecciona tu bucket.
    3. Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
  2. Envía el trabajo a tu clúster de Dataproc:
    1. Ve a la consola de Dataproc.
    2. Navega a trabajos.
    3. Haga clic en Enviar trabajo.
    4. Completa los detalles del trabajo.
    5. En Clúster, selecciona tu clúster.
    6. En Trabajo, asigna un nombre al ID del trabajo.
    7. En Tipo de trabajo, elige PySpark.
    8. En Archivo principal de Python, proporciona el URI de gsutil de la secuencia de comandos de PySpark subida que comienza con gs://.
    9. Para los archivos JAR, elige la versión más reciente del conector de Spark en Maven busca el jar con dependencias en las opciones de descarga. copiar su vínculo.
    10. En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco.
    11. En Propiedades, ingresa la clave spark.master y el valor yarn.
    12. Haga clic en Enviar.

gcloud

Usa el comando gcloud dataproc jobs submit pyspark para enviar el trabajo a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: la región de Dataproc preseleccionada.
  • --cluster: el nombre del clúster de Dataproc.
  • --jars: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el archivo uber jar con dependencias de Maven.
  • --driver-log-levels: establece el nivel de registro en INFO en el nivel raíz.
  • --properties: usa el administrador de recursos de YARN para la instancia principal de Spark.
  • --: proporciona los argumentos que requiere la secuencia de comandos.

Si la operación writeStream se realiza correctamente, deberías ver mensajes de registro. de la siguiente manera tanto a nivel local como en la página de detalles del trabajo en la Consola de Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Lee desde Pub/Sub Lite

En el siguiente ejemplo, se leerán los mensajes de un suscripción a Pub/Sub Lite con el readStream API. El conector generará mensajes que se ajusten al esquema de tabla formateado como spark.sql.Row .

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Para enviar el trabajo de lectura a Dataproc, sigue estos pasos:

Console

  1. Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
    1. Ve a la consola de Cloud Storage.
    2. Selecciona tu bucket.
    3. Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
  2. Envía el trabajo a tu clúster de Dataproc:
    1. Ve a la consola de Dataproc.
    2. Navega a trabajos.
    3. Haga clic en Enviar trabajo.
    4. Completa los detalles del trabajo.
    5. En Clúster, selecciona tu clúster.
    6. En Trabajo, asigna un nombre al ID del trabajo.
    7. En Tipo de trabajo, elige PySpark.
    8. En Archivo principal de Python, proporciona el URI de gsutil de la secuencia de comandos de PySpark subida que comienza con gs://.
    9. Para los archivos JAR, elige la versión más reciente del conector de Spark en Maven busca el jar con dependencias en las opciones de descarga. copiar su vínculo.
    10. En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco.
    11. En Propiedades, ingresa la clave spark.master y el valor yarn.
    12. Haga clic en Enviar.

gcloud

Usa el comando gcloud dataproc jobs submit pyspark otra vez para enviar el trabajo a Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: la región de Dataproc preseleccionada.
  • --cluster: el nombre del clúster de Dataproc.
  • --jars: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el archivo uber jar con dependencias de Maven.
  • --driver-log-levels: establece el nivel de registro en INFO en el nivel raíz.
  • --properties: usa el administrador de recursos de YARN para la instancia principal de Spark.
  • --: Proporciona argumentos obligatorios para la secuencia de comandos.

Si la operación readStream se realiza correctamente, deberías ver mensajes de registro. de la siguiente manera tanto a nivel local como en la página de detalles del trabajo en la Consola de Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Volver a reproducir y borrar definitivamente mensajes de Pub/Sub Lite

Las operaciones de búsqueda no funcionan cuando se lee desde Pub/Sub Lite con el conector de Spark de Pub/Sub Lite debido a lo siguiente: Los sistemas de Apache Spark realizan su propio seguimiento de desplazamientos dentro de particiones. La solución alternativa es desviar, buscar y reiniciar los flujos de trabajo.

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.

  1. Borra el tema y la suscripción.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Borra el clúster de Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Quita el bucket de Cloud Storage.

    gsutil rb gs://$BUCKET
    

¿Qué sigue?