Modello di generatore di dati in modalità flusso in Pub/Sub, BigQuery e Cloud Storage

Il modello di Generatore di dati in modalità flusso viene utilizzato per generare un numero illimitato o fisso di record sintetici o messaggi in base allo schema fornito dall'utente all'indirizzo la tariffa specificata. Le destinazioni compatibili includono argomenti Pub/Sub, tabelle BigQuery e bucket Cloud Storage.

Di seguito sono riportati alcuni casi d'uso possibili:

  • Simula la pubblicazione di eventi in tempo reale su larga scala in un argomento Pub/Sub per misurare e determinare il numero e le dimensioni dei consumer richiesti per elaborare gli eventi pubblicati.
  • Genera dati sintetici in una tabella BigQuery o in un bucket Cloud Storage per valutare i benchmark delle prestazioni o servire come proof of concept.

Formati di codifica e sink supportati

Nella tabella seguente vengono descritti i formati di sink e di codifica supportati da questo modello:
JSON Avro Parquet
Pub/Sub No
BigQuery No No
Cloud Storage

Requisiti della pipeline

  • L'account di servizio worker richiede il ruolo assegnato Worker Dataflow (roles/dataflow.worker). Per per saperne di più, consulta Introduzione a IAM.
  • Crea un file di schema contenente un modello JSON per i dati generati. Questo modello utilizza il formato JSON Data Generator, in modo da poter fornire varie funzioni faker per ogni campo nel . Per ulteriori informazioni, consulta documentazione json-data-generator.

    Ad esempio:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Carica il file di schema in un bucket Cloud Storage.
  • La destinazione di output deve esistere prima dell'esecuzione. La destinazione deve essere un argomento Pub/Sub, da una tabella BigQuery o da un bucket Cloud Storage, a seconda del tipo di sink.
  • Se la codifica di output è Avro o Parquet, crea un file di schema Avro e archivialo in un percorso di Cloud Storage.
  • Assegna all'account di servizio worker un ruolo IAM aggiuntivo a seconda della destinazione desiderata.
    Destinazione Ruolo IAM necessario aggiuntivo Applica a quale risorsa
    Pub/Sub Publisher Pub/Sub (roles/pubsub.publisher)
    (per ulteriori informazioni, consulta Controllo dell'accesso a Pub/Sub con IAM)
    Argomento Pub/Sub
    BigQuery Editor dati BigQuery (roles/bigquery.dataEditor)
    (per saperne di più, consulta Controllo dell'accesso BigQuery con IAM)
    Set di dati BigQuery
    Cloud Storage Amministratore oggetti Cloud Storage (roles/storage.objectAdmin)
    (per ulteriori informazioni, consulta Controllo dell'accesso a Cloud Storage con IAM)
    Bucket Cloud Storage

Parametri del modello

Parametro Descrizione
schemaLocation Percorso del file di schema. Ad esempio: gs://mybucket/filename.json.
qps Numero di messaggi da pubblicare al secondo. Ad esempio: 100.
sinkType (Facoltativo) Tipo di sink di output. I valori possibili sono PUBSUB, BIGQUERY e GCS. Il valore predefinito è PUBSUB.
outputType (Facoltativo) Tipo di codifica di output. I valori possibili sono JSON, AVRO e PARQUET. Il valore predefinito è JSON.
avroSchemaLocation (Facoltativo) Posizione del file di schema AVRO. Obbligatorio quando outputType è AVRO o PARQUET. Ad esempio: gs://mybucket/filename.avsc.
topic (Facoltativo) Nome dell'argomento Pub/Sub in cui la pipeline deve pubblicare i dati. Obbligatorio quando sinkType è Pub/Sub. Ad esempio: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Facoltativo) Nome della tabella BigQuery di output. Obbligatorio quando sinkType è BigQuery. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Facoltativo) Disposizione di scrittura BigQuery. I valori possibili sono WRITE_APPEND, WRITE_EMPTY o WRITE_TRUNCATE. Il valore predefinito è WRITE_APPEND.
outputDeadletterTable (Facoltativo) Nome della tabella BigQuery di output in cui conservare i record non riusciti. Se non viene specificato, la pipeline crea una tabella durante l'esecuzione con il nome {output_table_name}_error_records. Ad esempio: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Facoltativo) Percorso del percorso Cloud Storage di output. Obbligatorio quando sinkType è Cloud Storage. Ad esempio: gs://mybucket/pathprefix/.
outputFilenamePrefix (Facoltativo) Il prefisso del nome file dei file di output scritti in Cloud Storage. Il valore predefinito è output-.
windowDuration (Facoltativo) Intervallo della finestra in cui l'output viene scritto in Cloud Storage. Il valore predefinito è 1 min (in altre parole, 1 minuto).
numShards (Facoltativo) Numero massimo di shard di output. Obbligatorio quando sinkType è Cloud Storage e deve essere impostato su 1 o su un numero superiore.
messagesLimit (Facoltativo) Numero massimo di messaggi di output. Il valore predefinito è 0 e indica illimitato.
autoscalingAlgorithm (Facoltativo) Algoritmo utilizzato per la scalabilità automatica dei worker. I valori possibili sono THROUGHPUT_BASED per abilitare la scalabilità automatica o NONE per disabilitare.
maxNumWorkers (Facoltativo) Numero massimo di macchine worker. Ad esempio: 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Streaming Data Generator template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco di tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file di schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • JOB_NAME: un nome job univoco di tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • SCHEMA_LOCATION: il percorso del file di schema in Cloud Storage. Ad esempio: gs://mybucket/filename.json.
  • QPS: il numero di messaggi da pubblicare al secondo
  • PUBSUB_TOPIC: l'argomento Pub/Sub di output. Ad esempio: projects/my-project-id/topics/my-topic-id.

Passaggi successivi