Modelo de streams de alterações do Spanner para o BigQuery

O modelo de streams de alterações do Spanner para o BigQuery é um pipeline de streaming que faz stream de registos de alterações de dados do Spanner e os grava em tabelas do BigQuery através do Dataflow Runner V2.

Todas as colunas observadas do fluxo de alterações são incluídas em cada linha da tabela do BigQuery, independentemente de serem modificadas por uma transação do Spanner. As colunas não monitorizadas não são incluídas na linha do BigQuery. Todas as alterações do Spanner inferiores à marca cronológica do Dataflow são aplicadas com êxito às tabelas do BigQuery ou são armazenadas na fila de mensagens rejeitadas para nova tentativa. As linhas do BigQuery são inseridas fora de ordem em comparação com a ordenação da data/hora de confirmação do Spanner original.

Se as tabelas do BigQuery necessárias não existirem, o pipeline cria-as. Caso contrário, são usadas as tabelas do BigQuery existentes. O esquema das tabelas do BigQuery existentes tem de conter as colunas acompanhadas correspondentes das tabelas do Spanner e quaisquer colunas de metadados adicionais que não sejam ignoradas explicitamente pela opção ignoreFields. Consulte a descrição dos campos de metadados na lista seguinte. Cada nova linha do BigQuery inclui todas as colunas monitorizadas pelo fluxo de alterações a partir da respetiva linha na tabela do Spanner na data/hora do registo de alterações.

Os seguintes campos de metadados são adicionados às tabelas do BigQuery. Para mais detalhes sobre estes campos, consulte Registos de alterações de dados em "Partições, registos e consultas de streams de alterações".

  • _metadata_spanner_mod_type: o tipo de modificação (inserção, atualização ou eliminação) da transação do Spanner. Extraído do registo de alterações de dados do fluxo de alterações.
  • _metadata_spanner_table_name: o nome da tabela do Spanner. Este campo não é o nome da tabela de metadados do conetor.
  • _metadata_spanner_commit_timestamp: a data/hora de confirmação do Spanner, que é a hora em que uma alteração é confirmada. Este valor é extraído do registo de alteração de dados do fluxo de alterações.
  • _metadata_spanner_server_transaction_id: uma string globalmente exclusiva que representa a transação do Spanner na qual a alteração foi confirmada. Use este valor apenas no contexto do processamento de registos de fluxo de alterações. Não está correlacionado com o ID da transação na API do Spanner. Este valor é extraído do registo de alteração de dados do fluxo de alterações.
  • _metadata_spanner_record_sequence: O número de sequência do registo na transação do Spanner. Os números de sequência são garantidamente únicos e aumentam monotonicamente, mas não são necessariamente contíguos, numa transação. Este valor é extraído do registo de alteração de dados do fluxo de alterações.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: indica se o registo é o último registo de uma transação do Spanner na partição atual. Este valor é extraído do registo de alteração de dados do fluxo de alterações.
  • _metadata_spanner_number_of_records_in_transaction: O número de registos de alterações de dados que fazem parte da transação do Spanner em todas as partições de streams de alterações. Este valor é extraído do registo de alteração de dados do fluxo de alterações.
  • _metadata_spanner_number_of_partitions_in_transaction: O número de partições que devolvem registos de alterações de dados para a transação do Spanner. Este valor é extraído do registo de alteração de dados do fluxo de alterações.
  • _metadata_big_query_commit_timestamp: a data/hora de confirmação em que a linha é inserida no BigQuery. Se useStorageWriteApi for true, esta coluna não é criada automaticamente na tabela do histórico de alterações pelo pipeline. Nesse caso, tem de adicionar manualmente esta coluna na tabela do histórico de alterações e definir CURRENT_TIMESTAMP como o respetivo valor predefinido, se necessário.

