Esta página foi traduzida pela API Cloud Translation.
Switch to English

Guia de início rápido: processamento de stream com Dataflow

O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em modos de stream (em tempo real) e em lote com a mesma confiabilidade e expressividade. Ele fornece um ambiente simplificado de desenvolvimento de pipeline usando o SDK do Apache Beam, que tem um conjunto avançado de primitivos de análise de sessões e janelas, além de um ecossistema de conectores de origem e de coletor. Este guia de início rápido mostra como usar o Dataflow para:

  • ler mensagens publicadas em um tópico do Pub/Sub;
  • organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora;
  • gravar as mensagens no Cloud Storage.

Este guia de início rápido apresenta o uso do Dataflow em Java e Python. O SQL também é compatível.

Comece usando os modelos do Dataflow baseados na IU se não pretende fazer o processamento de dados personalizado.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative as APIs Compute Engine, Google Cloud's operations suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager, and App Engine.

    Ative as APIs

  5. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  6. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  8. Instale e inicialize o SDK do Cloud..
  9. Crie variáveis para o bucket, o projeto e a região. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. Selecione uma região do Dataflow próxima à execução dos comandos neste guia de início rápido.

    BUCKET_NAME=your-bucket-name
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=your-topic-id
    REGION=dataflow-region
  10. Crie um bucket do Cloud Storage que pertença a este projeto:

    gsutil mb gs://$BUCKET_NAME
  11. Crie um tópico do Pub/Sub neste projeto:

    gcloud pubsub topics create $TOPIC_ID
  12. Crie um job do Cloud Scheduler neste projeto. O job publica uma mensagem em um tópico do Pub/Sub em intervalos de um minuto.

    Esta etapa criará um aplicativo do App Engine para o projeto, se já não houver um.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!"

    Inicie o job.

    gcloud scheduler jobs run publisher-job
  13. Use os seguintes comandos para clonar o repositório do guia de início rápido e navegar até o diretório do código de amostra:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

Fazer streaming de mensagens do Pub/Sub para o Cloud Storage

Exemplo de código

Este exemplo de código usa o Dataflow para:

  • Leia as mensagens do Pub/Sub.
  • Janela (ou grupo) de mensagens em intervalos de tamanho fixo por carimbos de data/hora de publicação.
  • Grave as mensagens em cada janela nos arquivos no Cloud Storage.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )

class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )

class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode("utf-8"))

def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

Iniciar o pipeline

Para iniciar o pipeline, execute o seguinte comando:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --runner=DataflowRunner \
    --windowSize=2"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp

O comando anterior é executado localmente e inicia um job do Dataflow executado na nuvem. Quando o comando retornar JOB_MESSAGE_DETAILED: Workers have started successfully, saia do programa local usando Ctrl+C.

Observar o andamento do job e do pipeline

Observe o progresso do job no console do Dataflow.

Acessar o console do Dataflow

Observar o progresso do job

Abra a visualização de detalhes do job para ver:

  • a estrutura do job;
  • os registros da tarefa;
  • as métricas do cenário.

Observar o progresso do job

Talvez seja necessário aguardar alguns minutos para ver os arquivos de saída no Cloud Storage.

Observar o progresso do job

Como alternativa, use a linha de comando abaixo para verificar quais arquivos foram gravados.

gsutil ls gs://${BUCKET_NAME}/samples/

A saída será semelhante a esta:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Limpar

  1. Exclua o job do Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job
    
  2. No console do Dataflow, interrompa o job. Cancele o pipeline sem esvaziá-lo.

  3. Exclua o tópico.

    gcloud pubsub topics delete $TOPIC_ID
    
  4. Exclua os arquivos criados pelo pipeline.

    gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*"
    gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
    
  5. Remova o bucket do Cloud Storage.

    gsutil rb gs://${BUCKET_NAME}
    

A seguir