Gravar mensagens do Pub/Sub Lite usando o Apache Spark

O conector do Pub/Sub Lite do Spark é uma biblioteca de cliente Java de código aberto compatível com o uso do Pub/Sub Lite como uma fonte de entrada e saída para Streaming estruturado do Apache Spark O conector funciona em todas as distribuições do Apache Spark, incluindo o Dataproc.

Neste guia de início rápido, você aprende a:

  • ler mensagens do Pub/Sub Lite
  • gravar mensagens no Pub/Sub Lite

usando PySpark (em inglês) em um cluster do Dataproc Spark.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  4. Ative as APIs Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Ative as APIs

  5. Instale a CLI do Google Cloud.
  6. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  7. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  8. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  9. Ative as APIs Pub/Sub Lite, Dataproc, Cloud Storage, Logging .

    Ative as APIs

  10. Instale a CLI do Google Cloud.
  11. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init

Configurar

  1. Crie variáveis para o projeto.

    export PROJECT_ID=$(gcloud config get-value project)
    export PROJECT_NUMBER=$(gcloud projects list \
        --filter="projectId:$PROJECT_ID" \
        --format="value(PROJECT_NUMBER)")
    
  2. Criar um bucket do Cloud Storage Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.

    export BUCKET=your-bucket-name
    gsutil mb gs://$BUCKET
    
  3. Crie um tópico e uma assinatura do Pub/Sub Lite em um local compatível. Consulte Criar um tópico se você usa uma reserva do Pub/Sub Lite.

    export TOPIC=your-lite-topic-id
    export SUBSCRIPTION=your-lite-subscription-id
    export PUBSUBLITE_LOCATION=your-lite-location
    gcloud pubsub lite-topics create $TOPIC \
        --location=$PUBSUBLITE_LOCATION \
        --partitions=2 \
        --per-partition-bytes=30GiB
    gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
        --location=$PUBSUBLITE_LOCATION \
        --topic=$TOPIC
    
  4. Criar um cluster de Dataproc.

    export DATAPROC_REGION=your-dataproc-region
    export CLUSTER_ID=your-dataproc-cluster-id
    gcloud dataproc clusters create $CLUSTER_ID \
       --region $DATAPROC_REGION \
       --image-version 2.1 \
       --scopes 'https://www.googleapis.com/auth/cloud-platform' \
       --enable-component-gateway \
       --bucket $BUCKET
    
    • --region: uma região compatível do Dataproc onde o tópico e a assinatura do Pub/Sub Lite residem.
    • --image-version: a versão de imagem do cluster, que determina a versão do Apache Spark instalada no cluster; Escolha versões de lançamento de imagem 2.x.x porque o conector do Spark do Pub/Sub Lite é compatível atualmente com o Apache Spark 3.x.x.
    • --scopes: ativa o acesso da API aos serviços do Google Cloud no mesmo projeto.
    • --enable-component-gateway: ativar o acesso à IU da Web do Apache Spark.
    • --bucket: um bucket de preparo do Cloud Storage usado para armazenar dependências de job do cluster, saídas de driver e arquivos de configuração do cluster.
  5. Clone o repositório do guia de início rápido e navegue até o diretório do código de amostra:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsublite/spark-connector/
    

Como gravar no Pub/Sub Lite

O exemplo a seguir:

  • Crie uma origem da taxa que gere números e carimbos de data/hora consecutivos formatados como spark.sql.Row.
  • transformar os dados para corresponder ao esquema de tabela necessário para a API writeStream do Conector do Pub/Sub Lite Spark
  • gravar os dados em um tópico existente do Pub/Sub Lite
from pyspark.sql import SparkSession
from pyspark.sql.functions import array, create_map, col, lit, when
from pyspark.sql.types import BinaryType, StringType
import uuid

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# topic_id = "your-topic-id"

spark = SparkSession.builder.appName("write-app").getOrCreate()

# Create a RateStreamSource that generates consecutive numbers with timestamps:
# |-- timestamp: timestamp (nullable = true)
# |-- value: long (nullable = true)
sdf = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Transform the dataframe to match the required data fields and data types:
# https://github.com/googleapis/java-pubsublite-spark#data-schema
sdf = (
    sdf.withColumn("key", lit("example").cast(BinaryType()))
    .withColumn("data", col("value").cast(StringType()).cast(BinaryType()))
    .withColumnRenamed("timestamp", "event_timestamp")
    # Populate the attributes field. For example, an even value will
    # have {"key1", [b"even"]}.
    .withColumn(
        "attributes",
        create_map(
            lit("key1"),
            array(when(col("value") % 2 == 0, b"even").otherwise(b"odd")),
        ),
    )
    .drop("value")
)

# After the transformation, the schema of the dataframe should look like:
# |-- key: binary (nullable = false)
# |-- data: binary (nullable = true)
# |-- event_timestamp: timestamp (nullable = true)
# |-- attributes: map (nullable = false)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = false)
# |    |    |-- element: binary (containsNull = false)
sdf.printSchema()

