Il modello da protocollo Pub/Sub a BigQuery è una pipeline in modalità flusso che importa un protocollo
da una sottoscrizione Pub/Sub a una tabella BigQuery.
Gli errori che si verificano durante la scrittura nella tabella BigQuery vengono inseriti in modalità flusso in un
Argomento non elaborato Pub/Sub.
Per trasformare i dati, è possibile fornire una funzione JavaScript definita dall'utente (UDF). Gli errori durante l'esecuzione
dell'UDF possono essere inviati a un argomento Pub/Sub separato o allo stesso argomento non elaborato degli
errori di BigQuery.
Requisiti della pipeline
- Deve esistere la sottoscrizione Pub/Sub di input.
- Il file di schema per i record Proto deve esistere in Cloud Storage.
- Deve esistere l'argomento Pub/Sub di output.
- Deve esistere il set di dati BigQuery di output.
- Se la tabella BigQuery esiste, deve avere uno schema corrispondente ai dati del protocollo, indipendentemente dal valore
createDisposition
.
Parametri del modello
Parametro | Descrizione |
---|---|
protoSchemaPath |
Il percorso di Cloud Storage del file di schema proto autonomo. Ad esempio, gs://path/to/my/file.pb .
Questo file può essere generato con il flag --descriptor_set_out del comando protoc .
Il flag --include_imports garantisce che il file sia autonomo. |
fullMessageName |
Il nome completo del messaggio di protocollo. Ad esempio, package.name.MessageName , dove package.name è il valore
specificato per l'istruzione package e non per l'istruzione java_package . |
inputSubscription |
La sottoscrizione Pub/Sub di input da cui leggere. Ad esempio, projects/<project>/subscriptions/<subscription> . |
outputTopic |
L'argomento Pub/Sub da utilizzare per i record non elaborati. Ad esempio, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
La posizione della tabella di output BigQuery. Ad esempio, my-project:my_dataset.my_table .
A seconda del valore createDisposition specificato, la tabella di output potrebbe essere creata automaticamente utilizzando il file dello schema di input. |
preserveProtoFieldNames |
(Facoltativo) true per conservare il nome del campo Proto originale in JSON. false per utilizzare nomi JSON più standard.
Ad esempio, false cambierebbe field_name in fieldName . (Valore predefinito: false ) |
bigQueryTableSchemaPath |
(Facoltativo) Percorso Cloud Storage del percorso dello schema BigQuery. Ad esempio, gs://path/to/my/schema.json . Se non viene fornito, lo schema viene dedotto dallo schema Proto. |
javascriptTextTransformGcsPath |
(Facoltativo)
L'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facoltativo)
Il nome della funzione definita dall'utente (UDF) JavaScript che vuoi utilizzare.
Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ } , il nome della funzione è
myTransform . Per esempi di funzioni JavaScript definite dall'utente, consulta
Esempi di funzioni definite dall'utente.
|
javascriptTextTransformReloadIntervalMinutes |
(Facoltativo) Specifica la frequenza di ricarica della funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file delle funzioni definite dall'utente in Cloud Storage e ricarica funzione definita dall'utente se il file viene modificato. Questo parametro ti consente di aggiornare la funzione definita dall'utente mentre la pipeline è senza dover riavviare il job. Se il valore è 0, il ricaricamento della funzione definita dall'utente viene disabilitato. La il valore predefinito è 0. |
udfOutputTopic |
(Facoltativo) L'argomento Pub/Sub in cui sono archiviati gli errori delle funzioni definite dall'utente. Ad esempio:
projects/<project-id>/topics/<topic-name> . Se non viene fornito, si verificano errori delle funzioni definite dall'utente
vengono inviati allo stesso argomento di outputTopic . |
writeDisposition |
(Facoltativo) WriteDisposition di BigQuery.
Ad esempio, WRITE_APPEND , WRITE_EMPTY o WRITE_TRUNCATE . Valore predefinito: WRITE_APPEND . |
createDisposition |
(Facoltativo) Il CreateDisposition BigQuery.
Ad esempio, CREATE_IF_NEEDED , CREATE_NEVER . Valore predefinito: CREATE_IF_NEEDED . |
useStorageWriteApi |
(Facoltativo)
Se true , la pipeline utilizza
API BigQuery StorageWrite. Il valore predefinito è false . Per ulteriori informazioni, vedi
Utilizzo dell'API StorageWrite.
|
useStorageWriteApiAtLeastOnce |
(Facoltativo)
Quando utilizzi l'API StorageWrite, specifica la semantica della scrittura. Per utilizzare
semantica "at-least-once", imposta questo parametro su true . Per utilizzare la semantica esattamente una volta, imposta il parametro su false . Questo parametro si applica solo quando
useStorageWriteApi è true . Il valore predefinito è false .
|
numStorageWriteApiStreams |
(Facoltativo)
Quando utilizzi l'API StorageWrite, specifica il numero di flussi di scrittura. Se
useStorageWriteApi è true e useStorageWriteApiAtLeastOnce
è false , questo parametro deve essere impostato.
|
storageWriteApiTriggeringFrequencySec |
(Facoltativo)
Quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se
useStorageWriteApi è true e useStorageWriteApiAtLeastOnce
è false , questo parametro deve essere impostato.
|
Funzione definita dall'utente
Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la funzione definita dall'utente per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.
Specifica della funzione
La funzione definita dall'utente ha la seguente specifica:
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito
è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub Proto to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Sostituisci quanto segue:
JOB_NAME
: un nome di job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file di schema Proto (ad esempiogs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: il nome del messaggio Proto (ad esempio,package.name.MessageName
)SUBSCRIPTION_NAME
: il nome dell'abbonamento Pub/Sub di inputBIGQUERY_TABLE
: il nome della tabella di output BigQueryUNPROCESSED_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul
API e i relativi ambiti di autorizzazione, consulta
projects.templates.launch
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Sostituisci quanto segue:
PROJECT_ID
: L'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco di tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file di schema Proto (ad esempiogs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: il nome del messaggio Proto (ad esempio,package.name.MessageName
)SUBSCRIPTION_NAME
: il nome dell'abbonamento Pub/Sub di inputBIGQUERY_TABLE
: il nome della tabella di output BigQueryUNPROCESSED_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.