Modelo de fluxos de alterações do Spanner para Pub/Sub

O modelo de fluxos de alterações do Spanner para o Pub/Sub é um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em tópicos do Pub/Sub usando o Dataflow Runner V2.

Você precisa criar o novo tópico do Pub/Sub antes de enviar seus dados. Depois de criar, o Pub/Sub gera e anexa automaticamente uma assinatura ao novo tópico. Se você tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow vai gerar uma exceção e o pipeline vai travar enquanto tenta fazer uma conexão.

Se o tópico do Pub/Sub necessário já existir, os dados podem ser gerados para ele.

Para mais informações, consulte Sobre fluxos de alteração, Criar conexões de fluxos de alteração com o Dataflow e Práticas recomendadas para fluxos de alteração.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O tópico do Pub/Sub precisa ser criado antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
spannerInstanceId A instância do Spanner em que os fluxos de alterações serão lidos.
spannerDatabase O banco de dados do Spanner de onde os fluxos de alterações serão lidos.
spannerDatabaseRole Opcional: o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte Controle de acesso minucioso para streams de alteração.
spannerMetadataInstanceId A instância do Spanner a ser usada para a tabela de metadados do conector dos fluxos de alterações.
spannerMetadataDatabase O banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerChangeStreamName O nome do fluxo de alterações a ser lido pelo Spanner.
pubsubTopic O tópico do Pub/Sub para saída dos fluxos de alteração.
spannerProjectId (Opcional) Projeto do qual os fluxos de alterações serão lidos. Este é também o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
spannerMetadataTableName Opcional: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for informado, o Spanner automaticamente criará a tabela de metadados do conector dos fluxos durante a mudança do fluxo do pipeline. Você precisa fornecer esse parâmetro ao atualizar um pipeline atual. Não use esse parâmetro para outros casos.
rpcPriority Opcional: a prioridade de solicitação das chamadas do Spanner. O valor precisa ser um destes: [HIGH,MEDIUM,LOW]. (Padrão: HIGH)
startTimestamp (Opcional) O início DateTime, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
endTimestamp (Opcional) A terminação DateTime, inclusive, para usar em fluxos de alterações de leitura. Por exemplo, ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
outputFileFormat (Opcional) O formato da saída. A saída é encapsulada em muitas PubsubMessages e enviada para um tópico do Pub/Sub. Os formatos permitidos são JSON e AVRO. O padrão é JSON.
pubsubAPI (Opcional) API Pub/Sub usada para implementar o pipeline. As APIs permitidas são pubsubio e native_client. Para um pequeno número de consultas por segundo (QPS), native_client tem menos latência. Quando o QPS é alto, o pubsubio tem um desempenho melhor e mais estável. O padrão é pubsubio.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para saída dos fluxos de alteração

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: banco de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para saída dos fluxos de alteração

A seguir