Argomento Pub/Sub o sottoscrizione a file di testo in Cloud Storage

Il modello Argomento Pub/Sub o Sottoscrizione al testo di Cloud Storage è una pipeline di inserimento flussi che legge i record da Pub/Sub e li salva come serie di file Cloud Storage in formato testo. Il modello può essere utilizzato come metodo rapido per salvare i dati in Pub/Sub per uso futuro. Per impostazione predefinita, il modello genera un nuovo file ogni 5 minuti.

Requisiti della pipeline

  • L'argomento o la sottoscrizione Pub/Sub deve esistere prima dell'esecuzione.
  • I messaggi pubblicati nell'argomento devono essere in formato testo.
  • I messaggi pubblicati nell'argomento non devono contenere caratteri di fine riga. Tieni presente che ogni messaggio Pub/Sub viene salvato come singola riga nel file di output.

Parametri del modello

Parametro Descrizione
inputTopic L'argomento Pub/Sub da cui leggere l'input. Il nome dell'argomento deve essere nel formato projects/<project-id>/topics/<topic-name>. Se questo parametro viene fornito, inputSubscription non deve essere fornito.
inputSubscription La sottoscrizione Pub/Sub da cui leggere l'input. Il nome della sottoscrizione deve essere in formato projects/<project-id>/subscription/<subscription-name>. Se questo parametro viene fornito, inputTopic non deve essere fornito.
outputDirectory Il prefisso del percorso e del nome file per la scrittura dei file di output. Ad esempio, gs://bucket-name/path/. Questo valore deve terminare con una barra.
outputFilenamePrefix Il prefisso da inserire in ogni file con la finestra. Ad esempio, output-.
outputFilenameSuffix Il suffisso da inserire in ogni file con finestre, in genere un'estensione del file come .txt o .csv.
outputShardTemplate Il modello shard definisce la parte dinamica di ogni file con finestre. Per impostazione predefinita, la pipeline utilizza un singolo shard per l'output al file system all'interno di ogni finestra. Ciò significa che tutti i dati vengono inseriti in un unico file per finestra. Il valore predefinito di outputShardTemplate è W-P-SS-of-NN, dove W è l'intervallo di date della finestra, P è le informazioni del riquadro, S è il numero di shard e N è il numero di shard. In caso di un singolo file, la parte SS-of-NN di outputShardTemplate è 00-of-01.
windowDuration (Facoltativo) La durata della finestra è l'intervallo in cui i dati vengono scritti nella directory di output. Configura la durata in base alla velocità effettiva della pipeline. Ad esempio, una velocità effettiva più elevata potrebbe richiedere finestre di dimensioni inferiori in modo che i dati rientrino in memoria. Il valore predefinito è 5 min, con un minimo di 1 s. I formati consentiti sono: [int]s (per i secondi, ad esempio 5s), [int]m (per i minuti, ad esempio 12m), [int]h (per le ore, ad esempio 2h).

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
  • BUCKET_NAME: il nome del tuo bucket Cloud Storage

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
  • BUCKET_NAME: il nome del tuo bucket Cloud Storage

Passaggi successivi