Cómo escribir mensajes de Pub/Sub Lite con Apache Spark
El Conector de Spark de Pub/Sub Lite es una biblioteca cliente de código abierto de Java que admite Pub/Sub Lite como fuente de entrada y salida para Transmisión estructurada de Apache Spark de Google Cloud. El conector funciona en todas las distribuciones de Apache Spark, incluido Dataproc.
Esta guía de inicio rápido te muestra cómo hacer lo siguiente:
- leer mensajes de Pub/Sub Lite
- escribir mensajes en Pub/Sub Lite
usar PySpark desde un clúster de Spark de Dataproc
Antes de comenzar
- Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Configurar
Crea variables para tu proyecto.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Crea un bucket de Cloud Storage Los nombres de depósitos de Cloud Storage deben ser únicos a nivel global.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Crea un tema y una suscripción de Pub/Sub Lite en una ubicación compatible. Consulta Cómo crear un tema. si usas una reserva de Pub/Sub Lite.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Crea un clúster de Dataproc.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: una región de Dataproc compatible en la que se encuentran la suscripción y el tema de Pub/Sub Lite.--image-version
: Es la versión con imágenes del clúster, que determina la versión de Apache Spark instalada en el clúster. Elige versiones de actualización con imágenes 2.x.x porque, por el momento, el conector de Spark para Pub/Sub Lite es compatible con Apache Spark 3.x.x.--scopes
: Habilita el acceso a la API a los servicios de Google Cloud en el mismo proyecto.--enable-component-gateway
: Habilita el acceso a la IU web de Apache Spark.--bucket
: Es un bucket de etapa de pruebas de Cloud Storage que se usa para almacenar las dependencias de trabajos del clúster, los resultados del controlador y los archivos de configuración del clúster.
Clona el repositorio de la guía de inicio rápido y navega al directorio de código de muestra:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Escribe en Pub/Sub Lite
En el siguiente ejemplo, se realizan las siguientes acciones:
- crear una fuente de tasa que genere números y marcas de tiempo consecutivos con formato
spark.sql.Row
- transformar los datos para que coincidan
esquema de tabla
por el conector de Spark de Pub/Sub Lite
writeStream
API - escribir los datos en un tema existente de Pub/Sub Lite
Para enviar el trabajo de escritura a Dataproc, sigue estos pasos:
Console
- Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
- Ve a la consola de Cloud Storage.
- Selecciona tu bucket.
- Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
- Envía el trabajo a tu clúster de Dataproc:
- Ve a la consola de Dataproc.
- Navega a trabajos.
- Haga clic en Enviar trabajo.
- Completa los detalles del trabajo.
- En Clúster, selecciona tu clúster.
- En Trabajo, asigna un nombre al ID del trabajo.
- En Tipo de trabajo, elige PySpark.
- En Archivo principal de Python, proporciona el URI de almacenamiento de gcloud de la secuencia de comandos de PySpark subida que comienza con
gs://
. - Para los archivos JAR, elige la versión más reciente del conector de Spark en Maven busca el jar con dependencias en las opciones de descarga. copiar su vínculo.
- En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco. - En Propiedades, ingresa la clave
spark.master
y el valoryarn
. - Haga clic en Enviar.
gcloud
Usa el comando gcloud dataproc jobs submit pyspark para enviar el trabajo a Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: la región de Dataproc preseleccionada.--cluster
: el nombre del clúster de Dataproc.--jars
: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el Jar de Uber con dependencias de Maven.--driver-log-levels
: establece el nivel de registro en INFO en el nivel raíz.--properties
: usa el administrador de recursos de YARN para la instancia principal de Spark.--
: proporciona los argumentos que requiere la secuencia de comandos.
Si la operación writeStream
se realiza de forma exitosa, deberías ver mensajes de registro como los siguientes de forma local y en la página de detalles del trabajo en la consola de Google Cloud:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Lee desde Pub/Sub Lite
En el siguiente ejemplo, se leerán mensajes de una suscripción existente de Pub/Sub Lite con la API de readStream
. El conector generará mensajes que se ajusten al
esquema de tabla
formateado como
spark.sql.Row
de Google Cloud.
Para enviar el trabajo de lectura a Dataproc, sigue estos pasos:
Console
- Sube la secuencia de comandos de PySpark a tu bucket de Cloud Storage.
- Ve a la consola de Cloud Storage.
- Selecciona tu bucket.
- Usa Upload files para subir la secuencia de comandos de PySpark que deseas usar.
- Envía el trabajo a tu clúster de Dataproc:
- Ve a la consola de Dataproc.
- Navega a trabajos.
- Haga clic en Enviar trabajo.
- Completa los detalles del trabajo.
- En Clúster, selecciona tu clúster.
- En Trabajo, asigna un nombre al ID del trabajo.
- En Tipo de trabajo, elige PySpark.
- En Archivo principal de Python, proporciona el URI de almacenamiento de gcloud del
Se subió una secuencia de comandos de PySpark que comienza con
gs://
. - Para los archivos JAR, elige la versión más reciente del conector de Spark en Maven busca el jar con dependencias en las opciones de descarga. copiar su vínculo.
- En el caso de los Argumentos, si usas la secuencia de comandos completa de PySpark desde GitHub, ingresa
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID. Si copias la secuencia de comandos de PySpark anterior con las tareas pendientes completadas, déjala en blanco. - En Propiedades, ingresa la clave
spark.master
y el valoryarn
. - Haga clic en Enviar.
gcloud
Usa el comando gcloud dataproc jobs submit pyspark otra vez para enviar el trabajo a Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: la región de Dataproc preseleccionada.--cluster
: el nombre del clúster de Dataproc.--jars
: el jar de uber del conector de Spark de Pub/Sub Lite con dependencias en un bucket público de Cloud Storage. También puedes visitar este vínculo para descargar el archivo uber jar con dependencias de Maven.--driver-log-levels
: establece el nivel de registro en INFO en el nivel raíz.--properties
: usa el administrador de recursos de YARN para la instancia principal de Spark.--
: Proporciona argumentos obligatorios para la secuencia de comandos.
Si la operación readStream
se realiza de forma exitosa, deberías ver mensajes de registro como los siguientes de forma local y en la página de detalles del trabajo en la consola de Google Cloud:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Volver a reproducir y borrar definitivamente mensajes de Pub/Sub Lite
Las operaciones de salto no funcionan cuando se lee de Pub/Sub Lite con el conector de Spark de Pub/Sub Lite porque los sistemas de Apache Spark realizan su propio seguimiento de los desplazamientos dentro de las particiones. La solución alternativa es agotar, buscar y reiniciar los flujos de trabajo.
Limpia
Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.
Borra el tema y la suscripción.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Borra el clúster de Dataproc.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Quita el bucket de Cloud Storage.
gcloud storage rm gs://$BUCKET
¿Qué sigue?
Consulta el ejemplo de recuento de palabras en Java para el conector de Spark de Pub/Sub Lite.
Aprende a acceder al resultado del controlador del trabajo de Dataproc.
Otros conectores de Spark de productos de Google Cloud: Conector de BigQuery, el conector de Bigtable, Conector de Cloud Storage.