O modelo de fluxos de alterações do Spanner para o Pub/Sub é um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em tópicos do Pub/Sub usando o Dataflow Runner V2.
Você precisa criar o novo tópico do Pub/Sub antes de enviar seus dados. Depois de criar, o Pub/Sub gera e anexa automaticamente uma assinatura ao novo tópico. Se você tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow vai gerar uma exceção e o pipeline vai travar enquanto tenta fazer uma conexão.
Se o tópico do Pub/Sub necessário já existir, os dados podem ser gerados para ele.
Para mais informações, consulte Sobre fluxos de alteração, Criar conexões de fluxos de alteração com o Dataflow e Práticas recomendadas para fluxos de alteração.
Requisitos de pipeline
- A instância do Spanner precisa existir antes da execução do pipeline.
- O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
- A instância de metadados do Spanner precisa existir antes da execução do pipeline.
- O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
- O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
- O tópico do Pub/Sub precisa ser criado antes da execução do pipeline.
Parâmetros do modelo
Parâmetros obrigatórios
- spannerInstanceId: a instância do Spanner em que os fluxos de alterações serão lidos.
- spannerDatabase: o banco de dados do Spanner de onde os fluxos de alterações serão lidos.
- spannerMetadataInstanceId: a instância do Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
- spannerMetadataDatabase: o banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
- spannerChangeStreamName: o nome do fluxo de alterações a ser lido pelo Spanner.
- pubsubTopic: o tópico do Pub/Sub para saída do fluxo de alterações.
Parâmetros opcionais
- spannerProjectId: o projeto do qual os fluxos de alterações serão lidos. Esse projeto também é onde a tabela de metadados do conector de fluxos de alteração é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
- spannerDatabaseRole : o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio
SELECT
no fluxo de alterações e o privilégioEXECUTE
na função de leitura do fluxo de alterações. Para mais informações, consulte "Controle de acesso granular para fluxos de alteração" (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for informado, o Spanner automaticamente criará a tabela de metadados do conector dos fluxos durante a mudança do fluxo do pipeline. Você precisa fornecer esse parâmetro ao atualizar um pipeline atual. Não use esse parâmetro para outros casos.
- startTimestamp: o DateTime inicial (https://tools.ietf.org/html/rfc3339), inclusivo, a ser usado na leitura de fluxo de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
- endTimestamp: o DateTime (https://tools.ietf.org/html/rfc3339) final, inclusivo, a ser usado na leitura de fluxo de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
- spannerHost: o endpoint do Cloud Spanner para chamar no modelo. Usado apenas para testes. Exemplo: https://spanner.googleapis.com. O padrão é: https://spanner.googleapis.com.
- outputDataFormat: o formato da saída. A saída é encapsulada em muitas PubsubMessages e enviada para um tópico do Pub/Sub. Os formatos permitidos são JSON e AVRO. O padrão é JSON.
- pubsubAPI: A API Pub/Sub usada para implementar o pipeline. As APIs permitidas são
pubsubio
enative_client
. Para um pequeno número de consultas por segundo (QPS),native_client
tem menos latência. Quando o QPS é alto,pubsubio
tem um desempenho melhor e mais estável. O padrão épubsubio
. - pubsubProjectId : projeto do tópico do Pub/Sub. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
- rpcPriority : a prioridade de solicitação das chamadas do Spanner. Os valores permitidos são ALTO, MÉDIO e BAIXO. O valor padrão é ALTO.
- includeSpannerSource : indica se o ID do banco de dados do Spanner e o ID da instância devem ser incluídos para ler o fluxo de alterações nos dados da mensagem de saída. O padrão é: falso.
- outputMessageMetadata : o valor da string para o campo personalizado outputMessageMetadata na mensagem de saída do Pub/Sub. O padrão é vazio, e o campo outputMessageMetadata só é preenchido se esse valor não estiver vazio. Use caracteres de escape para qualquer caractere especial ao inserir o valor aqui(por exemplo, aspas duplas).
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Pub/Sub template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
Substitua:
JOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
SPANNER_INSTANCE_ID
: ID da instância do SpannerSPANNER_DATABASE
: banco de dados do SpannerSPANNER_METADATA_INSTANCE_ID
: ID da instância de metadados do SpannerSPANNER_METADATA_DATABASE
: banco de dados de metadados do SpannerSPANNER_CHANGE_STREAM
: fluxo de alterações do SpannerPUBSUB_TOPIC
: o tópico do Pub/Sub para saída dos fluxos de alteração
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
SPANNER_INSTANCE_ID
: ID da instância do SpannerSPANNER_DATABASE
: banco de dados do SpannerSPANNER_METADATA_INSTANCE_ID
: ID da instância de metadados do SpannerSPANNER_METADATA_DATABASE
: banco de dados de metadados do SpannerSPANNER_CHANGE_STREAM
: fluxo de alterações do SpannerPUBSUB_TOPIC
: o tópico do Pub/Sub para saída dos fluxos de alteração
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.