Fluxos de alterações do Spanner para o modelo do BigQuery

O modelo de fluxos de alterações do Spanner para BigQuery é um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em tabelas do BigQuery usando o Dataflow Runner V2.

Todas as colunas monitoradas do fluxo de alterações são incluídas em cada linha da tabela do BigQuery, independentemente de serem modificadas por uma transação do Spanner. As colunas não monitoradas não são incluídas na linha do BigQuery. Qualquer alteração do Spanner menor que a marca d'água do Dataflow é aplicada com êxito às tabelas do BigQuery ou armazenada na fila de mensagens inativas para nova tentativa. As linhas do BigQuery são inseridas fora de ordem em comparação com a ordem original do carimbo de data/hora de confirmação do Spanner.

Se as tabelas do BigQuery necessárias não existirem, o pipeline as criará. Caso contrário, as tabelas atuais do BigQuery serão usadas. O esquema das tabelas do BigQuery atuais precisa conter as colunas rastreadas correspondentes das tabelas do Spanner e qualquer outra coluna de metadados que não é ignorada explicitamente pela opção ignoreFields. Veja a descrição dos campos de metadados na lista a seguir. Cada nova linha do BigQuery inclui todas as colunas monitoradas pelo fluxo de alterações na linha correspondente na tabela do Spanner no carimbo de data/hora do registro de alterações.

Os seguintes campos de metadados são adicionados às tabelas do BigQuery. Para mais detalhes sobre esses campos, consulte Registros de alteração de dados em "Partições, registros e consultas de fluxos de alterações".

  • _metadata_spanner_mod_type: o tipo de modificação (inserir, atualizar ou excluir) da transação do Spanner. Extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_spanner_table_name: o nome da tabela do Spanner. Esse campo não é o nome da tabela de metadados do conector.
  • _metadata_spanner_commit_timestamp: o carimbo de data/hora de confirmação do Spanner, que é o momento em que uma alteração é confirmada. Esse valor é extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_spanner_server_transaction_id: uma string globalmente exclusiva que representa a transação do Spanner em que a alteração foi executada. Use esse valor somente no contexto de processamento de registros de fluxo de alterações. Ele não está correlacionado ao ID da transação na API do Spanner. Esse valor é extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_spanner_record_sequence: o número de sequência do registro na transação do Spanner. Os números de sequência são exclusivos e aumentam de maneira uniforme, mas não necessariamente contíguos, em uma transação. Esse valor é extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: indica se o registro é o último registro de uma transação do Spanner na partição atual. Esse valor é extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_spanner_number_of_records_in_transaction: o número de registros de alteração de dados que fazem parte da transação do Spanner em todas as partições de fluxo de alterações. Esse valor é extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_spanner_number_of_partitions_in_transaction: o número de partições que retornam registros de alteração de dados para a transação do Spanner. Esse valor é extraído do registro de alteração de dados do fluxo de alterações.
  • _metadata_big_query_commit_timestamp: o carimbo de data/hora de confirmação quando a linha foi inserida no BigQuery. Se useStorageWriteApi for true, essa coluna não será criada automaticamente na tabela de registro de alterações pelo pipeline. Nesse caso, adicione manualmente essa coluna na tabela do registro de alterações, se necessário.

Ao usar esse modelo, lembre-se dos seguintes detalhes:

  • Use esse modelo para propagar novas colunas em tabelas existentes ou criar novas tabelas do Spanner para o BigQuery. Para mais informações, consulte Processar a adição de tabelas ou colunas de rastreamento.
  • Para os tipos de captura de valor OLD_AND_NEW_VALUES e NEW_VALUES, quando o registro de alteração de dados tiver uma alteração UPDATE, o modelo precisará fazer uma leitura desatualizada para o Spanner no carimbo de data/hora de confirmação do registro de alteração de dados. Isso serve para recuperar as colunas inalteradas, mas monitoradas. Configure o banco de dados "version_retention_period" corretamente para a leitura desatualizada. Para o tipo de captura de valor NEW_ROW, o modelo será mais eficiente, porque o registro de alteração de dados captura a nova linha completa, incluindo colunas que não são atualizadas em solicitações UPDATE, e o modelo não precisa fazer uma leitura desatualizada.
  • Para minimizar a latência e os custos de transporte da rede, execute o job do Dataflow na mesma região que a instância do Spanner ou as tabelas do BigQuery. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Para mais informações, consulte Regiões do Dataflow.
  • Esse modelo é compatível com todos os tipos de dados válidos do Spanner. Se o tipo do BigQuery for mais preciso que o do Spanner, talvez ocorra perda de precisão durante a transformação. Especificamente:
    • Para o tipo JSON do Spanner, a ordem dos membros de um objeto é ordenada lexicograficamente, mas não há essa garantia para o tipo JSON do BigQuery.
    • O Spanner é compatível com o tipo TIMESTAMP de nanossegundos, mas o BigQuery é compatível apenas com o tipo TIMESTAMP de microssegundos.
  • Este modelo não oferece suporte ao uso da API BigQuery Storage Write no modo "exatamente uma vez".

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O conjunto de dados do BigQuery precisa existir antes da execução do pipeline.

Processar a adição de tabelas ou colunas de acompanhamento

