Fazer streaming de mensagens do Pub/Sub Lite usando o Dataflow

Como alternativa à gravação e execução dos seus próprios programas de processamento de dados, é possível usar o Dataflow com o conector de E/S do Pub/Sub Lite para Apache Beam. de dados. O Dataflow é um serviço totalmente gerenciado para transformar e enriquecer dados em stream (em tempo real) e modos de lote com a mesma confiabilidade e expressividade. Ele executa programas desenvolvidos de maneira confiável usando o SDK do Apache Beam, que tem um conjunto extensível de abstrações avançadas de processamento com estado e conectores de E/S para outros sistemas de streaming e em lote.

Neste guia de início rápido, mostramos como escrever um pipeline do Apache Beam que:

  • Ler mensagens do Pub/Sub Lite
  • Organizar em janelas (ou agrupar) as mensagens por carimbo de data/hora
  • gravar as mensagens no Cloud Storage.

Ela também mostra como:

  • Enviar o pipeline para ser executado no Dataflow
  • Criar um modelo flexível do Dataflow a partir do pipeline

Este tutorial requer o Maven, mas também é possível converter o projeto de exemplo de Maven para o Gradle. Para saber mais, consulte Opcional: converter do Maven para o Gradle.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Ative as APIs Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Configure a autenticação:

    1. Crie a conta de serviço:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Substitua SERVICE_ACCOUNT_NAME por um nome para a conta de serviço.

    2. Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Substitua:

      • SERVICE_ACCOUNT_NAME: o nome da conta de serviço.
      • PROJECT_ID: o ID do projeto em que você criou a conta de serviço
      • ROLE: o papel a ser concedido
    3. Conceda à sua Conta do Google um papel que permita que você use os papéis da conta de serviço e anexe a conta de serviço a outros recursos:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Substitua:

      • SERVICE_ACCOUNT_NAME: o nome da conta de serviço.
      • PROJECT_ID: o ID do projeto em que você criou a conta de serviço
      • USER_EMAIL: o endereço de e-mail da sua Conta do Google
  8. Instale a CLI do Google Cloud.
  9. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  10. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  11. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  12. Ative as APIs Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. Configure a autenticação:

    1. Crie a conta de serviço:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Substitua SERVICE_ACCOUNT_NAME por um nome para a conta de serviço.

    2. Conceda papéis à conta de serviço. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Substitua:

      • SERVICE_ACCOUNT_NAME: o nome da conta de serviço.
      • PROJECT_ID: o ID do projeto em que você criou a conta de serviço
      • ROLE: o papel a ser concedido
    3. Conceda à sua Conta do Google um papel que permita que você use os papéis da conta de serviço e anexe a conta de serviço a outros recursos:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Substitua:

      • SERVICE_ACCOUNT_NAME: o nome da conta de serviço.
      • PROJECT_ID: o ID do projeto em que você criou a conta de serviço
      • USER_EMAIL: o endereço de e-mail da sua Conta do Google
  14. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login

Configurar seu projeto do Pub/Sub Lite

  1. Crie variáveis para o bucket, o projeto e a região do Dataflow do Cloud Storage. Os nomes dos intervalos do Cloud Storage precisam ser globalmente exclusivos. A região do Dataflow precisa ser válida para executar o job. Para mais informações sobre regiões e locais, consulte Locais do Dataflow.

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. Crie um bucket do Cloud Storage que pertença a este projeto:

       gsutil mb gs://$BUCKET
    

Criar um tópico e uma assinatura zonal do Pub/Sub Lite

Criar um tópico zonal do Pub/Sub Lite e uma assinatura do Lite.

Para o local do Lite, escolha um local do Pub/Sub Lite compatível. Você também precisa especificar uma zona para a região. Por exemplo, us-central1-a.

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

Fazer streaming de mensagens para o Dataflow

Faça o download do código de amostra do guia de início rápido

Clone o repositório do guia de início rápido e navegue até o diretório do exemplo de código.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

Código de amostra

Este exemplo de código usa o Dataflow para:

  • Leia as mensagens de uma assinatura do Pub/Sub Lite como fonte ilimitada.
  • Agrupar mensagens com base nos carimbos de data/hora de publicação usando janelas de tempo fixo e o acionador padrão.
  • Gravar as mensagens agrupadas em arquivos no Cloud Storage.

Java

Antes de executar esta amostra, siga as instruções de configuração do Java em Bibliotecas de cliente do Pub/Sub Lite.


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

Iniciar o pipeline do Dataflow

Para iniciar o pipeline no Dataflow, execute o seguinte comando:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

O comando anterior inicia um job do Dataflow. Acesse o link na saída do console para acessar o job no console de monitoramento do Dataflow.

Observar o andamento do job

Observe o andamento do job no console do Dataflow.

Acessar o console do Dataflow

Abra a visualização de detalhes do job para ver:

  • Gráfico do job
  • Detalhes da execução
  • Métricas do job

Publique algumas mensagens no seu tópico do Lite.

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

Talvez seja necessário aguardar alguns minutos para ver as mensagens nos registros do worker.

Use o comando abaixo para verificar quais arquivos foram gravados no Cloud Storage.

gsutil ls "gs://$BUCKET/samples/"

A saída será semelhante a esta:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

Use o comando abaixo para analisar o conteúdo de um arquivo:

gsutil cat "gs://$BUCKET/samples/your-filename"

Opcional: criar um modelo do Dataflow

Você tem a opção de criar um modelo flexível do Dataflow personalizado com base no seu pipeline. Os modelos do Dataflow permitem executar jobs com diferentes parâmetros de entrada no console do Google Cloud ou na linha de comando sem a necessidade de configurar um ambiente de desenvolvimento completo em Java.

  1. Crie um JAR multiarquitetura que inclua todas as dependências do pipeline. Você verá target/pubsublite-streaming-bundled-1.0.jar após a execução do comando.

    mvn clean package -DskipTests=true
    
  2. Forneça nomes e locais para o arquivo de modelo e para a imagem do contêiner do modelo.

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. Criar um modelo flexível personalizado. Um arquivo metadata.json obrigatório, que contém a especificação necessária para executar o job, foi fornecido com o exemplo.

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. Execute um job usando o modelo flexível personalizado.

Console

  1. Criar job com base no modelo.

  2. Digite um Nome do job.

  3. Insira sua região do Dataflow.

  4. Escolha seu Modelo personalizado.

  5. Digite o caminho do modelo.

  6. Insira os parâmetros obrigatórios.

  7. Cliquem em Executar job.

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.

  1. No console do Dataflow, interrompa o job. Cancele o pipeline em vez de drená-lo.

  2. Exclua o tópico e a assinatura.

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. Exclua os arquivos criados pelo pipeline.

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. Exclua a imagem e o arquivo de modelo, se houver.

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. Remova o bucket do Cloud Storage.

    gsutil rb gs://$BUCKET
    

  6. Exclua a conta de serviço:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:

    gcloud auth application-default revoke
  8. Opcional: revogar credenciais da CLI gcloud.

    gcloud auth revoke

A seguir