Fazer streaming de mensagens do Pub/Sub Lite usando o Dataflow
Como alternativa à gravação e execução dos seus próprios programas de processamento de dados, é possível usar o Dataflow com o conector de E/S do Pub/Sub Lite para Apache Beam. de dados. O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em stream (em tempo real) e modos de lote com a mesma confiabilidade e expressividade. Ele executa programas desenvolvidos de maneira confiável usando o SDK do Apache Beam, que tem um conjunto extensível de abstrações avançadas de processamento com estado e conectores de E/S para outros sistemas de streaming e em lote.
Neste guia de início rápido, mostramos como escrever um pipeline do Apache Beam que:
- Ler mensagens do Pub/Sub Lite
- Organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora
- gravar as mensagens no Cloud Storage.
Ela também mostra como:
- Enviar o pipeline para ser executado no Dataflow
- Criar um modelo flexível do Dataflow a partir do pipeline
Este tutorial requer o Maven, mas também é possível converter o projeto de exemplo do Maven para o Gradle. Para saber mais, consulte Opcional: converter do Maven para o Gradle.
Antes de começar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Configurar seu projeto do Pub/Sub Lite
Crie variáveis para o bucket, o projeto e a região do Dataflow do Cloud Storage. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. A região do Dataflow precisa ser válida para executar o job. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Crie um bucket do Cloud Storage que pertença a este projeto:
gcloud storage buckets create gs://$BUCKET
Criar um tópico e uma assinatura do Pub/Sub Lite
Crie um tópico e uma assinatura do Pub/Sub Lite.
Para o local do Lite, escolha um
local do Pub/Sub Lite com suporte. Você também precisa especificar uma zona para a região. Por exemplo, us-central1-a
.
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Transmitir mensagens para o Dataflow
Fazer o download do código de exemplo do início rápido
Clone o repositório do guia de início rápido e navegue até o diretório do código de amostra.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Código de amostra
Este exemplo de código usa o Dataflow para:
- Leia as mensagens de uma assinatura do Pub/Sub Lite como fonte ilimitada.
- Agrupe mensagens com base nos carimbos de data/hora de publicação usando janelas de tempo fixas e o acionador padrão.
Gravar as mensagens agrupadas em arquivos no Cloud Storage.
Java
Antes de executar este exemplo, siga as instruções de configuração do Java nas bibliotecas de cliente do Pub/Sub Lite.
Iniciar o pipeline do Dataflow
Para iniciar o pipeline no Dataflow, execute o seguinte comando:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
O comando anterior inicia um job do Dataflow. Acesse o link na saída do console para acessar o job no console de monitoramento do Dataflow.
Observar o andamento do job
Observe o andamento do job no console do Dataflow.
Abra a visualização de detalhes do job para ver:
- Gráfico do job
- Detalhes da execução
- Métricas do job
Publique algumas mensagens no seu tópico do Lite.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Talvez seja necessário aguardar alguns minutos para ver as mensagens nos registros do worker.
Use o comando abaixo para verificar quais arquivos foram gravados no Cloud Storage.
gcloud storage ls "gs://$BUCKET/samples/"
A saída será semelhante a esta:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Use o comando abaixo para analisar o conteúdo de um arquivo:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
Opcional: criar um modelo do Dataflow
Você tem a opção de criar um modelo flexível do Dataflow personalizado com base no seu pipeline. Os modelos do Dataflow permitem executar jobs com diferentes parâmetros de entrada no console do Google Cloud ou na linha de comando sem a necessidade de configurar um ambiente de desenvolvimento Java completo.
Crie um JAR multiarquitetura que inclua todas as dependências do pipeline. Você verá
target/pubsublite-streaming-bundled-1.0.jar
após a execução do comando.mvn clean package -DskipTests=true
Forneça nomes e locais para o arquivo de modelo e para a imagem do contêiner do modelo.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Criar um modelo flexível personalizado. Um arquivo
metadata.json
obrigatório, que contém a especificação necessária para executar o job, foi fornecido com o exemplo.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Execute um job usando o modelo flexível personalizado.
Console
Digite um Nome do job.
Insira sua região do Dataflow.
Escolha seu Modelo personalizado.
Digite o caminho do modelo.
Insira os parâmetros obrigatórios.
Cliquem em Executar job.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.
No console do Dataflow, interrompa o job. Cancele o pipeline em vez de drená-lo.
Exclua o tópico e a assinatura.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Exclua os arquivos criados pelo pipeline.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Exclua a imagem e o arquivo de modelo, se houver.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Remova o bucket do Cloud Storage.
gcloud storage rm gs://$BUCKET --recursive
-
Exclua a conta de serviço:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
A seguir
Leia mais sobre a configuração de modelos flexíveis do Dataflow.
Entenda os pipelines de streaming do Dataflow.