Quando usar este modelo, tenha em atenção os seguintes detalhes:

  • Pode usar este modelo para propagar novas colunas em tabelas existentes ou novas tabelas do Spanner para o BigQuery. Para mais informações, consulte o artigo Faça a gestão da adição de tabelas ou colunas de acompanhamento.
  • Para os tipos de captura de valores OLD_AND_NEW_VALUES e NEW_VALUES, quando o registo de alteração de dados contém uma alteração UPDATE, o modelo tem de fazer uma leitura desatualizada para o Spanner na indicação de tempo de confirmação do registo de alteração de dados para obter as colunas não alteradas, mas monitorizadas. Certifique-se de que configura corretamente o "version_retention_period" da base de dados para a leitura desatualizada. Para o tipo de captura de valor NEW_ROW, o modelo é mais eficiente, porque o registo de alterações de dados captura a linha completa, incluindo colunas que não são atualizadas em pedidos UPDATE, e o modelo não precisa de fazer uma leitura desatualizada.
  • Para minimizar a latência da rede e os custos de transporte de rede, execute a tarefa do Dataflow a partir da mesma região que a sua instância do Spanner ou tabelas do BigQuery. Se usar origens, destinos, localizações de ficheiros de preparação ou localizações de ficheiros temporários que se encontram fora da região da sua tarefa, os seus dados podem ser enviados entre regiões. Para mais informações, consulte o artigo Regiões do Dataflow.
  • Este modelo suporta todos os tipos de dados do Spanner válidos. Se o tipo do BigQuery for mais preciso do que o tipo do Spanner, pode ocorrer uma perda de precisão durante a transformação. Especificamente:
    • Para o tipo JSON do Spanner, a ordem dos membros de um objeto é ordenada lexicograficamente, mas não existe essa garantia para o tipo JSON do BigQuery.
    • O Spanner suporta o tipo TIMESTAMP de nanossegundos, mas o BigQuery só suporta o tipo TIMESTAMP de microssegundos.

Saiba mais sobre as streams de alterações, como criar pipelines do Dataflow de streams de alterações e práticas recomendadas.

Requisitos do pipeline

  • A instância do Spanner tem de existir antes de executar o pipeline.
  • A base de dados do Spanner tem de existir antes de executar o pipeline.
  • A instância de metadados do Spanner tem de existir antes da execução do pipeline.
  • A base de dados de metadados do Spanner tem de existir antes de executar o pipeline.
  • A stream de alterações do Spanner tem de existir antes de executar o pipeline.
  • O conjunto de dados do BigQuery tem de existir antes da execução do pipeline.

Processar a adição de tabelas ou colunas de acompanhamento

Esta secção descreve as práticas recomendadas para processar a adição de tabelas e colunas do Spanner de acompanhamento enquanto o pipeline está em execução. A versão mais antiga do modelo suportada para esta funcionalidade é a 2024-09-19-00_RC00.

  • Antes de adicionar uma nova coluna a um âmbito de stream de alterações do Spanner, adicione primeiro a coluna à tabela de registo de alterações do BigQuery. A coluna adicionada tem de ter um tipo de dados correspondente e ser NULLABLE. Aguarde, pelo menos, 10 minutos antes de continuar a criar a nova coluna ou tabela no Spanner. A gravação na nova coluna sem esperar pode resultar num registo não processado com um código de erro inválido no diretório da fila de mensagens não entregues.
  • Para adicionar uma nova tabela, adicione primeiro a tabela na base de dados do Spanner. A tabela é criada automaticamente no BigQuery quando o pipeline recebe um registo para a nova tabela.
  • Depois de adicionar as novas colunas ou tabelas na base de dados do Spanner, certifique-se de que altera o fluxo de alterações para acompanhar as novas colunas ou tabelas que quer, se ainda não estiverem a ser acompanhadas implicitamente.
  • O modelo não elimina tabelas nem colunas do BigQuery. Se uma coluna for removida da tabela do Spanner, os valores nulos são preenchidos nas colunas do registo de alterações do BigQuery para os registos gerados após a remoção das colunas da tabela do Spanner, a menos que remova manualmente a coluna do BigQuery.
  • O modelo não suporta atualizações do tipo de coluna. Embora o Spanner suporte a alteração de uma coluna STRING para uma coluna BYTES ou de uma coluna BYTES para uma coluna STRING, não pode modificar o tipo de dados de uma coluna existente nem usar o mesmo nome de coluna com tipos de dados diferentes no BigQuery. Se eliminar e recriar uma coluna com o mesmo nome, mas um tipo diferente no Spanner, os dados podem ser escritos na coluna do BigQuery existente, mas o tipo permanece inalterado.
  • Este modelo não suporta atualizações do modo de colunas. As colunas de metadados replicadas no BigQuery são definidas para o modo REQUIRED. Todas as outras colunas replicadas no BigQuery são definidas como NULLABLE, independentemente de estarem definidas como NOT NULL na tabela do Spanner. Não pode atualizar as colunas NULLABLE para o modo REQUIRED no BigQuery.
  • A alteração do tipo de captura de valor de um fluxo de alterações não é suportada para pipelines em execução.

