Il modello Pub/Sub Avro to BigQuery è una pipeline in modalità flusso che importa dati Avro da una sottoscrizione Pub/Sub in una tabella BigQuery. Eventuali errori che si verificano durante la scrittura nella tabella BigQuery vengono inseriti in modalità flusso in un argomento Pub/Sub non elaborato.
Requisiti della pipeline
- Deve esistere la sottoscrizione Pub/Sub di input.
- In Cloud Storage deve esistere il file dello schema per i record Avro.
- L'argomento Pub/Sub non elaborato deve esistere.
- Deve esistere il set di dati BigQuery di output.
Parametri del modello
Parametri obbligatori
- schemaPath: la posizione in Cloud Storage del file dello schema Avro. Ad esempio,
gs://path/to/my/schema.avsc
. - inputSubscription : la sottoscrizione di input Pub/Sub da cui leggere. (ad esempio, projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>).
- outputTableSpec: la posizione della tabella di output BigQuery in cui scrivere l'output. Ad esempio,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
. A seconda delcreateDisposition
specificato, la tabella di output potrebbe essere creata automaticamente utilizzando lo schema Avro fornito dall'utente. - outputTopic : l'argomento Pub/Sub da utilizzare per i record non elaborati. (Esempio: projects/<PROJECT_ID>/topics/<TOPIC_NAME>).
Parametri facoltativi
- useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica "at-least-once" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica "exactly-once", imposta il parametro su
false
. Questo parametro si applica solo quandouseStorageWriteApi
ètrue
. Il valore predefinito èfalse
. - writeDisposition: il valore di BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio,
WRITE_APPEND
,WRITE_EMPTY
oWRITE_TRUNCATE
. Il valore predefinito èWRITE_APPEND
. - createDisposition: il valore CreateDisposition di BigQuery (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). Ad esempio,
CREATE_IF_NEEDED
eCREATE_NEVER
. Il valore predefinito èCREATE_IF_NEEDED
. - useStorageWriteApi: se true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams : quando si utilizza l'API StorageWrite, specifica il numero di flussi di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec : quando utilizzi l'API StorageWrite, specifica la frequenza di attivazione in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub Avro to BigQuery template.
- Inserisci i valori parametro negli appositi campi.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco di tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file di schema Avro (ad esempio,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: il nome dell'abbonamento Pub/Sub di inputBIGQUERY_TABLE
: il nome della tabella di output BigQueryDEADLETTER_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul
API e i relativi ambiti di autorizzazione, consulta
projects.templates.launch
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Sostituisci quanto segue:
JOB_NAME
: un nome job univoco di tua sceltaLOCATION
: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempious-central1
VERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile in cartella principale non-dated nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica , che puoi trovare nidificata nella rispettiva cartella principale con data del bucket: gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: il percorso Cloud Storage del file di schema Avro (ad esempio,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: il nome dell'abbonamento Pub/Sub di inputBIGQUERY_TABLE
: il nome della tabella di output BigQueryDEADLETTER_TOPIC
: l'argomento Pub/Sub da utilizzare per la coda non elaborata
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.