Modelo de tópico ou assinatura do Pub/Sub para arquivos de texto no Cloud Storage

O modelo de tópico ou assinatura do Pub/Sub para o Cloud Storage Text é um pipeline de streaming que lê registros do Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos de pipeline

  • O tópico ou a assinatura do Pub/Sub precisa ter sido criado antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetro Descrição
inputTopic O tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<project-id>/topics/<topic-name>. Se esse parâmetro for fornecido, inputSubscription não poderá ser informado.
inputSubscription O tópico do Pub/Sub em que a entrada será lida. O nome da assinatura precisa estar no formato projects/<project-id>/subscription/<subscription-name>. Se esse parâmetro for fornecido, inputTopic não deverá ser fornecido.
outputDirectory O caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Por exemplo, gs://bucket-name/path/. Esse valor precisa terminar com uma barra.
outputFilenamePrefix O prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-.
outputFilenameSuffix O sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv.
outputShardTemplate O modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são gerados em um único arquivo por janela. O outputShardTemplate padrão é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate é 00-of-01.
windowDuration (Opcional) A duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é 5 min, com um mínimo de 1 s. Os formatos permitidos são: [int]s para segundos (5 s, por exemplo); [int]m para minutos (12 min, por exemplo); [int]h para horas (2h, por exemplo).

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

A seguir