Parâmetros de modelos

Parâmetros obrigatórios

  • spannerInstanceId: a instância do Spanner a partir da qual ler streams de alterações.
  • spannerDatabase: a base de dados do Spanner a partir da qual ler as streams de alterações.
  • spannerMetadataInstanceId: a instância do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerMetadataDatabase: a base de dados do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerChangeStreamName: o nome da stream de alterações do Spanner a partir da qual os dados serão lidos.
  • bigQueryDataset: o conjunto de dados do BigQuery para a saída de streams de alterações.

Parâmetros opcionais

  • spannerProjectId: o projeto a partir do qual ler os fluxos de alterações. Este valor também é o projeto onde a tabela de metadados do conetor de streams de alterações é criada. O valor predefinido para este parâmetro é o projeto onde o pipeline do Dataflow está a ser executado.
  • spannerDatabaseRole: a função da base de dados do Spanner a usar quando executar o modelo. Este parâmetro só é necessário quando o principal do IAM que está a executar o modelo é um utilizador do controlo de acesso detalhado. A função da base de dados tem de ter o privilégio SELECT na stream de alterações e o privilégio EXECUTE na função de leitura da stream de alterações. Para mais informações, consulte o artigo Controlo de acesso detalhado para streams de alterações (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conetor de streams de alterações do Spanner a usar. Se não for fornecida, é criada automaticamente uma tabela de metadados do conector de streams de alterações do Spanner durante o fluxo do pipeline. Tem de fornecer este parâmetro quando atualizar um pipeline existente. Caso contrário, não indique este parâmetro.
  • rpcPriority: a prioridade do pedido para chamadas do Spanner. O valor tem de ser um dos seguintes: HIGH, MEDIUM ou LOW. O valor predefinido é HIGH.
  • spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Usado apenas para testes. Por exemplo, https://batch-spanner.googleapis.com.
  • startTimestamp: a data/hora de início (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, a usar para ler streams de alterações. Ex-2021-10-12T07:20:50.52Z. A predefinição é a data/hora em que o pipeline é iniciado, ou seja, a hora atual.
  • endTimestamp: a data/hora de fim (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, a usar para ler streams de alterações.Exemplo: 2021-10-12T07:20:50.52Z. A predefinição é um tempo infinito no futuro.
  • bigQueryProjectId: o projeto do BigQuery. O valor predefinido é o projeto para a tarefa do Dataflow.
  • bigQueryChangelogTableNameTemplate: o modelo para o nome da tabela do BigQuery que contém o histórico de alterações. A predefinição é: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory: o caminho para armazenar quaisquer registos não processados. O caminho predefinido é um diretório na localização temporária da tarefa do Dataflow. Normalmente, o valor predefinido é suficiente.
  • dlqRetryMinutes: o número de minutos entre as novas tentativas da fila de mensagens rejeitadas. O valor predefinido é 10.
  • ignoreFields: uma lista de campos (sensível a maiúsculas e minúsculas) separada por vírgulas a ignorar. Estes campos podem ser campos de tabelas observadas ou campos de metadados adicionados pelo pipeline. Os campos ignorados não são inseridos no BigQuery. Quando ignora o campo _metadata_spanner_table_name, o parâmetro bigQueryChangelogTableNameTemplate também é ignorado. A predefinição é vazio.
  • disableDlqRetries: indica se as repetições para a DLQ devem ou não ser desativadas. A predefinição é: false.
  • useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar a semântica exatamente uma vez, defina o parâmetro como false. Este parâmetro só se aplica quando useStorageWriteApi é true. O valor predefinido é false.
  • numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Spanner change streams to BigQuery template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: base de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery para a saída de streams de alterações

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: base de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Spanner
  • BIGQUERY_DATASET: o conjunto de dados do BigQuery para a saída de streams de alterações

O que se segue?