O modelo de stream do Bigtable para stream do Pub/Sub é um pipeline de streaming que transmite registros de alteração de dados do Bigtable e os publica em um tópico do Pub/Sub usando o Dataflow.
Um fluxo de alterações do Bigtable permite a inscrição em mutações de dados por tabela. Quando você se inscreve em streams de alteração de tabela, as seguintes restrições se aplicam:
- Somente células modificadas e descritores de operações de exclusão são retornados.
- Somente o novo valor de uma célula modificada é retornado.
Quando os registros de alteração de dados são publicados em um tópico do Pub/Sub, as mensagens podem ser inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Bigtable.
Os registros de alteração de dados do Bigtable que não podem ser publicados em tópicos do Pub/Sub são colocados temporariamente em um diretório de fila de mensagens inativas (fila de mensagens não processadas) no Cloud Storage. Após o número máximo de tentativas malsucedidas, esses registros são colocados indefinidamente no mesmo diretório de fila de mensagens inativas para revisão humana ou processamento adicional pelo usuário.
O pipeline requer que o tópico de destino do Pub/Sub exista. O tópico de destino pode ser configurado para validar mensagens usando um esquema. Quando um tópico do Pub/Sub especifica um esquema, o pipeline só começa se o esquema for válido. Dependendo do tipo de esquema, use uma das seguintes definições de esquema para o tópico de destino:
Buffers de protocolo
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Use o seguinte esquema Protobuf com a codificação de mensagem JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Cada nova mensagem do Pub/Sub inclui um registro de alteração de dados retornado pelo stream de alteração da linha correspondente na tabela do Bigtable.
Descrição da mensagem de saída do Pub/Sub
Nome do campo | Descrição |
---|---|
rowKey |
A chave da linha alterada. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as chaves de linha são retornadas como strings. Quando useBase64Rowkeys é especificado, as chaves de linha são codificadas em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string. |
modType |
O tipo de mutação de linha. Use um dos seguintes valores: SET_CELL , DELETE_CELLS ou DELETE_FAMILY . |
columnFamily |
O grupo de colunas afetado pela mutação de linha. |
column |
O qualificador de coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY , o campo de coluna não está definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as colunas são retornadas como strings. Quando useBase64ColumnQualifier é especificado, o campo da coluna é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string. |
commitTimestamp |
A hora em que o Bigtable aplica a mutação. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). |
timestamp |
O valor do carimbo de data/hora da célula afetada pela mutação. Para tipos de mutação DELETE_CELLS e DELETE_FAMILY , o carimbo de data/hora não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). |
timestampFrom |
Descreve um início inclusivo do intervalo do carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS . Para outros tipos de mutação, timestampFrom não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). |
timestampTo |
Descreve um fim exclusivo do intervalo de carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS . Para outros tipos de mutação, timestampTo não é definido. |
isGC |
Um valor booleano que indica se a mutação é gerada por um mecanismo de coleta de lixo do Bigtable. |
tieBreaker |
Quando duas mutações são registradas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o maior valor tiebreaker é aplicada à tabela de origem. Mutações com valores tiebreaker menores são descartadas. |
value |
O novo valor definido pela mutação. A menos que a opção de pipeline stripValues esteja definida, o valor será definido para mutações SET_CELL . Para outros tipos de mutação, o valor não é definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, os valores são retornados como strings.
Quando useBase64Values é especificado, o valor é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes do valor em uma string. |
sourceInstance |
O nome da instância do Bigtable que registrou a mutação. Pode ser quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub. |
sourceCluster |
O nome do cluster do Bigtable que registrou a mutação. Pode ser usado quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub. |
sourceTable |
O nome da tabela do Bigtable que recebeu a mutação. Pode ser usado no caso de um fluxo de vários pipelines mudar de tabelas diferentes para o mesmo tópico do Pub/Sub. |
Requisitos de pipeline
- A instância de origem especificada do Bigtable.
- A tabela de origem do Bigtable especificada. A tabela precisa ter fluxos de alterações ativados.
- O perfil de aplicativo do Bigtable especificado.
- O tópico especificado do Pub/Sub precisa existir.
Parâmetros do modelo
Parâmetro | Descrição |
---|---|
bigtableReadInstanceId |
O ID da instância do Bigtable de origem. |
bigtableReadTableId |
O ID da tabela de origem do Bigtable. |
bigtableChangeStreamAppProfile |
O ID do perfil do aplicativo Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única. |
pubSubTopic |
O nome do tópico do Pub/Sub de destino. |
messageFormat |
Opcional: quando o tópico de destino tem um esquema configurado, o formato da mensagem é determinado pelo esquema e pela codificação configurados. O formato das mensagens a serem publicadas no tópico do Pub/Sub. Valores aceitos: AVRO , PROTOCOL_BUFFERS e JSON . O padrão é JSON . Quando o formato JSON é usado, os campos rowKey, column e value da mensagem são strings, cujo conteúdo é determinado pelas opções de pipeline useBase64Rowkeys , useBase64ColumnQualifiers , useBase64Values e bigtableChangeStreamCharset . |
messageEncoding |
Opcional: quando o tópico de destino tiver um esquema configurado, a codificação da mensagem será determinada pelas configurações do tópico. A codificação das mensagens a serem publicadas no tópico do Pub/Sub. Valores aceitos: BINARY , JSON e JSON . O padrão é JSON . |
stripValues |
Opcional: quando definido como true , as mutações SET_CELL são retornadas sem novos valores definidos. O padrão é "false".
Esse parâmetro é útil quando você não precisa de um novo valor, também conhecido como invalidação de cache, ou quando os valores são extremamente grandes e excedem os limites de tamanho de mensagens do Pub/Sub. |
bigtableReadProjectId |
Opcional: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow. |
pubSubProjectId |
Opcional: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow. |
bigtableChangeStreamMetadataInstanceId |
Opcional: a alteração do Bigtable faz streaming do ID da instância de metadados. |
bigtableChangeStreamMetadataTableTableId |
Opcional: a alteração do Bigtable faz streaming do ID da tabela de metadados. |
bigtableChangeStreamCharset |
Opcional: a alteração do Bigtable faz o stream do nome do conjunto de caracteres ao ler as chaves de linha, os valores e os qualificadores de coluna. Essa opção é usada quando a codificação da mensagem é JSON. |
bigtableChangeStreamStartTimestamp |
Opcional: o carimbo de data/hora de início, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, 2022-05-05T07:59:59Z . O padrão é o carimbo de data/hora do horário de início do pipeline. |
bigtableChangeStreamIgnoreColumnFamilies |
Opcional: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado. |
bigtableChangeStreamIgnoreColumns |
Opcional: uma lista separada por vírgulas do nome da coluna é ignorada. |
bigtableChangeStreamName |
Opcional: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é o nome gerado automaticamente. Para encontrar o valor, consulte os registros do job do Dataflow. |
bigtableChangeStreamResume |
Opcional: quando definido como true , um novo pipeline retoma o processamento a partir do ponto em que um pipeline em execução anteriormente com o mesmo valor de bigtableChangeStreamName é interrompido.
Se o pipeline com o valor bigtableChangeStreamName fornecido nunca tiver sido executado, um novo pipeline não será iniciado.
Quando definido como false , um novo pipeline é iniciado. Se um pipeline com o mesmo valor bigtableChangeStreamName já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão é false . |
useBase64Rowkeys |
Opcional: usado com codificação de mensagem JSON. Quando definido como true , o campo rowKey é uma string codificada em Base64.
Caso contrário, o rowKey é produzido usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false . |
useBase64ColumnQualifiers |
Opcional: usado com codificação de mensagem JSON. Quando definido como true , o campo column é uma string codificada em Base64.
Caso contrário, a coluna será produzida usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false . |
useBase64Values |
Opcional: usado com codificação de mensagem JSON. Quando definido como true , o campo value é uma string codificada em Base64.
Caso contrário, o valor será produzido usando bigtableChangeStreamCharset para decodificar bytes em uma string. O padrão é false . |
dlqMaxRetries |
Opcional: o máximo de novas mensagens inativas. O valor padrão é 5 . |
dlqRetryMinutes |
Opcional: o número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) O valor padrão é 10 . |
dlqDirectory |
Opcional: o diretório da fila de mensagens inativas. Os registros que não forem processados são armazenados nesse diretório. O padrão é um diretório no local temporário do job do Dataflow. Na maioria dos casos, é possível usar o caminho padrão. |
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Bigtable change streams to Pub/Sub template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
.BIGTABLE_INSTANCE_ID
: o ID da instância do Bigtable.BIGTABLE_TABLE_ID
: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID do perfil do aplicativo Bigtable.PUBSUB_TOPIC
: o nome do tópico de destino do Pub/Sub
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
.BIGTABLE_INSTANCE_ID
: o ID da instância do Bigtable.BIGTABLE_TABLE_ID
: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID do perfil do aplicativo Bigtable.PUBSUB_TOPIC
: o nome do tópico de destino do Pub/Sub
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.