O modelo de fluxo de alterações do Bigtable para BigQuery é um pipeline de streaming que transmite registros de alteração de dados do Bigtable e os grava em tabelas do BigQuery usando o Dataflow.
Um fluxo de alterações do Bigtable permite a inscrição em mutações de dados por tabela. Quando você se inscreve em streams de alteração de tabela, as seguintes restrições se aplicam:
- Somente células modificadas e descritores de operações de exclusão são retornados.
- Somente o novo valor de uma célula modificada é retornado.
Quando os registros de alteração de dados são gravados no BigQuery, as linhas podem ser inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Bigtable.
As linhas da tabela do registro de alterações que não podem ser gravadas no BigQuery devido a um erro persistente são colocadas permanentemente em um diretório de fila de mensagens inativas (fila de mensagens não processadas) no Cloud Storage para revisão humana ou processamento extra do usuário.
Se a tabela necessária do BigQuery não existir, o pipeline a criará. Caso contrário, será usada uma tabela atual do BigQuery. O esquema das tabelas atuais do BigQuery precisa conter as colunas da tabela a seguir.
Cada nova linha do BigQuery inclui um registro de alteração de dados retornado pelo stream de alteração da linha correspondente na tabela do Bigtable.
Esquema da tabela de saída do BigQuery
Nome da coluna | Tipo | Anulável | Descrição |
---|---|---|---|
row_key |
STRING ou BYTES |
Não | A chave da linha alterada. Quando a opção writeRowkeyAsBytes do pipeline está definida como true , o tipo da coluna precisa ser BYTES . Caso contrário, use o tipo STRING . |
mod_type |
STRING |
Não | O tipo de mutação de linha. Use um dos seguintes valores: SET_CELL , DELETE_CELLS ou DELETE_FAMILY . |
column_family |
STRING |
Não | O grupo de colunas afetado pela mutação de linha. |
column |
STRING |
Sim | O qualificador de coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY , defina como NULL . |
commit_timestamp |
TIMESTAMP |
Não | A hora em que o Bigtable aplica a mutação. |
big_query_commit_timestamp |
TIMESTAMP |
Sim | Opcional: especifica o horário em que o BigQuery grava a linha em uma tabela de saída. O campo não será preenchido se o nome da coluna estiver presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore . |
timestamp |
TIMESTAMP ou INT64 |
Sim | O valor do carimbo de data/hora da célula afetada pela mutação. Quando a opção writeNumericTimestamps do pipeline está definida como true , o tipo da coluna precisa ser INT64 . Caso contrário, use o tipo TIMESTAMP .
Para os tipos de mutação DELETE_CELLS e DELETE_FAMILY , defina como NULL . |
timestamp_from |
TIMESTAMP ou INT64 |
Sim | Descreve um início inclusivo do intervalo do carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS . Para outros tipos de mutação, defina como NULL . |
timestamp_to |
TIMESTAMP ou INT64 |
Sim | Descreve um fim exclusivo do intervalo de carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS . Para outros tipos de mutação, defina como NULL . |
is_gc |
BOOL |
Não | Opcional: quando a mutação for acionada por uma política de coleta de lixo, defina como true .
Em todos os outros casos, defina como false . O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore . |
source_instance |
STRING |
Não | Opcional: descreve o nome da instância do Bigtable de origem da mutação. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore . |
source_cluster |
STRING |
Não | Opcional: descreve o nome do cluster do Bigtable de origem da mutação. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore . |
source_table |
STRING |
Não | Opcional: descreve o nome da tabela do Bigtable a que a mutação se aplica. O valor dessa coluna pode ser útil se várias tabelas do Bigtable fizerem o streaming de alterações para a mesma tabela do BigQuery. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore . |
tiebreaker |
INT64 |
Não | Opcional: quando duas mutações são registradas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o maior valor tiebreaker é aplicada à tabela de origem. Mutações com valores tiebreaker menores são descartadas. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore . |
value |
STRING ou BYTES |
Sim | O novo valor definido pela mutação. Quando a opção writeValuesAsBytes do pipeline está definida como true , o tipo da coluna precisa ser BYTES . Caso contrário, use o tipo STRING . O valor é definido para mutações SET_CELL . Para outros tipos de mutação, o valor é definido como NULL . |
Requisitos de pipeline
- A instância de origem especificada do Bigtable.
- A tabela de origem do Bigtable especificada. A tabela precisa ter fluxos de alterações ativados.
- O perfil de aplicativo do Bigtable especificado.
- O conjunto de dados de destino do BigQuery especificado.
Parâmetros do modelo
Parâmetros obrigatórios
- bigQueryDataset: o nome do conjunto de dados da tabela de destino do BigQuery.
- bigtableChangeStreamAppProfile: o código do perfil do aplicativo do Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única.
- bigtableReadInstanceId: o ID da instância de origem do Bigtable.
- bigtableReadTableId: o ID da tabela de origem do Bigtable.
Parâmetros opcionais
- writeRowkeyAsBytes: se você deve gravar chaves de linha como
BYTES
do BigQuery. Quando definido comotrue
, as chaves de linha são gravadas na colunaBYTES
. Caso contrário, as chaves de linha serão gravadas na colunaSTRING
. O padrão éfalse
. - writeValuesAsBytes: quando definidos como verdadeiros, os valores são gravados na coluna BYTES. Caso contrário, são gravados na coluna STRING. O padrão é "false".
- writeNumericTimestamps: se o carimbo de data/hora do Bigtable será gravado como
INT64
do BigQuery. Quando definido comoINT64
, os valores são gravados na coluna. Caso contrário, os valores são gravados na colunaTIMESTAMP
. Colunas afetadas:timestamp
,timestamp_from
etimestamp_to
. O padrão éfalse
. Quando definido comotrue
, a hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC). - bigQueryProjectId: o ID do projeto do conjunto de dados do BigQuery. O padrão é o projeto do job do Dataflow.
- bigQueryChangelogTableName: nome da tabela de destino do BigQuery. Se não for especificado, o valor
bigtableReadTableId + "_changelog"
será usado. O padrão é vazio. - bigQueryChangelogTablePartitionGranularity : especifica uma granularidade para particionar a tabela do registro de alterações. Quando definida, a tabela é particionada. Use um dos seguintes valores compatíveis:
HOUR
,DAY
,MONTH
ouYEAR
. Por padrão, a tabela não é particionada. - bigQueryChangelogTablePartitionExpirationMs: define o prazo de validade da partição da tabela de registros de alterações, em milissegundos. Quando definido como verdadeiro, as partições mais antigas do que o número especificado de milissegundos são excluídas. Por padrão, nenhuma expiração é definida.
- bigQueryChangelogTableFieldsToIgnore: uma lista separada por vírgulas das colunas do registro de alterações que, quando especificadas, não são criadas e preenchidas. Use um dos seguintes valores compatíveis:
is_gc
,source_instance
,source_cluster
,source_table
,tiebreaker
oubig_query_commit_timestamp
. Por padrão, todas as colunas são preenchidas. - dlqDirectory: o diretório a ser usado para a fila de mensagens inativas. Os registros que não forem processados são armazenados nesse diretório. O padrão é um diretório no local temporário do job do Dataflow. Na maioria dos casos, é possível usar o caminho padrão.
- bigtableChangeStreamMetadataInstanceId: a alteração do Bigtable faz streaming do ID da instância de metadados. O padrão é vazio.
- bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conector de fluxos de alteração do Bigtable. Se não for fornecida, uma tabela de metadados do conector de streams de alterações do Bigtable será criada automaticamente durante a execução do pipeline. O padrão é vazio.
- bigtableChangeStreamCharset: o nome do conjunto de caracteres dos fluxos de alterações do Bigtable. O padrão é UTF-8.
- bigtableChangeStreamStartTimestamp: o carimbo de data/hora inicial (https://tools.ietf.org/html/rfc3339, link em inglês), inclusive, para uso na leitura de fluxos de alterações. Por exemplo,
2022-05-05T07:59:59Z
. O padrão é o carimbo de data/hora do horário de início do pipeline. - bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado. O padrão é vazio.
- bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas do nome da coluna é ignorada. O padrão é vazio.
- bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado.
- bigtableChangeStreamResume: quando definido como
true
, um novo pipeline retoma o processamento a partir do ponto em que um pipeline anteriormente em execução com o mesmo valor debigtableChangeStreamName
foi interrompido. Se o pipeline com o valorbigtableChangeStreamName
fornecido nunca tiver sido executado, um novo pipeline não será iniciado. Quando definido comofalse
, um novo pipeline é iniciado. Se um pipeline com o mesmo valorbigtableChangeStreamName
já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão éfalse
. - bigtableReadProjectId: o código do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Bigtable change streams to BigQuery template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
BIGTABLE_INSTANCE_ID
: o ID da instância do Bigtable.BIGTABLE_TABLE_ID
: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID do perfil do aplicativo Bigtable.BIGQUERY_DESTINATION_DATASET
: o nome do conjunto de dados de destino do BigQuery
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
BIGTABLE_INSTANCE_ID
: o ID da instância do Bigtable.BIGTABLE_TABLE_ID
: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: ID do perfil do aplicativo Bigtable.BIGQUERY_DESTINATION_DATASET
: o nome do conjunto de dados de destino do BigQuery
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.