query = (
    sdf.writeStream.format("pubsublite")
    .option(
        "pubsublite.topic",
        f"projects/{project_number}/locations/{location}/topics/{topic_id}",
    )
    # Required. Use a unique checkpoint location for each job.
    .option("checkpointLocation", "/tmp/app" + uuid.uuid4().hex)
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 60 seconds to terminate the query.
query.awaitTermination(60)
query.stop()

Para enviar o job de gravação ao Dataproc:

Console

  1. Faça upload do script do PySpark para o bucket do Cloud Storage.
    1. Acesse o Console do Cloud Storage.
    2. Selecione seu bucket.
    3. Use Upload files para fazer upload do script PySpark que você pretende usar.
  2. Envie o job para o cluster do Dataproc:
    1. Acesse o Console do Dataproc.
    2. Navegue até os jobs.
    3. Clique em Enviar job.
    4. Preencha os detalhes do job.
    5. Em Cluster, escolha o cluster.
    6. Em Job, atribua um nome ao ID do job.
    7. Em Tipo de job, escolha PySpark.
    8. Para o arquivo Python principal, forneça o URI da gsutil do script PySpark enviado que começa com gs://.
    9. Para arquivos jar, escolha a versão mais recente do conector do Spark no Maven, procure o jar com dependências nas opções de download e copie o link.
    10. Em Arguments, se você usar o script PySpark completo do GitHub, digite --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --topic_id=TOPIC_ID Se você copiar o script PySpark acima com as tarefas concluídas, deixe-o em branco.
    11. Em Propriedades, insira a chave spark.master e o valor yarn.
    12. Clique em Enviar.

gcloud

Use o comando gcloud dataproc jobs submit pyspark para enviar o job ao Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
  • --region: a região do Dataproc pré-selecionada.
  • --cluster: o nome do cluster do Dataproc.
  • --jars: jar uber do conector Spark do Pub/Sub Lite com dependências em um bucket público do Cloud Storage. Também é possível acessar este link para fazer o download do jar uber com dependências do Maven.
  • --driver-log-levels: defina o nível de geração de registros como INFO no nível raiz.
  • --properties: usa o gerenciador de recursos YARN para o mestre do Spark.
  • --: fornece os argumentos exigidos pelo script.

Se a operação writeStream for bem-sucedida, você verá mensagens de registro como as seguintes no local, bem como na página de detalhes do job no console do Google Cloud:

INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..

Como ler do Pub/Sub Lite

O exemplo a seguir lerá mensagens de uma assinatura existente do Pub/Sub Lite usando a API readStream. O conector gerará mensagens que estão em conformidade com o esquema de tabela fixo como spark.sql.Row .

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark = SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf = (
    spark.readStream.format("pubsublite")
    .option(
        "pubsublite.subscription",
        f"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
    )
    .load()
)

sdf = sdf.withColumn("data", sdf.data.cast(StringType()))

query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="1 second")
    .start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()

Para enviar o job de leitura para o Dataproc:

Console

  1. Faça upload do script do PySpark para o bucket do Cloud Storage.
    1. Acesse o Console do Cloud Storage.
    2. Selecione seu bucket.
    3. Use Upload files para fazer upload do script PySpark que você pretende usar.
  2. Envie o job para o cluster do Dataproc:
    1. Acesse o Console do Dataproc.
    2. Navegue até os jobs.
    3. Clique em Enviar job.
    4. Preencha os detalhes do job.
    5. Em Cluster, escolha o cluster.
    6. Em Job, atribua um nome ao ID do job.
    7. Em Tipo de job, escolha PySpark.
    8. Para o arquivo Python principal, forneça o URI da gsutil do script PySpark enviado que começa com gs://.
    9. Para arquivos jar, escolha a versão mais recente do conector do Spark no Maven, procure o jar com dependências nas opções de download e copie o link.
    10. Em Arguments, se você usar o script PySpark completo do GitHub, digite --project_number=PROJECT_NUMBER, --location=PUBSUBLITE_LOCATION, --subscription_id=SUBSCRIPTION_ID Se você copiar o script PySpark acima com as tarefas concluídas, deixe-o em branco.
    11. Em Propriedades, insira a chave spark.master e o valor yarn.
    12. Clique em Enviar.

gcloud

Use o comando gcloud dataproc jobs submit pyspark novamente para enviar o job ao Dataproc:

gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
    --region=$DATAPROC_REGION \
    --cluster=$CLUSTER_ID \
    --jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
    --driver-log-levels=root=INFO \
    --properties=spark.master=yarn \
    -- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
  • --region: a região do Dataproc pré-selecionada.
  • --cluster: o nome do cluster do Dataproc.
  • --jars: jar uber do conector Spark do Pub/Sub Lite com dependências em um bucket público do Cloud Storage. Também é possível acessar este link para fazer o download do jar uber com dependências do Maven.
  • --driver-log-levels: defina o nível de geração de registros como INFO no nível raiz.
  • --properties: usa o gerenciador de recursos YARN para o mestre do Spark.
  • --: fornece os argumentos necessários para o script.

Se a operação readStream for bem-sucedida, você verá mensagens de registro como as seguintes no local, bem como na página de detalhes do job no console do Google Cloud:

+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|        subscription|partition|offset|key|data|   publish_timestamp|     event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...|        0| 89523|  0|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89524|  1|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|
|projects/50200928...|        0| 89525|  2|   .|2021-09-03 23:01:...|2021-09-03 22:56:...|        []|

Repetir e limpar mensagens do Pub/Sub Lite

As operações de busca não funcionam na leitura do Pub/Sub Lite usando o conector do Spark do Pub/Sub Lite porque os sistemas do Apache Spark executam o próprio rastreamento de deslocamentos dentro de partições. A solução é drenar, procurar e reiniciar os fluxos de trabalho.

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, siga estas etapas.

  1. Exclua o tópico e a assinatura.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  2. Exclua o cluster do Dataproc.

    gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
    
  3. Remova o bucket do Cloud Storage.

    gsutil rb gs://$BUCKET
    

A seguir