O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:
- ler mensagens publicadas em um tópico do Pub/Sub;
- organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora;
- gravar as mensagens no Cloud Storage.
Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível.
Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.
Antes de começar
- Siga as instruções para instalar e inicializar o SDK do Cloud.
- Ative o faturamento do projeto.
Para concluir este guia de início rápido, você precisa ativar as seguintes APIs: Compute Engine, pacote de operações do Google Cloud, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager e App Engine.
Pode levar alguns instantes para que as APIs apareçam no console.
Crie uma chave de conta de serviço:
Crie uma chave de conta de serviço
- Na lista Conta de serviço, selecione Nova conta de serviço.
- Digite um nome no campo Nome da conta de serviço.
- Na lista Papel, selecione Projeto > Proprietário.
- Clique em Criar.
A chave é enviada para a pasta de downloads padrão do navegador.
Defina a variável de ambiente
GOOGLE_APPLICATION_CREDENTIALS
para apontar para a chave da conta de serviço.export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
Crie variáveis para o bucket, o projeto e a região. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. Selecione uma região do Dataflow perto de onde você executa os comandos neste guia de início rápido.
BUCKET_NAME=BUCKET_NAME PROJECT_NAME=$(gcloud config get-value project) REGION=DATAFLOW_REGION
Crie um bucket do Cloud Storage que pertença a este projeto:
gsutil mb gs://$BUCKET_NAME
Crie um tópico do Pub/Sub neste projeto:
gcloud pubsub topics create cron-topic
Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Cloud Pub/Sub em intervalos de um minuto.
Esta etapa criará um aplicativo do App Engine para o projeto, se já não houver um.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=cron-topic --message-body="Hello!"
Inicie o job.
gcloud scheduler jobs run publisher-job
Use o seguinte comando para clonar o repositório do guia de início rápido e navegar até o diretório do código de amostra:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Fazer streaming de mensagens do Pub/Sub para o Cloud Storage
Exemplo de código
Este exemplo de código usa o Dataflow para:
- Leia as mensagens do Pub/Sub.
- Janela (ou grupo) de mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
Grave as mensagens em cada janela nos arquivos no Cloud Storage.
Java
Python
Iniciar o pipeline
Para iniciar o pipeline, execute o seguinte comando:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_NAME \ --region=$REGION \ --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_NAME \ --region=$REGION \ --input_topic=projects/$PROJECT_NAME/topics/cron-topic \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --temp_location=gs://$BUCKET_NAME/temp
O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem. Quando o comando retornar JOB_MESSAGE_DETAILED: Workers
have started successfully
, saia do programa local usando Ctrl+C
.
Observar o andamento do job e do pipeline
Observe o progresso do job no console do Dataflow.
Abra a visualização de detalhes do job para ver:
- a estrutura do job;
- os registros da tarefa;
- as métricas do cenário.
Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.
Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.
gsutil ls gs://${BUCKET_NAME}/samples/
A saída será semelhante a esta:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32 gs://{$BUCKET_NAME}/samples/output-22:32-22:34 gs://{$BUCKET_NAME}/samples/output-22:34-22:36 gs://{$BUCKET_NAME}/samples/output-22:36-22:38
Limpar
Exclua o job do Cloud Scheduler.
gcloud scheduler jobs delete publisher-job
No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.
Exclua o tópico.
gcloud pubsub topics delete cron-topic
Exclua os arquivos criados pelo pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Remova o bucket do Cloud Storage.
gsutil rb gs://${BUCKET_NAME}
A seguir
Se você quiser organizar em janelas as mensagens do Pub/Sub por um carimbo de data/hora personalizado, especifique esse carimbo como um atributo na mensagem do Pub/Sub e use-o com PubsubIO's
withTimestampAttribute
.Confira os modelos do Dataflow de código aberto projetados para streaming.
Leia mais sobre como o Dataflow se integra ao Pub/Sub.
Para saber mais sobre janelas, consulte o exemplo Pipeline de jogos para dispositivos móveis do Apache Beam (em inglês).