Modello di sottoscrizione Pub/Sub a BigQuery

Il modello Pub/Sub Subscription to BigQuery è una pipeline di flusso che legge i messaggi in formato JSON da una sottoscrizione Pub/Sub e li scrive in una tabella BigQuery. Puoi utilizzare il modello come soluzione rapida per spostare i dati Pub/Sub in BigQuery. Il modello legge i messaggi in formato JSON da Pub/Sub e li converte in elementi BigQuery.

Requisiti della pipeline

  • Il campo data dei messaggi Pub/Sub deve utilizzare il formato JSON, descritto in questa guida JSON. Ad esempio, i messaggi con valori nel campo data formattati come {"k1":"v1", "k2":"v2"} possono essere inseriti in una tabella BigQuery con due colonne denominate k1 e k2, con un tipo di dati stringa.
  • La tabella di output deve esistere prima dell'esecuzione della pipeline. Lo schema della tabella deve corrispondere agli oggetti JSON di input.

Parametri del modello

Parametri obbligatori

  • outputTableSpec : la posizione della tabella di output BigQuery, nel formato <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • inputSubscription : la sottoscrizione di input Pub/Sub da cui leggere, nel formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>.

Parametri facoltativi

  • outputDeadletterTable : la tabella BigQuery da utilizzare per i messaggi che non riescono a raggiungere la tabella di output, nel formato di <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Se la tabella non esiste, viene creata durante l'esecuzione della pipeline. Se non specificato, viene utilizzato OUTPUT_TABLE_SPEC_error_records.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : definisci l'intervallo in cui i worker possono verificare la presenza di modifiche alle funzioni definite dall'utente JavaScript per ricaricare i file. Il valore predefinito è 0.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha la seguente specifica:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

    5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub Subscription to BigQuery template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    Sostituisci quanto segue:

    • JOB_NAME: un nome di job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per l'organizzazione in anteprima dei file locali (ad esempio gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome di job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per l'organizzazione in anteprima dei file locali (ad esempio gs://your-bucket/staging)
    • SUBSCRIPTION_NAME: il nome della sottoscrizione Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    Passaggi successivi