Modello Streaming Data Generator a Pub/Sub, BigQuery e Cloud Storage

Il modello Generatore di dati in streaming viene utilizzato per generare un numero illimitato o fisso di record o messaggi sintetici in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito è riportato un insieme di alcuni possibili casi d'uso:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumer necessari per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark sul rendimento o come proof of concept.

Destinazioni e formati di codifica supportati

La tabella seguente descrive i canali di destinazione e i formati di codifica supportati da questo modello:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage

Requisiti della pipeline

  • L'account di servizio worker deve disporre del ruolo assegnato Dataflow Worker (roles/dataflow.worker). Per maggiori informazioni, consulta Introduzione a IAM.
  • Crea un file schema contenente un modello JSON per i dati generati. Questo modello utilizza la libreria JSON Data Generator, quindi puoi fornire varie funzioni di simulazione per ogni campo dello schema. Per saperne di più, consulta la documentazione di json-data-generator.

    Ad esempio:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Carica il file dello schema in un bucket Cloud Storage.
  • La destinazione di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage, a seconda del tipo di destinazione.
  • Se la codifica di output è Avro o Parquet, crea un file dello schema Avro e memorizzalo in una posizione Cloud Storage.
  • Assegna all'account di servizio di lavoro un ruolo IAM aggiuntivo a seconda della destinazione desiderata.
    Destinazione Ruolo IAM necessario aggiuntivo A quale risorsa applicare
    Pub/Sub Publisher Pub/Sub (roles/pubsub.publisher)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a Pub/Sub con IAM)
    Argomento Pub/Sub
    BigQuery BigQuery Data Editor (roles/bigquery.dataEditor)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a BigQuery con IAM)
    Set di dati BigQuery
    Cloud Storage Amministratore oggetti Cloud Storage (roles/storage.objectAdmin)
    (per ulteriori informazioni, consulta Controllo dell'controllo dell'accesso a Cloud Storage con IAM)
    Bucket Cloud Storage

Parametri del modello

Parametro Descrizione
schemaLocation Posizione del file dello schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di destinazione di output. I valori possibili sono PUBSUB, BIGQUERY, GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica di output. I valori possibili sono JSON, AVRO, PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file dello schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub a cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) Istruzione di scrittura BigQuery. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output per contenere i record con errori. Se non viene fornito, la pipeline crea la tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso della posizione Cloud Storage di output. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome dei file di output scritti in Cloud Storage. Il valore predefinito è output-.
windowDuration (Facoltativo) Intervallo della finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1m (ovvero 1 minuto).
numShards (Facoltativo) Numero massimo di shard di output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su un numero maggiore o uguale a 1.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0, che indica illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per attivare la scalabilità automatica o NONE per disattivarla.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Streaming Data Generator template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file dello schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file dello schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

Passaggi successivi