Escrever mensagens do Pub/Sub Lite usando o Apache Spark
O conector do Pub/Sub Lite do Spark é uma biblioteca de cliente Java de código aberto compatível com o uso do Pub/Sub Lite como uma fonte de entrada e saída para o Streaming estruturado do Apache Spark. O conector funciona em todas as distribuições do Apache Spark, incluindo o Dataproc.
Neste guia de início rápido, você aprende a:
- ler mensagens do Pub/Sub Lite
- gravar mensagens no Pub/Sub Lite
usando PySpark em um cluster Spark do Dataproc.
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verifique se a cobrança está ativada para o seu projeto do Google Cloud.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verifique se a cobrança está ativada para o seu projeto do Google Cloud.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Configurar
Crie variáveis para o projeto.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Criar um bucket do Cloud Storage Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Crie um tópico e uma assinatura do Pub/Sub Lite em um local compatível. Consulte Criar um tópico. se você usar uma reserva do Pub/Sub Lite.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Criar um cluster de Dataproc.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: uma região compatível do Dataproc onde o tópico e a assinatura do Pub/Sub Lite residem.--image-version
: a versão de imagem do cluster, que determina a versão do Apache Spark instalada no cluster; Escolha versões de lançamento de imagem 2.x.x porque o conector Spark do Pub/Sub Lite atualmente é compatível com o Apache Spark 3.x.x.--scopes
: ativa o acesso da API aos serviços do Google Cloud no mesmo projeto.--enable-component-gateway
: ativar o acesso à IU da Web do Apache Spark.--bucket
: um bucket de preparo do Cloud Storage usado para armazenar dependências de job do cluster, saídas de driver e arquivos de configuração do cluster.
Clone o repositório do guia de início rápido e navegue até o diretório do código de amostra:
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Como gravar no Pub/Sub Lite
O exemplo a seguir:
- crie um
origem da tarifa
que gera números e carimbos de data/hora consecutivos formatados como
spark.sql.Row
- transformar os dados para corresponder ao
esquema de tabela
necessário para a API
writeStream
do Conector do Pub/Sub Lite Spark - gravar os dados em um tópico existente do Pub/Sub Lite
Para enviar o job de gravação ao Dataproc:
Console
- Faça upload do script do PySpark para o bucket do Cloud Storage.
- Acesse o Console do Cloud Storage.
- Selecione seu bucket.
- Use Upload files para fazer upload do script PySpark que você pretende usar.
- Envie o job para o cluster do Dataproc:
- Acesse o Console do Dataproc.
- Navegue até os jobs.
- Clique em Enviar job.
- Preencha os detalhes do job.
- Em Cluster, escolha o cluster.
- Em Job, atribua um nome ao ID do job.
- Em Tipo de job, escolha PySpark.
- Para o arquivo Python principal, forneça o URI do armazenamento do gcloud do script PySpark enviado que começa com
gs://
. - Para arquivos Jar, escolha a versão mais recente do conector do Spark no Maven , procure o jar com dependências nas opções de download e copie o link dele.
- Em Arguments, se você usar o script PySpark completo do
GitHub, digite
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID Se você copiar o script PySpark acima com as tarefas concluídas, deixe-o em branco. - Em Propriedades, insira a chave
spark.master
e o valoryarn
. - Clique em Enviar.
gcloud
Use o comando gcloud dataproc jobs submit pyspark para enviar o job ao Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: a região do Dataproc pré-selecionada.--cluster
: o nome do cluster do Dataproc.--jars
: jar uber do conector Spark do Pub/Sub Lite com dependências em um bucket público do Cloud Storage. Também é possível acessar este link para fazer o download do jar uber com dependências do Maven.--driver-log-levels
: defina o nível de geração de registros como INFO no nível raiz.--properties
: usa o gerenciador de recursos YARN para o mestre do Spark.--
: fornece os argumentos exigidos pelo script.
Se a operação writeStream
for bem-sucedida, você verá mensagens de registro
como as seguintes no local, bem como na página de detalhes do job no
console do Google Cloud:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Como ler do Pub/Sub Lite
O exemplo a seguir lerá mensagens de um
assinatura do Pub/Sub Lite usando o
readStream
API. O conector enviará mensagens de acordo com o
esquema de tabela
formatado como
spark.sql.Row
,
Para enviar o job de leitura para o Dataproc:
Console
- Faça upload do script do PySpark para o bucket do Cloud Storage.
- Acesse o Console do Cloud Storage.
- Selecione seu bucket.
- Use Upload files para fazer upload do script PySpark que você pretende usar.
- Envie o job para o cluster do Dataproc:
- Acesse o Console do Dataproc.
- Navegue até os jobs.
- Clique em Enviar job.
- Preencha os detalhes do job.
- Em Cluster, escolha o cluster.
- Em Job, atribua um nome ao ID do job.
- Em Tipo de job, escolha PySpark.
- Em Arquivo Python principal, forneça o URI de armazenamento da gcloud do
fez o upload de um script do PySpark que começa com
gs://
. - Para arquivos Jar, escolha a versão mais recente do conector do Spark no Maven (link em inglês), procure o jar com dependências nas opções de download e copie o link dele.
- Em Arguments, se você usar o script PySpark completo do
GitHub, digite
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID Se você copiar o script PySpark acima com as tarefas concluídas, deixe-o em branco. - Em Propriedades, insira a chave
spark.master
e o valoryarn
. - Clique em Enviar.
gcloud
Use o comando gcloud dataproc jobs submit pyspark novamente para enviar o job ao Dataproc:
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: a região do Dataproc pré-selecionada.--cluster
: o nome do cluster do Dataproc.--jars
: jar uber do conector Spark do Pub/Sub Lite com dependências em um bucket público do Cloud Storage. Também é possível acessar este link para fazer o download do jar uber com dependências do Maven.--driver-log-levels
: defina o nível de geração de registros como INFO no nível raiz.--properties
: usa o gerenciador de recursos YARN para o mestre do Spark.--
: fornece os argumentos necessários para o script.
Se a operação readStream
for bem-sucedida, você vai receber mensagens de registro
como no exemplo a seguir localmente e na página de detalhes do job no
Console do Google Cloud:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Reproduzir e limpar mensagens do Pub/Sub Lite
As operações de busca não funcionam durante a leitura o Pub/Sub Lite usa o conector do Spark do Pub/Sub Lite porque Os sistemas do Apache Spark realizam o próprio rastreamento de deslocamentos dentro das partições. A solução é drenar, buscar e reiniciar os fluxos de trabalho.
Limpar
Para evitar cobranças na conta do Google Cloud pelos recursos usados nesta página, siga estas etapas.
Exclua o tópico e a assinatura.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Exclua o cluster do Dataproc.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Remova o bucket do Cloud Storage.
gcloud storage rm gs://$BUCKET
A seguir
Confira o exemplo de contagem de palavras em Java para o conector Spark do Pub/Sub Lite.
Saiba como acessar a saída do driver do job do Dataproc.
Outros conectores do Spark por produtos do Google Cloud: conector do BigQuery, conector do Bigtable, conector do Cloud Storage.