Utilizzare Datastream e Dataflow per trasmettere dati in BigQuery

In questa pagina sono riportate le best practice per l'utilizzo di Datastream e Dataflow per inviare dati in streaming a BigQuery.

Suddividi i set di dati delle repliche in base a chiavi definite dall'utente

Il set di dati gestione temporanea in BigQuery viene partizionato automaticamente. Tuttavia, per impostazione predefinita, il set di dati della replica non è partizionato perché le chiavi di partizione nelle tabelle della replica devono essere definite in base a una logica aziendale specifica, anziché essere applicate da Datastream e Dataflow.

Per ogni tabella del set di dati della replica che deve essere partizionata:

  1. Arresta e svuota il job Dataflow.

  2. Utilizza l'editor SQL in BigQuery per eseguire il seguente script SQL per ogni tabella nel set di dati della replica. Per questo esempio, la tabella actor nel set di dati datastream_cdc ha una colonna last_update che vogliamo impostare come chiave di partizione. Eseguendo lo script, la tabella viene ricreata con la chiave di partizione corretta.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Utilizza il modello da Datastream a BigQuery per ricreare un job Dataflow.

Eseguire funzioni definite dall'utente per manipolare i dati sugli eventi

Puoi utilizzare il modello da Datastream a BigQuery per eseguire una funzione JavaScript definita dall'utente. Per farlo, devi innanzitutto inserire un file contenente la funzione in una posizione specifica all'interno di Cloud Storage. Poi:

  • Utilizza il parametro javascriptTextTransformGcsPath nel modello per specificare la posizione del file in Cloud Storage che contiene la funzione definita dall'utente.
  • Utilizza il parametro javascriptTextTransformFunctionName per specificare il nome della funzione JavaScript che vuoi chiamare come funzione definita dall'utente.

Ad esempio, puoi eseguire una funzione definita dall'utente per conservare i record eliminati nelle tabelle del set di dati della replica in BigQuery. Questo processo è noto come eliminazione temporanea.

A questo scopo, crea una funzione che copi il valore della colonna _metadata_deleted in una nuova colonna denominata is_deleted, quindi reimposta il valore della colonna _metadata_deleted su false. In questo modo il job Dataflow ignora gli eventi di eliminazione e conserva i record eliminati durante l'aggiornamento del set di dati della replica in BigQuery.

Ecco il codice campione per questa funzione definita dall'utente:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Imposta la frequenza di unione

Utilizza il parametro mergeFrequencyMinutes del modello Datastream to BigQuery per impostare la frequenza di unione. Si tratta del numero di minuti tra le unioni per una determinata tabella nel set di dati della replica in BigQuery. Durante il backfill dei dati storici, ti consigliamo di mantenere bassa la frequenza di unione (12 o 24 ore) per tenere sotto controllo i costi.

Ad esempio, se imposti il valore di questo parametro su 10 minuti, Dataflow eseguirà il job che utilizza il modello ogni 10 minuti. Tuttavia, la prima volta che viene eseguito il job, verrà applicato un ritardo di 5 minuti. In questo esempio, se il job viene eseguito alle 09:14, la prima unione verrà eseguita alle 09:29 (10 minuti per l'unione e 5 minuti per il ritardo). La seconda unione verrà eseguita alle ore 09:39 e tutte le unioni successive verranno eseguite a intervalli di 10 minuti (09:49, 09:59, 10:09 e così via).

Se imposti la frequenza di unione su 60 minuti, il job verrà eseguito all'ora, dopo un ritardo di 5 minuti dall'esecuzione iniziale del job. Se il job è pianificato per l'esecuzione alle 10:00, verrà effettivamente eseguito alle 10:05 a causa del ritardo di 5 minuti. Tutte le unioni successive verranno eseguite a intervalli di 60 minuti (11:05, 12:05, 13:05 e così via).

Che si tratti di un controllo dei costi o di altri motivi, potresti non essere in grado di eseguire un'unione con una frequenza che soddisfi le esigenze della tua attività. Potresti non disporre dei dati più recenti. Per accedere ai dati più recenti, crea una vista sopra le tabelle dei set di dati di gestione temporanea e replica in BigQuery, dove la vista imita l'unione. Questa vista viene creata come un'unica tabella logica (sia per il set di dati gestione temporanea che per quello di replica). Se la frequenza di unione è bassa e hai bisogno di un accesso più rapido ai dati, utilizza la vista.