Modelo de streams de alterações do Spanner para o Pub/Sub

O modelo de streams de alterações do Spanner para o Pub/Sub é um pipeline de streaming que transmite registos de alterações de dados do Spanner e escreve-os em tópicos do Pub/Sub usando o Dataflow Runner V2.

Para enviar os dados para um novo tópico do Pub/Sub, primeiro tem de criar o tópico. Após a criação, o Pub/Sub gera e anexa automaticamente uma subscrição ao novo tópico. Se tentar enviar dados para um tópico do Pub/Sub que não existe, o pipeline do Dataflow gera uma exceção e fica bloqueado, pois tenta continuamente estabelecer uma ligação.

Se o tópico do Pub/Sub necessário já existir, pode enviar dados para esse tópico.

Para mais informações, consulte os artigos Acerca das streams de alterações, Crie ligações de streams de alterações com o Dataflow, e Práticas recomendadas para streams de alterações.

Requisitos do pipeline

  • A instância do Spanner tem de existir antes de executar o pipeline.
  • A base de dados do Spanner tem de existir antes de executar o pipeline.
  • A instância de metadados do Spanner tem de existir antes da execução do pipeline.
  • A base de dados de metadados do Spanner tem de existir antes de executar o pipeline.
  • A stream de alterações do Spanner tem de existir antes de executar o pipeline.
  • O tópico Pub/Sub tem de existir antes de executar o pipeline.

Parâmetros de modelos

Parâmetros obrigatórios

  • spannerInstanceId: a instância do Spanner a partir da qual ler streams de alterações.
  • spannerDatabase: a base de dados do Spanner a partir da qual ler as streams de alterações.
  • spannerMetadataInstanceId: a instância do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerMetadataDatabase: a base de dados do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerChangeStreamName: o nome da stream de alterações do Spanner a partir da qual os dados serão lidos.
  • pubsubTopic: o tópico do Pub/Sub para a saída de streams de alterações.

Parâmetros opcionais

  • spannerProjectId: o projeto a partir do qual ler os fluxos de alterações. É também neste projeto que a tabela de metadados do conetor de streams de alterações é criada. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
  • spannerDatabaseRole: a função da base de dados do Spanner a usar quando executar o modelo. Este parâmetro só é necessário quando o principal do IAM que está a executar o modelo é um utilizador do controlo de acesso detalhado. A função da base de dados tem de ter o privilégio SELECT na stream de alterações e o privilégio EXECUTE na função de leitura da stream de alterações. Para mais informações, consulte o artigo Controlo de acesso detalhado para streams de alterações (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conetor de streams de alterações do Spanner a usar. Se não for fornecida, o Spanner cria automaticamente a tabela de metadados do conector de streams durante a alteração do fluxo do pipeline. Tem de fornecer este parâmetro quando atualizar um pipeline existente. Não use este parâmetro para outros casos.
  • startTimestamp: o DateTime de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. A predefinição é a data/hora em que o pipeline é iniciado, ou seja, a hora atual.
  • endTimestamp: o DateTime de fim (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler fluxos de alterações. Por exemplo, ex- 2021-10-12T07:20:50.52Z. A predefinição é um tempo infinito no futuro.
  • spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Usado apenas para testes. Por exemplo, https://spanner.googleapis.com. O valor predefinido é: https://spanner.googleapis.com.
  • outputDataFormat: o formato da saída. O resultado é envolvido em muitas PubsubMessages e enviado para um tópico Pub/Sub. Os formatos permitidos são JSON e AVRO. A predefinição é JSON.
  • pubsubAPI: a API Pub/Sub usada para implementar o pipeline. As APIs permitidas são pubsubio e native_client. Para um pequeno número de consultas por segundo (CPS), o native_client tem uma latência inferior. Para um grande número de CPS, a pubsubio oferece um desempenho melhor e mais estável. A predefinição é pubsubio.
  • pubsubProjectId: projeto do tópico do Pub/Sub. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
  • rpcPriority: a prioridade do pedido para chamadas do Spanner. Os valores permitidos são HIGH, MEDIUM e LOW. A predefinição é: HIGH).
  • includeSpannerSource: indica se deve ou não incluir o ID da base de dados e o ID da instância do Spanner para ler a stream de alterações dos dados da mensagem de saída. A predefinição é: false.
  • outputMessageMetadata: o valor da string para o campo personalizado outputMessageMetadata na mensagem de publicação/subscrição de saída. A predefinição é vazio e o campo outputMessageMetadata só é preenchido se este valor não estiver vazio. Introduza os carateres especiais de forma literal quando introduzir o valor aqui(ou seja, aspas duplas).

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Spanner change streams to Pub/Sub template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: base de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para a saída de streams de alterações

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Spanner
  • SPANNER_DATABASE: base de dados do Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Spanner
  • PUBSUB_TOPIC: o tópico do Pub/Sub para a saída de streams de alterações

O que se segue?