Modello di generatore di dati in modalità flusso in Pub/Sub, BigQuery e Cloud Storage

Il modello di Generatore di dati in modalità flusso viene utilizzato per generare un numero illimitato o fisso di record sintetici o messaggi in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito sono riportati alcuni casi d'uso possibili:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumer richiesti per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark delle prestazioni o servire come proof of concept.

Formati di codifica e sink supportati

Nella tabella seguente vengono descritti i formati di sink e di codifica supportati da questo modello:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage Yes

Requisiti della pipeline

  • L'account di servizio worker richiede il ruolo assegnato Worker Dataflow (roles/dataflow.worker). Per ulteriori informazioni, consulta Introduzione a IAM.
  • Crea un file di schema contenente un modello JSON per i dati generati. Questo modello utilizza la libreria JSON Data Generator, per consentirti di fornire varie funzioni faker per ogni campo dello schema. Per maggiori informazioni, consulta la documentazione relativa al file json-data-generator.

    Ad esempio:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Carica il file di schema in un bucket Cloud Storage.
  • La destinazione di output deve esistere prima dell'esecuzione. Il target deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage, a seconda del tipo di sink.
  • Se la codifica di output è Avro o Parquet, crea un file di schema Avro e archivialo in un percorso di Cloud Storage.
  • Assegna all'account di servizio worker un ruolo IAM aggiuntivo a seconda della destinazione desiderata.
    Destinazione Ruolo IAM necessario aggiuntivo Applica a quale risorsa
    Pub/Sub Publisher Pub/Sub (roles/pubsub.publisher)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a Pub/Sub con IAM)
    Argomento Pub/Sub
    BigQuery Editor dati BigQuery (roles/bigquery.dataEditor)
    (per saperne di più, consulta Controllo dell'controllo dell'accesso BigQuery con IAM)
    Set di dati BigQuery
    Cloud Storage Amministratore oggetti Cloud Storage (roles/storage.objectAdmin)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a Cloud Storage con IAM)
    Bucket Cloud Storage

Parametri del modello

Parametro Descrizione
schemaLocation Percorso del file di schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB, BIGQUERY e GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica di output. I valori possibili sono JSON, AVRO e PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file di schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub in cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) Disposizione di scrittura BigQuery. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output in cui conservare i record non riusciti. Se non viene specificato, la pipeline crea una tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso del percorso Cloud Storage di output. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output-.
windowDuration (Facoltativo) Intervallo della finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 min (in altre parole, 1 minuto).
numShards (Facoltativo) Numero massimo di shard di output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su 1 o su un numero superiore.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0 e indica illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per abilitare la scalabilità automatica o NONE per disabilitare.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Streaming Data Generator template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: il percorso del file di schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: il percorso del file di schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

Passaggi successivi