Modelo de tópico ou assinatura do Pub/Sub para arquivos de texto no Cloud Storage

O modelo de tópico ou assinatura do Pub/Sub para o Cloud Storage Text é um pipeline de streaming que lê registros do Pub/Sub e os salva como uma série de arquivos do Cloud Storage em formato de texto. O modelo pode ser usado como uma maneira rápida de salvar dados em Pub/Sub para uso futuro. Por padrão, o modelo gera um novo arquivo a cada cinco minutos.

Requisitos de pipeline

  • O tópico ou a assinatura do Pub/Sub precisa ter sido criado antes da execução.
  • As mensagens publicadas no tópico precisam estar em formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Observe que cada mensagem do Pub/Sub é salva como uma linha única no arquivo de saída.

Parâmetros do modelo

Parâmetros obrigatórios

  • outputDirectory: o caminho e o prefixo do nome de arquivo em que os arquivos de saída serão gravados. Esse valor precisa terminar com uma barra. (Exemplo: gs://your-bucket/your-path).

Parâmetros opcionais

  • inputTopic: o tópico do Pub/Sub em que a entrada será lida. O nome do tópico precisa estar no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Se esse parâmetro for fornecido, não use inputSubscription. (Exemplo: projects/your-project-id/topics/your-topic-name).
  • inputSubscription: o tópico do Pub/Sub em que a entrada será lida. O nome da assinatura usa o formato projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>. Se esse parâmetro for fornecido, não use inputTopic. Por exemplo: projects/your-project-id/subscriptions/your-subscription-name.
  • userTempLocation: o diretório fornecido pelo usuário para enviar arquivos temporários. Precisa terminar com uma barra.
  • outputFilenamePrefix: o prefixo a ser colocado em cada arquivo em janela. (Exemplo: output-). O padrão é: saída.
  • outputFilenameSuffix: o sufixo a ser colocado em cada arquivo em janela, normalmente uma extensão de arquivo, como .txt ou .csv. Exemplo: .txt. O padrão é vazio.
  • outputShardTemplate: o modelo de fragmento define a parte dinâmica de cada arquivo em janela. Por padrão, o pipeline usa um único fragmento para saída para o sistema de arquivos em cada janela. Isso significa que todos os dados são gerados em um único arquivo por janela. O padrão de outputShardTemplate é W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número do fragmento e N é a quantidade de fragmentos. No caso de um único arquivo, a parte SS-of-NN de outputShardTemplate é 00-of-01.
  • numShards: o número máximo de fragmentos de saída produzidos durante a gravação. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage. Padrão: 0.
  • windowDuration : a duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é de 5 min (5 minutos), com um mínimo de 1 s (1 segundo). Os formatos permitidos são: [int]s (para segundos; exemplo: 5 s), [int]m (para minutos; exemplo: 12 min), [int]h (para horas; exemplo: 2h). (Exemplo: 5 min).
  • yearPattern : padrão para formatar o ano. Precisa ser um ou mais "y" ou "Y". O uso de maiúsculas e minúsculas não faz diferença no ano. O padrão pode ser unido por caracteres que não são alfanuméricos ou o caractere de diretório ("/"). O padrão é "AAAA".
  • monthPattern (padrão): padrão para formatar o mês. Precisa ser um ou mais do caractere "M". O padrão pode ser unido por caracteres não alfanuméricos ou pelo caractere de diretório ("/"). O padrão é "MM".
  • dayPattern : padrão para formatar o dia. Precisa ser um ou mais "d" para o dia do mês ou "D" para o dia do ano. O uso de maiúsculas e minúsculas não faz diferença no ano. O padrão pode ser unido por caracteres que não são alfanuméricos ou o caractere de diretório ("/"). O padrão é "dd".
  • hourPattern : padrão para formatar a hora. Precisa ser um ou mais do caractere "H". O padrão pode ser unido por caracteres não alfanuméricos ou pelo caractere de diretório ("/"). O padrão é "HH".
  • minutePattern : padrão para formatar os minutos. Precisa ser um ou mais do caractere "m". O padrão pode ser unido por caracteres não alfanuméricos ou pelo caractere de diretório ("/"). O padrão é "mm".

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • SUBSCRIPTION_NAME: o nome da sua assinatura de Pub/Sub
  • BUCKET_NAME: o nome do bucket do Cloud Storage

A seguir