Modelo de fluxos de alterações do Spanner para o banco de dados de origem

Pipeline de streaming. Lê dados dos fluxos de alterações do Spanner e os grava em uma origem.

Parâmetros do modelo

Parâmetro Descrição
changeStreamName O nome do fluxo de alterações do Spanner que o pipeline lê.
instanceId O nome da instância do Spanner em que o fluxo de alterações está presente.
databaseId O nome do banco de dados do Spanner que o fluxo de alterações monitora.
spannerProjectId O nome do projeto do Spanner.
metadataInstance A instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.
metadataDatabase O banco de dados para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API do fluxo de alterações.
sourceShardsFilePath Caminho para um arquivo do Cloud Storage que contém informações do perfil de conexão para os fragmentos de origem.
startTimestamp Opcional: o carimbo de data/hora inicial para ler as alterações. O padrão é vazio.
endTimestamp Opcional: o carimbo de data/hora de término para ler as alterações. Se nenhum carimbo de data/hora for fornecido, a leitura será indefinida. O padrão é vazio.
shadowTablePrefix Opcional: o prefixo usado para nomear tabelas de sombra. Padrão: shadow_.
sessionFilePath Opcional: caminho da sessão no Cloud Storage que contém informações de mapeamento do HarbourBridge.
filtrationMode Opcional: modo de filtragem. Especifica como excluir determinados registros com base em um critério. Os modos compatíveis são: none (não filtra nada), forward_migration (filtra registros gravados usando o pipeline de migração para frente). O padrão é forward_migration.
shardingCustomJarPath Opcional: local do arquivo JAR personalizado no Cloud Storage que contém a lógica de personalização para buscar o ID do fragmento. Se você definir esse parâmetro, defina o parâmetro shardingCustomJarPath. O padrão é vazio.
shardingCustomClassName Opcional: nome de classe totalmente qualificado com a implementação do ID de fragmento personalizado. Se shardingCustomJarPath for especificado, esse parâmetro será obrigatório. O padrão é vazio.
shardingCustomParameters Opcional: string contendo os parâmetros personalizados que serão passados para a classe de divisão personalizada. O padrão é vazio.
sourceDbTimezoneOffset Opcional: o deslocamento de fuso horário em relação ao UTC para o banco de dados de origem. Exemplo de valor: +10:00. O padrão é +00:00.
dlqGcsPubSubSubscription Opcional: a assinatura do Pub/Sub que está sendo usada em uma política de notificação do Cloud Storage para o diretório de repetição de DLQ quando executado no modo normal. O nome precisa estar no formato projects/<id-do-projeto>/subscriptions/<nome-da-assinatura>. Quando definido, o deadLetterQueueDirectory e dlqRetryMinutes são ignorados.
skipDirectoryName Opcional: os registros ignorados na replicação reversa são gravados neste diretório. O nome do diretório padrão é skip.
maxShardConnections Opcional: o número máximo de conexões que um determinado fragmento pode aceitar. O padrão é 10000.
deadLetterQueueDirectory Opcional: o caminho usado para armazenar a saída da fila de erros. O caminho padrão é um diretório no local temporário do job do Dataflow.
dlqMaxRetryCount Opcional: o número máximo de vezes que os erros temporários podem ser repetidos pela fila de mensagens inativas. O padrão é 500.
runMode Opcional: o tipo de modo de execução. Valores aceitos: regular, retryDLQ. Padrão: regular. Especificar retryDLQ é apenas para refazer registros de fila de mensagens inativas graves.
dlqRetryMinutes Opcional: o número de minutos entre novas tentativas de fila de mensagens inativas (DLQ) O valor padrão é 10.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Spanner Change Streams to Source Database template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

CLI da gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • CHANGE_STREAM_NAME: o nome do fluxo de alterações a ser lido
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID do banco de dados do Cloud Spanner.
  • SPANNER_PROJECT_ID: o ID do projeto do Cloud Spanner.
  • METADATA_INSTANCE: a instância do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações
  • METADATA_DATABASE: o banco de dados do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações
  • SOURCE_SHARDS_FILE_PATH: o caminho para o arquivo do GCS que contém os detalhes do fragmento de origem

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • CHANGE_STREAM_NAME: o nome do fluxo de alterações a ser lido
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID do banco de dados do Cloud Spanner.
  • SPANNER_PROJECT_ID: o ID do projeto do Cloud Spanner.
  • METADATA_INSTANCE: a instância do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações
  • METADATA_DATABASE: o banco de dados do Cloud Spanner para armazenar metadados ao ler de fluxos de alterações
  • SOURCE_SHARDS_FILE_PATH: o caminho para o arquivo do GCS que contém os detalhes do fragmento de origem