Guia de início rápido: processamento de stream com Dataflow

O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:

  • ler mensagens publicadas em um tópico do Pub/Sub;
  • organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora;
  • gravar as mensagens no Cloud Storage.

Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível. Este guia de início rápido também é oferecido como um tutorial do Qwiklabs, que oferece credenciais temporárias para você começar.

Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.

    Ative as APIs

  5. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  6. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  8. Instale e inicialize o SDK do Cloud..
  9. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  10. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  11. Ative as APIs Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.

    Ative as APIs

  12. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar e continuar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  13. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  14. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  15. Instale e inicialize o SDK do Cloud..
  16. Crie variáveis para o bucket, o projeto e a região. Os nomes dos buckets do Cloud Storage precisam ser globalmente exclusivos. Selecione uma região do Dataflow perto de onde executar os comandos neste guia de início rápido.

    BUCKET_NAME=your-bucket-name
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=your-topic-id
    REGION=dataflow-region
  17. Crie um bucket do Cloud Storage que pertença a este projeto:

    gsutil mb gs://$BUCKET_NAME
  18. Crie um tópico do Pub/Sub neste projeto:

    gcloud pubsub topics create $TOPIC_ID
  19. Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Pub/Sub em intervalos de um minuto.

    Se não houver um aplicativo do App Engine para o projeto, esta etapa criará um.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!"

    Inicie o job.

    gcloud scheduler jobs run publisher-job
  20. Use os seguintes comandos para clonar o repositório do guia de início rápido e navegar para o diretório do código de amostra:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Fazer streaming de mensagens do Pub/Sub para o Cloud Storage

Exemplo de código

Este exemplo de código usa o Dataflow para:

  • Leia as mensagens do Pub/Sub.
  • Janela (ou grupo) de mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
  • Grave as mensagens em cada janela nos arquivos no Cloud Storage.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Iniciar o pipeline

Para iniciar o pipeline, execute o seguinte comando:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp

O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem. Quando o comando retornar JOB_MESSAGE_DETAILED: Workers have started successfully, saia do programa local usando Ctrl+C.

Observar o andamento do job e do pipeline

Observe o progresso do job no console do Dataflow.

Acessar o console do Dataflow

Observar o progresso do job

Abra a visualização de detalhes do job para ver:

  • a estrutura do job;
  • os registros da tarefa;
  • as métricas do cenário.

Observar o progresso do job

Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.

Observar o progresso do job

Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.

gsutil ls gs://${BUCKET_NAME}/samples/

A saída será semelhante a esta:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Limpar

  1. Exclua o job do Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job
    
  2. No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.

  3. Exclua o tópico.

    gcloud pubsub topics delete $TOPIC_ID
    
  4. Exclua os arquivos criados pelo pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  5. Remova o bucket do Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

A seguir