Il modello Datastream to BigQuery è una pipeline in modalità flusso che legge i dati di Datastream e li replica in BigQuery. Il modello legge i dati da Cloud Storage
utilizzando le notifiche Pub/Sub e li replica in una tabella intermedia BigQuery
partizionata in base al tempo. Dopo la replica, il modello esegue un MERGE
in BigQuery per eseguire l'upsert di tutte le modifiche di Change Data Capture (CDC) in una replica della tabella di origine.
Il modello gestisce la creazione e l'aggiornamento delle tabelle BigQuery gestite dalla replica. Quando è richiesto il linguaggio di definizione dei dati (DDL), un callback a Datastream estrae lo schema della tabella di origine e lo traduce in tipi di dati BigQuery. Le operazioni supportate includono:
- Le nuove tabelle vengono create man mano che i dati vengono inseriti.
- Alle tabelle BigQuery vengono aggiunte nuove colonne con valori iniziali null.
- Le colonne eliminate vengono ignorate in BigQuery e i valori futuri sono null.
- Le colonne rinominate vengono aggiunte a BigQuery come nuove colonne.
- Le modifiche ai tipi non vengono propagate a BigQuery.
Ti consigliamo di eseguire questa pipeline utilizzando la modalità di streaming almeno una volta, poiché il modello esegue la deduplica quando unisce i dati da una tabella BigQuery temporanea alla tabella BigQuery principale. Questo passaggio della pipeline indica che non è previsto alcun vantaggio aggiuntivo nell'utilizzo della modalità di streaming esattamente una volta.
Requisiti della pipeline
- Uno stream DataStream pronto per la replica dei dati o che la sta già eseguendo.
- Le notifiche Pub/Sub di Cloud Storage sono attivate per i dati di Datastream.
- I set di dati di destinazione BigQuery sono stati creati e all'account di servizio Compute Engine è stato concesso l'accesso amministrativo.
- Per creare la tabella della replica di destinazione è necessaria una chiave primaria nella tabella di origine.
- Un database di origine MySQL o Oracle. I database PostgreSQL e SQL Server non sono supportati.
Parametri del modello
Parametri obbligatori
- inputFilePattern: la posizione del file per l'output del file Datastream in Cloud Storage, nel formato
gs://<BUCKET_NAME>/<ROOT_PATH>/
. - inputFileFormat: il formato dei file di output prodotti da Datastream. I valori consentiti sono
avro
ejson
. Il valore predefinito èavro
. - gcsPubSubSubscription: l'abbonamento Pub/Sub utilizzato da Cloud Storage per notificare a Dataflow i nuovi file disponibili per l'elaborazione, nel formato:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - outputStagingDatasetTemplate: il nome del set di dati contenente le tabelle di staging. Questo parametro supporta i modelli, ad esempio
{_metadata_dataset}_log
omy_dataset_log
. In genere, questo parametro è il nome di un set di dati. Il valore predefinito è{_metadata_dataset}
. - outputDatasetTemplate: il nome del set di dati contenente le tabelle di replica. Questo parametro supporta i modelli, ad esempio
{_metadata_dataset}
omy_dataset
. In genere, questo parametro è il nome di un set di dati. Il valore predefinito è{_metadata_dataset}
. - deadLetterQueueDirectory: il percorso utilizzato da Dataflow per scrivere l'output della coda dei messaggi non recapitabili. Questo percorso non deve essere lo stesso del percorso dell'output del file Datastream. Il valore predefinito è
empty
.
Parametri facoltativi
- streamName: il nome o il modello dello stream da sottoporre a polling per le informazioni sullo schema. Il valore predefinito è: {_metadata_stream}. In genere, il valore predefinito è sufficiente.
- rfcStartDateTime: la data e l'ora di inizio da utilizzare per recuperare i dati da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è
1970-01-01T00:00:00.00Z
. - fileReadConcurrency: il numero di file DataStream simultanei da leggere. Il valore predefinito è
10
. - outputProjectId: l'ID del progetto Google Cloud contenente i set di dati BigQuery in cui eseguire l'output dei dati. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- outputStagingTableNameTemplate: il modello da utilizzare per assegnare un nome alle tabelle di staging. Ad esempio,
{_metadata_table}
. Il valore predefinito è{_metadata_table}_log
. - outputTableNameTemplate: il modello da utilizzare per il nome delle tabelle di replica, ad esempio
{_metadata_table}
. Il valore predefinito è{_metadata_table}
. - ignoreFields: campi separati da virgole da ignorare in BigQuery. Il valore predefinito è
_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count
. Ad esempio:_metadata_stream,_metadata_schema
. - mergeFrequencyMinutes: il numero di minuti tra le unioni per una determinata tabella. Il valore predefinito è
5
. - dlqRetryMinutes: il numero di minuti tra i tentativi di esecuzione della coda DLQ. Il valore predefinito è
10
. - dataStreamRootUrl: l'URL principale dell'API Datastream. Valore predefinito: https://datastream.googleapis.com/.
- applyMerge: indica se disattivare le query MERGE per il job. Il valore predefinito è
true
. - mergeConcurrency: il numero di query MERGE di BigQuery simultanee. Ha effetto solo se applyMerge è impostato su true. Il valore predefinito è
30
. - partitionRetentionDays: il numero di giorni da utilizzare per la conservazione delle partizioni durante l'esecuzione delle unioni BigQuery. Il valore predefinito è
1
. - useStorageWriteApiAtLeastOnce: questo parametro viene applicato solo se
Use BigQuery Storage Write API
è attivato. Setrue
, per l'API Storage Write vengono utilizzate le semantiche almeno una volta. In caso contrario, vengono utilizzate le semantiche di esecuzione esattamente una volta. Il valore predefinito èfalse
. - javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è
0
, il ricaricamento delle funzioni definite dall'utente è disattivato. Il valore predefinito è0
. - pythonTextTransformGcsPath: il pattern del percorso Cloud Storage per il codice Python contenente le funzioni definite dall'utente. Ad esempio,
gs://your-bucket/your-transforms/*.py
. - pythonRuntimeVersion: la versione del runtime da utilizzare per questa UDF Python.
- pythonTextTransformFunctionName: il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, cifre e trattini bassi. Ad esempio,
transform_udf1
. - runtimeRetries: il numero di volte che verrà eseguito un nuovo tentativo di runtime prima del fallimento. Valore predefinito: 5.
- useStorageWriteApi: se true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Funzione definita dall'utente
Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.
Specifiche della funzione
La UDF ha la seguente specifica:
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Datastream to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming Almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage per i dati di Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: il nome del set di dati BigQuery.BIGQUERY_TABLE
: il modello di tabella BigQuery. Ad esempio,{_metadata_schema}_{_metadata_table}_log
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage per i dati di Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: il nome del set di dati BigQuery.BIGQUERY_TABLE
: il modello di tabella BigQuery. Ad esempio,{_metadata_schema}_{_metadata_table}_log
Passaggi successivi
- Scopri come implementare Datastream e Dataflow per l'analisi.
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.