Esse modelo cria um pipeline de streaming que pesquisa continuamente novos arquivos de texto carregados no Cloud Storage, lê cada arquivo linha por linha e publica strings em um tópico do Pub/Sub. O modelo publica registros em um arquivo delimitado por uma nova linha contendo registros JSON ou em um arquivo CSV em um tópico do Pub/Sub para processamento em tempo real. É possível usar esse modelo para reproduzir dados novamente no Pub/Sub.
O pipeline é executado indefinidamente e precisa ser encerrado manualmente com um "cancel", e não com um "drain", porque ele usa uma transformação "Watch" que é um "SplittableDoFn" não compatível com drenagem.
Atualmente, o intervalo de pesquisa é fixo e definido para 10 segundos. Esse modelo não configura carimbos de data/hora nos registros individuais, assim o horário do evento é igual ao da publicação durante a execução. Se o processamento do pipeline depender de um tempo de evento preciso, não utilize esse pipeline.
Requisitos de pipeline
- Os arquivos de entrada precisam estar no formato JSON ou CSV delimitado por nova linha. Registros que abrangem várias linhas nos arquivos de origem podem causar problemas no downstream, porque cada linha nos arquivos é publicada como uma mensagem para o Pub/Sub.
- O tópico do Pub/Sub precisa existir antes da execução.
- O pipeline é executado indefinidamente e precisa ser finalizado manualmente.
Parâmetros do modelo
Parâmetros obrigatórios
- inputFilePattern: o padrão do arquivo de entrada a ser lido. Exemplo: gs://bucket-name/files/*.json.
- outputTopic: o tópico de entrada do Pub/Sub a ser gravado. O nome precisa estar no formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. (Exemplo: projects/your-project-id/topics/your-topic-name).
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Opcional: para alternar do processamento "Exatamente uma vez" para o modo de streaming "Pelo menos uma vez", selecione Pelo menos uma vez.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Substitua:
JOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
STAGING_LOCATION
: o local para fase de testes de arquivos locais (por exemplo,gs://your-bucket/staging
)TOPIC_NAME
: o nome do tópico do Pub/SubBUCKET_NAME
: o nome do bucket do Cloud StorageFILE_PATTERN
: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo,path/*.csv
).
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
STAGING_LOCATION
: o local para fase de testes de arquivos locais (por exemplo,gs://your-bucket/staging
)TOPIC_NAME
: o nome do tópico do Pub/SubBUCKET_NAME
: o nome do bucket do Cloud StorageFILE_PATTERN
: o padrão de arquivo glob para ler no bucket do Cloud Storage (por exemplo,path/*.csv
).
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.