Modelo de streams de alterações do Spanner para a base de dados de origem

Pipeline de streaming. Lê dados de Streams de alterações do Spanner e escreve-os numa origem.

Parâmetros de modelos

Parâmetro Descrição
changeStreamName O nome da stream de alterações do Spanner a partir da qual o pipeline lê.
instanceId O nome da instância do Spanner onde a stream de alterações está presente.
databaseId O nome da base de dados do Spanner que a stream de alterações monitoriza.
spannerProjectId O nome do projeto do Spanner.
metadataInstance A instância para armazenar os metadados usados pelo conector para controlar o consumo dos dados da API de fluxo de alterações.
metadataDatabase A base de dados para armazenar os metadados usados pelo conetor para controlar o consumo dos dados da API Change Stream.
sourceShardsFilePath Caminho para um ficheiro do Cloud Storage que contém informações do perfil de associação para fragmentos de origem.
startTimestamp Opcional: a indicação de tempo de início da leitura é alterada. A predefinição é vazio.
endTimestamp Opcional: a indicação de tempo final para a leitura é alterada. Se não for indicada nenhuma data/hora, lê indefinidamente. A predefinição é vazio.
shadowTablePrefix Opcional: o prefixo usado para dar nome às tabelas de sombra. Predefinição: shadow_.
sessionFilePath Opcional: caminho da sessão no Cloud Storage que contém informações de mapeamento do HarbourBridge.
filtrationMode Opcional: modo de filtragem. Especifica como rejeitar determinados registos com base num critério. Os modos suportados são: none (não filtrar nada), forward_migration (filtrar registos escritos através do pipeline de migração para a frente). A predefinição é forward_migration.
shardingCustomJarPath Opcional: localização do ficheiro JAR personalizado no Cloud Storage que contém a lógica de personalização para obter o ID do fragmento. Se definir este parâmetro, defina o parâmetro shardingCustomJarPath. A predefinição é vazio.
shardingCustomClassName Opcional: nome da classe totalmente qualificado com a implementação do ID do fragmento personalizado. Se shardingCustomJarPath for especificado, este parâmetro é obrigatório. A predefinição é vazio.
shardingCustomParameters Opcional: string que contém quaisquer parâmetros personalizados a transmitir à classe de divisão em fragmentos personalizada. A predefinição é vazio.
sourceDbTimezoneOffset Opcional: a diferença de fuso horário em relação ao UTC para a base de dados de origem. Valor de exemplo: +10:00. Predefinição: +00:00.
dlqGcsPubSubSubscription Opcional: a subscrição do Pub/Sub que está a ser usada numa política de notificação do Cloud Storage para o diretório de repetição da DLQ quando executa no modo normal. O nome deve estar no formato projects/<project-id>/subscriptions/<subscription-name>. Quando definido, o deadLetterQueueDirectory e dlqRetryMinutes são ignorados.
skipDirectoryName Opcional: os registos ignorados da replicação inversa são escritos neste diretório. O nome do diretório predefinido é skip.
maxShardConnections Opcional: o número máximo de ligações que um determinado fragmento pode aceitar. Predefinição: 10000.
deadLetterQueueDirectory Opcional: o caminho usado ao armazenar o resultado da fila de erros. O caminho predefinido é um diretório na localização temporária do trabalho do Dataflow.
dlqMaxRetryCount Opcional: o número máximo de vezes que os erros temporários podem ser repetidos através da fila de mensagens rejeitadas. A predefinição é 500.
runMode Opcional: o tipo de modo de execução. Valores suportados: regular, retryDLQ. Predefinição: regular. Especifica que retryDLQ é apenas para registos de filas de mensagens rejeitadas graves.
dlqRetryMinutes Opcional: o número de minutos entre as novas tentativas da fila de mensagens rejeitadas. A predefinição é 10.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Spanner Change Streams to Source Database template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

CLI gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • CHANGE_STREAM_NAME: o nome da stream de alterações a partir da qual ler
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID da base de dados do Cloud Spanner.
  • SPANNER_PROJECT_ID: o ID do projeto do Cloud Spanner.
  • METADATA_INSTANCE: a instância do Cloud Spanner para armazenar metadados quando lê a partir de fluxos de alterações
  • METADATA_DATABASE: a base de dados do Cloud Spanner para armazenar metadados quando lê a partir de streams de alterações
  • SOURCE_SHARDS_FILE_PATH: o caminho para o ficheiro GCS que contém os detalhes do fragmento de origem

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • CHANGE_STREAM_NAME: o nome da stream de alterações a partir da qual ler
  • INSTANCE_ID: o ID da instância do Cloud Spanner.
  • DATABASE_ID: o ID da base de dados do Cloud Spanner.
  • SPANNER_PROJECT_ID: o ID do projeto do Cloud Spanner.
  • METADATA_INSTANCE: a instância do Cloud Spanner para armazenar metadados quando lê a partir de fluxos de alterações
  • METADATA_DATABASE: a base de dados do Cloud Spanner para armazenar metadados quando lê a partir de streams de alterações
  • SOURCE_SHARDS_FILE_PATH: o caminho para o ficheiro GCS que contém os detalhes do fragmento de origem