Streams de alteração do Bigtable para modelo do Pub/Sub

O modelo de stream do Bigtable para stream do Pub/Sub é um pipeline de streaming que transmite registros de alteração de dados do Bigtable e os publica em um tópico do Pub/Sub usando o Dataflow.

Um fluxo de alterações do Bigtable permite a inscrição em mutações de dados por tabela. Quando você se inscreve em streams de alteração de tabela, as seguintes restrições se aplicam:

  • Somente células modificadas e descritores de operações de exclusão são retornados.
  • Somente o novo valor de uma célula modificada é retornado.

Quando os registros de alteração de dados são publicados em um tópico do Pub/Sub, as mensagens podem ser inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Bigtable.

Os registros de alteração de dados do Bigtable que não podem ser publicados em tópicos do Pub/Sub são colocados temporariamente em um diretório de fila de mensagens inativas (fila de mensagens não processadas) no Cloud Storage. Após o número máximo de tentativas malsucedidas, esses registros são colocados indefinidamente no mesmo diretório de fila de mensagens inativas para revisão humana ou processamento adicional pelo usuário.

O pipeline requer que o tópico de destino do Pub/Sub exista. O tópico de destino pode ser configurado para validar mensagens usando um esquema. Quando um tópico do Pub/Sub especifica um esquema, o pipeline só começa se o esquema for válido. Dependendo do tipo de esquema, use uma das seguintes definições de esquema para o tópico de destino:

Buffers de protocolo

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Use o seguinte esquema Protobuf com a codificação de mensagem JSON:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Cada nova mensagem do Pub/Sub inclui um registro de alteração de dados retornado pelo stream de alteração da linha correspondente na tabela do Bigtable.

Descrição da mensagem de saída do Pub/Sub

Nome do campo Descrição
rowKey A chave da linha alterada. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as chaves de linha são retornadas como strings. Quando useBase64Rowkeys é especificado, as chaves de linha são codificadas em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string.
modType O tipo de mutação de linha. Use um dos seguintes valores: SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
columnFamily O grupo de colunas afetado pela mutação de linha.
column O qualificador de coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY, o campo de coluna não está definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as colunas são retornadas como strings. Quando useBase64ColumnQualifier é especificado, o campo da coluna é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string.
commitTimestamp A hora em que o Bigtable aplica a mutação. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
timestamp O valor do carimbo de data/hora da célula afetada pela mutação. Para tipos de mutação DELETE_CELLS e DELETE_FAMILY, o carimbo de data/hora não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
timestampFrom Descreve um início inclusivo do intervalo do carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS. Para outros tipos de mutação, timestampFrom não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
timestampTo Descreve um fim exclusivo do intervalo de carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS. Para outros tipos de mutação, timestampTo não é definido.
isGC Um valor booleano que indica se a mutação é gerada por um mecanismo de coleta de lixo do Bigtable.
tieBreaker Quando duas mutações são registradas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o maior valor tiebreaker é aplicada à tabela de origem. Mutações com valores tiebreaker menores são descartadas.
value O novo valor definido pela mutação. A menos que a opção de pipeline stripValues esteja definida, o valor será definido para mutações SET_CELL. Para outros tipos de mutação, o valor não é definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, os valores são retornados como strings. Quando useBase64Values é especificado, o valor é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes do valor em uma string.
sourceInstance O nome da instância do Bigtable que registrou a mutação. Pode ser quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub.
sourceCluster O nome do cluster do Bigtable que registrou a mutação. Pode ser usado quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub.
sourceTable O nome da tabela do Bigtable que recebeu a mutação. Pode ser usado no caso de um fluxo de vários pipelines mudar de tabelas diferentes para o mesmo tópico do Pub/Sub.

Requisitos de pipeline

  • A instância de origem especificada do Bigtable.
  • A tabela de origem do Bigtable especificada. A tabela precisa ter fluxos de alterações ativados.
  • O perfil de aplicativo do Bigtable especificado.
  • O tópico especificado do Pub/Sub precisa existir.

Parâmetros do modelo

