Modelo de streams de alterações do Spanner para o Cloud Storage

O modelo de streams de alterações do Spanner para o Cloud Storage é um pipeline de streaming que transmite registos de alterações de dados do Spanner e os escreve num contentor do Cloud Storage através do Dataflow Runner v2.

O pipeline agrupa os registos de streams de alterações do Spanner em janelas com base na respetiva data/hora, com cada janela a representar uma duração cuja duração pode configurar com este modelo. É garantido que todos os registos com indicações de tempo pertencentes à janela estão na janela; não pode haver chegadas tardias. Também pode definir um número de fragmentos de saída. O pipeline cria um ficheiro de saída do Cloud Storage por janela por fragmento. Num ficheiro de saída, os registos não estão ordenados. Os ficheiros de saída podem ser escritos no formato JSON ou AVRO, consoante a configuração do utilizador.

Tenha em atenção que pode minimizar a latência da rede e os custos de transporte de rede executando a tarefa do Dataflow a partir da mesma região que a sua instância do Spanner ou o contentor do Cloud Storage. Se usar origens, destinos, localizações de ficheiros de preparação ou localizações de ficheiros temporários que se encontram fora da região da sua tarefa, os seus dados podem ser enviados entre regiões. Veja mais informações sobre as regiões do Dataflow.

Saiba mais sobre as streams de alterações, como criar pipelines do Dataflow de streams de alterações e práticas recomendadas.

Requisitos do pipeline

  • A instância do Spanner tem de existir antes de executar o pipeline.
  • A base de dados do Spanner tem de existir antes de executar o pipeline.
  • A instância de metadados do Spanner tem de existir antes da execução do pipeline.
  • A base de dados de metadados do Spanner tem de existir antes de executar o pipeline.
  • A stream de alterações do Spanner tem de existir antes de executar o pipeline.
  • O contentor de saída do Cloud Storage tem de existir antes de executar o pipeline.

Parâmetros de modelos

Parâmetros obrigatórios

  • spannerInstanceId: o ID da instância do Spanner a partir da qual os dados das streams de alterações são lidos.
  • spannerDatabase: a base de dados do Spanner a partir da qual ler os dados das streams de alterações.
  • spannerMetadataInstanceId: o ID da instância do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerMetadataDatabase: a base de dados do Spanner a usar para a tabela de metadados do conetor de streams de alterações.
  • spannerChangeStreamName: o nome da stream de alterações do Spanner a partir da qual os dados serão lidos.
  • gcsOutputDirectory: o caminho e o prefixo do nome do ficheiro para escrever ficheiros de saída. Tem de terminar com uma barra. A formatação DateTime é usada para analisar o caminho do diretório para formatadores de data e hora. Por exemplo, gs://your-bucket/your-path.

Parâmetros opcionais

  • spannerProjectId: o ID do projeto do Google Cloud que contém a base de dados do Spanner a partir da qual ler fluxos de alterações. É também neste projeto que a tabela de metadados do conetor de streams de alterações é criada. O valor predefinido deste parâmetro é o projeto no qual o pipeline do Dataflow está a ser executado.
  • spannerDatabaseRole: a função da base de dados do Spanner a usar quando executar o modelo. Este parâmetro só é necessário quando o principal do IAM que está a executar o modelo é um utilizador do controlo de acesso detalhado. A função da base de dados tem de ter o privilégio SELECT na stream de alterações e o privilégio EXECUTE na função de leitura da stream de alterações. Para mais informações, consulte o artigo Controlo de acesso detalhado para streams de alterações (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: o nome da tabela de metadados do conetor de streams de alterações do Spanner a usar. Se não for fornecida, é criada automaticamente uma tabela de metadados de streams de alterações do Spanner durante a execução do pipeline. Tem de fornecer um valor para este parâmetro quando atualizar um pipeline existente. Caso contrário, não use este parâmetro.
  • startTimestamp: o DateTime de início, inclusive, a usar para ler streams de alterações, no formato Ex-2021-10-12T07:20:50.52Z. A predefinição é a data/hora em que o pipeline é iniciado, ou seja, a hora atual.
  • endTimestamp: a data/hora de fim, inclusive, a usar para ler fluxos de alterações. Por exemplo, Ex-2021-10-12T07:20:50.52Z. A predefinição é um tempo infinito no futuro.
  • spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Usado apenas para testes. Por exemplo, https://spanner.googleapis.com. O valor predefinido é: https://spanner.googleapis.com.
  • outputFileFormat: o formato do ficheiro do Cloud Storage de saída. Os formatos permitidos são TEXT e AVRO. A predefinição é AVRO.
  • windowDuration: a duração da janela é o intervalo no qual os dados são escritos no diretório de saída. Configure a duração com base no débito do pipeline. Por exemplo, um débito mais elevado pode exigir tamanhos de janelas mais pequenos para que os dados caibam na memória. A predefinição é 5 m (cinco minutos), com um mínimo de 1 s (um segundo). Os formatos permitidos são: [int]s (para segundos, exemplo: 5s), [int]m (para minutos, exemplo: 12m), [int]h (para horas, exemplo: 2h). Por exemplo, 5m.
  • rpcPriority: a prioridade do pedido para chamadas do Spanner. O valor tem de ser HIGH, MEDIUM ou LOW. A predefinição é HIGH.
  • outputFilenamePrefix: o prefixo a colocar em cada ficheiro dividido em janelas. Por exemplo, output-. A predefinição é: output.
  • numShards: o número máximo de fragmentos de saída produzidos durante a escrita. Um número mais elevado de fragmentos significa um débito mais elevado para a escrita no Cloud Storage, mas um custo de agregação de dados potencialmente mais elevado entre fragmentos ao processar ficheiros do Cloud Storage de saída. A predefinição é: 20.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Spanner change streams to Google Cloud Storage template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: base de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: localização do ficheiro para a saída de streams de alterações

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • SPANNER_INSTANCE_ID: ID da instância do Cloud Spanner
  • SPANNER_DATABASE: base de dados do Cloud Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID da instância de metadados do Cloud Spanner
  • SPANNER_METADATA_DATABASE: base de dados de metadados do Cloud Spanner
  • SPANNER_CHANGE_STREAM: stream de alterações do Cloud Spanner
  • GCS_OUTPUT_DIRECTORY: localização do ficheiro para a saída de streams de alterações

O que se segue?