Modelo de fluxos de alterações do Spanner para Pub/Sub

O modelo de fluxos de alterações do Spanner para o Pub/Sub é um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em tópicos do Pub/Sub usando o Dataflow Runner V2.

Você precisa criar o novo tópico do Pub/Sub antes de enviar seus dados. Depois de criar, o Pub/Sub gera e anexa automaticamente uma assinatura ao novo tópico. Se você tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow vai gerar uma exceção e o pipeline vai travar enquanto tenta fazer uma conexão.

Se o tópico do Pub/Sub necessário já existir, os dados podem ser gerados para ele.

Para mais informações, consulte Sobre fluxos de alteração, Criar conexões de fluxos de alteração com o Dataflow e Práticas recomendadas para fluxos de alteração.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O tópico do Pub/Sub precisa ser criado antes da execução do pipeline.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerInstanceId: a instância do Spanner em que os fluxos de alterações serão lidos.
  • spannerDatabase: o banco de dados do Spanner de onde os fluxos de alterações serão lidos.
  • spannerMetadataInstanceId: a instância do Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
  • spannerMetadataDatabase: o banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
  • spannerChangeStreamName: o nome do fluxo de alterações a ser lido pelo Spanner.
  • pubsubTopic: o tópico do Pub/Sub para saída do fluxo de alterações.

Parâmetros opcionais

  • spannerProjectId: o projeto do qual os fluxos de alterações serão lidos. Esse projeto também é onde a tabela de metadados do conector de fluxos de alteração é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
  • spannerDatabaseRole : o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte "Controle de acesso granular para fluxos de alteração" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for informado, o Spanner automaticamente criará a tabela de metadados do conector dos fluxos durante a mudança do fluxo do pipeline. Você precisa fornecer esse parâmetro ao atualizar um pipeline atual. Não use esse parâmetro para outros casos.
  • startTimestamp: o DateTime inicial (https://tools.ietf.org/html/rfc3339), inclusivo, a ser usado na leitura de fluxo de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
  • endTimestamp: o DateTime (https://tools.ietf.org/html/rfc3339) final, inclusivo, a ser usado na leitura de fluxo de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
  • spannerHost: o endpoint do Cloud Spanner para chamar no modelo. Usado apenas para testes. Exemplo: https://spanner.googleapis.com. O padrão é: https://spanner.googleapis.com.
  • outputDataFormat: o formato da saída. A saída é encapsulada em muitas PubsubMessages e enviada para um tópico do Pub/Sub. Os formatos permitidos são JSON e AVRO. O padrão é JSON.
  • pubsubAPI: A API Pub/Sub usada para implementar o pipeline. As APIs permitidas são pubsubio e native_client. Para um pequeno número de consultas por segundo (QPS), native_client tem menos latência. Quando o QPS é alto, pubsubio tem um desempenho melhor e mais estável. O padrão é pubsubio.
  • pubsubProjectId : projeto do tópico do Pub/Sub. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
  • rpcPriority : a prioridade de solicitação das chamadas do Spanner. Os valores permitidos são ALTO, MÉDIO e BAIXO. O valor padrão é ALTO.
  • includeSpannerSource : indica se o ID do banco de dados do Spanner e o ID da instância devem ser incluídos para ler o fluxo de alterações nos dados da mensagem de saída. O padrão é: falso.
  • outputMessageMetadata : o valor da string para o campo personalizado outputMessageMetadata na mensagem de saída do Pub/Sub. O padrão é vazio, e o campo outputMessageMetadata só é preenchido se esse valor não estiver vazio. Use caracteres de escape para qualquer caractere especial ao inserir o valor aqui(por exemplo, aspas duplas).

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para saída dos fluxos de alteração

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para saída dos fluxos de alteração

A seguir