Fazer streaming de mensagens do Pub/Sub usando o Dataflow
O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:
- ler mensagens publicadas em um tópico do Pub/Sub;
- organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora;
- gravar as mensagens no Cloud Storage.
Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível. Este guia de início rápido também é oferecido como um tutorial do Google Cloud Ensina, que oferece credenciais temporárias para você começar.
Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.
Antes de começar
- Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
-
No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.
-
Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como verificar se o faturamento está ativado em um projeto.
-
Ative as APIs Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.
-
Crie uma conta de serviço:
-
No console do Cloud, acesse a página Criar conta de serviço.
Acesse "Criar conta de serviço" - Selecione o projeto.
-
No campo Nome da conta de serviço, insira um nome. O console do Cloud preenche o campo ID da conta de serviço com base nesse nome.
No campo Descrição da conta de serviço, insira uma descrição. Por exemplo,
Service account for quickstart
. - Clique em Criar e continuar.
-
Para fornecer acesso ao projeto, conceda os seguintes papéis à conta de serviço: Dataflow Admin, Pub/Sub Admin, Cloud Storage Object Admin .
Na lista Selecionar um papel, escolha um.
Para papéis adicionais, clique em
Adicionar outro papel e adicione cada papel adicional. - Clique em Continuar.
-
Clique em Concluído para terminar a criação da conta de serviço.
Não feche a janela do navegador. Você vai usá-la na próxima etapa.
-
-
Crie uma chave de conta de serviço:
- No console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
- Clique em Chaves.
- Clique em Adicionar chave e em Criar nova chave.
- Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
- Clique em Fechar.
-
Defina a variável de ambiente
GOOGLE_APPLICATION_CREDENTIALS
como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente. - Instale e inicialize a Google Cloud CLI.
-
No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.
-
Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como verificar se o faturamento está ativado em um projeto.
-
Ative as APIs Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.
-
Crie uma conta de serviço:
-
No console do Cloud, acesse a página Criar conta de serviço.
Acesse "Criar conta de serviço" - Selecione o projeto.
-
No campo Nome da conta de serviço, insira um nome. O console do Cloud preenche o campo ID da conta de serviço com base nesse nome.
No campo Descrição da conta de serviço, insira uma descrição. Por exemplo,
Service account for quickstart
. - Clique em Criar e continuar.
-
Para fornecer acesso ao projeto, conceda os seguintes papéis à conta de serviço: Dataflow Admin, Pub/Sub Admin, Cloud Storage Object Admin .
Na lista Selecionar um papel, escolha um.
Para papéis adicionais, clique em
Adicionar outro papel e adicione cada papel adicional. - Clique em Continuar.
-
Clique em Concluído para terminar a criação da conta de serviço.
Não feche a janela do navegador. Você vai usá-la na próxima etapa.
-
-
Crie uma chave de conta de serviço:
- No console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
- Clique em Chaves.
- Clique em Adicionar chave e em Criar nova chave.
- Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
- Clique em Fechar.
-
Defina a variável de ambiente
GOOGLE_APPLICATION_CREDENTIALS
como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente. - Instale e inicialize a Google Cloud CLI.
-
Crie variáveis para o bucket, o projeto e a região. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. Selecione uma região do Dataflow perto de onde você executa os comandos neste guia de início rápido. O valor da variável
REGION
precisa ser um nome de região válido. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.BUCKET_NAME=your-bucket-name PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=your-topic-id REGION=dataflow-region
-
Crie um bucket do Cloud Storage que pertença a este projeto:
gsutil mb gs://$BUCKET_NAME
-
Crie um tópico do Pub/Sub neste projeto:
gcloud pubsub topics create $TOPIC_ID
-
Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Pub/Sub em intervalos de um minuto.
Esta etapa criará um aplicativo do App Engine para o projeto, se já não houver um.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Inicie o job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Use os seguintes comandos para clonar o repositório do guia de início rápido e navegar até o diretório do código de amostra:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Fazer streaming de mensagens do Pub/Sub para o Cloud Storage
Exemplo de código
Este exemplo de código usa o Dataflow para:
- Leia as mensagens do Pub/Sub.
- Janela (ou grupo) de mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
Grave as mensagens em cada janela nos arquivos no Cloud Storage.
Java
Python
Iniciar o pipeline
Para iniciar o pipeline, execute o seguinte comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp
O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem. Quando o comando retornar JOB_MESSAGE_DETAILED: Workers
have started successfully
, saia do programa local usando Ctrl+C
.
Observar o andamento do job e do pipeline
Observe o progresso do job no console do Dataflow.
Abra a visualização de detalhes do job para ver:
- a estrutura do job;
- os registros da tarefa;
- as métricas do cenário.
Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.
Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.
gsutil ls gs://${BUCKET_NAME}/samples/
A saída será semelhante a esta:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, siga as etapas abaixo.
Exclua o job do Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.
Exclua o tópico.
gcloud pubsub topics delete $TOPIC_ID
Exclua os arquivos criados pelo pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Remova o bucket do Cloud Storage.
gsutil rb gs://${BUCKET_NAME}
A seguir
- Se você quiser exibir as mensagens do Pub/Sub por um carimbo de data/hora personalizado, especifique esse carimbo como um atributo na mensagem do Pub/Sub e use esse carimbo com PubsubIO'swithTimestampAttribute
.
Confira os modelos do Dataflow de código aberto projetados para streaming.
Leia mais sobre como o Dataflow se integra ao Pub/Sub.
Confira este tutorial que lê do Pub/Sub e grava no BigQuery usando modelos Flex do Dataflow.
Para saber mais sobre janelas, consulte o exemplo Pipeline de jogos para dispositivos móveis do Apache Beam (em inglês).