Modello da Datastream a BigQuery (flusso)

Il modello da Datastream a BigQuery è una pipeline di inserimento flussi che legge i dati di Datastream e li replica in BigQuery. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica in una tabella temporanea di BigQuery partizionata in base all'ora. Dopo la replica, il modello esegue un'istruzione MERGE in BigQuery per eseguire l'upsert di tutte le modifiche CDC (Change Data Capture) in una replica della tabella di origine.

Il modello gestisce la creazione e l'aggiornamento delle tabelle BigQuery gestite dalla replica. Quando è richiesto Data Definition Language (DDL), un callback a Datastream estrae lo schema della tabella di origine e lo converte in tipi di dati BigQuery. Le operazioni supportate includono:

  • Vengono create nuove tabelle man mano che vengono inseriti i dati.
  • Alle tabelle BigQuery vengono aggiunte nuove colonne con valori iniziali null.
  • Le colonne eliminate vengono ignorate in BigQuery e i valori futuri sono nulli.
  • Le colonne rinominate vengono aggiunte a BigQuery come nuove colonne.
  • Le modifiche del tipo non vengono propagate a BigQuery.

Ti consigliamo di eseguire questa pipeline utilizzando la modalità flusso di dati Almeno una volta, perché il modello esegue la deduplicazione quando unisce i dati da una tabella BigQuery temporanea alla tabella BigQuery principale. Questo passaggio della pipeline non comporta ulteriori vantaggi nell'utilizzo della modalità flusso di dati "exactly-once".

Requisiti della pipeline

  • Un flusso Datastream pronto per la replica dei dati o lo sta già.
  • Le notifiche Pub/Sub di Cloud Storage sono abilitate per i dati Datastream.
  • Vengono creati set di dati di destinazione BigQuery e all'account di servizio Compute Engine è stato concesso l'accesso in qualità di amministratore.
  • Per creare la tabella di replica di destinazione, è necessaria una chiave primaria nella tabella di origine.
  • Un database di origine MySQL o Oracle. I database PostgreSQL non sono supportati.

Parametri del modello

Parametri obbligatori

  • inputFilePattern: la posizione del file per l'output del file Datastream in Cloud Storage, nel formato gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat : il formato dei file di output prodotti da Datastream. Il valore può essere "avro" o "json". Il valore predefinito è: avro.
  • gcsPubSubSubscription : la sottoscrizione Pub/Sub utilizzata da Cloud Storage per notificare a Dataflow la presenza di nuovi file disponibili per l'elaborazione nel formato: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate : il nome del set di dati che contiene tabelle temporanee. Questo parametro supporta i modelli, ad esempio {_metadata_dataset}_log o my_dataset_log. Normalmente, questo parametro è il nome di un set di dati. Il valore predefinito è: {_metadata_dataset}.
  • outputDatasetTemplate : il nome del set di dati che contiene le tabelle di replica. Questo parametro supporta i modelli, ad esempio {_metadata_dataset} o my_dataset. Normalmente, questo parametro è il nome di un set di dati. Il valore predefinito è: {_metadata_dataset}.
  • deadLetterQueueDirectory : il percorso utilizzato da Dataflow per scrivere l'output della coda dei messaggi non recapitabili. Questo percorso non deve trovarsi nello stesso percorso dell'output del file Datastream. Il campo predefinito è vuoto.

