Argomento Pub/Sub in file di testo su Cloud Storage

Il modello di testo da Pub/Sub a Cloud Storage è una pipeline di flusso che legge dall'argomento Pub/Sub e li salva come serie di file Cloud Storage formato di testo. Il modello può essere utilizzato come metodo rapido per salvare i dati in Pub/Sub per per usi futuri. Per impostazione predefinita, il modello genera un nuovo file ogni 5 minuti.

Requisiti della pipeline

  • L'argomento Pub/Sub deve esistere prima dell'esecuzione.
  • I messaggi pubblicati nell'argomento devono essere in formato testo.
  • I messaggi pubblicati nell'argomento non devono contenere caratteri di fine riga. Tieni presente che ogni Il messaggio Pub/Sub viene salvato come singola riga nel file di output.

Parametri del modello

Parametri obbligatori

  • outputDirectory: il percorso e il prefisso del nome file per la scrittura dei file di output. Ad esempio, gs://bucket-name/path/. Questo valore deve terminare con una barra.
  • outputFilenamePrefix: il prefisso da inserire in ogni file a finestra. Ad esempio, output-. Il valore predefinito è: output.

Parametri facoltativi

  • inputTopic: l'argomento Pub/Sub da cui leggere l'input. Il nome dell'argomento deve essere nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • userTempLocation: la directory fornita dall'utente in cui verranno visualizzati i file temporanei. Deve terminare con una barra.
  • outputFilenameSuffix: il suffisso da inserire in ogni file a finestra. In genere un'estensione di file come .txt o .csv. Il campo predefinito è vuoto.
  • outputShardTemplate : il modello di shard definisce la parte dinamica di ogni file con finestre. Per impostazione predefinita, la pipeline utilizza un singolo shard per l'output al file system all'interno di ogni finestra. Di conseguenza, tutti i dati vengono inseriti in un unico file per finestra. Il valore predefinito di outputShardTemplate è to W-P-SS-of-NN, dove W è l'intervallo di date della finestra, P sono le informazioni del riquadro, S è il numero dello shard e N è il numero di shard. In caso di un singolo file, la parte SS-of-NN di outputShardTemplate è 00-of-01.
  • yearPattern: pattern per la formattazione dell'anno. Deve essere uno o più di y o Y. La custodia non fa alcuna differenza durante l'anno. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è YYYY.
  • monthPattern : la sequenza per la formattazione del mese. Deve contenere uno o più caratteri M. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è MM.
  • dayPattern : sequenza per la formattazione del giorno. Deve essere uno o più di d per il giorno del mese o D per il giorno dell'anno. Se vuoi, racchiudi il pattern con caratteri non alfanumerici o con il carattere della directory ("/"). Il valore predefinito è dd.
  • hourPattern: pattern per la formattazione dell'ora. Deve essere costituito da uno o più caratteri H. Se vuoi, racchiudi il pattern con caratteri non alfanumerici o con il carattere della directory ("/"). Il valore predefinito è HH.
  • minutePattern: pattern per la formattazione del minuto. Deve essere costituito da uno o più caratteri m. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è mm.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub to Text Files on Cloud Storage template.
  6. Inserisci i valori parametro negli appositi campi.
  7. (Facoltativo) Passare dall'elaborazione "exactly-once" all'impostazione modalità flusso di dati almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage

Passaggi successivi