Modelo do Datastream para o Spanner

O modelo do Datastream para Spanner é um pipeline de streaming que lê eventos do Datastream de um bucket do Cloud Storage e os grava em um banco de dados do Spanner. Ele é destinado à migração de dados de fontes do Datastream para o Spanner.

Todas as tabelas necessárias para migração precisam existir no banco de dados de destino do Spanner antes da execução do modelo. Portanto, a migração de esquema de um banco de dados de origem para o Spanner de destino precisa ser concluída antes da migração de dados. Os dados podem existir nas tabelas antes da migração. Esse modelo não propaga alterações de esquema do Datastream no banco de dados do Spanner.

A consistência de dados é garantida apenas no final da migração, quando todos os dados tiverem sido gravados no Spanner. Para armazenar informações de pedidos de cada registro gravado no Spanner, esse modelo cria uma tabela adicional (chamada de tabela de sombra) para cada tabela no banco de dados do Spanner. Isso é usado para garantir consistência no final da migração. As tabelas de sombra não são excluídas após a migração e podem ser usadas para fins de validação no final da migração.

Todos os erros que ocorrem durante a operação, como incompatibilidades de esquema, arquivos JSON malformados ou erros resultantes da execução de transformações, são registrados em uma fila de erros. A fila de erros é uma pasta do Cloud Storage que armazena todos os eventos do Datastream que encontraram erros, além do motivo do erro. em formato de texto. Os erros podem ser temporários ou permanentes e são armazenados em pastas apropriadas do Cloud Storage na fila de erros. Os erros temporários são repetidos automaticamente, ao contrário dos permanentes. No caso de erros permanentes, você tem a opção de fazer correções nos eventos de mudança e movê-los para o bucket recuperável enquanto o modelo estiver em execução.

Requisitos de pipeline

  • Um fluxo de Datastream no estado Em execução ou Não iniciado.
  • Um bucket do Cloud Storage em que os eventos do Datastream são replicados.
  • Um banco de dados do Spanner com tabelas atuais. Essas tabelas podem estar vazias ou conter dados.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o local do arquivo do Cloud Storage que contém os arquivos do Datastream a serem replicados. Normalmente, esse é o caminho raiz de um stream.
  • instanceId: a instância do Spanner em que as alterações são replicadas.
  • databaseId: o banco de dados do Spanner em que as alterações são replicadas.

Parâmetros opcionais

  • inputFileFormat: o formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro,json. Padrão, avro.
  • sessionFilePath: o caminho do arquivo de sessão no Cloud Storage que contém informações de mapeamento do HarbourBridge.
  • projectId: o ID do projeto do Spanner.
  • spannerHost: opcional: o endpoint do Cloud Spanner para chamar no modelo. Por exemplo: https://batch-spanner.googleapis.com. O padrão é: https://batch-spanner.googleapis.com.
  • streamName: o nome ou modelo do stream para pesquisar informações de esquema e tipo de origem.
  • gcsPubSubSubscription: a assinatura do Pub/Sub que está sendo usada em uma política de notificação do Cloud Storage. O nome precisa estar no formato de projects/.
  • shadowTablePrefix: o prefixo usado para nomear tabelas de sombra. Padrão: shadow_.
  • shouldCreateShadowTables: essa flag indica se as tabelas de sombra precisam ser criadas no banco de dados do Cloud Spanner. O padrão é "true".
  • rfcStartDateTime: o DateTime inicial usado para buscar do Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: o número de arquivos do DataStream simultâneos a serem lidos. O padrão é: 30.
  • deadLetterQueueDirectory: o caminho do arquivo usado para armazenar a saída da fila de erros. O caminho do arquivo padrão é um diretório no local temporário do job do Dataflow.
  • dlqRetryMinutes número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) O valor padrão é 10.
  • dlqMaxRetryCount: o número máximo de vezes que os erros temporários podem ser repetidos pela DLQ. O padrão é 500.
  • dataStreamRootUrl: URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
  • datastreamSourceType: é o tipo de banco de dados de origem ao qual o Datastream se conecta. Exemplo: mysql/Oracle. Precisa ser definido ao testar sem um Datastream em execução.
  • roundJsonDecimals: se essa flag for definida, os valores decimais nas colunas JSON serão arredondados para um número que pode ser armazenado sem perda de precisão. O padrão é: falso.
  • runMode: é o tipo de modo de execução, regular ou com retryDLQ. O padrão é: normal.
  • transformationContextFilePath: caminho do arquivo de contexto de transformação no armazenamento em nuvem usado para preencher os dados usados nas transformações realizadas durante as migrações. Por exemplo, o ID do fragmento para o nome do banco de dados para identificar o banco de dados de onde uma linha foi migrado.
  • directoryWatchDurationInMinutes: a duração em que o pipeline precisa continuar pesquisando um diretório no GCS. Os arquivos de saída do Datastream são organizados em uma estrutura de diretórios que descreve o carimbo de data/hora do evento agrupado por minutos. Esse parâmetro precisa ser aproximadamente igual ao atraso máximo que pode ocorrer entre o evento que ocorre no banco de dados de origem e o mesmo evento gravado no GCS pelo Datastream. Percentil 99,9 = 10 minutos. O padrão é 10.
  • spannerPriority: opcional: a prioridade da solicitação para chamadas do Cloud Spanner. O valor precisa ser um destes: [HIGH,MEDIUM,LOW]. O valor padrão é ALTO.
  • dlqGcsPubSubSubscription: a assinatura do Pub/Sub que está sendo usada em uma política de notificação do Cloud Storage para o diretório de repetição de DLQ quando executado no modo normal. O nome precisa estar no formato de projects/.
  • transformationJarPath: local do jar personalizado no Cloud Storage que contém a lógica de transformação personalizada para processar registros na migração para frente. O padrão é vazio.
  • transformationClassName: o nome de classe totalmente qualificado com lógica de transformação personalizada. É um campo obrigatório no caso de transformationJarPath é especificado. O padrão é vazio.
  • transformationCustomParameters: string contendo os parâmetros personalizados que serão passados para a classe de transformação personalizada. O padrão é vazio.
  • filteredEventsDirectory: é o caminho do arquivo para armazenar os eventos filtrados por transformação personalizada. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é suficiente na maioria das condições.
  • shardingContextFilePath: caminho do arquivo de contexto de fragmentação no armazenamento em nuvem usado para preencher o ID do fragmento durante as migrações. Ele tem o formato Map<stream_name, Map<db_name, shard_id>>.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to Spanner template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Por exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região onde você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos do Datastream. Por exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a instância do Spanner.
  • CLOUDSPANNER_DATABASE: o banco de dados do Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

A seguir