Modifiche in tempo reale di Bigtable per il modello Pub/Sub

Le modifiche in tempo reale di Bigtable al modello Pub/Sub è una pipeline di inserimento flussi che trasmette i flussi di record delle modifiche dei dati di Bigtable e li pubblica in un argomento Pub/Sub utilizzando Dataflow.

Una modifica in tempo reale di Bigtable consente di sottoscrivere mutazioni dei dati in base alla tabella. Quando ti abboni alle modifiche in tempo reale delle tabelle, si applicano i seguenti vincoli:

  • Vengono restituite solo le celle modificate e i descrittori delle operazioni di eliminazione.
  • Viene restituito solo il nuovo valore di una cella modificata.

Quando i record delle modifiche dei dati vengono pubblicati in un argomento Pub/Sub, i messaggi possono essere inseriti in ordine non corretto rispetto all'ordine originale del timestamp di commit di Bigtable.

I record delle modifiche dei dati di Bigtable che non possono essere pubblicati negli argomenti Pub/Sub vengono temporaneamente inseriti in una directory di code di messaggi non recapitabili (coda di messaggi non elaborati) in Cloud Storage. Dopo il numero massimo di nuovi tentativi non riusciti, questi record vengono inseriti in modo permanente nella stessa directory della coda dei messaggi non recapitabili per la revisione umana o un'ulteriore elaborazione da parte dell'utente.

La pipeline richiede l'esistenza dell'argomento Pub/Sub di destinazione. L'argomento di destinazione potrebbe essere configurato per convalidare i messaggi utilizzando uno schema. Quando un argomento Pub/Sub specifica uno schema, la pipeline viene avviata solo se lo schema è valido. A seconda del tipo di schema, utilizza una delle seguenti definizioni dello schema per l'argomento di destinazione:

Buffer di protocollo

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Utilizza il seguente schema Protobuf con codifica dei messaggi JSON:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Ogni nuovo messaggio Pub/Sub include una voce di un record delle modifiche dei dati restituito dal flusso di modifiche dalla riga corrispondente nella tabella Bigtable. Il modello Pub/Sub riunisce le voci di ogni record delle modifiche ai dati in modifiche a livello di cella.

Descrizione del messaggio di output Pub/Sub

Nome campo Descrizione
rowKey La chiave di riga della riga modificata. Arriva sotto forma di array di byte. Quando è configurata la codifica dei messaggi JSON, le chiavi di riga vengono restituite come stringhe. Quando useBase64Rowkeys è specificato, le chiavi di riga hanno codifica Base64. In caso contrario, viene utilizzato un set di caratteri specificato da bigtableChangeStreamCharset per decodificare i byte chiave di riga in una stringa.
modType Il tipo di mutazione della riga. Utilizza uno dei seguenti valori: SET_CELL, DELETE_CELLS o DELETE_FAMILY.
columnFamily La famiglia di colonne interessata dalla mutazione della riga.
column Il qualificatore di colonna interessato dalla mutazione della riga. Per il tipo di mutazione DELETE_FAMILY, il campo della colonna non è impostato. Arriva sotto forma di array di byte. Quando viene configurata la codifica di messaggi JSON, le colonne vengono restituite come stringhe. Quando useBase64ColumnQualifier è specificato, il campo della colonna è con codifica Base64. In caso contrario, viene utilizzato un set di caratteri specificato da bigtableChangeStreamCharset per decodificare i byte chiave di riga in una stringa.
commitTimestamp Data e ora in cui Bigtable applica la mutazione. L'ora è misurata in microsecondi dall'epoca di Unix (1 gennaio 1970 alle UTC).
timestamp Il valore del timestamp della cella interessata dalla mutazione. Per i tipi di mutazione DELETE_CELLS e DELETE_FAMILY, il timestamp non è impostato. L'ora è misurata in microsecondi dall'epoca di Unix (1 gennaio 1970 alle UTC).
timestampFrom Descrive un inizio inclusivo dell'intervallo timestamp per tutte le celle eliminate dalla mutazione DELETE_CELLS. Per gli altri tipi di mutazione, timestampFrom non è impostato. L'ora è misurata in microsecondi dall'epoca di Unix (1 gennaio 1970 alle UTC).
timestampTo Descrive una fine esclusiva dell'intervallo timestamp per tutte le celle eliminate dalla mutazione DELETE_CELLS. Per gli altri tipi di mutazione, timestampTo non è impostato.
isGC Un valore booleano che indica se la mutazione è generata da un meccanismo di garbage collection di Bigtable.
tieBreaker Quando due mutazioni vengono registrate contemporaneamente da cluster Bigtable diversi, la mutazione con il valore tiebreaker più alto viene applicata alla tabella di origine. Le mutazioni con valori tiebreaker più bassi vengono ignorate.
value Il nuovo valore impostato dalla mutazione. A meno che non sia impostata l'opzione pipeline stripValues, il valore è impostato per le mutazioni SET_CELL. Per gli altri tipi di mutazione, il valore non è impostato. Arriva sotto forma di array di byte. Quando viene configurata la codifica dei messaggi JSON, i valori vengono restituiti come stringhe. Quando useBase64Values è specificato, il valore è codificato Base64. In caso contrario, viene utilizzato un set di caratteri specificato da bigtableChangeStreamCharset per decodificare i byte di valore in una stringa.
sourceInstance Il nome dell'istanza Bigtable in cui è stata registrata la mutazione. Potrebbe verificarsi quando più pipeline eseguono il flusso di modifiche da istanze diverse allo stesso argomento Pub/Sub.
sourceCluster Il nome del cluster Bigtable che ha registrato la mutazione. Può essere utilizzato quando più pipeline trasmettono modifiche in modalità flusso da istanze diverse allo stesso argomento Pub/Sub.
sourceTable Il nome della tabella Bigtable che ha ricevuto la mutazione. Può essere utilizzata nel caso in cui un flusso di più pipeline cambi da tabelle diverse allo stesso argomento Pub/Sub.

