Modelo de fluxos de alterações do Spanner para Cloud Storage

Os fluxos de alterações para o modelo do Cloud Storage são um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em um bucket do Cloud Storage usando o Dataflow Runner v2.

O pipeline agrupa os registros de stream de alterações do Spanner em janelas com base no carimbo de data/hora, e cada janela representa uma duração de tempo cujo tamanho pode ser configurado com esse modelo. Todos os registros com carimbos de data/hora pertencentes à janela têm a garantia de estarem na janela. Não pode haver chegadas atrasadas. Também é possível definir vários fragmentos de saída. O pipeline cria um arquivo de saída do Cloud Storage por janela por fragmento. Em um arquivo de saída, os registros são desordenados. Os arquivos de saída podem ser gravados no formato JSON ou AVRO, dependendo da configuração do usuário.

Observe que é possível minimizar a latência da rede e os custos de transporte dela executando o job do Dataflow na mesma região da sua instância do Spanner ou do bucket do Cloud Storage. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Saiba mais sobre as regiões do Dataflow.

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O bucket de saída do Cloud Storage precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetros obrigatórios

  • spannerInstanceId: o ID da instância do Spanner de onde os dados dos fluxo de alterações são lidos.
  • spannerDatabase: o banco de dados do Spanner de onde os dados dos fluxo de alterações serão lidos.
  • spannerMetadataInstanceId: o ID da instância do Spanner a ser usado para a tabela de metadados do conector dos fluxo de alterações.
  • spannerMetadataDatabase: o banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxo de alterações.
  • spannerChangeStreamName: o nome do fluxo de alterações do Spanner a ser lido.
  • gcsOutputDirectory: o caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Precisa terminar com uma barra. A formatação DateTime é usada para analisar o caminho do diretório em busca de formatadores de data e hora. Por exemplo, gs://your-bucket/your-path.

Parâmetros opcionais

  • spannerProjectId: o ID do projeto do Google Cloud que contém o banco de dados do Spanner para ler os fluxo de alterações. Esse projeto também é onde a tabela de metadados do conector de fluxos de alteração é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
  • spannerDatabaseRole: o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte "Controle de acesso granular para fluxos de alteração" (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conector dos fluxo de alterações do Spanner a ser usado. Se não for fornecida, uma tabela de metadados de fluxos de alterações do Spanner será criada automaticamente durante a execução do pipeline. Você precisa fornecer um valor para esse parâmetro ao atualizar um pipeline atual. Caso contrário, não use esse parâmetro.
  • startTimestamp: o DateTime inicial, inclusive, a ser usado na leitura de fluxo de alterações, no formato Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora em que o pipeline é iniciado, ou seja, o horário atual.
  • endTimestamp: o DateTime de término, inclusive, para usar em fluxo de alterações de leitura. Por exemplo, Ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
  • spannerHost: o endpoint do Cloud Spanner a ser chamado no modelo. Usado apenas para testes. Por exemplo, https://spanner.googleapis.com. O padrão é: https://spanner.googleapis.com.
  • outputFileFormat: o formato do arquivo de saída do Cloud Storage. Os formatos permitidos são TEXT e AVRO. O padrão é AVRO.
  • windowDuration: a duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é 5m (cinco minutos), com o mínimo de 1s (um segundo). Os formatos permitidos são: [int]s (para segundos; exemplo: 5 s), [int]m (para minutos; exemplo: 12 min), [int]h (para horas; exemplo: 2h). Por exemplo, 5m.
  • rpcPriority: a prioridade de solicitação das chamadas do Spanner. O valor precisa ser HIGH, MEDIUM ou LOW. O padrão é HIGH.
  • outputFilenamePrefix: o prefixo a ser colocado em cada arquivo em janela. Por exemplo, output-. O padrão é: saída.
  • numShards: o número máximo de fragmentos de saída produzidos durante a gravação. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage. O padrão é: 20

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Google Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

A seguir