O modelo de streams de alterações do Spanner para o BigQuery é um pipeline de streaming que faz stream de registos de alterações de dados do Spanner e os grava em tabelas do BigQuery através do Dataflow Runner V2.
Todas as colunas observadas do fluxo de alterações são incluídas em cada linha da tabela do BigQuery, independentemente de serem modificadas por uma transação do Spanner. As colunas não monitorizadas não são incluídas na linha do BigQuery. Todas as alterações do Spanner inferiores à marca cronológica do Dataflow são aplicadas com êxito às tabelas do BigQuery ou são armazenadas na fila de mensagens rejeitadas para nova tentativa. As linhas do BigQuery são inseridas fora de ordem em comparação com a ordenação da data/hora de confirmação do Spanner original.
Se as tabelas do BigQuery necessárias não existirem, o pipeline cria-as. Caso contrário,
são usadas as tabelas do BigQuery existentes. O esquema das tabelas do BigQuery existentes tem de conter as colunas acompanhadas correspondentes das tabelas do Spanner e quaisquer colunas de metadados adicionais que não sejam ignoradas explicitamente pela opção ignoreFields
.
Consulte a descrição dos campos de metadados na lista seguinte.
Cada nova linha do BigQuery inclui todas as colunas
monitorizadas pelo fluxo de alterações a partir da respetiva linha na tabela do Spanner na
data/hora do registo de alterações.
Os seguintes campos de metadados são adicionados às tabelas do BigQuery. Para mais detalhes sobre estes campos, consulte Registos de alterações de dados em "Partições, registos e consultas de streams de alterações".
_metadata_spanner_mod_type
: o tipo de modificação (inserção, atualização ou eliminação) da transação do Spanner. Extraído do registo de alterações de dados do fluxo de alterações._metadata_spanner_table_name
: o nome da tabela do Spanner. Este campo não é o nome da tabela de metadados do conetor._metadata_spanner_commit_timestamp
: a data/hora de confirmação do Spanner, que é a hora em que uma alteração é confirmada. Este valor é extraído do registo de alteração de dados do fluxo de alterações._metadata_spanner_server_transaction_id
: uma string globalmente exclusiva que representa a transação do Spanner na qual a alteração foi confirmada. Use este valor apenas no contexto do processamento de registos de fluxo de alterações. Não está correlacionado com o ID da transação na API do Spanner. Este valor é extraído do registo de alteração de dados do fluxo de alterações._metadata_spanner_record_sequence
: O número de sequência do registo na transação do Spanner. Os números de sequência são garantidamente únicos e aumentam monotonicamente, mas não são necessariamente contíguos, numa transação. Este valor é extraído do registo de alteração de dados do fluxo de alterações._metadata_spanner_is_last_record_in_transaction_in_partition
: indica se o registo é o último registo de uma transação do Spanner na partição atual. Este valor é extraído do registo de alteração de dados do fluxo de alterações._metadata_spanner_number_of_records_in_transaction
: O número de registos de alterações de dados que fazem parte da transação do Spanner em todas as partições de streams de alterações. Este valor é extraído do registo de alteração de dados do fluxo de alterações._metadata_spanner_number_of_partitions_in_transaction
: O número de partições que devolvem registos de alterações de dados para a transação do Spanner. Este valor é extraído do registo de alteração de dados do fluxo de alterações._metadata_big_query_commit_timestamp
: a data/hora de confirmação em que a linha é inserida no BigQuery. SeuseStorageWriteApi
fortrue
, esta coluna não é criada automaticamente na tabela do histórico de alterações pelo pipeline. Nesse caso, tem de adicionar manualmente esta coluna na tabela do histórico de alterações e definirCURRENT_TIMESTAMP
como o respetivo valor predefinido, se necessário.
Quando usar este modelo, tenha em atenção os seguintes detalhes:
- Pode usar este modelo para propagar novas colunas em tabelas existentes ou novas tabelas do Spanner para o BigQuery. Para mais informações, consulte o artigo Faça a gestão da adição de tabelas ou colunas de acompanhamento.
- Para os tipos de captura de valores
OLD_AND_NEW_VALUES
eNEW_VALUES
, quando o registo de alteração de dados contém uma alteração UPDATE, o modelo tem de fazer uma leitura desatualizada para o Spanner na indicação de tempo de confirmação do registo de alteração de dados para obter as colunas não alteradas, mas monitorizadas. Certifique-se de que configura corretamente o "version_retention_period" da base de dados para a leitura desatualizada. Para o tipo de captura de valorNEW_ROW
, o modelo é mais eficiente, porque o registo de alterações de dados captura a linha completa, incluindo colunas que não são atualizadas em pedidos UPDATE, e o modelo não precisa de fazer uma leitura desatualizada. - Para minimizar a latência da rede e os custos de transporte de rede, execute a tarefa do Dataflow a partir da mesma região que a sua instância do Spanner ou tabelas do BigQuery. Se usar origens, destinos, localizações de ficheiros de preparação ou localizações de ficheiros temporários que se encontram fora da região da sua tarefa, os seus dados podem ser enviados entre regiões. Para mais informações, consulte o artigo Regiões do Dataflow.
- Este modelo suporta todos os tipos de dados do Spanner válidos. Se o tipo do BigQuery for mais preciso do que o tipo do Spanner, pode ocorrer uma perda de precisão durante a transformação. Especificamente:
- Para o tipo JSON do Spanner, a ordem dos membros de um objeto é ordenada lexicograficamente, mas não existe essa garantia para o tipo JSON do BigQuery.
- O Spanner suporta o tipo TIMESTAMP de nanossegundos, mas o BigQuery só suporta o tipo TIMESTAMP de microssegundos.
Saiba mais sobre as streams de alterações, como criar pipelines do Dataflow de streams de alterações e práticas recomendadas.
Requisitos do pipeline
- A instância do Spanner tem de existir antes de executar o pipeline.
- A base de dados do Spanner tem de existir antes de executar o pipeline.
- A instância de metadados do Spanner tem de existir antes da execução do pipeline.
- A base de dados de metadados do Spanner tem de existir antes de executar o pipeline.
- A stream de alterações do Spanner tem de existir antes de executar o pipeline.
- O conjunto de dados do BigQuery tem de existir antes da execução do pipeline.
Processar a adição de tabelas ou colunas de acompanhamento
Esta secção descreve as práticas recomendadas para processar a adição de tabelas e colunas do Spanner de acompanhamento enquanto o pipeline está em execução. A versão mais antiga do modelo suportada
para esta funcionalidade é a 2024-09-19-00_RC00
.
- Antes de adicionar uma nova coluna a um âmbito de stream de alterações do Spanner,
adicione primeiro a coluna à tabela de registo de alterações do BigQuery. A coluna adicionada tem de ter um tipo de dados correspondente e ser
NULLABLE
. Aguarde, pelo menos, 10 minutos antes de continuar a criar a nova coluna ou tabela no Spanner. A gravação na nova coluna sem esperar pode resultar num registo não processado com um código de erro inválido no diretório da fila de mensagens não entregues. - Para adicionar uma nova tabela, adicione primeiro a tabela na base de dados do Spanner. A tabela é criada automaticamente no BigQuery quando o pipeline recebe um registo para a nova tabela.
- Depois de adicionar as novas colunas ou tabelas na base de dados do Spanner, certifique-se de que altera o fluxo de alterações para acompanhar as novas colunas ou tabelas que quer, se ainda não estiverem a ser acompanhadas implicitamente.
- O modelo não elimina tabelas nem colunas do BigQuery. Se uma coluna for removida da tabela do Spanner, os valores nulos são preenchidos nas colunas do registo de alterações do BigQuery para os registos gerados após a remoção das colunas da tabela do Spanner, a menos que remova manualmente a coluna do BigQuery.
- O modelo não suporta atualizações do tipo de coluna. Embora o Spanner suporte a alteração de uma coluna
STRING
para uma colunaBYTES
ou de uma colunaBYTES
para uma colunaSTRING
, não pode modificar o tipo de dados de uma coluna existente nem usar o mesmo nome de coluna com tipos de dados diferentes no BigQuery. Se eliminar e recriar uma coluna com o mesmo nome, mas um tipo diferente no Spanner, os dados podem ser escritos na coluna do BigQuery existente, mas o tipo permanece inalterado. - Este modelo não suporta atualizações do modo de colunas. As colunas de metadados
replicadas no BigQuery são definidas para o modo
REQUIRED
. Todas as outras colunas replicadas no BigQuery são definidas comoNULLABLE
, independentemente de estarem definidas comoNOT NULL
na tabela do Spanner. Não pode atualizar as colunasNULLABLE
para o modoREQUIRED
no BigQuery. - A alteração do tipo de captura de valor de um fluxo de alterações não é suportada para pipelines em execução.
Parâmetros de modelos
Parâmetros obrigatórios
- spannerInstanceId: a instância do Spanner a partir da qual ler streams de alterações.
- spannerDatabase: a base de dados do Spanner a partir da qual ler as streams de alterações.
- spannerMetadataInstanceId: a instância do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
- spannerMetadataDatabase: a base de dados do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
- spannerChangeStreamName: o nome da stream de alterações do Spanner a partir da qual os dados serão lidos.
- bigQueryDataset: o conjunto de dados do BigQuery para a saída de streams de alterações.
Parâmetros opcionais
- spannerProjectId: o projeto a partir do qual ler os fluxos de alterações. Este valor também é o projeto onde a tabela de metadados do conetor de streams de alterações é criada. O valor predefinido para este parâmetro é o projeto onde o pipeline do Dataflow está a ser executado.
- spannerDatabaseRole: a função da base de dados do Spanner a usar quando executar o modelo. Este parâmetro só é necessário quando o principal do IAM que está a executar o modelo é um utilizador do controlo de acesso detalhado. A função da base de dados tem de ter o privilégio
SELECT
na stream de alterações e o privilégioEXECUTE
na função de leitura da stream de alterações. Para mais informações, consulte o artigo Controlo de acesso detalhado para streams de alterações (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName: o nome da tabela de metadados do conetor de streams de alterações do Spanner a usar. Se não for fornecida, é criada automaticamente uma tabela de metadados do conector de streams de alterações do Spanner durante o fluxo do pipeline. Tem de fornecer este parâmetro quando atualizar um pipeline existente. Caso contrário, não indique este parâmetro.
- rpcPriority: a prioridade do pedido para chamadas do Spanner. O valor tem de ser um dos seguintes:
HIGH
,MEDIUM
ouLOW
. O valor predefinido éHIGH
. - spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Usado apenas para testes. Por exemplo,
https://batch-spanner.googleapis.com
. - startTimestamp: a data/hora de início (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, a usar para ler streams de alterações. Ex-2021-10-12T07:20:50.52Z. A predefinição é a data/hora em que o pipeline é iniciado, ou seja, a hora atual.
- endTimestamp: a data/hora de fim (https://datatracker.ietf.org/doc/html/rfc3339), inclusive, a usar para ler streams de alterações.Exemplo: 2021-10-12T07:20:50.52Z. A predefinição é um tempo infinito no futuro.
- bigQueryProjectId: o projeto do BigQuery. O valor predefinido é o projeto para a tarefa do Dataflow.
- bigQueryChangelogTableNameTemplate: o modelo para o nome da tabela do BigQuery que contém o histórico de alterações. A predefinição é: {_metadata_spanner_table_name}_changelog.
- deadLetterQueueDirectory: o caminho para armazenar quaisquer registos não processados. O caminho predefinido é um diretório na localização temporária da tarefa do Dataflow. Normalmente, o valor predefinido é suficiente.
- dlqRetryMinutes: o número de minutos entre as novas tentativas da fila de mensagens rejeitadas. O valor predefinido é
10
. - ignoreFields: uma lista de campos (sensível a maiúsculas e minúsculas) separada por vírgulas a ignorar. Estes campos podem ser campos de tabelas observadas ou campos de metadados adicionados pelo pipeline. Os campos ignorados não são inseridos no BigQuery. Quando ignora o campo _metadata_spanner_table_name, o parâmetro bigQueryChangelogTableNameTemplate também é ignorado. A predefinição é vazio.
- disableDlqRetries: indica se as repetições para a DLQ devem ou não ser desativadas. A predefinição é: false.
- useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é
false
. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como
true
. Para usar a semântica exatamente uma vez, defina o parâmetro comofalse
. Este parâmetro só se aplica quandouseStorageWriteApi
étrue
. O valor predefinido éfalse
. - numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro. A predefinição é: 0. - storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro.
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Cloud Spanner change streams to BigQuery template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
SPANNER_INSTANCE_ID
: ID da instância do SpannerSPANNER_DATABASE
: base de dados do SpannerSPANNER_METADATA_INSTANCE_ID
: ID da instância de metadados do SpannerSPANNER_METADATA_DATABASE
: base de dados de metadados do SpannerSPANNER_CHANGE_STREAM
: stream de alterações do SpannerBIGQUERY_DATASET
: o conjunto de dados do BigQuery para a saída de streams de alterações
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
SPANNER_INSTANCE_ID
: ID da instância do SpannerSPANNER_DATABASE
: base de dados do SpannerSPANNER_METADATA_INSTANCE_ID
: ID da instância de metadados do SpannerSPANNER_METADATA_DATABASE
: base de dados de metadados do SpannerSPANNER_CHANGE_STREAM
: stream de alterações do SpannerBIGQUERY_DATASET
: o conjunto de dados do BigQuery para a saída de streams de alterações
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.