Fluxos de alterações do Bigtable no modelo do BigQuery

O modelo de fluxo de alterações do Bigtable para BigQuery é um pipeline de streaming que transmite registros de alteração de dados do Bigtable e os grava em tabelas do BigQuery usando o Dataflow.

Um fluxo de alterações do Bigtable permite a inscrição em mutações de dados por tabela. Quando você se inscreve em streams de alteração de tabela, as seguintes restrições se aplicam:

  • Somente células modificadas e descritores de operações de exclusão são retornados.
  • Somente o novo valor de uma célula modificada é retornado.

Quando os registros de alteração de dados são gravados no BigQuery, as linhas podem ser inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Bigtable.

As linhas da tabela do registro de alterações que não podem ser gravadas no BigQuery devido a um erro persistente são colocadas permanentemente em um diretório de fila de mensagens inativas (fila de mensagens não processadas) no Cloud Storage para revisão humana ou processamento extra do usuário.

Se a tabela necessária do BigQuery não existir, o pipeline a criará. Caso contrário, será usada uma tabela atual do BigQuery. O esquema das tabelas atuais do BigQuery precisa conter as colunas da tabela a seguir.

Cada nova linha do BigQuery inclui um registro de alteração de dados retornado pelo stream de alteração da linha correspondente na tabela do Bigtable.

Esquema da tabela de saída do BigQuery

Nome da coluna Tipo Anulável Descrição
row_key STRING ou BYTES Não A chave da linha alterada. Quando a opção writeRowkeyAsBytes do pipeline está definida como true, o tipo da coluna precisa ser BYTES. Caso contrário, use o tipo STRING.
mod_type STRING Não O tipo de mutação de linha. Use um dos seguintes valores: SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
column_family STRING Não O grupo de colunas afetado pela mutação de linha.
column STRING Sim O qualificador de coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY, defina como NULL.
commit_timestamp TIMESTAMP Não A hora em que o Bigtable aplica a mutação.
big_query_commit_timestamp TIMESTAMP Sim Opcional: especifica o horário em que o BigQuery grava a linha em uma tabela de saída. O campo não será preenchido se o nome da coluna estiver presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP ou INT64 Sim O valor do carimbo de data/hora da célula afetada pela mutação. Quando a opção writeNumericTimestamps do pipeline está definida como true, o tipo da coluna precisa ser INT64. Caso contrário, use o tipo TIMESTAMP. Para os tipos de mutação DELETE_CELLS e DELETE_FAMILY, defina como NULL.
timestamp_from TIMESTAMP ou INT64 Sim Descreve um início inclusivo do intervalo do carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS. Para outros tipos de mutação, defina como NULL.
timestamp_to TIMESTAMP ou INT64 Sim Descreve um fim exclusivo do intervalo de carimbo de data/hora para todas as células excluídas pela mutação DELETE_CELLS. Para outros tipos de mutação, defina como NULL.
is_gc BOOL Não Opcional: quando a mutação for acionada por uma política de coleta de lixo, defina como true. Em todos os outros casos, defina como false. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore.
source_instance STRING Não Opcional: descreve o nome da instância do Bigtable de origem da mutação. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING Não Opcional: descreve o nome do cluster do Bigtable de origem da mutação. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore.
source_table STRING Não Opcional: descreve o nome da tabela do Bigtable a que a mutação se aplica. O valor dessa coluna pode ser útil se várias tabelas do Bigtable fizerem o streaming de alterações para a mesma tabela do BigQuery. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 Não Opcional: quando duas mutações são registradas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o maior valor tiebreaker é aplicada à tabela de origem. Mutações com valores tiebreaker menores são descartadas. O campo não é preenchido quando o nome da coluna está presente no valor da opção do pipeline bigQueryChangelogTableFieldsToIgnore.
value STRING ou BYTES Sim O novo valor definido pela mutação. Quando a opção writeValuesAsBytes do pipeline está definida como true, o tipo da coluna precisa ser BYTES. Caso contrário, use o tipo STRING. O valor é definido para mutações SET_CELL. Para outros tipos de mutação, o valor é definido como NULL.

Requisitos de pipeline

  • A instância de origem especificada do Bigtable.
  • A tabela de origem do Bigtable especificada. A tabela precisa ter fluxos de alterações ativados.
  • O perfil de aplicativo do Bigtable especificado.
  • O conjunto de dados de destino do BigQuery especificado.

Parâmetros do modelo

