Modelli di flussi di modifiche di Bigtable a BigQuery

Il modello di flussi di modifiche di Bigtable a BigQuery è una pipeline di streaming che esegue il flusso dei record delle modifiche dei dati di Bigtable e li scrive in tabelle BigQuery utilizzando Dataflow.

Un flusso di modifiche Bigtable ti consente di iscriverti alle mutazioni dei dati in base alla tabella. Quando ti abboni agli stream di variazioni della tabella, si applicano i seguenti vincoli:

  • Vengono restituite solo le celle modificate e i descrittori delle operazioni di eliminazione.
  • Viene restituito solo il nuovo valore di una cella modificata.

Quando i record delle modifiche dei dati vengono scritti in BigQuery, le righe potrebbero essere inserite fuori ordine rispetto all'ordinamento del timestamp del commit di Bigtable originale.

Le righe della tabella del log delle modifiche che non possono essere scritte in BigQuery a causa di un errore persistente sono inserito in modo permanente in una directory di messaggi non recapitabili (coda di messaggi non elaborati) in Cloud Storage per la revisione umana o ulteriore elaborazione da parte dell'utente.

Se la tabella BigQuery necessaria non esiste, la pipeline la crea. Altrimenti, viene utilizzata la tabella BigQuery esistente. Lo schema delle tabelle BigQuery esistenti deve contenere le colonne della tabella seguente.

Ogni nuova riga BigQuery include un record di modifica dei dati restituito dallo stream di variazioni dalla riga corrispondente nella tabella Bigtable.

Schema della tabella di output BigQuery

Nome colonna Tipo Ammette valori Null Descrizione
row_key STRING o BYTES No La chiave riga della riga modificata. Quando l'opzione pipeline writeRowkeyAsBytes è impostata su true, il tipo di colonna deve essere BYTES. In caso contrario, utilizza il tipo STRING.
mod_type STRING No Il tipo di mutazione della riga. Utilizza uno dei seguenti valori: SET_CELL, DELETE_CELLS o DELETE_FAMILY.
column_family STRING No La famiglia di colonne interessata dalla mutazione della riga.
column STRING Il qualificatore di colonna interessato dalla mutazione della riga. Per il tipo di mutazione DELETE_FAMILY, imposta NULL.
commit_timestamp TIMESTAMP No Il momento in cui Bigtable applica la mutazione.
big_query_commit_timestamp TIMESTAMP (Facoltativo) Specifica l'ora in cui BigQuery scrive la riga in una tabella di output. Il campo non viene compilato se il nome della colonna è presente nel valore dell'opzione pipeline bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP o INT64 Il valore del timestamp della cella interessata dalla mutazione. Se l'opzione pipeline writeNumericTimestamps è impostata su true, il tipo di colonna deve essere INT64. In caso contrario, usa il tipo TIMESTAMP. Per i tipi di mutazione DELETE_CELLS e DELETE_FAMILY, imposta su NULL.
timestamp_from TIMESTAMP o INT64 Descrive un inizio inclusivo dell'intervallo timestamp per tutte le celle eliminate dalla mutazione DELETE_CELLS. Per altri tipi di mutazione, imposta su NULL.
timestamp_to TIMESTAMP o INT64 Descrive una fine esclusiva dell'intervallo timestamp per tutte le celle eliminate dalla mutazione DELETE_CELLS. Per altri tipi di mutazione, imposta su NULL.
is_gc BOOL No (Facoltativo) Quando la mutazione viene attivata da un criterio di garbage collection, imposta il valore su true. In tutti gli altri casi, imposta su false. Il campo non viene compilato quando il nome della colonna è presente nel valore dell'opzione della pipeline bigQueryChangelogTableFieldsToIgnore.
source_instance STRING No (Facoltativo) Descrive il nome dell'istanza Bigtable da cui proviene la mutazione. Il campo non viene compilato se è presente il nome della colonna nel valore dell'opzione pipeline bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING No (Facoltativo) Descrive il nome del cluster Bigtable da cui proviene la mutazione. Il campo non viene compilato quando il nome della colonna è presente nel valore dell'opzione della pipeline bigQueryChangelogTableFieldsToIgnore.
source_table STRING No (Facoltativo) Descrive il nome della tabella Bigtable a cui si applica la mutazione. Il valore in questa colonna potrebbe essere utile se più tabelle Bigtable trasmettono modifiche alla stessa tabella BigQuery. Il campo non viene compilato se è presente il nome della colonna nel valore dell'opzione pipeline bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 No (Facoltativo) Quando due mutazioni vengono registrate contemporaneamente da cluster Bigtable diversi, alla tabella di origine viene applicata la mutazione con il valore tiebreaker più alto. Le mutazioni con valori tiebreaker più bassi vengono ignorate. Il campo non viene compilato se è presente il nome della colonna nel valore dell'opzione pipeline bigQueryChangelogTableFieldsToIgnore.
value STRING o BYTES Il nuovo valore impostato dalla mutazione. Se l'opzione pipeline writeValuesAsBytes è impostata su true, il tipo di colonna deve essere BYTES. In caso contrario, utilizza il tipo STRING. Il valore è impostato per SET_CELL mutazioni. Per gli altri tipi di mutazione, il valore è impostato su NULL.

