Como usar o Pub/Sub com o Dataflow

O Cloud Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em stream (em tempo real) e modos de lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:

  • ler mensagens publicadas em um tópico do Pub/Sub;
  • colocar as mensagens em janelas ou buffers como lotes;
  • gravar as mensagens no Cloud Storage.

Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível.

Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.

Antes de começar

Configurar o ambiente do Cloud Storage

  1. Siga as instruções para instalar e inicializar o SDK do Cloud.
  2. Ative o faturamento do projeto.
  3. Para concluir este guia de início rápido, ative as seguintes APIs: Compute Engine, Stackdriver, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager e App Engine.

    Ativar as APIs

    Pode levar alguns instantes para que as APIs apareçam no console.

  4. Crie uma chave de conta de serviço:

    Criar uma chave de conta de serviço

    1. Na lista Conta de serviço, selecione Nova conta de serviço.
    2. Digite um nome no campo Nome da conta de serviço.
    3. Na lista Papel, selecione Projeto > Proprietário.
    4. Clique em Criar.

    A chave é enviada para a pasta de downloads padrão do navegador.

  5. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS para apontar para a chave da conta de serviço.

    export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
    
  6. Crie variáveis para o intervalo e o projeto. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos.

    BUCKET_NAME=bucket-name
    PROJECT_NAME=$(gcloud config get-value project)
    
  7. Crie um intervalo do Cloud Storage que pertença a este projeto:

    gsutil mb gs://$BUCKET_NAME
    
  8. Crie um tópico do Pub/Sub neste projeto:

    gcloud pubsub topics create cron-topic
    
  9. Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Cloud Pub/Sub em intervalos de um minuto.

    Esta etapa criará um aplicativo do App Engine para o projeto, se já não houver um.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
     --topic=cron-topic --message-body="Hello!"
    

    Inicie o job.

    gcloud scheduler jobs run publisher-job
    
  10. Use o seguinte comando para clonar o repositório do guia de início rápido e navegar até o diretório do código de amostra:

    java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics
    

    python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies
    

Fazer streaming de mensagens do Pub/Sub para o Cloud Storage

Exemplo de código

Este exemplo de código usa o Dataflow para:

  • Leia as mensagens do Pub/Sub.
  • Mensagens de janela (ou grupo) por carimbo de data/hora em intervalos de dois minutos.
  • Salve as mensagens em cada janela como arquivos no Cloud Storage.

java

import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import java.io.IOException;

public class PubSubToGCS {
  /*
  * Define your own configuration options. Add your own arguments to be processed
  * by the command-line parser, and specify default values for them.
  */
  public interface PubSubToGCSOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();
    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();
    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();
    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGCSOptions options = PipelineOptionsFactory
      .fromArgs(args)
      .withValidation()
      .as(PubSubToGCSOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
      // 1) Read string messages from a Pub/Sub topic.
      .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
      // 2) Group the messages into fixed-sized minute intervals.
      .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
      // 3) Write one file to GCS for every window of messages.
      .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

python

import argparse
import datetime
import json
import logging

import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions

class GroupWindowsIntoBatches(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size):
        # Convert minutes into seconds.
        self.window_size = int(window_size * 60)

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(window.FixedWindows(self.window_size))
            | "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
            # Use a dummy key to group the elements in the same window.
            # Note that all the elements in one window must fit into memory
            # for this. If the windowed elements do not fit into memory,
            # please consider using `beam.util.BatchElements`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
            | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
            | "Groupby" >> beam.GroupByKey()
            | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
        )

class AddTimestamps(beam.DoFn):
    def process(self, element, publish_time=beam.DoFn.TimestampParam):
        """Processes each incoming windowed element by extracting the Pub/Sub
        message and its publish timestamp into a dictionary. `publish_time`
        defaults to the publish timestamp returned by the Pub/Sub server. It
        is bound to each element by Beam at runtime.
        """

        yield {
            "message_body": element.decode("utf-8"),
            "publish_time": datetime.datetime.utcfromtimestamp(
                float(publish_time)
            ).strftime("%Y-%m-%d %H:%M:%S.%f"),
        }

class WriteBatchesToGCS(beam.DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, batch, window=beam.DoFn.WindowParam):
        """Write one batch per file to a Google Cloud Storage bucket. """

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        filename = "-".join([self.output_path, window_start, window_end])

        with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for element in batch:
                f.write("{}\n".format(json.dumps(element)).encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
    # `save_main_session` is set to true because some DoFn's rely on
    # globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read PubSub Messages"
            >> beam.io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupWindowsIntoBatches(window_size)
            | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
        )

if __name__ == "__main__":  # noqa
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from.\n"
        '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in number of minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="GCS Path of the output file including filename prefix.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        pipeline_args,
    )

Iniciar o pipeline

Para iniciar o pipeline, use o seguinte comando:

java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGCS \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_NAME \
    --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

python

python PubSubToGCS.py \
  --project=$PROJECT_NAME \
  --input_topic=projects/$PROJECT_NAME/topics/cron-topic \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --temp_location=gs://$BUCKET_NAME/temp

Observar o andamento do job e do pipeline

Observe o progresso do job no console do Dataflow.

Acessar o console do Dataflow

Observar o progresso do job

Abra a visualização de detalhes do job para ver:

  • a estrutura do job;
  • os registros da tarefa;
  • as métricas do cenário.

Observar o progresso do job

Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.

Observar o progresso do job

Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.

gsutil ls gs://${BUCKET_NAME}/samples/

A saída será semelhante a esta:

java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32
gs://{$BUCKET_NAME}/samples/output-22:32-22:34
gs://{$BUCKET_NAME}/samples/output-22:34-22:36
gs://{$BUCKET_NAME}/samples/output-22:36-22:38

Limpeza

  1. Exclua o job do Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job
    
  2. Use Ctrl+C para interromper o programa no seu terminal.

  3. No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.

  4. Exclua o tópico.

    gcloud pubsub topics delete cron-topic
    
  5. Exclua os arquivos criados pelo pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    
  6. Remova o intervalo do Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

A seguir