Streams de alteração do Bigtable para modelo do Pub/Sub

O modelo de stream do Bigtable para stream do Pub/Sub é um pipeline de streaming que transmite registros de alteração de dados do Bigtable e os publica em um tópico do Pub/Sub usando o Dataflow.

Um fluxo de alterações do Bigtable permite a inscrição em mutações de dados por tabela. Quando você se inscreve em streams de alteração de tabela, as seguintes restrições se aplicam:

  • Somente células modificadas e descritores de operações de exclusão são retornados.
  • Somente o novo valor de uma célula modificada é retornado.

Quando os registros de alteração de dados são publicados em um tópico do Pub/Sub, as mensagens podem ser inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Bigtable.

Os registros de alteração de dados do Bigtable que não podem ser publicados em tópicos do Pub/Sub são colocados temporariamente em um diretório de fila de mensagens inativas (fila de mensagens não processadas) no Cloud Storage. Após o número máximo de tentativas malsucedidas, esses registros são colocados indefinidamente no mesmo diretório de fila de mensagens inativas para revisão humana ou processamento adicional pelo usuário.

O pipeline requer que o tópico de destino do Pub/Sub exista. O tópico de destino pode ser configurado para validar mensagens usando um esquema. Quando um tópico do Pub/Sub especifica um esquema, o pipeline só começa se o esquema for válido. Dependendo do tipo de esquema, use uma das seguintes definições de esquema para o tópico de destino:

Buffers de protocolo

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Use o seguinte esquema Protobuf com a codificação de mensagem JSON:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Cada nova mensagem do Pub/Sub inclui uma entrada de um registro de alteração de dados retornado pelo fluxo de alterações da linha correspondente na tabela do Bigtable. O modelo do Pub/Sub nivela as entradas em cada registro de alteração de dados em alterações individuais no nível da célula.

Descrição da mensagem de saída do Pub/Sub

Nome do campo Descrição
rowKey A chave da linha alterada. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as chaves de linha são retornadas como strings. Quando useBase64Rowkeys é especificado, as chaves de linha são codificadas em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string.
modType O tipo de mutação de linha. Use um dos seguintes valores: SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
columnFamily O grupo de colunas afetado pela mutação de linha.
column O qualificador de coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY, o campo de coluna não está definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as colunas são retornadas como strings. Quando useBase64ColumnQualifier é especificado, o campo da coluna é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string.
commitTimestamp A hora em que o Bigtable aplica a mutação. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
timestamp O valor do carimbo de data/hora da célula afetada pela mutação. Para tipos de mutação DELETE_CELLS e DELETE_FAMILY, o carimbo de data/hora não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
timestampFrom Descreve um início inclusivo do intervalo do carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS. Para outros tipos de mutação, timestampFrom não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
timestampTo Descreve um fim exclusivo do intervalo de carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS. Para outros tipos de mutação, timestampTo não é definido.
isGC Um valor booleano que indica se a mutação é gerada por um mecanismo de coleta de lixo do Bigtable.
tieBreaker Quando duas mutações são registradas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o maior valor tiebreaker é aplicada à tabela de origem. Mutações com valores tiebreaker menores são descartadas.
value O novo valor definido pela mutação. A menos que a opção de pipeline stripValues esteja definida, o valor será definido para mutações SET_CELL. Para outros tipos de mutação, o valor não é definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, os valores são retornados como strings. Quando useBase64Values é especificado, o valor é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes do valor em uma string.
sourceInstance O nome da instância do Bigtable que registrou a mutação. Pode ser quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub.
sourceCluster O nome do cluster do Bigtable que registrou a mutação. Pode ser usado quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub.
sourceTable O nome da tabela do Bigtable que recebeu a mutação. Pode ser usado no caso de um fluxo de vários pipelines mudar de tabelas diferentes para o mesmo tópico do Pub/Sub.

Requisitos de pipeline

  • A instância de origem especificada do Bigtable.
  • A tabela de origem do Bigtable especificada. A tabela precisa ter fluxos de alterações ativados.
  • O perfil de aplicativo do Bigtable especificado.
  • O tópico especificado do Pub/Sub precisa existir.

Parâmetros do modelo

