Modello Datastream a BigQuery (flusso)

Il modello Datastream to BigQuery è una pipeline in modalità flusso che legge i dati di Datastream e li replica in BigQuery. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica in una tabella intermedia BigQuery partizionata in base al tempo. Dopo la replica, il modello esegue un MERGE in BigQuery per eseguire l'upsert di tutte le modifiche di Change Data Capture (CDC) in una replica della tabella di origine.

Il modello gestisce la creazione e l'aggiornamento delle tabelle BigQuery gestite dalla replica. Quando è richiesto il linguaggio di definizione dei dati (DDL), un callback a Datastream estrae lo schema della tabella di origine e lo traduce in tipi di dati BigQuery. Le operazioni supportate includono:

  • Le nuove tabelle vengono create man mano che i dati vengono inseriti.
  • Alle tabelle BigQuery vengono aggiunte nuove colonne con valori iniziali null.
  • Le colonne eliminate vengono ignorate in BigQuery e i valori futuri sono null.
  • Le colonne rinominate vengono aggiunte a BigQuery come nuove colonne.
  • Le modifiche ai tipi non vengono propagate a BigQuery.

Ti consigliamo di eseguire questa pipeline utilizzando la modalità di streaming almeno una volta, poiché il modello esegue la deduplica quando unisce i dati da una tabella BigQuery temporanea alla tabella BigQuery principale. Questo passaggio della pipeline indica che non è previsto alcun vantaggio aggiuntivo nell'utilizzo della modalità di streaming esattamente una volta.

Requisiti della pipeline

  • Uno stream DataStream pronto per la replica dei dati o che la sta già eseguendo.
  • Le notifiche Pub/Sub di Cloud Storage sono abilitate per i dati di Datastream.
  • I set di dati di destinazione BigQuery sono stati creati e all'account di servizio Compute Engine è stato concesso l'accesso amministrativo.
  • Per creare la tabella della replica di destinazione è necessaria una chiave primaria nella tabella di origine.
  • Un database di origine MySQL o Oracle. I database PostgreSQL e SQL Server non sono supportati.

Parametri del modello

Parametri obbligatori

  • inputFilePattern : la posizione del file per l'output del file Datastream in Cloud Storage, nel formato: gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat : il formato dei file di output prodotti da Datastream. Il valore può essere "avro" o "json". Valore predefinito: avro.
  • gcsPubSubSubscription : l'abbonamento Pub/Sub utilizzato da Cloud Storage per notificare a Dataflow i nuovi file disponibili per l'elaborazione, nel formato: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate : il nome del set di dati contenente le tabelle di staging. Questo parametro supporta i modelli, ad esempio {_metadata_dataset}_log o my_dataset_log. In genere, questo parametro è il nome di un set di dati. Il valore predefinito è {_metadata_dataset}.
  • outputDatasetTemplate : il nome del set di dati contenente le tabelle di replica. Questo parametro supporta i modelli, ad esempio {_metadata_dataset} o my_dataset. In genere, questo parametro è il nome di un set di dati. Il valore predefinito è {_metadata_dataset}.
  • deadLetterQueueDirectory : il percorso utilizzato da Dataflow per scrivere l'output della coda dei messaggi non recapitabili. Questo percorso non deve essere lo stesso del percorso dell'output del file Datastream. Il valore predefinito è vuoto.

Parametri facoltativi

  • streamName : il nome o il modello dello stream da sottoporre a polling per le informazioni sullo schema. Il valore predefinito è: {_metadata_stream}. In genere, il valore predefinito è sufficiente.
  • rfcStartDateTime : la data e l'ora di inizio da utilizzare per recuperare i dati da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : il numero di file DataStream simultanei da leggere. Il valore predefinito è 10.
  • outputProjectId : l'ID del progetto Google Cloud contenente i set di dati BigQuery in cui eseguire l'output dei dati. Il valore predefinito per questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • outputStagingTableNameTemplate : il modello da utilizzare per assegnare un nome alle tabelle di staging. Ad esempio, {_metadata_table}). Il valore predefinito è {_metadata_table}_log.
  • outputTableNameTemplate : il modello da utilizzare per il nome delle tabelle di replica, ad esempio {_metadata_table}. Il valore predefinito è: {_metadata_table}.
  • ignoreFields : campi separati da virgole da ignorare in BigQuery. Il valore predefinito è: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. (esempio: _metadata_stream,_metadata_schema).
  • mergeFrequencyMinutes : il numero di minuti tra le unioni per una determinata tabella. Il valore predefinito è 5.
  • dlqRetryMinutes : il numero di minuti tra i tentativi di esecuzione della coda DLQ. Il valore predefinito è 10.
  • dataStreamRootUrl : l'URL principale dell'API Datastream. Valore predefinito: https://datastream.googleapis.com/.
  • applyMerge : indica se disattivare le query MERGE per il job. Il valore predefinito è true.
  • mergeConcurrency : il numero di query MERGE di BigQuery simultanee. Ha effetto solo se applyMerge è impostato su true. Il valore predefinito è 30.
  • partitionRetentionDays : il numero di giorni da utilizzare per la conservazione delle partizioni durante l'esecuzione delle unioni BigQuery. Valore predefinito: 1.
  • useStorageWriteApiAtLeastOnce : questo parametro viene applicato solo se è attivata l'opzione "Utilizza l'API BigQuery Storage Write". Se il valore è true, vengono utilizzate le semantiche almeno una volta per l'API Storage Write. In caso contrario, vengono utilizzate le semantiche di esecuzione esattamente una volta. Il valore predefinito è false.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. (ad es. gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è 0, il ricaricamento dei DFF è disattivato. Il valore predefinito è 0.
  • pythonTextTransformGcsPath : il pattern del percorso Cloud Storage per il codice Python contenente le funzioni definite dall'utente. (ad esempio gs://your-bucket/your-transforms/*.py).
  • pythonRuntimeVersion : la versione del runtime da utilizzare per questa UDF Python.
  • pythonTextTransformFunctionName : il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, cifre e trattini bassi. (esempio: transform_udf1).
  • runtimeRetries : il numero di volte che verrà eseguito un nuovo tentativo di runtime prima del fallimento. Il valore predefinito è 5.
  • useStorageWriteApi : se true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec : quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha la seguente specifica:

  • Input: i dati del CDC, serializzati come stringa JSON.
  • Output: una stringa JSON che corrisponde allo schema della tabella di destinazione BigQuery.
  • Esegui il modello

    Console

    1. Vai alla pagina Crea job da modello di Dataflow.
    2. Vai a Crea job da modello
    3. Nel campo Nome job, inserisci un nome univoco per il job.
    4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

      Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

    5. Nel menu a discesa Modello di flusso di dati, seleziona the Datastream to BigQuery template.
    6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
    7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità flusso di dati Almeno una volta, seleziona Almeno una volta.
    8. Fai clic su Esegui job.

    gcloud

    Nella shell o nel terminale, esegui il modello:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome di job univoco a tua scelta
    • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: il percorso Cloud Storage per i dati di Datastream. Ad esempio: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: il nome del set di dati BigQuery.
    • BIGQUERY_TABLE: il modello di tabella BigQuery. Ad esempio, {_metadata_schema}_{_metadata_table}_log

    API

    Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • JOB_NAME: un nome di job univoco a tua scelta
    • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: il percorso Cloud Storage per i dati di Datastream. Ad esempio: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: il nome del set di dati BigQuery.
    • BIGQUERY_TABLE: il modello di tabella BigQuery. Ad esempio, {_metadata_schema}_{_metadata_table}_log

    Passaggi successivi