Gravar mensagens do Pub/Sub Lite usando o Apache Spark

O conector do Pub/Sub Lite do Spark é uma biblioteca de cliente Java de código aberto compatível com o uso do Pub/Sub Lite como uma fonte de entrada e saída para o Streaming estruturado do Apache Spark. O conector funciona em todas as distribuições do Apache Spark, incluindo o Dataproc.

Neste guia de início rápido, você aprende a:

  • ler mensagens do Pub/Sub Lite
  • gravar mensagens no Pub/Sub Lite

usando o PySpark (em inglês) em um cluster do Dataproc Spark.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.
  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Make sure that billing is enabled for your Google Cloud project.

  9. Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.

    Enable the APIs

  10. Install the Google Cloud CLI.
  11. To initialize the gcloud CLI, run the following command:

    gcloud init

Configurar

  1. Crie variáveis para o projeto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
  2. Criar um bucket do Cloud Storage Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.

    export BUCKET=your-bucket-name
    gcloud storage buckets create gs://$BUCKET
    
  3. Crie um tópico e uma assinatura do Pub/Sub Lite em um local compatível. Consulte Criar um tópico se você usa uma reserva do Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
  4. Criar um cluster de Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    • --region: uma região compatível do Dataproc onde o tópico e a assinatura do Pub/Sub Lite residem.
    • --image-version: a versão de imagem do cluster, que determina a versão do Apache Spark instalada no cluster; Escolha versões de lançamento de imagem 2.x.x porque o conector Spark do Pub/Sub Lite atualmente é compatível com o Apache Spark 3.x.x.
    • --scopes: permite o acesso da API aos serviços do Google Cloud no mesmo projeto.
    • --enable-component-gateway: ativar o acesso à IU da Web do Apache Spark.
    • --bucket: um bucket de preparo do Cloud Storage usado para armazenar dependências de job do cluster, saídas de driver e arquivos de configuração do cluster.
  5. Clone o repositório do guia de início rápido e navegue até o diretório do código de amostra:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Como gravar no Pub/Sub Lite

O exemplo a seguir:

  • Crie uma origem da taxa que gere números e carimbos de data/hora consecutivos formatados como spark.sql.Row.
  • transformar os dados para corresponder ao esquema de tabela necessário para a API writeStream do Conector do Pub/Sub Lite Spark
  • gravar os dados em um tópico existente do Pub/Sub Lite
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Para enviar o job de gravação ao Dataproc:

Console

  1. Faça upload do script do PySpark para o bucket do Cloud Storage.
    1. Acesse o Console do Cloud Storage.
    2. Selecione seu bucket.
    3. Use Upload files para fazer upload do script PySpark que você pretende usar.
  2. Envie o job para o cluster do Dataproc:
    1. Acesse o Console do Dataproc.
    2. Navegue até os jobs.
    3. Clique em Enviar job.
    4. Preencha os detalhes do job.
    5. Em Cluster, escolha o cluster.
    6. Em Job, atribua um nome ao ID do job.
    7. Em Tipo de job, escolha PySpark.
    8. Para o arquivo Python principal, forneça o URI do armazenamento do gcloud do script PySpark enviado que começa com gs://.
    9. Para arquivos Jar, escolha a versão mais recente do conector do Spark no Maven , procure o jar com dependências nas opções de download e copie o link dele.
    10. Em Arguments, se você usar o script PySpark completo do GitHub, digite --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID Se você copiar o script PySpark acima com as tarefas concluídas, deixe-o em branco.
    11. Em Propriedades, insira a chave spark.master e o valor yarn.
    12. Clique em Enviar.

gcloud

Use o comando gcloud dataproc jobs submit pyspark para enviar o job ao Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: a região do Dataproc pré-selecionada.
  • --cluster: o nome do cluster do Dataproc.
  • --jars: jar uber do conector Spark do Pub/Sub Lite com dependências em um bucket público do Cloud Storage. Também é possível acessar este link para fazer o download do jar uber com dependências do Maven.
  • --driver-log-levels: defina o nível de geração de registros como INFO no nível raiz.
  • --properties: usa o gerenciador de recursos YARN para o mestre do Spark.
  • --: fornece os argumentos exigidos pelo script.

Se a operação writeStream for bem-sucedida, você verá mensagens de registro como as seguintes no local, bem como na página de detalhes do job no console do Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Como ler do Pub/Sub Lite

O exemplo a seguir vai ler mensagens de uma assinatura existente do Pub/Sub Lite usando a API readStream. O conector vai gerar mensagens que estão em conformidade com o esquema de tabela fixo como spark.sql.Row .

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Para enviar o job de leitura para o Dataproc:

Console

  1. Faça upload do script do PySpark para o bucket do Cloud Storage.
    1. Acesse o Console do Cloud Storage.
    2. Selecione seu bucket.
    3. Use Upload files para fazer upload do script PySpark que você pretende usar.
  2. Envie o job para o cluster do Dataproc:
    1. Acesse o Console do Dataproc.
    2. Navegue até os jobs.
    3. Clique em Enviar job.
    4. Preencha os detalhes do job.
    5. Em Cluster, escolha o cluster.
    6. Em Job, atribua um nome ao ID do job.
    7. Em Tipo de job, escolha PySpark.
    8. Para o arquivo Python principal, forneça o URI do armazenamento do gcloud do script PySpark enviado que começa com gs://.
    9. Para arquivos Jar, escolha a versão mais recente do conector do Spark no Maven , procure o jar com dependências nas opções de download e copie o link dele.
    10. Em Arguments, se você usar o script PySpark completo do GitHub, digite --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID Se você copiar o script PySpark acima com as tarefas concluídas, deixe-o em branco.
    11. Em Propriedades, insira a chave spark.master e o valor yarn.
    12. Clique em Enviar.

gcloud

Use o comando gcloud dataproc jobs submit pyspark novamente para enviar o job ao Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: a região do Dataproc pré-selecionada.
  • --cluster: o nome do cluster do Dataproc.
  • --jars: jar uber do conector Spark do Pub/Sub Lite com dependências em um bucket público do Cloud Storage. Também é possível acessar este link para fazer o download do jar uber com dependências do Maven.
  • --driver-log-levels: defina o nível de geração de registros como INFO no nível raiz.
  • --properties: usa o gerenciador de recursos YARN para o mestre do Spark.
  • --: fornece os argumentos necessários para o script.

Se a operação readStream for bem-sucedida, você verá mensagens de registro como as seguintes no local, bem como na página de detalhes do job no console do Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Reproduzir e limpar mensagens do Pub/Sub Lite

As operações de busca não funcionam ao ler do Pub/Sub Lite usando o conector do Pub/Sub Lite do Spark porque os sistemas do Apache Spark realizam o próprio rastreamento de deslocamentos nas partições. A solução é drenar, procurar e reiniciar os fluxos de trabalho.

Limpar

Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, siga estas etapas.

  1. Exclua o tópico e a assinatura.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Exclua o cluster do Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Remova o bucket do Cloud Storage.

    gcloud storage rm gs://$BUCKET
    

A seguir