Esta seção descreve as práticas recomendadas para lidar com a adição de tabelas e colunas do Spanner de rastreamento enquanto o pipeline está em execução. A versão mais antiga do modelo compatível com esse recurso é 2024-09-19-00_RC00.

  • Antes de adicionar uma nova coluna a um escopo de fluxo de alterações do Spanner, adicione a coluna à tabela de registro de alterações do BigQuery. A coluna adicionada precisa ter um tipo de dados correspondente e ser NULLABLE. Aguarde pelo menos 10 minutos antes de continuar a criar a nova coluna ou tabela no Spanner. Gravar na nova coluna sem esperar pode resultar em um registro não processado com um código do erro inválido no diretório da fila de mensagens inativas.
  • Para adicionar uma nova tabela, primeiro adicione-a ao banco de dados do Spanner. A tabela é criada automaticamente no BigQuery quando o pipeline recebe um registro para a nova tabela.
  • Depois de adicionar as novas colunas ou tabelas ao banco de dados do Spanner, altere o fluxo de mudanças para rastrear as novas colunas ou tabelas que você quiser, caso elas ainda não sejam rastreadas implicitamente.
  • O modelo não exclui tabelas ou colunas do BigQuery. Se uma coluna for removida da tabela do Spanner, os valores nulos serão preenchidos nas colunas de registro de alterações do BigQuery para registros gerados após a remoção das colunas da tabela do Spanner, a menos que você remova manualmente a coluna do BigQuery.
  • O modelo não é compatível com atualizações de tipo de coluna. Embora o Spanner ofereça suporte à mudança de uma coluna STRING para uma BYTES ou uma coluna BYTES para uma STRING, não é possível modificar o tipo de dados de uma coluna existente nem usar o mesmo nome de coluna com tipos de dados diferentes no BigQuery. Se você excluir e recriar uma coluna com o mesmo nome, mas com um tipo diferente, no Spanner, os dados poderão ser gravados na coluna do BigQuery, mas o tipo não será alterado.
  • Este modelo não é compatível com atualizações do modo de coluna. As colunas de metadados replicadas no BigQuery são definidas como o modo REQUIRED. Todas as outras colunas replicadas no BigQuery são definidas como NULLABLE, independentemente de serem definidas como NOT NULL na tabela do Spanner. Não é possível atualizar as colunas NULLABLE para o modo REQUIRED no BigQuery.
  • Não é possível alterar o tipo de captura de valor de um fluxo de mudanças para pipelines em execução.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerInstanceId: a instância do Spanner em que os fluxos de alterações serão lidos.
  • spannerDatabase: o banco de dados do Spanner de onde os fluxos de alterações serão lidos.
  • spannerMetadataInstanceId: a instância do Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
  • spannerMetadataDatabase: o banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
  • spannerChangeStreamName: o nome do fluxo de alterações a ser lido pelo Spanner.
  • bigQueryDataset: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações.

Parâmetros opcionais

  • spannerProjectId: o projeto do qual os fluxos de alterações serão lidos. Esse valor também é o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O valor padrão desse parâmetro é o projeto em que o pipeline do Dataflow está em execução.
  • spannerDatabaseRole : o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte "Controle de acesso granular para fluxos de alteração" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for informado, uma tabela de metadados do conector de streams de alteração do Spanner será criada automaticamente durante o fluxo do pipeline. Você precisa fornecer esse parâmetro ao atualizar um pipeline atual. Caso contrário, não forneça esse parâmetro.
  • rpcPriority : a prioridade de solicitação das chamadas do Spanner. O valor precisa ser um destes valores: HIGH, MEDIUM ou LOW. O valor padrão é HIGH.
  • spannerHost: opcional: o endpoint do Cloud Spanner para chamar no modelo. Usado apenas para testes. Por exemplo: https://batch-spanner.googleapis.com.
  • startTimestamp : o DateTime inicial (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, a ser usado em fluxos de alterações de leitura. Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
  • endTimestamp : o DateTime final (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, para uso em fluxos de alterações de leitura.Ex-2021- 10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
  • bigQueryProjectId: o projeto do BigQuery. O valor padrão é o projeto do job do Dataflow.
  • bigQueryChangelogTableNameTemplate : o modelo do nome da tabela do BigQuery que contém o registro de alterações. Vai para o padrão: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory : o caminho para armazenar registros não processados. O caminho padrão é um diretório no local temporário do job do Dataflow. O valor padrão geralmente é suficiente.
  • dlqRetryMinutes: número de minutos entre novas tentativas de fila de mensagens inativas (DLQ). O valor padrão é 10.
  • ignoreFields : uma lista de campos separados por vírgulas (diferencia maiúsculas de minúsculas) a ser ignorados. Esses campos podem ser campos de tabelas monitoradas ou campos de metadados adicionados pelo pipeline. Campos ignorados não são inseridos no BigQuery. Quando você ignora o campo _metadata_spanner_table_name, o parâmetro bigQueryChangelogTableNameTemplate também é ignorado. O padrão é vazio.
  • disableDlqRetries: se as tentativas do DLQ serão desativadas ou não. O padrão é: falso.
  • useStorageWriteApi : se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : ao usar a API Storage Write, especifica a semântica de gravação. Para usar pelo menos uma semântica (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to BigQuery template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery usado para a saída de fluxos de alterações

A seguir