Generatore di flussi di dati nel modello Pub/Sub, BigQuery e Cloud Storage

Il modello Streaming Data Builder viene utilizzato per generare un numero illimitato o fisso di record sintetici o messaggi in base allo schema fornito dall'utente alla frequenza specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito sono riportati alcuni casi d'uso possibili:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni di consumer necessari per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark delle prestazioni o fungere da proof of concept.

Sink e formati di codifica supportati

La tabella riportata di seguito descrive quali sink e formati di codifica sono supportati da questo modello:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage Yes

Requisiti della pipeline

  • L'account di servizio worker richiede il ruolo Worker Dataflow (roles/dataflow.worker) assegnato. Per maggiori informazioni, consulta Introduzione a IAM.
  • Crea un file di schema contenente un modello JSON per i dati generati. Questo modello utilizza la libreria JSON Data Builder, che ti consente di fornire varie funzioni faker per ogni campo nello schema. Per saperne di più, consulta la documentazione di JSON-data-generator.

    Ad esempio:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Caricare il file dello schema in un bucket Cloud Storage.
  • La destinazione di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, una tabella BigQuery o un bucket Cloud Storage a seconda del tipo di sink.
  • Se la codifica di output è Avro o Parquet, crea un file di schema Avro e archivialo in una posizione di Cloud Storage.
  • Assegna all'account di servizio worker un ruolo IAM aggiuntivo a seconda della destinazione desiderata.
    Destinazione È necessario anche il ruolo IAM aggiuntivo Applica a quale risorsa
    Pub/Sub Publisher Pub/Sub (roles/pubsub.publisher)
    (per saperne di più, consulta Controllo dell'controllo dell'accesso Pub/Sub con IAM)
    Argomento Pub/Sub
    BigQuery Editor dati BigQuery (roles/bigquery.dataEditor)
    (per saperne di più, consulta Controllo dell'controllo dell'accesso a BigQuery con IAM)
    Set di dati BigQuery
    Cloud Storage Amministratore oggetti Cloud Storage (roles/storage.objectAdmin)
    (per saperne di più, vedi Controllo dell'controllo dell'accesso di Cloud Storage con IAM)
    Bucket Cloud Storage

Parametri del modello

Parametro Descrizione
schemaLocation Posizione del file dello schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB, BIGQUERY, GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica dell'output. I valori possibili sono JSON, AVRO, PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file di schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub in cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) Disposizione scrittura BigQuery. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output in cui verranno conservati i record non riusciti. Se non viene specificato, la pipeline crea una tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso della località di Cloud Storage di output. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output-.
windowDuration (Facoltativo) Intervallo di finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 min (ovvero 1 minuto).
numShards (Facoltativo) Numero massimo di shard di output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su un numero pari o superiore a 1.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0 che indica un numero illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per abilitare la scalabilità automatica o NONE per disabilitare.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Streaming Data Generator template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: percorso del file dello schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: percorso del file dello schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

Passaggi successivi