Requisiti della pipeline

  • L'istanza di origine Bigtable specificata.
  • La tabella di origine Bigtable specificata. Per la tabella devono essere abilitati i flussi di modifiche.
  • Il profilo di applicazione Bigtable specificato.
  • Il set di dati di destinazione BigQuery specificato.

Parametri del modello

Parametri obbligatori

  • bigQueryDataset : il nome del set di dati della tabella BigQuery di destinazione.
  • bigtableChangeStreamAppProfile: l'ID profilo dell'applicazione Bigtable. Il profilo di applicazione deve utilizzare il routing a cluster singolo e consentire transazioni su riga singola.
  • bigtableReadInstanceId : l'ID dell'istanza Bigtable di origine.
  • bigtableReadTableId : l'ID tabella Bigtable di origine.

Parametri facoltativi

  • writeRowkeyAsBytes : se scrivere chiavi-riga come BigQuery BYTES. Se impostato su true, le chiavi di riga vengono scritte nella colonna BYTES. In caso contrario, i valori rowkey vengono scritti nella colonna STRING. Il valore predefinito è false.
  • writeValuesAsBytes : se viene impostato un valore true, i valori veri vengono scritti nella colonna BYTES, altrimenti nella colonna STRING. Il valore predefinito è false.
  • writeNumericTimestamps : se scrivere il timestamp di Bigtable come BigQuery INT64. Se il criterio è impostato su true, i valori vengono scritti nella colonna INT64. In caso contrario, i valori vengono scritti nella colonna TIMESTAMP. Colonne interessate: timestamp, timestamp_from e timestamp_to. Il valore predefinito è false. Se impostato su true, il tempo viene misurato in microsecondi dall'epoca di Unix (1° gennaio 1970 UTC).
  • bigQueryProjectId : l'ID del progetto del set di dati BigQuery. Il valore predefinito è il progetto per il job Dataflow.
  • bigQueryChangelogTableName : nome della tabella BigQuery di destinazione. Se non specificato, viene utilizzato il valore bigtableReadTableId + "_changelog". Il campo predefinito è vuoto.
  • bigQueryChangelogTablePartitionGranularity: specifica una granularità per il partizionamento della tabella del log delle modifiche. Se impostato, la tabella viene partizionata. Utilizza uno dei seguenti valori supportati: HOUR, DAY, MONTH o YEAR. Per impostazione predefinita, la tabella non è partizionata.
  • bigQueryChangelogTablePartitionExpirationMs : imposta la scadenza della partizione della tabella del log delle modifiche, in millisecondi. Se impostato su true, le partizioni precedenti al numero di millisecondi specificato vengono eliminate. Per impostazione predefinita, non viene impostata alcuna scadenza.
  • bigQueryChangelogTableFieldsToIgnore : un elenco separato da virgole di colonne del log delle modifiche che, se specificate, non vengono create né compilate. Utilizza uno dei seguenti valori supportati: is_gc, source_instance, source_cluster, source_table, tiebreaker o big_query_commit_timestamp. Per impostazione predefinita, tutte le colonne sono compilate.
  • dlqDirectory: la directory da utilizzare per la coda delle email inutilizzate. I record che non vengono elaborati vengono archiviati in questa directory. L'impostazione predefinita è una directory sotto la località temporanea del job Dataflow. Nella maggior parte dei casi, puoi utilizzare il percorso predefinito.
  • bigtableChangeStreamMetadataInstanceId: l'ID istanza dei metadati delle modifiche in tempo reale di Bigtable. Il valore predefinito è vuoto.
  • bigtableChangeStreamMetadataTableTableId: l'ID della tabella dei metadati del connettore delle modifiche in tempo reale di Bigtable. Se non viene fornita, una tabella dei metadati del connettore dei flussi di modifiche Bigtable viene creata automaticamente durante l'esecuzione della pipeline. Il valore predefinito è vuoto.
  • bigtableChangeStreamCharset : il nome del set di caratteri modifiche in tempo reale di Bigtable. Il valore predefinito è: UTF-8.
  • bigtableChangeStreamStartTimestamp : il timestamp iniziale (https://tools.ietf.org/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ad esempio: 2022-05-05T07:59:59Z. Il valore predefinito è il timestamp dell'ora di inizio della pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: un elenco separato da virgole di modifiche ai nomi delle famiglie di colonne da ignorare. Il valore predefinito è vuoto.
  • bigtableChangeStreamIgnoreColumns: un elenco separato da virgole di modifiche ai nomi di colonna da ignorare. Il valore predefinito è vuoto.
  • bigtableChangeStreamName : un nome univoco per la pipeline del client. Ti consente di riprendere l'elaborazione dal punto in cui è stata interrotta una pipeline in esecuzione in precedenza. Il nome predefinito è generato automaticamente. Controlla i log del job Dataflow per il valore utilizzato.
  • bigtableChangeStreamResume : se impostato su true, l'elaborazione di una nuova pipeline riprende dal punto in cui è stata arrestata una pipeline precedentemente in esecuzione con lo stesso valore bigtableChangeStreamName. Se la pipeline con il valore bigtableChangeStreamName specificato non è mai stata eseguita, non ne viene avviata una nuova. Se impostato su false, viene avviata una nuova pipeline. Se è già stata eseguita una pipeline con lo stesso valore bigtableChangeStreamName per l'origine specificata, non ne viene avviata una nuova. Il valore predefinito è false.
  • bigtableReadProjectId : l'ID progetto Bigtable. Il valore predefinito è il progetto per il job Dataflow.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. Il valore predefinito è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bigtable change streams to BigQuery template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • BIGTABLE_INSTANCE_ID: l'ID istanza Bigtable.
  • BIGTABLE_TABLE_ID: l'ID della tua tabella Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: l'ID del profilo dell'applicazione Bigtable.
  • BIGQUERY_DESTINATION_DATASET: il nome del set di dati di destinazione BigQuery

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sul API e i relativi ambiti di autorizzazione, consulta projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: L'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco di tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi di eseguire il deployment del job Dataflow, ad esempio us-central1
  • BIGTABLE_INSTANCE_ID: l'ID dell'istanza Bigtable.
  • BIGTABLE_TABLE_ID: l'ID della tua tabella Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: l'ID del profilo dell'applicazione Bigtable.
  • BIGQUERY_DESTINATION_DATASET: il nome del set di dati di destinazione BigQuery

Passaggi successivi