Utilizzare Datastream e Dataflow per trasmettere dati in BigQuery

In questa pagina troverai le best practice per l'utilizzo di Datastream e Dataflow per trasmettere dati in streaming a BigQuery.

Partizionare i set di dati delle repliche in base alle chiavi definite dall'utente

Il set di dati di staging in BigQuery viene partizionato automaticamente. Tuttavia, per impostazione predefinita, il set di dati di replica non è partizionato perché le chiavi di partizione nelle tabelle di replica devono essere definite in base a una logica aziendale specifica, anziché essere applicate da Datastream e Dataflow.

Per ogni tabella nel set di dati di replica che deve essere partizionata:

  1. Arresta e svuota il job Dataflow

  2. Utilizza l'editor SQL in BigQuery per eseguire il seguente script SQL per ogni tabella nel set di dati di replica. Per questo esempio, la tabella actor nel set di dati datastream_cdc ha una colonna last_update che vogliamo impostare come chiave di partizionamento. Eseguendo lo script, ricrei la tabella con la chiave di partizione corretta.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Utilizza il modello Datastream a BigQuery per ricreare un job Dataflow.

Esegui funzioni definite dall'utente per manipolare i dati degli eventi

Puoi utilizzare il modello Datastream a BigQuery per eseguire una funzione definita dall'utente JavaScript. Per farlo, innanzitutto posiziona un file contenente la funzione in una posizione specifica all'interno di Cloud Storage. Poi, procedi nel seguente modo:

  • Utilizza il parametro javascriptTextTransformGcsPath nel modello per specificare la posizione del file in Cloud Storage che contiene la funzione definita dall'utente#39;utente.
  • Utilizza il parametro javascriptTextTransformFunctionName per specificare il nome della funzione JavaScript che vuoi chiamare come funzione definita dall'utente.

Ad esempio, puoi eseguire una funzione definita dall'utente#39;utente per conservare i record eliminati nelle tabelle del set di dati di replica in BigQuery. Questa procedura è nota come eliminazione temporanea.

Per farlo, crea una funzione che copi il valore della colonna _metadata_deleted in una nuova colonna denominata is_deleted e poi reimposti il valore della colonna _metadata_deleted su false. In questo modo, il job Dataflow ignora gli eventi di eliminazione e conserva i record eliminati quando aggiorna il set di dati di replica in BigQuery.

Ecco il codice campione per questa funzione definita dall'utente dall'utente:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Impostare la frequenza di unione

Utilizza il parametro mergeFrequencyMinutes del modello Datastream a BigQuery per impostare la frequenza di unione. È il numero di minuti tra le unioni per una determinata tabella nel set di dati di replica in BigQuery. Durante il riempimento dei dati storici, ti consigliamo di mantenere bassa la frequenza di unione (12 o 24 ore) per tenere sotto controllo i costi.

Ad esempio, se imposti il valore di questo parametro su 10 minuti, Dataflow eseguirà il job che utilizza il modello ogni 10 minuti. Tuttavia, la prima volta che viene eseguito il job, si verificherà un ritardo di 5 minuti. Per questo esempio, se il job viene eseguito alle 9:14, il primo merge verrà eseguito alle 9:29 (10 minuti per il merge e 5 minuti per il ritardo). La seconda unione avverrà alle 9:39 e tutte le unioni successive avverranno a intervalli di 10 minuti (9:49, 9:59, 10:09 e così via).

Se imposti la frequenza di unione su 60 minuti, il job verrà eseguito ogni ora, dopo un ritardo di 5 minuti per l'esecuzione iniziale. Se l'esecuzione del job è pianificata per le 10:00, verrà eseguita alle 10:05 a causa del ritardo di 5 minuti. Tutte le unioni successive avverranno a intervalli di 60 minuti (11:05, 12:05, 13:05 e così via).

Che sia per controllare i costi o per altri motivi, potresti non essere in grado di eseguire un'unione con una frequenza che soddisfi le esigenze della tua attività. Potresti non avere i dati più recenti. Per accedere ai dati più recenti, crea una vista sopra le tabelle dei set di dati di staging e di replica in BigQuery, in cui la vista simula l'unione. Questa vista viene creata come una tabella logica (sia per i set di dati di staging che per quelli di replica). Se la frequenza di unione è bassa e hai bisogno di accedere più rapidamente ai dati, utilizza la visualizzazione.