Modelo de tópico ou subscrição do Pub/Sub para ficheiros de texto no Cloud Storage

O tópico do Pub/Sub ou a subscrição do modelo de texto do Cloud Storage é um pipeline de streaming que lê registos do Pub/Sub e guarda-os como uma série de ficheiros do Cloud Storage no formato de texto. Pode usar o modelo como uma forma rápida de guardar dados no Pub/Sub para utilização futura. Por predefinição, o modelo gera um novo ficheiro a cada 5 minutos.

Requisitos do pipeline

  • O tópico ou a subscrição do Pub/Sub tem de existir antes da execução.
  • As mensagens publicadas no tópico têm de estar no formato de texto.
  • As mensagens publicadas no tópico não podem conter novas linhas. Tenha em atenção que cada mensagem do Pub/Sub é guardada como uma única linha no ficheiro de saída.

Parâmetros de modelos

Parâmetros obrigatórios

  • outputDirectory: o caminho e o prefixo do nome do ficheiro para escrever ficheiros de saída. Este valor tem de terminar com uma barra. Por exemplo, gs://your-bucket/your-path/.

Parâmetros opcionais

  • inputTopic: o tópico Pub/Sub a partir do qual ler a entrada. Se este parâmetro for fornecido, não use inputSubscription. Por exemplo, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: a subscrição do Pub/Sub a partir da qual ler a entrada. Se este parâmetro for fornecido, não use inputTopic. Por exemplo, projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>.
  • userTempLocation: o diretório fornecido pelo utilizador para gerar ficheiros temporários. Tem de terminar com uma barra.
  • outputFilenamePrefix: o prefixo a colocar em cada ficheiro dividido em janelas. Por exemplo, output-. A predefinição é: output.
  • outputFilenameSuffix: o sufixo a colocar em cada ficheiro dividido em janelas, normalmente uma extensão de ficheiro, como .txt ou .csv. Por exemplo, .txt. A predefinição é vazio.
  • outputShardTemplate: o modelo de fragmento define a parte dinâmica de cada ficheiro dividido em janelas. Por predefinição, o pipeline usa um único fragmento para a saída para o sistema de ficheiros em cada janela. Isto significa que todos os dados são enviados para um único ficheiro por janela. O outputShardTemplate é predefinido como W-P-SS-of-NN, em que W é o intervalo de datas da janela, P são as informações do painel, S é o número de fragmentos e N é o número de fragmentos. No caso de um único ficheiro, a parte SS-of-NN do outputShardTemplate é 00-of-01.
  • numShards: o número máximo de fragmentos de saída produzidos durante a escrita. Um número mais elevado de fragmentos significa um débito mais elevado para a escrita no Cloud Storage, mas um custo de agregação de dados potencialmente mais elevado entre fragmentos ao processar ficheiros do Cloud Storage de saída. A predefinição é: 0.
  • windowDuration: a duração da janela é o intervalo no qual os dados são escritos no diretório de saída. Configure a duração com base no débito do pipeline. Por exemplo, um débito mais elevado pode exigir tamanhos de janelas mais pequenos para que os dados caibam na memória. A predefinição é 5m (5 minutos), com um mínimo de 1s (1 segundo). Os formatos permitidos são: [int]s (para segundos, exemplo: 5s), [int]m (para minutos, exemplo: 12m), [int]h (para horas, exemplo: 2h). Por exemplo, 5m.
  • yearPattern: padrão para formatar o ano. Tem de ser um ou mais de y ou Y. A caixa não faz diferença no ano. O padrão pode ser opcionalmente envolvido por carateres que não sejam alfanuméricos nem o caráter de diretório (/). A predefinição é YYYY.
  • monthPattern: padrão para formatar o mês. Tem de ser um ou mais carateres M. O padrão pode ser opcionalmente envolvido por carateres que não sejam alfanuméricos ou o caráter de diretório (/). A predefinição é MM.
  • dayPattern: padrão para formatar o dia. Tem de ser um ou mais dos seguintes valores: d para o dia do mês ou D para o dia do ano. A caixa não faz diferença no ano. O padrão pode ser opcionalmente envolvido por carateres que não sejam alfanuméricos nem o caráter de diretório (/). A predefinição é dd.
  • hourPattern: padrão para formatar a hora. Tem de ser um ou mais carateres H. O padrão pode ser opcionalmente envolvido por carateres que não sejam alfanuméricos ou o caráter de diretório (/). A predefinição é HH.
  • minutePattern: padrão para formatar o minuto. Tem de ser um ou mais carateres m. O padrão pode ser opcionalmente envolvido por carateres que não sejam alfanuméricos ou o caráter de diretório (/). A predefinição é mm.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • SUBSCRIPTION_NAME: o nome da sua subscrição do Pub/Sub
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • SUBSCRIPTION_NAME: o nome da sua subscrição do Pub/Sub
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage

O que se segue?