Modello di file da Pub/Sub ad Avro in Cloud Storage

I file da Pub/Sub ad Avro sul modello di Cloud Storage sono una pipeline di inserimento flussi che legge i dati da un argomento Pub/Sub e scrive file Avro nel bucket Cloud Storage specificato.

Requisiti della pipeline

  • L'argomento Pub/Sub di input deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • inputTopic : l'argomento Pub/Sub a cui eseguire la sottoscrizione per il consumo dei messaggi. Il nome dell'argomento deve essere nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputDirectory : la directory di output in cui vengono archiviati i file Avro di output. Deve contenere una barra (/) alla fine. Ad esempio: gs://example-bucket/example-directory/.
  • avroTempDirectory : la directory per i file Avro temporanei. Deve contenere una barra (/) alla fine. Ad esempio: gs://example-bucket/example-directory/.

Parametri facoltativi

  • outputFilenamePrefix : il prefisso del nome file di output per i file Avro. Il valore predefinito è: output.
  • outputFilenameSuffix : il suffisso del nome file di output per i file Avro. Il campo predefinito è vuoto.
  • outputShardTemplate : il modello di shard definisce la parte dinamica di ogni file con finestre. Per impostazione predefinita, la pipeline utilizza un singolo shard per l'output al file system all'interno di ogni finestra. Di conseguenza, tutti i dati vengono inseriti in un unico file per finestra. I valori predefiniti di outputShardTemplate sono to W-P-SS-of-NN, dove W è l'intervallo di date della finestra, P è le informazioni del riquadro, S è il numero di shard e N è il numero di shard. In caso di un singolo file, la parte SS-of-NN di outputShardTemplate è 00-of-01.
  • yearPattern : Sequenza per la formattazione dell'anno. Deve essere uno o più di y o Y. La custodia non fa alcuna differenza durante l'anno. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è YYYY.
  • monthPattern : la sequenza per la formattazione del mese. Deve contenere uno o più caratteri M. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è MM.
  • dayPattern : sequenza per la formattazione del giorno. Deve essere uno o più di d per il giorno del mese o D per il giorno dell'anno. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è dd.
  • hourPattern : sequenza per formattare l'ora. Deve contenere uno o più caratteri H. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è HH.
  • minutePattern : sequenza per la formattazione dei minuti. Deve contenere uno o più caratteri m. Facoltativamente, puoi aggregare il pattern con caratteri non alfanumerici o il carattere della directory ('/'). Il valore predefinito è mm.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub to Avro Files on Cloud Storage template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Sostituisci quanto segue:

  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del tuo bucket Cloud Storage
  • FILENAME_PREFIX: il prefisso preferito del nome file di output
  • FILENAME_SUFFIX: il suffisso preferito del nome file di output
  • SHARD_TEMPLATE: il modello di shard di output preferito

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del tuo bucket Cloud Storage
  • FILENAME_PREFIX: il prefisso preferito del nome file di output
  • FILENAME_SUFFIX: il suffisso preferito del nome file di output
  • SHARD_TEMPLATE: il modello di shard di output preferito

Passaggi successivi