O modelo de streams de alterações do Spanner para o Pub/Sub é um pipeline de streaming que transmite registos de alterações de dados do Spanner e escreve-os em tópicos do Pub/Sub usando o Dataflow Runner V2.
Para enviar os dados para um novo tópico do Pub/Sub, primeiro tem de criar o tópico. Após a criação, o Pub/Sub gera e anexa automaticamente uma subscrição ao novo tópico. Se tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow gera uma exceção e fica bloqueado, pois tenta continuamente estabelecer uma ligação.
Se o tópico do Pub/Sub necessário já existir, pode enviar dados para esse tópico.
Para mais informações, consulte os artigos Acerca das streams de alterações, Crie ligações de streams de alterações com o Dataflow, e Práticas recomendadas para streams de alterações.
Requisitos do pipeline
- A instância do Spanner tem de existir antes de executar o pipeline.
- A base de dados do Spanner tem de existir antes de executar o pipeline.
- A instância de metadados do Spanner tem de existir antes da execução do pipeline.
- A base de dados de metadados do Spanner tem de existir antes de executar o pipeline.
- A stream de alterações do Spanner tem de existir antes de executar o pipeline.
- O tópico Pub/Sub tem de existir antes de executar o pipeline.
Parâmetros de modelos
Parâmetros obrigatórios
- spannerInstanceId: a instância do Spanner a partir da qual ler streams de alterações.
- spannerDatabase: a base de dados do Spanner a partir da qual ler as streams de alterações.
- spannerMetadataInstanceId: a instância do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
- spannerMetadataDatabase: a base de dados do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
- spannerChangeStreamName: o nome da stream de alterações do Spanner a partir da qual os dados serão lidos.
- pubsubTopic: o tópico do Pub/Sub para a saída de streams de alterações.
Parâmetros opcionais
- spannerProjectId: o projeto a partir do qual ler os fluxos de alterações. É também neste projeto que a tabela de metadados do conetor de streams de alterações é criada. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
- spannerDatabaseRole: a função da base de dados do Spanner a usar quando executar o modelo. Este parâmetro só é necessário quando o principal do IAM que está a executar o modelo é um utilizador do controlo de acesso detalhado. A função da base de dados tem de ter o privilégio
SELECT
na stream de alterações e o privilégioEXECUTE
na função de leitura da stream de alterações. Para mais informações, consulte o artigo Controlo de acesso detalhado para streams de alterações (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName: o nome da tabela de metadados do conetor de streams de alterações do Spanner a usar. Se não for fornecida, o Spanner cria automaticamente a tabela de metadados do conector de streams durante a alteração do fluxo do pipeline. Tem de fornecer este parâmetro quando atualizar um pipeline existente. Não use este parâmetro para outros casos.
- startTimestamp: o DateTime de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. A predefinição é a data/hora em que o pipeline é iniciado, ou seja, a hora atual.
- endTimestamp: o DateTime de fim (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler fluxos de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. A predefinição é um tempo infinito no futuro.
- spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Usado apenas para testes. Por exemplo,
https://spanner.googleapis.com
. O valor predefinido é: https://spanner.googleapis.com. - outputDataFormat: o formato da saída. O resultado é envolvido em muitas PubsubMessages e enviado para um tópico Pub/Sub. Os formatos permitidos são JSON e AVRO. A predefinição é JSON.
- pubsubAPI: a API Pub/Sub usada para implementar o pipeline. As APIs permitidas são
pubsubio
enative_client
. Para um pequeno número de consultas por segundo (CPS), onative_client
tem uma latência inferior. Para um grande número de CPS, apubsubio
oferece um desempenho melhor e mais estável. A predefinição épubsubio
. - pubsubProjectId: projeto do tópico do Pub/Sub. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
- rpcPriority: a prioridade do pedido para chamadas do Spanner. Os valores permitidos são HIGH, MEDIUM e LOW. A predefinição é: HIGH).
- includeSpannerSource: indica se deve ou não incluir o ID da base de dados e o ID da instância do Spanner para ler a stream de alterações dos dados da mensagem de saída. A predefinição é: false.
- outputMessageMetadata: o valor da string para o campo personalizado outputMessageMetadata na mensagem de publicação/subscrição de saída. A predefinição é vazio e o campo outputMessageMetadata só é preenchido se este valor não estiver vazio. Introduza os carateres especiais de forma literal quando introduzir o valor aqui(ou seja, aspas duplas).
Execute o modelo
Consola
- Aceda à página do fluxo de dados Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1
.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Cloud Spanner change streams to Pub/Sub template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
Substitua o seguinte:
JOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
SPANNER_INSTANCE_ID
: ID da instância do SpannerSPANNER_DATABASE
: base de dados do SpannerSPANNER_METADATA_INSTANCE_ID
: ID da instância de metadados do SpannerSPANNER_METADATA_DATABASE
: base de dados de metadados do SpannerSPANNER_CHANGE_STREAM
: stream de alterações do SpannerPUBSUB_TOPIC
: o tópico do Pub/Sub para a saída de streams de alterações
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME
: um nome de tarefa exclusivo à sua escolhaVERSION
: a versão do modelo que quer usarPode usar os seguintes valores:
latest
para usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1
SPANNER_INSTANCE_ID
: ID da instância do SpannerSPANNER_DATABASE
: base de dados do SpannerSPANNER_METADATA_INSTANCE_ID
: ID da instância de metadados do SpannerSPANNER_METADATA_DATABASE
: base de dados de metadados do SpannerSPANNER_CHANGE_STREAM
: stream de alterações do SpannerPUBSUB_TOPIC
: o tópico do Pub/Sub para a saída de streams de alterações
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.