Este modelo cria um pipeline de streaming que procura continuamente novos ficheiros de texto carregados para o Cloud Storage, lê cada ficheiro linha a linha e publica strings num tópico do Pub/Sub. O modelo publica registos num ficheiro delimitado por newline que contém registos JSON ou num ficheiro CSV num tópico do Pub/Sub para processamento em tempo real. Pode usar este modelo para repetir dados no Pub/Sub.
O pipeline é executado indefinidamente e tem de ser terminado manualmente através de um "cancel" e não de um "drain", devido à sua utilização da transformação "Watch", que é um "SplittableDoFn" que não suporta a drenagem.
Atualmente, o intervalo de sondagem é fixo e está definido para 10 segundos. Este modelo não define nenhuma data/hora nos registos individuais, pelo que a hora do evento é igual à hora de publicação durante a execução. Se o seu pipeline depender de uma hora do evento precisa para o processamento, não deve usar este pipeline.
Requisitos do pipeline
- Os ficheiros de entrada têm de estar no formato JSON ou CSV delimitado por newline. Os registos que abrangem várias linhas nos ficheiros de origem podem causar problemas a jusante, porque cada linha nos ficheiros é publicada como uma mensagem no Pub/Sub.
- O tópico Pub/Sub tem de existir antes da execução.
- O pipeline é executado indefinidamente e tem de ser terminado manualmente.
Parâmetros de modelos
Parâmetros obrigatórios
- inputFilePattern: o padrão do ficheiro de entrada a partir do qual ler. Por exemplo,
gs://bucket-name/files/*.json
. - outputTopic: o tópico de entrada do Pub/Sub para o qual escrever. O nome tem de estar no formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. Por exemplo,projects/your-project-id/topics/your-topic-name
.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Opcional: para mudar do processamento exatamente uma vez para o modo de streaming pelo menos uma vez, selecione Pelo menos uma vez.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaREGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
STAGING_LOCATION
: a localização para organizar ficheiros locais (por exemplo,gs://your-bucket/staging
)TOPIC_NAME
: o nome do seu tópico do Pub/SubBUCKET_NAME
: o nome do seu contentor do Cloud StorageFILE_PATTERN
: o padrão glob do ficheiro a ler no contentor do Cloud Storage (por exemplo,path/*.csv
)
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaLOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
STAGING_LOCATION
: a localização para organizar ficheiros locais (por exemplo,gs://your-bucket/staging
)TOPIC_NAME
: o nome do seu tópico do Pub/SubBUCKET_NAME
: o nome do seu contentor do Cloud StorageFILE_PATTERN
: o padrão glob do ficheiro a ler no contentor do Cloud Storage (por exemplo,path/*.csv
)
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.