Parâmetro Descrição
bigtableReadInstanceId O ID da instância do Bigtable de origem.
bigtableReadTableId O ID da tabela de origem do Bigtable.
bigtableChangeStreamAppProfile O ID do perfil do aplicativo Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única.
pubSubTopic O nome do tópico do Pub/Sub de destino.
messageFormat Opcional: quando o tópico de destino tem um esquema configurado, o formato da mensagem é determinado pelo esquema e pela codificação configurados. O formato das mensagens a serem publicadas no tópico do Pub/Sub. Valores aceitos: AVRO, PROTOCOL_BUFFERS e JSON. O padrão é JSON. Quando o formato JSON é usado, os campos rowKey, column e value da mensagem são strings, cujo conteúdo é determinado pelas opções de pipeline useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values e bigtableChangeStreamCharset.
messageEncoding Opcional: quando o tópico de destino tiver um esquema configurado, a codificação da mensagem será determinada pelas configurações do tópico. A codificação das mensagens a serem publicadas no tópico do Pub/Sub. Valores aceitos: BINARY, JSON e JSON. O padrão é JSON.
stripValues Opcional: quando definido como true, as mutações SET_CELL são retornadas sem novos valores definidos. O padrão é "false". Esse parâmetro é útil quando você não precisa de um novo valor, também conhecido como invalidação de cache, ou quando os valores são extremamente grandes e excedem os limites de tamanho de mensagens do Pub/Sub.
bigtableReadProjectId Opcional: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
pubSubProjectId Opcional: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
bigtableChangeStreamMetadataInstanceId Opcional: a alteração do Bigtable faz streaming do ID da instância de metadados.
bigtableChangeStreamMetadataTableTableId Opcional: a alteração do Bigtable faz streaming do ID da tabela de metadados.
bigtableChangeStreamCharset Opcional: a alteração do Bigtable faz o stream do nome do conjunto de caracteres ao ler as chaves de linha, os valores e os qualificadores de coluna. Essa opção é usada quando a codificação da mensagem é JSON.
bigtableChangeStreamStartTimestamp Opcional: o carimbo de data/hora de início, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, 2022-05-05T07:59:59Z. O padrão é o carimbo de data/hora do horário de início do pipeline.
bigtableChangeStreamIgnoreColumnFamilies Opcional: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado.
bigtableChangeStreamIgnoreColumns Opcional: uma lista separada por vírgulas do nome da coluna é ignorada.
bigtableChangeStreamName Opcional: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é o nome gerado automaticamente. Para encontrar o valor, consulte os registros do job do Dataflow.
bigtableChangeStreamResume Opcional: quando definido como true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline em execução anteriormente com o mesmo valor de bigtableChangeStreamName é interrompido. Se o pipeline com o valor bigtableChangeStreamName fornecido nunca tiver sido executado, um novo pipeline não será iniciado. Quando definido como false, um novo pipeline é iniciado. Se um pipeline com o mesmo valor bigtableChangeStreamName já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão é false.
useBase64Rowkeys Opcional: usado com codificação de mensagem JSON. Quando definido como true, o campo rowKey é uma string codificada em Base64. Caso contrário, o rowKey é produzido usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false.
useBase64ColumnQualifiers Opcional: usado com codificação de mensagem JSON. Quando definido como true, o campo column é uma string codificada em Base64. Caso contrário, a coluna será produzida usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false.
useBase64Values Opcional: usado com codificação de mensagem JSON. Quando definido como true, o campo value é uma string codificada em Base64. Caso contrário, o valor será produzido usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false.
dlqMaxRetries Opcional: o máximo de novas mensagens inativas. O valor padrão é 5.
dlqRetryMinutes Opcional: o número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) O valor padrão é 10.
dlqDirectory Opcional: o diretório da fila de mensagens inativas. Os registros que não forem processados são armazenados nesse diretório. O padrão é um diretório no local temporário do job do Dataflow. Na maioria dos casos, é possível usar o caminho padrão.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Bigtable change streams to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1.
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID do perfil do aplicativo Bigtable.
  • PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1.
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID do perfil do aplicativo Bigtable.
  • PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub

A seguir