Modello di sottoscrizione Pub/Sub a BigQuery

Il modello di sottoscrizione Pub/Sub a BigQuery è una pipeline di inserimento flussi che legge i messaggi in formato JSON da una sottoscrizione Pub/Sub e li scrive in una tabella BigQuery. Puoi utilizzare il modello come soluzione rapida per spostare i dati Pub/Sub in BigQuery. Il modello legge i messaggi in formato JSON da Pub/Sub e li converte in elementi BigQuery.

Requisiti della pipeline

  • Il campo data dei messaggi Pub/Sub deve utilizzare il formato JSON, descritto in questa guida a JSON. Ad esempio, i messaggi con valori nel campo data formattato come {"k1":"v1", "k2":"v2"} possono essere inseriti in una tabella BigQuery con due colonne, denominate k1 e k2, con un tipo di dati stringa.
  • La tabella di output deve esistere prima dell'esecuzione della pipeline. Lo schema della tabella deve corrispondere agli oggetti JSON di input.

Parametri del modello

Parametri obbligatori

  • outputTableSpec : il percorso della tabella di output BigQuery nel formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription : la sottoscrizione di input Pub/Sub da cui leggere, nel formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parametri facoltativi

  • outputDeadletterTable : la tabella BigQuery da utilizzare per i messaggi che non riescono a raggiungere la tabella di output, nel formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Se la tabella non esiste, viene creata durante l'esecuzione della pipeline. Se non specificato, viene utilizzato OUTPUT_TABLE_SPEC_error_records.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente di JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript di esempio, consulta gli esempi di funzioni definite dall'utente (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : definisci l'intervallo con cui i worker possono controllare le modifiche alla funzione JavaScript definita dall'utente per ricaricare i file. Il valore predefinito è 0.

Funzione definita dall'utente

Facoltativamente, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la funzione definita dall'utente per ogni elemento di input. I payload degli elementi sono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La funzione definita dall'utente ha le seguenti specifiche:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON corrispondente allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

    5. Nel menu a discesa Modello Dataflow, seleziona the Pub/Sub Subscription to BigQuery template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità di streaming "at-least-once", seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
    

    Sostituisci quanto segue:

    • JOB_NAME: un nome job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: nome della tua tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: nome della tua tabella BigQuery

    Passaggi successivi