Tópico do Pub/Sub para arquivos de texto no Cloud Storage

O modelo Cloud Pub/Sub para Cloud Storage Text é um pipeline de streaming que lê registros do Cloud Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos de pipeline

  • O tópico do Pub/Sub precisa existir antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetros obrigatórios

  • outputDirectory: o caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
  • outputFilenamePrefix: o prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-. O padrão é: saída.

Parâmetros opcionais

  • inputTopic: o tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • userTempLocation: o diretório fornecido pelo usuário para enviar arquivos temporários. Precisa terminar com uma barra.
  • outputFilenameSuffix: o sufixo a ser colocado em cada arquivo em janela. Normalmente, uma extensão de arquivo, como .txt ou .csv. O padrão é vazio.
  • outputShardTemplate: o modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Portanto, todos os dados são enviados em um único arquivo por janela. O padrão de outputShardTemplate é to W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate é 00-of-01.
  • yearPattern: padrão para formatar o ano. Precisa ser y ou Y. O uso de maiúsculas e minúsculas não faz diferença no ano. Como opção, coloque o padrão entre caracteres não alfanuméricos ou use o caractere de diretório ("/"). O padrão é YYYY.
  • monthPattern: padrão para formatar o mês. Precisa ter um ou mais do caractere M. Como opção, coloque o padrão entre caracteres não alfanuméricos ou use o caractere de diretório ("/"). O padrão é MM.
  • dayPattern: padrão para formatar o dia. Precisa ser um ou mais do caractere d para o dia do mês ou D para o dia do ano. Como opção, coloque o padrão entre caracteres não alfanuméricos ou use o caractere de diretório ("/"). O padrão é dd.
  • hourPattern: padrão para formatar a hora. Precisa ter um ou mais do caractere H. Como opção, coloque o padrão entre caracteres não alfanuméricos ou use o caractere de diretório ("/"). O padrão é HH.
  • minutePattern: padrão para formatar os minutos. Precisa ter um ou mais do caractere m. Como opção, coloque o padrão entre caracteres não alfanuméricos ou use o caractere de diretório ("/"). O padrão é mm.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
  8. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • STAGING_LOCATION: o local para fase de testes de arquivos locais (por exemplo, gs://your-bucket/staging)
  • TOPIC_NAME: o nome do tópico do Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToText.Options;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
 * windowed files at the specified output directory.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_GCS_Text.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_GCS_Text",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Text Files on Cloud Storage",
    description =
        "The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub topic and "
            + "saves them as a series of Cloud Storage files in text format. "
            + "The template can be used as a quick way to save data in Pub/Sub for future use. "
            + "By default, the template generates a new file every 5 minutes.",
    optionsClass = Options.class,
    skipOptions = {"inputSubscription"},
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-to-text",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Pub/Sub topic must exist prior to execution.",
      "The messages published to the topic must be in text format.",
      "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class PubsubToText {

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {

    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input subscription",
        helpText =
            "Pub/Sub subscription to read the input from, in the format of"
                + " 'projects/your-project-id/subscriptions/your-subscription-name'",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    ValueProvider<String> getInputSubscription();

    void setInputSubscription(ValueProvider<String> value);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input topic",
        helpText =
            "The Pub/Sub topic to read the input from. The topic name should be in the format "
                + "`projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @TemplateCreationParameter(value = "false")
    @Description(
        "This determines whether the template reads from a Pub/Sub subscription or a topic")
    @Default.Boolean(false)
    Boolean getUseSubscription();

    void setUseSubscription(Boolean value);

    @TemplateParameter.GcsWriteFolder(
        order = 3,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. For example, `gs://bucket-name/path/`. This value must end in a slash.")
    @Required
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        optional = true,
        description = "User provided temp location",
        helpText =
            "The user provided directory to output temporary files to. Must end with a slash.")
    ValueProvider<String> getUserTempLocation();

    void setUserTempLocation(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file. For example, `output-`.")
    @Default.String("output")
    @Required
    ValueProvider<String> getOutputFilenamePrefix();

    void setOutputFilenamePrefix(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Output filename suffix of the files to write",
        helpText =
            "The suffix to place on each windowed file. Typically a file extension such as `.txt` or `.csv`.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();

    void setOutputFilenameSuffix(ValueProvider<String> value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> messages = null;

    /*
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    if (options.getUseSubscription()) {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
    }
    messages
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

        // Apply windowed file writes. Use a NestedValueProvider because the filename
        // policy requires a resourceId generated from the input value at runtime.
        .apply(
            "Write File(s)",
            TextIO.write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(
                    WindowedFilenamePolicy.writeWindowedFiles()
                        .withOutputDirectory(options.getOutputDirectory())
                        .withOutputFilenamePrefix(options.getOutputFilenamePrefix())
                        .withShardTemplate(options.getOutputShardTemplate())
                        .withSuffix(options.getOutputFilenameSuffix())
                        .withYearPattern(options.getYearPattern())
                        .withMonthPattern(options.getMonthPattern())
                        .withDayPattern(options.getDayPattern())
                        .withHourPattern(options.getHourPattern())
                        .withMinutePattern(options.getMinutePattern()))
                .withTempDirectory(
                    NestedValueProvider.of(
                        maybeUseUserTempLocation(
                            options.getUserTempLocation(), options.getOutputDirectory()),
                        (SerializableFunction<String, ResourceId>)
                            input -> FileBasedSink.convertToFileResourceIfPossible(input))));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
   * when output bucket is locked and temporary data cannot be deleted.
   *
   * @param userTempLocation user provided temp location
   * @param outputLocation user provided outputDirectory to be used as the default temp location
   * @return userTempLocation if available, otherwise outputLocation is returned.
   */
  private static ValueProvider<String> maybeUseUserTempLocation(
      ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
    return DualInputNestedValueProvider.of(
        userTempLocation,
        outputLocation,
        new SerializableFunction<TranslatorInput<String, String>, String>() {
          @Override
          public String apply(TranslatorInput<String, String> input) {
            return (input.getX() != null) ? input.getX() : input.getY();
          }
        });
  }
}

A seguir