Parâmetros obrigatórios

  • bigQueryDataset: o nome do conjunto de dados da tabela de destino do BigQuery.
  • bigtableChangeStreamAppProfile: o código do perfil do aplicativo do Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única.
  • bigtableReadInstanceId: o ID da instância de origem do Bigtable.
  • bigtableReadTableId: o ID da tabela de origem do Bigtable.

Parâmetros opcionais

  • writeRowkeyAsBytes: se você deve gravar chaves de linha como BYTES do BigQuery. Quando definido como true, as chaves de linha são gravadas na coluna BYTES. Caso contrário, as chaves de linha serão gravadas na coluna STRING. O padrão é false.
  • writeValuesAsBytes: quando definidos como verdadeiros, os valores são gravados na coluna BYTES. Caso contrário, são gravados na coluna STRING. O padrão é "false".
  • writeNumericTimestamps: se o carimbo de data/hora do Bigtable será gravado como INT64 do BigQuery. Quando definido como INT64, os valores são gravados na coluna. Caso contrário, os valores são gravados na coluna TIMESTAMP. Colunas afetadas: timestamp, timestamp_from e timestamp_to. O padrão é false. Quando definido como true, a hora é medida em microssegundos desde a época do Unix (1 de janeiro de 1970 no UTC).
  • bigQueryProjectId: o ID do projeto do conjunto de dados do BigQuery. O padrão é o projeto do job do Dataflow.
  • bigQueryChangelogTableName: nome da tabela de destino do BigQuery. Se não for especificado, o valor bigtableReadTableId + "_changelog" será usado. O padrão é vazio.
  • bigQueryChangelogTablePartitionGranularity : especifica uma granularidade para particionar a tabela do registro de alterações. Quando definida, a tabela é particionada. Use um dos seguintes valores compatíveis: HOUR, DAY, MONTH ou YEAR. Por padrão, a tabela não é particionada.
  • bigQueryChangelogTablePartitionExpirationMs: define o prazo de validade da partição da tabela de registros de alterações, em milissegundos. Quando definido como verdadeiro, as partições mais antigas do que o número especificado de milissegundos são excluídas. Por padrão, nenhuma expiração é definida.
  • bigQueryChangelogTableFieldsToIgnore: uma lista separada por vírgulas das colunas do registro de alterações que, quando especificadas, não são criadas e preenchidas. Use um dos seguintes valores compatíveis: is_gc, source_instance, source_cluster, source_table, tiebreaker ou big_query_commit_timestamp. Por padrão, todas as colunas são preenchidas.
  • dlqDirectory: o diretório a ser usado para a fila de mensagens inativas. Os registros que não forem processados são armazenados nesse diretório. O padrão é um diretório no local temporário do job do Dataflow. Na maioria dos casos, é possível usar o caminho padrão.
  • bigtableChangeStreamMetadataInstanceId: a alteração do Bigtable faz streaming do ID da instância de metadados. O padrão é vazio.
  • bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conector de fluxos de alteração do Bigtable. Se não for fornecida, uma tabela de metadados do conector de streams de alterações do Bigtable será criada automaticamente durante a execução do pipeline. O padrão é vazio.
  • bigtableChangeStreamCharset: o nome do conjunto de caracteres dos fluxos de alterações do Bigtable. O padrão é UTF-8.
  • bigtableChangeStreamStartTimestamp: o carimbo de data/hora inicial (https://tools.ietf.org/html/rfc3339, link em inglês), inclusive, para uso na leitura de fluxos de alterações. Por exemplo, 2022-05-05T07:59:59Z. O padrão é o carimbo de data/hora do horário de início do pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado. O padrão é vazio.
  • bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas do nome da coluna é ignorada. O padrão é vazio.
  • bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado.
  • bigtableChangeStreamResume: quando definido como true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline anteriormente em execução com o mesmo valor de bigtableChangeStreamName foi interrompido. Se o pipeline com o valor bigtableChangeStreamName fornecido nunca tiver sido executado, um novo pipeline não será iniciado. Quando definido como false, um novo pipeline é iniciado. Se um pipeline com o mesmo valor bigtableChangeStreamName já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão é false.
  • bigtableReadProjectId: o código do projeto do Bigtable. O padrão é o projeto do job do Dataflow.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Bigtable change streams to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID do perfil do aplicativo Bigtable.
  • BIGQUERY_DESTINATION_DATASET: o nome do conjunto de dados de destino do BigQuery

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: ID do perfil do aplicativo Bigtable.
  • BIGQUERY_DESTINATION_DATASET: o nome do conjunto de dados de destino do BigQuery

A seguir