Il modello da Pub/Sub Avro a BigQuery è una pipeline di inserimento flussi che importa dati Avro da una sottoscrizione Pub/Sub in una tabella BigQuery. Eventuali errori che si verificano durante la scrittura nella tabella BigQuery vengono inseriti in modalità flusso in un argomento Pub/Sub non elaborato.
Requisiti della pipeline
- Deve esistere la sottoscrizione Pub/Sub di input.
- Il file di schema per i record Avro deve esistere in Cloud Storage.
- L'argomento Pub/Sub non elaborato deve esistere.
- Deve esistere il set di dati BigQuery di output.
Parametri del modello
Parametri obbligatori
- schemaPath : il percorso Cloud Storage del file di schema Avro. Ad esempio,
gs://path/to/my/schema.avsc
. - inputSubscription : la sottoscrizione di input Pub/Sub da cui leggere. ad esempio projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>.
- outputTableSpec : la posizione della tabella di output BigQuery in cui scrivere l'output. Ad esempio,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
.In base al valorecreateDisposition
specificato, la tabella di output potrebbe essere creata automaticamente utilizzando lo schema Avro fornito dall'utente. - outputTopic : l'argomento Pub/Sub da utilizzare per i record non elaborati. (Esempio: projects/<PROJECT_ID>/topics/<TOPIC_NAME>).
Parametri facoltativi
- useStorageWriteApiAtLeastOnce : quando si utilizza l'API StorageWrite, specifica la semantica della scrittura. Per utilizzare la semantica "at-least-once" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica "exactly-once", imposta il parametro su
false
. Questo parametro si applica solo quandouseStorageWriteApi
ètrue
. Il valore predefinito èfalse
. - writeDisposition : il valore writeDisposition di BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio,
WRITE_APPEND
,WRITE_EMPTY
oWRITE_TRUNCATE
. Il valore predefinito èWRITE_APPEND
. - createDisposition : il valore CreateDisposition di BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio,
CREATE_IF_NEEDED
eCREATE_NEVER
. Il valore predefinito èCREATE_IF_NEEDED
. - useStorageWriteApi : se true, la pipeline utilizza l'API BigQuery Storage Scrivi (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API StorageWrite (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : quando si utilizza l'API StorageWrite, specifica il numero di flussi di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec : quando utilizzi l'API StorageWrite, specifica la frequenza di attivazione in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello Dataflow, seleziona the Pub/Sub Avro to BigQuery template.
- Inserisci i valori parametro negli appositi campi.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file di schema Avro (ad esempio,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: il nome della sottoscrizione di input Pub/SubBIGQUERY_TABLE
: nome della tabella di output BigQueryDEADLETTER_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni
sull'API e sui relativi ambiti di autorizzazione, consulta
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file di schema Avro (ad esempio,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: il nome della sottoscrizione di input Pub/SubBIGQUERY_TABLE
: nome della tabella di output BigQueryDEADLETTER_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.