Parâmetros obrigatórios

  • pubSubTopic: o nome do tópico do Pub/Sub de destino.
  • bigtableChangeStreamAppProfile: o código do perfil do aplicativo do Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única.
  • bigtableReadInstanceId: o ID da instância de origem do Bigtable.
  • bigtableReadTableId: o ID da tabela de origem do Bigtable.

Parâmetros opcionais

  • messageEncoding: a codificação das mensagens a serem publicadas no tópico do Pub/Sub. Quando o esquema do tópico de destino é configurado, a codificação da mensagem é determinada pelas configurações do tópico. Os seguintes valores são aceitos: BINARY e JSON. O padrão é JSON.
  • messageFormat: a codificação das mensagens a serem publicadas no tópico do Pub/Sub. Quando o esquema do tópico de destino é configurado, a codificação da mensagem é determinada pelas configurações do tópico. Os seguintes valores são aceitos: AVRO, PROTOCOL_BUFFERS e JSON. O valor padrão é JSON. Quando o formato JSON é usado, os campos rowKey, coluna e valor da mensagem são strings, com conteúdo determinado pelas opções de pipeline useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values e bigtableChangeStreamCharset.
  • stripValues: quando definido como verdadeiro, as mutações SET_CELL são retornadas sem novos valores definidos. O padrão é "false". Esse parâmetro é útil quando você não precisa de um novo valor, também conhecido como invalidação de cache, ou quando os valores são extremamente grandes e excedem os limites de tamanho de mensagens do Pub/Sub.
  • dlqDirectory: o diretório da fila de mensagens inativas. Os registros que não forem processados são armazenados nesse diretório. O padrão é um diretório no local temporário do job do Dataflow. Na maioria dos casos, é possível usar o caminho padrão.
  • dlqRetryMinutes: número de minutos entre novas tentativas de fila de mensagens inativas (DLQ). O padrão é 10.
  • dlqMaxRetries: o máximo de tentativas de mensagens inativas. O padrão é 5.
  • useBase64Rowkeys: usado com a codificação de mensagens JSON. Quando definido como true, o campo rowKey é uma string codificada em Base64. Caso contrário, o rowKey é produzido usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false.
  • pubSubProjectId: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
  • useBase64ColumnQualifiers: usado com a codificação de mensagens JSON. Quando definido como true, o campo column é uma string codificada em Base64. Caso contrário, a coluna será produzida usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false.
  • useBase64Values: usado com a codificação de mensagens JSON. Quando definido como true, o campo de valor é uma string codificada em Base64. Caso contrário, o valor será produzido usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false.
  • disableDlqRetries: se as tentativas do DLQ serão desativadas ou não. O padrão é: falso.
  • bigtableChangeStreamMetadataInstanceId: a alteração do Bigtable faz streaming do ID da instância de metadados. O padrão é vazio.
  • bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conector de fluxos de alteração do Bigtable. Se não for fornecida, uma tabela de metadados do conector de streams de alterações do Bigtable será criada automaticamente durante a execução do pipeline. O padrão é vazio.
  • bigtableChangeStreamCharset: o nome do conjunto de caracteres dos fluxos de alterações do Bigtable. O padrão é UTF-8.
  • bigtableChangeStreamStartTimestamp: o carimbo de data/hora inicial (https://tools.ietf.org/html/rfc3339, link em inglês), inclusive, para uso na leitura de fluxos de alterações. Por exemplo, 2022-05-05T07:59:59Z. O padrão é o carimbo de data/hora do horário de início do pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado. O padrão é vazio.
  • bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas do nome da coluna é ignorada. O padrão é vazio.
  • bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado.
  • bigtableChangeStreamResume: quando definido como true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline anteriormente em execução com o mesmo valor de bigtableChangeStreamName foi interrompido. Se o pipeline com o valor bigtableChangeStreamName fornecido nunca tiver sido executado, um novo pipeline não será iniciado. Quando definido como false, um novo pipeline é iniciado. Se um pipeline com o mesmo valor bigtableChangeStreamName já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão é false.
  • bigtableReadProjectId: o código do projeto do Bigtable. O padrão é o projeto do job do Dataflow.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Bigtable change streams to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID do perfil do aplicativo Bigtable.
  • PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID do perfil do aplicativo Bigtable.
  • PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub

A seguir