Da Pub/Sub a BigQuery con modello di funzione definita dall'utente Python

La funzione definita dall'utente da Pub/Sub a BigQuery con Python è una pipeline in modalità flusso che legge i messaggi in formato JSON e li scrive in una tabella BigQuery. Facoltativamente, puoi fornire una funzione definita dall'utente (UDF) scritta in Python per elaborare i messaggi in arrivo.

Requisiti della pipeline

  • La tabella BigQuery deve esistere e avere uno schema.
  • I dati dei messaggi Pub/Sub devono utilizzare il formato JSON, una funzione definita dall'utente che converte i dati dei messaggi in JSON. I dati JSON devono corrispondere allo schema della tabella BigQuery. Ad esempio, se i payload JSON sono formattati come {"k1":"v1", "k2":"v2"}, la tabella BigQuery deve avere due colonne di stringhe denominate k1 e k2.
  • Specifica il parametro inputSubscription o inputTopic, ma non entrambi.

Parametri del modello

Parametro Descrizione
outputTableSpec La tabella BigQuery in cui scrivere, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME".
inputSubscription (Facoltativo) L'abbonamento Pub/Sub da cui leggere, formattato come "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME".
inputTopic (Facoltativo) L'argomento Pub/Sub da cui leggere, formattato come "projects/PROJECT_ID/topics/TOPIC_NAME".
outputDeadletterTable La tabella BigQuery per i messaggi che non hanno raggiunto la della tabella di output, formattata come "PROJECT_ID:DATASET_NAME.TABLE_NAME". Se la tabella non esiste, viene creata quando viene eseguita la pipeline. Se questo non è specificato, il valore Al suo posto viene usato "OUTPUT_TABLE_SPEC_error_records".
pythonExternalTextTransformGcsPath (Facoltativo) L'URI Cloud Storage del file di codice Python che definisce la funzione definita dall'utente (UDF) che vuoi utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName (Facoltativo) Il nome della funzione definita dall'utente (UDF) Python che vuoi utilizzare.
useStorageWriteApi (Facoltativo) Se true, la pipeline utilizza API BigQuery StorageWrite. Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzare l'API Storage Write.
useStorageWriteApiAtLeastOnce (Facoltativo) Quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare semantica "at-least-once", imposta questo parametro su true. Per utilizzare la semantica "exactly-once", imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
numStorageWriteApiStreams (Facoltativo) Quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, questo parametro deve essere impostato.
storageWriteApiTriggeringFrequencySec (Facoltativo) Quando utilizzi l'API Storage Scrivi, specifica la frequenza di attivazione, in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, questo parametro deve essere impostato.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi serializzate come stringhe JSON. Per ulteriori informazioni, vedi Crea funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La UDF ha la seguente specifica:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema del Tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

    5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub to BigQuery with Python UDF template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    Sostituisci quanto segue:

    • JOB_NAME: un nome job univoco di tua scelta
    • REGION_NAME: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: il nome della tabella BigQuery

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome di job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: la versione del modello che vuoi utilizzare

      Puoi utilizzare i seguenti valori:

    • STAGING_LOCATION: la posizione per i file locali di gestione temporanea (ad esempio, gs://your-bucket/staging)
    • TOPIC_NAME: nome dell'argomento Pub/Sub
    • DATASET: il tuo set di dati BigQuery
    • TABLE_NAME: il nome della tua tabella BigQuery

    Passaggi successivi