Modelo de fluxos de alterações do Spanner para Cloud Storage

Os fluxos de alterações para o modelo do Cloud Storage são um pipeline de streaming que transmite os registros de alteração de dados do Spanner e os grava em um bucket do Cloud Storage usando o Dataflow Runner v2.

O pipeline agrupa os registros de stream de alterações do Spanner em janelas com base no carimbo de data/hora, e cada janela representa uma duração de tempo cujo tamanho pode ser configurado com esse modelo. Todos os registros com carimbos de data/hora pertencentes à janela têm a garantia de estarem na janela. Não pode haver chegadas atrasadas. Também é possível definir vários fragmentos de saída. O pipeline cria um arquivo de saída do Cloud Storage por janela por fragmento. Em um arquivo de saída, os registros são desordenados. Os arquivos de saída podem ser gravados no formato JSON ou AVRO, dependendo da configuração do usuário.

Observe que é possível minimizar a latência da rede e os custos de transporte dela executando o job do Dataflow na mesma região da sua instância do Spanner ou do bucket do Cloud Storage. Se você usar fontes, coletores, locais de arquivos de preparo ou de arquivos temporários localizados fora da região do job, seus dados poderão ser enviados entre regiões. Saiba mais sobre as regiões do Dataflow.

Saiba mais sobre fluxos de alterações, como criar pipelines de mudança no pipeline do Dataflow e práticas recomendadas.

Requisitos de pipeline

  • A instância do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados do Spanner precisa ser criado antes da execução do pipeline.
  • A instância de metadados do Spanner precisa existir antes da execução do pipeline.
  • O banco de dados de metadados do Spanner precisa existir antes da execução do pipeline.
  • O fluxo de alterações do Spanner precisa ser criado antes da execução do pipeline.
  • O bucket de saída do Cloud Storage precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetro Descrição
spannerInstanceId O ID da instância do Spanner de onde os dados dos fluxos de alterações são lidos.
spannerDatabase O banco de dados do Spanner de onde os dados dos fluxos de alterações são lidos.
spannerDatabaseRole Opcional: o papel do banco de dados do Spanner a ser usado ao executar o modelo. Esse parâmetro é necessário somente quando o principal do IAM que executa o modelo é um usuário de controle de acesso minucioso. A função de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo de alterações. Para mais informações, consulte Controle de acesso minucioso para streams de alteração.
spannerMetadataInstanceId O ID da instância do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerMetadataDatabase O banco de dados do Spanner a ser usado para a tabela de metadados do conector dos fluxos de alterações.
spannerChangeStreamName O nome do fluxo de alterações a ser lido pelo Spanner.
gcsOutputDirectory O local do arquivo dos fluxos de alterações na saída do Cloud Storage no formato: 'gs://${BUCKET}/${ROOT_PATH}/'.
outputFilenamePrefix (Opcional) O prefixo do nome dos arquivos a serem gravados. O prefixo do arquivo padrão é definido como "saída".
spannerProjectId (Opcional) Projeto do qual os fluxos de alterações serão lidos. Este é também o projeto em que a tabela de metadados do conector dos fluxos de alterações é criada. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.
startTimestamp (Opcional) O DateTime inicial, inclusive, para ler os fluxos de alterações. Ex-2021-10-12T07:20:50.52Z. O padrão é o carimbo de data/hora quando o pipeline é iniciado, ou seja, o horário atual.
endTimestamp (Opcional) A terminação DateTime, inclusive, para usar em fluxos de alterações de leitura. Ex-2021-10-12T07:20:50.52Z. O padrão é um tempo infinito no futuro.
outputFileFormat (Opcional) O formato do arquivo de saída do Cloud Storage. Os formatos permitidos são TEXT e AVRO. O padrão é AVRO.
windowDuration (Opcional) A duração da janela é o intervalo em que os dados são gravados no diretório de saída. Configure a duração com base na capacidade de processamento do pipeline. Por exemplo, uma capacidade de processamento mais alta pode exigir tamanhos de janela menores para que os dados se encaixem na memória. O padrão é 5 min, com um mínimo de 1 s. Os formatos permitidos são: [int]s para segundos (5 s, por exemplo); [int]m para minutos (12 min, por exemplo); [int]h para horas (2h, por exemplo).
rpcPriority Opcional: a prioridade de solicitação das chamadas do Spanner. O valor precisa ser um destes: [HIGH,MEDIUM,LOW]. (Padrão: HIGH)
numShards (Opcional) O número máximo de fragmentos de saída produzidos durante a gravação. O número padrão é 20. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage
spannerMetadataTableName Opcional: o nome da tabela de metadados do conector dos fluxos de alterações do Spanner a ser usado. Se não for fornecido, uma tabela de metadados dos fluxos de alterações do Spanner será criada automaticamente durante o fluxo de pipeline. Esse parâmetro precisa ser fornecido ao atualizar um pipeline existente e não pode ser indicado de outra forma.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Spanner change streams to Google Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: banco de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: banco de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: fluxo de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: local do arquivo da saída dos fluxos de alterações

A seguir