Parametri facoltativi

  • streamName : il nome o il modello dello stream da sottoporre a polling per ottenere le informazioni sullo schema. Il valore predefinito è: {_metadata_stream}. In genere il valore predefinito è sufficiente.
  • rfcStartDateTime : il valore DateTime di inizio da utilizzare per recuperare i dati da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : il numero di file DataStream simultanei da leggere. Il valore predefinito è 10.
  • outputProjectId : l'ID del progetto Google Cloud che contiene i set di dati BigQuery in cui inviare i dati. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • outputStagingTableNameTemplate : il modello da utilizzare per assegnare un nome alle tabelle temporanee. ad esempio {_metadata_table}). Il valore predefinito è: {_metadata_table}_log.
  • outputTableNameTemplate : il modello da utilizzare per il nome delle tabelle di replica, ad esempio {_metadata_table}. Il valore predefinito è: {_metadata_table}.
  • ignoreFields : campi separati da virgole da ignorare in BigQuery. Il valore predefinito è: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. Esempio: _metadata_stream,_metadata_schema.
  • mergeFrequencyMinutes : il numero di minuti tra le unioni di una determinata tabella. Il valore predefinito è 5.
  • dlqRetryMinutes : il numero di minuti tra i nuovi tentativi DLQ. Il valore predefinito è 10.
  • dataStreamRootUrl : l'URL principale dell'API Datastream. Il valore predefinito è: https://datastream.googleapis.com/.
  • applyMerge : se disabilitare le query MERGE per il job. Il valore predefinito è: true.
  • mergeConcurrency : il numero di query simultanee di BigQuery MERGE. Ha effetto solo quando applyMerge è impostato su true. Il valore predefinito è 30.
  • partitionRetentionDays : il numero di giorni da utilizzare per la conservazione della partizione durante l'esecuzione di unioni BigQuery. Il valore predefinito è: 1.
  • useStorageWriteApiAtLeastOnce : questo parametro viene applicato soltanto se è abilitata l'opzione "Usa API BigQuery Storage Scrivi". Se il valore è vero, per l'API Storage viene utilizzata la semantica "at-least-once". In caso contrario, viene usata la semantica "exactly-once". Il valore predefinito è false.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Esempio: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta gli esempi di funzioni definite dall'utente (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : specifica la frequenza di ricarica della funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file delle funzioni definite dall'utente in Cloud Storage e ricarica la funzione definita dall'utente se il file viene modificato. Questo parametro consente di aggiornare la funzione definita dall'utente mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è 0, il ricaricamento della funzione definita dall'utente viene disabilitato. Il valore predefinito è 0.
  • pythonTextTransformGcsPath : il pattern del percorso di Cloud Storage per il codice Python contenente le funzioni definite dall'utente. ad esempio gs://your-bucket/your-transforms/*.py.
  • pythonRuntimeVersion : la versione del runtime da utilizzare per questa funzione Python.
  • pythonTextTransformFunctionName : il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, numeri e trattini bassi. (ad esempio: transform_udf1).
  • runtimeRetries : il numero di volte in cui verrà eseguito un nuovo tentativo di runtime prima che non vada a buon fine. Il valore predefinito è 5.
  • useStorageWriteApi : se true, la pipeline utilizza l'API BigQuery Storage Scrivi (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API StorageWrite (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : quando si utilizza l'API StorageWrite, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec : quando utilizzi l'API StorageWrite, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Funzione definita dall'utente

Facoltativamente, puoi estendere questo modello scrivendo una funzione definita dall'utente dall'utente. Il modello chiama la funzione definita dall'utente per ogni elemento di input. I payload degli elementi sono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Specifica della funzione

La funzione definita dall'utente ha la seguente specifica:

  • Input: i dati CDC, serializzati come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

    5. Dal menu a discesa Modello Dataflow, seleziona the Datastream to BigQuery template.
    6. Inserisci i valori parametro negli appositi campi.
    7. (Facoltativo) Per passare dall'elaborazione "exactly-once" alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: il percorso Cloud Storage dei dati Datastream. Ad esempio: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: il nome del set di dati BigQuery.
    • BIGQUERY_TABLE: il tuo modello di tabella BigQuery. Ad esempio, {_metadata_schema}_{_metadata_table}_log

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: il percorso Cloud Storage dei dati Datastream. Ad esempio: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: il nome del set di dati BigQuery.
    • BIGQUERY_TABLE: il tuo modello di tabella BigQuery. Ad esempio, {_metadata_schema}_{_metadata_table}_log

    Passaggi successivi