Modelo de ficheiros de texto no Cloud Storage para o Pub/Sub (stream)

Este modelo cria um pipeline de streaming que procura continuamente novos ficheiros de texto carregados para o Cloud Storage, lê cada ficheiro linha a linha e publica strings num tópico do Pub/Sub. O modelo publica registos num ficheiro delimitado por newline que contém registos JSON ou num ficheiro CSV num tópico do Pub/Sub para processamento em tempo real. Pode usar este modelo para repetir dados no Pub/Sub.

O pipeline é executado indefinidamente e tem de ser terminado manualmente através de um "cancel" e não de um "drain", devido à sua utilização da transformação "Watch", que é um "SplittableDoFn" que não suporta a drenagem.

Atualmente, o intervalo de sondagem é fixo e está definido para 10 segundos. Este modelo não define nenhuma data/hora nos registos individuais, pelo que a hora do evento é igual à hora de publicação durante a execução. Se o seu pipeline depender de uma hora do evento precisa para o processamento, não deve usar este pipeline.

Requisitos do pipeline

  • Os ficheiros de entrada têm de estar no formato JSON ou CSV delimitado por newline. Os registos que abrangem várias linhas nos ficheiros de origem podem causar problemas a jusante, porque cada linha nos ficheiros é publicada como uma mensagem no Pub/Sub.
  • O tópico Pub/Sub tem de existir antes da execução.
  • O pipeline é executado indefinidamente e tem de ser terminado manualmente.

Parâmetros de modelos

Parâmetros obrigatórios

  • inputFilePattern: o padrão do ficheiro de entrada a partir do qual ler. Por exemplo, gs://bucket-name/files/*.json.
  • outputTopic: o tópico de entrada do Pub/Sub para o qual escrever. O nome tem de estar no formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Por exemplo, projects/your-project-id/topics/your-topic-name.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
  8. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
  • TOPIC_NAME: o nome do seu tópico do Pub/Sub
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage
  • FILE_PATTERN: o padrão glob do ficheiro a ler no contentor do Cloud Storage (por exemplo, path/*.csv)

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • STAGING_LOCATION: a localização para organizar ficheiros locais (por exemplo, gs://your-bucket/staging)
  • TOPIC_NAME: o nome do seu tópico do Pub/Sub
  • BUCKET_NAME: o nome do seu contentor do Cloud Storage
  • FILE_PATTERN: o padrão glob do ficheiro a ler no contentor do Cloud Storage (por exemplo, path/*.csv)

O que se segue?