O modelo de stream do Bigtable para stream do Pub/Sub é um pipeline de streaming que transmite registros de alteração de dados do Bigtable e os publica em um tópico do Pub/Sub usando o Dataflow.
Um fluxo de alterações do Bigtable permite a inscrição em mutações de dados por tabela. Quando você se inscreve em streams de alteração de tabela, as seguintes restrições se aplicam:
- Somente células modificadas e descritores de operações de exclusão são retornados.
- Somente o novo valor de uma célula modificada é retornado.
Quando os registros de alteração de dados são publicados em um tópico do Pub/Sub, as mensagens podem ser inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Bigtable.
Os registros de alteração de dados do Bigtable que não podem ser publicados em tópicos do Pub/Sub são colocados temporariamente em um diretório de fila de mensagens inativas (fila de mensagens não processadas) no Cloud Storage. Após o número máximo de tentativas malsucedidas, esses registros são colocados indefinidamente no mesmo diretório de fila de mensagens inativas para revisão humana ou processamento adicional pelo usuário.
O pipeline requer que o tópico de destino do Pub/Sub exista. O tópico de destino pode ser configurado para validar mensagens usando um esquema. Quando um tópico do Pub/Sub especifica um esquema, o pipeline só começa se o esquema for válido. Dependendo do tipo de esquema, use uma das seguintes definições de esquema para o tópico de destino:
Buffers de protocolo
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Use o seguinte esquema Protobuf com a codificação de mensagem JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Cada nova mensagem do Pub/Sub inclui uma entrada de um registro de alteração de dados retornado pelo fluxo de alterações da linha correspondente na tabela do Bigtable. O modelo do Pub/Sub nivela as entradas em cada registro de alteração de dados em alterações individuais no nível da célula.
Descrição da mensagem de saída do Pub/Sub
Nome do campo | Descrição |
---|---|
rowKey |
A chave da linha alterada. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as chaves de linha são retornadas como strings. Quando useBase64Rowkeys é especificado, as chaves de linha são codificadas em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string. |
modType |
O tipo de mutação de linha. Use um dos seguintes valores: SET_CELL , DELETE_CELLS ou DELETE_FAMILY . |
columnFamily |
O grupo de colunas afetado pela mutação de linha. |
column |
O qualificador de coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY , o campo de coluna não está definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, as colunas são retornadas como strings. Quando useBase64ColumnQualifier é especificado, o campo da coluna é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes da chave de linha em uma string. |
commitTimestamp |
A hora em que o Bigtable aplica a mutação. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). |
timestamp |
O valor do carimbo de data/hora da célula afetada pela mutação. Para tipos de mutação DELETE_CELLS e DELETE_FAMILY , o carimbo de data/hora não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). |
timestampFrom |
Descreve um início inclusivo do intervalo do carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS . Para outros tipos de mutação, timestampFrom não é definido. A hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). |
timestampTo |
Descreve um fim exclusivo do intervalo de carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS . Para outros tipos de mutação, timestampTo não é definido. |
isGC |
Um valor booleano que indica se a mutação é gerada por um mecanismo de coleta de lixo do Bigtable. |
tieBreaker |
Quando duas mutações são registradas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o maior valor tiebreaker é aplicada à tabela de origem. Mutações com valores tiebreaker menores são descartadas. |
value |
O novo valor definido pela mutação. A menos que a opção de pipeline stripValues esteja definida, o valor será definido para mutações SET_CELL . Para outros tipos de mutação, o valor não é definido. Chega na forma de uma matriz de bytes. Quando a codificação de mensagem JSON é configurada, os valores são retornados como strings.
Quando useBase64Values é especificado, o valor é codificado em Base64. Caso contrário, um charset especificado por bigtableChangeStreamCharset será usado para decodificar bytes do valor em uma string. |
sourceInstance |
O nome da instância do Bigtable que registrou a mutação. Pode ser quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub. |
sourceCluster |
O nome do cluster do Bigtable que registrou a mutação. Pode ser usado quando vários pipelines fazem streaming de alterações de instâncias diferentes para o mesmo tópico do Pub/Sub. |
sourceTable |
O nome da tabela do Bigtable que recebeu a mutação. Pode ser usado no caso de um fluxo de vários pipelines mudar de tabelas diferentes para o mesmo tópico do Pub/Sub. |
Requisitos de pipeline
- A instância de origem especificada do Bigtable.
- A tabela de origem do Bigtable especificada. A tabela precisa ter fluxos de alterações ativados.
- O perfil de aplicativo do Bigtable especificado.
- O tópico especificado do Pub/Sub precisa existir.
Parâmetros do modelo
Parâmetros obrigatórios
- pubSubTopic: o nome do tópico do Pub/Sub de destino.
- bigtableChangeStreamAppProfile: o código do perfil do aplicativo do Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única.
- bigtableReadInstanceId: o ID da instância de origem do Bigtable.
- bigtableReadTableId: o ID da tabela de origem do Bigtable.
Parâmetros opcionais
- messageEncoding: a codificação das mensagens a serem publicadas no tópico do Pub/Sub. Quando o esquema do tópico de destino é configurado, a codificação da mensagem é determinada pelas configurações do tópico. Os seguintes valores são aceitos:
BINARY
eJSON
. O padrão éJSON
. - messageFormat: a codificação das mensagens a serem publicadas no tópico do Pub/Sub. Quando o esquema do tópico de destino é configurado, a codificação da mensagem é determinada pelas configurações do tópico. Os seguintes valores são aceitos:
AVRO
,PROTOCOL_BUFFERS
eJSON
. O valor padrão éJSON
. Quando o formatoJSON
é usado, os campos rowKey, coluna e valor da mensagem são strings, com conteúdo determinado pelas opções de pipelineuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
ebigtableChangeStreamCharset
. - stripValues: quando definido como verdadeiro, as mutações SET_CELL são retornadas sem novos valores definidos. O padrão é "false". Esse parâmetro é útil quando você não precisa de um novo valor, também conhecido como invalidação de cache, ou quando os valores são extremamente grandes e excedem os limites de tamanho de mensagens do Pub/Sub.
- dlqDirectory: o diretório da fila de mensagens inativas. Os registros que não forem processados são armazenados nesse diretório. O padrão é um diretório no local temporário do job do Dataflow. Na maioria dos casos, é possível usar o caminho padrão.
- dlqRetryMinutes número de minutos entre novas tentativas de fila de mensagens inativas (DLQ). O padrão é
10
. - dlqMaxRetries: o máximo de tentativas de mensagens inativas. O padrão é
5
. - useBase64Rowkeys: usado com a codificação de mensagens JSON. Quando definido como
true
, o camporowKey
é uma string codificada em Base64. Caso contrário, orowKey
é produzido usandobigtableChangeStreamCharset
para decodificar bytes em uma string. O padrão éfalse
. - pubSubProjectId: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
- useBase64ColumnQualifiers: usado com a codificação de mensagens JSON. Quando definido como
true
, o campocolumn
é uma string codificada em Base64. Caso contrário, a coluna será produzida usandobigtableChangeStreamCharset
para decodificar bytes em uma string. O padrão éfalse
. - useBase64Values: usado com a codificação de mensagens JSON. Quando definido como
true
, o campo de valor é uma string codificada em Base64. Caso contrário, o valor será produzido usandobigtableChangeStreamCharset
para decodificar bytes em uma string. O padrão éfalse
. - disableDlqRetries se as tentativas do DLQ serão desativadas ou não. O padrão é: falso.
- bigtableChangeStreamMetadataInstanceId: a alteração do Bigtable faz streaming do ID da instância de metadados. O padrão é vazio.
- bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conector de fluxos de alteração do Bigtable. Se não for fornecida, uma tabela de metadados do conector de streams de alterações do Bigtable será criada automaticamente durante a execução do pipeline. O padrão é vazio.
- bigtableChangeStreamCharset: o nome do conjunto de caracteres dos fluxos de alterações do Bigtable. O padrão é UTF-8.
- bigtableChangeStreamStartTimestamp: o carimbo de data/hora inicial (https://tools.ietf.org/html/rfc3339, link em inglês), inclusive, para uso na leitura de fluxos de alterações. Por exemplo,
2022-05-05T07:59:59Z
. O padrão é o carimbo de data/hora do horário de início do pipeline. - bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado. O padrão é vazio.
- bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas do nome da coluna é ignorada. O padrão é vazio.
- bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado.
- bigtableChangeStreamResume: quando definido como
true
, um novo pipeline retoma o processamento a partir do ponto em que um pipeline anteriormente em execução com o mesmo valor debigtableChangeStreamName
foi interrompido. Se o pipeline com o valorbigtableChangeStreamName
fornecido nunca tiver sido executado, um novo pipeline não será iniciado. Quando definido comofalse
, um novo pipeline é iniciado. Se um pipeline com o mesmo valorbigtableChangeStreamName
já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão éfalse
. - bigtableReadProjectId: o código do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Bigtable change streams to Pub/Sub template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
BIGTABLE_INSTANCE_ID
: o ID da instância do Bigtable.BIGTABLE_TABLE_ID
: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID do perfil do aplicativo Bigtable.PUBSUB_TOPIC
: o nome do tópico de destino do Pub/Sub
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
BIGTABLE_INSTANCE_ID
: o ID da instância do Bigtable.BIGTABLE_TABLE_ID
: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID do perfil do aplicativo Bigtable.PUBSUB_TOPIC
: o nome do tópico de destino do Pub/Sub
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.