Da Pub/Sub a BigQuery con modello di funzione definita dall'utente Python

Il modello da Pub/Sub a BigQuery con funzione definita dall'utente Python è una pipeline di inserimento flussi che legge i messaggi in formato JSON da Pub/Sub e li scrive in una tabella BigQuery. Facoltativamente, puoi fornire una funzione definita dall'utente;utente (UDF) scritta in Python per elaborare i messaggi in arrivo.

Requisiti della pipeline

  • La tabella BigQuery deve esistere e avere uno schema.
  • I dati dei messaggi Pub/Sub devono utilizzare il formato JSON oppure devi fornire una funzione definita dall'utente che converti i dati dei messaggi in formato JSON. I dati JSON devono corrispondere allo schema della tabella BigQuery. Ad esempio, se i payload JSON sono formattati come {"k1":"v1", "k2":"v2"}, la tabella BigQuery deve avere due colonne di tipo stringa denominate k1 e k2.
  • Specifica il parametro inputSubscription o inputTopic, ma non entrambi.

Parametri del modello

Parametro Descrizione
outputTableSpec La tabella BigQuery in cui scrivere, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME".
inputSubscription (Facoltativo) La sottoscrizione Pub/Sub da cui eseguire la lettura, formattata come "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME".
inputTopic (Facoltativo) L'argomento Pub/Sub da cui eseguire la lettura, nel formato "projects/PROJECT_ID/topics/TOPIC_NAME".
outputDeadletterTable La tabella BigQuery per i messaggi che non sono riusciti a raggiungere la tabella di output, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME". Se la tabella non esiste, viene creata all'esecuzione della pipeline. Se questo parametro non viene specificato, viene utilizzato il valore "OUTPUT_TABLE_SPEC_error_records".
pythonExternalTextTransformGcsPath (Facoltativo) L'URI Cloud Storage del file di codice Python che definisce la funzione definita dall'utente;utente (UDF) che vuoi utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Facoltativo: il nome della funzione definita dall'utente (UDF) Python che vuoi utilizzare.
useStorageWriteApi Facoltativo: se true, la pipeline utilizza l' API BigQuery Storage Write. Il valore predefinito è false. Per ulteriori informazioni, consulta la pagina relativa all' utilizzo dell'API Storage Write.
useStorageWriteApiAtLeastOnce Facoltativo: quando si utilizza l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica at-least-once, imposta questo parametro su true. Per usare la semantica "exactly-once", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
numStorageWriteApiStreams Facoltativo: quando si utilizza l'API Storage Write, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.
storageWriteApiTriggeringFrequencySec Facoltativo: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Funzione definita dall'utente

Facoltativamente, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la funzione definita dall'utente per ogni elemento di input. I payload degli elementi sono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La funzione definita dall'utente ha le seguenti specifiche:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON corrispondente allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

    5. Nel menu a discesa Modello Dataflow, seleziona the Pub/Sub to BigQuery with Python UDF template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità di streaming "at-least-once", seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME
    

    Sostituisci quanto segue:

    • JOB_NAME: un nome job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: nome della tua tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per la gestione temporanea dei file locali (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: nome della tua tabella BigQuery

    Passaggi successivi