Questo modello crea una pipeline di inserimento flussi che esegue il polling continuo per i nuovi file di testo caricati Cloud Storage legge ogni file riga per riga e pubblica le stringhe in un Pub/Sub per ogni argomento. Il modello pubblica i record in un file delimitato da nuova riga contenente JSON record o file CSV in un argomento Pub/Sub per l'elaborazione in tempo reale. Puoi utilizzare questo modello per riprodurre i dati in Pub/Sub.
La pipeline viene eseguita a tempo indeterminato e deve essere terminata manualmente tramite un "annullamento" e non un "drain", per via dell'uso della funzione "Watch" transform, che è uno strumento 'SplittableDoFn' che non per supportare lo svuotamento.
Attualmente, l'intervallo di polling è fisso e impostato su 10 secondi. Questo modello non impostare un timestamp qualsiasi sui singoli record, in modo che l'ora dell'evento corrisponda a quella della pubblicazione tempo durante l'esecuzione. Se la tua pipeline si basa su un'ora precisa dell'evento per l'elaborazione, non dovrebbero usare questa pipeline.
Requisiti della pipeline
- I file di input devono essere in formato JSON o CSV delimitato da nuova riga. Record che coprono più righe nei file di origine possono causare problemi a valle, poiché ogni riga all'interno dei file è pubblicata come messaggio in Pub/Sub.
- L'argomento Pub/Sub deve esistere prima dell'esecuzione.
- La pipeline viene eseguita all'infinito e deve essere terminata manualmente.
Parametri del modello
Parametri obbligatori
- inputFilePattern : il pattern del file di input da cui leggere. Esempio: gs://bucket-name/files/*.json.
- outputTopic: l'argomento di input Pub/Sub in cui scrivere. Il nome deve essere nel formato
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. (ad es. projects/your-project-id/topics/your-topic-name).
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito
è
us-central1
.Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco di tua sceltaREGION_NAME
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
STAGING_LOCATION
: la posizione per i file locali di gestione temporanea (ad esempio,gs://your-bucket/staging
)TOPIC_NAME
: il nome dell'argomento Pub/SubBUCKET_NAME
: il nome del tuo bucket Cloud StorageFILE_PATTERN
: il pattern glob dei file da leggere nel bucket Cloud Storage (ad esempiopath/*.csv
)
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
STAGING_LOCATION
: la posizione per i file locali di gestione temporanea (ad esempio,gs://your-bucket/staging
)TOPIC_NAME
: il nome dell'argomento Pub/SubBUCKET_NAME
: il nome del tuo bucket Cloud StorageFILE_PATTERN
: il glob del pattern del file da cui leggere nel bucket Cloud Storage (ad es.path/*.csv
)
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.