Modello di file di testo su Cloud Storage in Pub/Sub (flusso)

Questo modello crea una pipeline di inserimento flussi che esegue il polling continuo per i nuovi file di testo caricati su Cloud Storage, legge ogni file riga per riga e pubblica stringhe in un argomento Pub/Sub. Il modello pubblica i record in un file delimitato da nuova riga contenente record JSON o file CSV in un argomento Pub/Sub per l'elaborazione in tempo reale. Puoi utilizzare questo modello per riprodurre i dati in Pub/Sub.

La pipeline viene eseguita all'infinito e deve essere terminata manualmente tramite un comando "cancel" e non "drain", a causa dell'utilizzo della trasformazione "Watch", ovvero di tipo "SplittableDoFn" che non supporta lo svuotamento.

Attualmente, l'intervallo di polling è fisso e impostato su 10 secondi. Questo modello non imposta alcun timestamp per i singoli record, quindi l'ora dell'evento corrisponde a quella della pubblicazione durante l'esecuzione. Se la tua pipeline si basa su un'ora precisa dell'evento per l'elaborazione, non devi utilizzare questa pipeline.

Requisiti della pipeline

  • I file di input devono essere in formato JSON o CSV delimitato da nuova riga. I record che si estendono su più righe nei file di origine possono causare problemi a valle, poiché ogni riga all'interno dei file viene pubblicata come messaggio per Pub/Sub.
  • L'argomento Pub/Sub deve esistere prima dell'esecuzione.
  • La pipeline viene eseguita all'infinito e deve essere terminata manualmente.

Parametri del modello

Parametri obbligatori

  • inputFilePattern : il pattern del file di input da cui leggere. Esempio: gs://bucket-name/files/*.json.
  • outputTopic : l'argomento di input Pub/Sub in cui scrivere. Il nome deve essere nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. ad esempio projects/your-project-id/topics/your-topic-name.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Inserisci i valori parametro negli appositi campi.
  7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del tuo bucket Cloud Storage
  • FILE_PATTERN: il glob del pattern del file da cui leggere nel bucket Cloud Storage (ad esempio, path/*.csv)

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del tuo bucket Cloud Storage
  • FILE_PATTERN: il glob del pattern del file da cui leggere nel bucket Cloud Storage (ad esempio, path/*.csv)

Passaggi successivi