Da Pub/Sub a modello BigQuery

Il modello Da Pub/Sub a BigQuery è una pipeline di inserimento flussi che legge i messaggi in formato JSON da Pub/Sub e li scrive in una tabella BigQuery. Facoltativamente, puoi fornire una funzione definita dall'utente;utente (UDF) scritta in JavaScript per elaborare i messaggi in arrivo.

Requisiti della pipeline

  • La tabella BigQuery deve esistere e avere uno schema.
  • I dati dei messaggi Pub/Sub devono utilizzare il formato JSON oppure devi fornire una funzione definita dall'utente che converti i dati dei messaggi in formato JSON. I dati JSON devono corrispondere allo schema della tabella BigQuery. Ad esempio, se i payload JSON sono formattati come {"k1":"v1", "k2":"v2"}, la tabella BigQuery deve avere due colonne di tipo stringa denominate k1 e k2.
  • Specifica il parametro inputSubscription o inputTopic, ma non entrambi.

Parametri del modello

Parametro Descrizione
outputTableSpec La tabella BigQuery in cui scrivere, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME".
inputSubscription (Facoltativo) La sottoscrizione Pub/Sub da cui eseguire la lettura, formattata come "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME".
inputTopic (Facoltativo) L'argomento Pub/Sub da cui eseguire la lettura, nel formato "projects/PROJECT_ID/topics/TOPIC_NAME".
outputDeadletterTable La tabella BigQuery per i messaggi che non sono riusciti a raggiungere la tabella di output, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME". Se la tabella non esiste, viene creata all'esecuzione della pipeline. Se questo parametro non viene specificato, viene utilizzato il valore "OUTPUT_TABLE_SPEC_error_records".
javascriptTextTransformGcsPath Facoltativo: L'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facoltativo) Il nome della funzione definita dall'utente di JavaScript che vuoi utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite, consulta gli esempi di funzioni definite dall'utente.
javascriptTextTransformReloadIntervalMinutes (Facoltativo) Specifica la frequenza di ricarica della funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la funzione definita dall'utente se il file viene modificato. Questo parametro consente di aggiornare la funzione definita dall'utente mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle funzioni definite dall'utente è disabilitato. Il valore predefinito è 0.
useStorageWriteApi (Facoltativo) Se true, la pipeline utilizza l' API BigQuery Storage Write. Il valore predefinito è false. Per ulteriori informazioni, consulta la pagina relativa all' utilizzo dell'API Storage Write.
useStorageWriteApiAtLeastOnce (Facoltativo) Quando si utilizza l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica at-least-once, imposta questo parametro su true. Per usare la semantica "exactly-once", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
numStorageWriteApiStreams (Facoltativo) Quando si utilizza l'API Storage Write, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.
storageWriteApiTriggeringFrequencySec Facoltativo: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Funzione definita dall'utente

Facoltativamente, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la funzione definita dall'utente per ogni elemento di input. I payload degli elementi sono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La funzione definita dall'utente ha le seguenti specifiche:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON corrispondente allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

    5. Nel menu a discesa Modello Dataflow, seleziona the Pub/Sub to BigQuery template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità di streaming "at-least-once", seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME
    

    Sostituisci quanto segue:

    • JOB_NAME: un nome job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: nome della tua tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: nome della tua tabella BigQuery

    Passaggi successivi