Requisiti della pipeline

  • L'istanza di origine Bigtable specificata.
  • La tabella di origine Bigtable specificata. Nella tabella devono essere abilitati modifiche in tempo reale.
  • Il profilo di applicazione Bigtable specificato.
  • L'argomento Pub/Sub specificato deve esistere.

Parametri del modello

Parametri obbligatori

  • pubSubTopic : il nome dell'argomento Pub/Sub di destinazione.
  • bigtableChangeStreamAppProfile : l'ID profilo dell'applicazione Bigtable. Il profilo di applicazione deve utilizzare il routing a cluster singolo e consentire transazioni su riga singola.
  • bigtableReadInstanceId : l'ID dell'istanza Bigtable di origine.
  • bigtableReadTableId : l'ID tabella Bigtable di origine.

Parametri facoltativi

  • messageEncoding : la codifica dei messaggi da pubblicare nell'argomento Pub/Sub. Quando viene configurato lo schema dell'argomento di destinazione, la codifica dei messaggi è determinata dalle impostazioni dell'argomento. Sono supportati i seguenti valori: BINARY e JSON. Il valore predefinito è JSON.
  • messageFormat : la codifica dei messaggi da pubblicare nell'argomento Pub/Sub. Quando viene configurato lo schema dell'argomento di destinazione, la codifica dei messaggi è determinata dalle impostazioni dell'argomento. Sono supportati i seguenti valori: AVRO, PROTOCOL_BUFFERS e JSON. Il valore predefinito è JSON. Quando viene utilizzato il formato JSON, i campi rowKey, colonna e valore del messaggio sono stringhe i cui contenuti sono determinati dalle opzioni della pipeline useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values e bigtableChangeStreamCharset.
  • stripValues : se impostato su true, le mutazioni di SET_CELL vengono restituite senza l'impostazione di nuovi valori. Il valore predefinito è false. Questo parametro è utile quando non è necessario un nuovo valore (noto anche come invalidazione della cache) o quando i valori sono estremamente grandi e superano i limiti delle dimensioni dei messaggi Pub/Sub.
  • dlqDirectory : la directory per la coda dei messaggi non recapitabili. I record che non vengono elaborati vengono archiviati in questa directory. Il valore predefinito è una directory nella località temporanea del job Dataflow. Nella maggior parte dei casi, puoi utilizzare il percorso predefinito.
  • dlqRetryMinutes : il numero di minuti tra i nuovi tentativi in una coda di messaggi non recapitabili. Il valore predefinito è 10.
  • dlqMaxRetries : il numero massimo di nuovi tentativi non recapitabili. Il valore predefinito è 5.
  • useBase64Rowkeys : utilizzata con la codifica dei messaggi JSON. Se impostato su true, il campo rowKey è una stringa con codifica Base64. In caso contrario, rowKey viene generato utilizzando bigtableChangeStreamCharset per decodificare i byte in una stringa. Il valore predefinito è false.
  • pubSubProjectId : l'ID progetto Bigtable. Il valore predefinito è il progetto del job Dataflow.
  • useBase64ColumnQualifiers : utilizzato con la codifica dei messaggi JSON. Se impostato su true, il campo column è una stringa con codifica Base64. In caso contrario, la colonna viene prodotta utilizzando bigtableChangeStreamCharset per decodificare i byte in una stringa. Il valore predefinito è false.
  • useBase64Values : utilizzato con la codifica dei messaggi JSON. Se impostato su true, il campo del valore è una stringa con codifica Base64. In caso contrario, il valore viene prodotto utilizzando bigtableChangeStreamCharset per decodificare i byte in una stringa. Il valore predefinito è false.
  • disableDlqRetries : indica se disabilitare o meno i nuovi tentativi per la DLQ. Il valore predefinito è false.
  • bigtableChangeStreamMetadataInstanceId : la modifica di Bigtable trasmette l'ID istanza dei metadati. Il campo predefinito è vuoto.
  • bigtableChangeStreamMetadataTableTableId : l'ID della tabella dei metadati del connettore delle modifiche in tempo reale di Bigtable. Se non viene specificata, durante l'esecuzione della pipeline viene creata automaticamente una tabella dei metadati del connettore delle modifiche in tempo reale di Bigtable. Il campo predefinito è vuoto.
  • bigtableChangeStreamCharset : il nome del set di caratteri modifiche in tempo reale di Bigtable. Il valore predefinito è: UTF-8.
  • bigtableChangeStreamStartTimestamp : il timestamp iniziale (https://tools.ietf.org/html/rfc3339), incluso, da utilizzare per la lettura delle modifiche in tempo reale. Ad esempio: 2022-05-05T07:59:59Z. Il valore predefinito è il timestamp dell'ora di inizio della pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies : un elenco separato da virgole di modifiche al nome della famiglia di colonne da ignorare. Il campo predefinito è vuoto.
  • bigtableChangeStreamIgnoreColumns : un elenco separato da virgole di modifiche ai nomi delle colonne da ignorare. Il campo predefinito è vuoto.
  • bigtableChangeStreamName : un nome univoco per la pipeline del client. Consente di riprendere l'elaborazione dal momento in cui è stata arrestata una pipeline in esecuzione in precedenza. Il nome predefinito è generato automaticamente. Controlla i log del job Dataflow per il valore utilizzato.
  • bigtableChangeStreamResume : se impostato su true, l'elaborazione di una nuova pipeline riprende dal punto in cui è stata interrotta una pipeline precedentemente in esecuzione con lo stesso valore bigtableChangeStreamName. Se la pipeline con il valore bigtableChangeStreamName specificato non è mai stata eseguita, non ne viene avviata una nuova. Se il valore è impostato su false, viene avviata una nuova pipeline. Se è già stata eseguita una pipeline con lo stesso valore bigtableChangeStreamName per l'origine specificata, non ne viene avviata una nuova. Il valore predefinito è false.
  • bigtableReadProjectId : l'ID progetto Bigtable. Il valore predefinito è il progetto per il job Dataflow.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Bigtable change streams to Pub/Sub template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • BIGTABLE_INSTANCE_ID: l'ID dell'istanza Bigtable.
  • BIGTABLE_TABLE_ID: l'ID della tua tabella Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: l'ID del profilo della tua applicazione Bigtable.
  • PUBSUB_TOPIC: il nome dell'argomento di destinazione Pub/Sub

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

    • latest per utilizzare la versione più recente del modello, disponibile nella cartella padre non con data del bucket: gs://dataflow-templates-REGION_NAME/latest/
    • il nome della versione, ad esempio 2023-09-12-00_RC00, per utilizzare una versione specifica del modello, che è possibile trovare nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • BIGTABLE_INSTANCE_ID: l'ID dell'istanza Bigtable.
  • BIGTABLE_TABLE_ID: l'ID della tua tabella Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: l'ID del profilo della tua applicazione Bigtable.
  • PUBSUB_TOPIC: il nome dell'argomento di destinazione Pub/Sub

Passaggi successivi