Modelo de fluxo de dados para o Spanner

O modelo Datastream para Spanner é um pipeline de streaming que lê eventos do Datastream de um contentor do Cloud Storage e escreve-os numa base de dados do Spanner. Destina-se à migração de dados de origens do Datastream para o Spanner. Especifique o parâmetro gcsPubSubSubscription para ler dados de notificações do Pub/Sub OU forneça o parâmetro inputFilePattern para ler diretamente dados de ficheiros no Cloud Storage.

Todas as tabelas necessárias para a migração têm de existir na base de dados do Spanner de destino antes da execução do modelo. Por conseguinte, a migração do esquema de uma base de dados de origem para o Spanner de destino tem de ser concluída antes da migração de dados. Os dados podem existir nas tabelas antes da migração. Este modelo não propaga as alterações do esquema da stream de dados para a base de dados do Spanner.

A consistência dos dados só é garantida no final da migração, quando todos os dados tiverem sido escritos no Spanner. Para armazenar informações de ordenação de cada registo escrito no Spanner, este modelo cria uma tabela adicional (denominada tabela de sombra) para cada tabela na base de dados do Spanner. Isto é usado para garantir a consistência no final da migração. As tabelas ocultas não são eliminadas após a migração e podem ser usadas para fins de validação no final da migração.

Todos os erros que ocorrem durante a operação, como incompatibilidades de esquemas, ficheiros JSON com formato incorreto ou erros resultantes da execução de transformações, são registados numa fila de erros. A fila de erros é uma pasta do Cloud Storage que armazena todos os eventos do Datastream que encontraram erros, juntamente com o motivo do erro em formato de texto. Os erros podem ser temporários ou permanentes e são armazenados em pastas do Cloud Storage adequadas na fila de erros. Os erros transitórios são repetidos automaticamente, ao contrário dos erros permanentes. Em caso de erros permanentes, tem a opção de fazer correções aos eventos de alteração e movê-los para o contentor de repetição enquanto o modelo está em execução.

Requisitos do pipeline

  • Uma stream do Google Analytics 4 no estado Em execução ou Não iniciada.
  • Um contentor do Cloud Storage onde os eventos do Datastream são replicados.
  • Uma base de dados do Spanner com tabelas existentes. Estas tabelas podem estar vazias ou conter dados.

Parâmetros de modelos

Parâmetros obrigatórios

  • instanceId: a instância do Spanner onde as alterações são replicadas.
  • databaseId: a base de dados do Spanner onde as alterações são replicadas.

Parâmetros opcionais

  • inputFilePattern: a localização do ficheiro do Cloud Storage que contém os ficheiros do fluxo de dados a replicar. Normalmente, este é o caminho raiz de uma stream. O suporte para esta funcionalidade foi desativado. Use esta funcionalidade apenas para tentar novamente entradas que chegam à DLQ grave.
  • inputFileFormat: o formato do ficheiro de saída produzido pelo Datastream. Por exemplo, avro,json. A predefinição é avro.
  • sessionFilePath: caminho do ficheiro de sessão no Cloud Storage que contém informações de mapeamento do HarbourBridge.
  • projectId: o ID do projeto do Spanner.
  • spannerHost: o ponto final do Cloud Spanner a chamar no modelo. Por exemplo, https://batch-spanner.googleapis.com. O valor predefinido é: https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: a subscrição do Pub/Sub que está a ser usada numa política de notificação do Cloud Storage. Para o nome, use o formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: o nome ou o modelo da stream para obter informações do esquema e o tipo de origem.
  • shadowTablePrefix: o prefixo usado para dar nome às tabelas de sombra. Predefinição: shadow_.
  • shouldCreateShadowTables: esta flag indica se têm de ser criadas tabelas de sombra na base de dados do Cloud Spanner. A predefinição é: true.
  • rfcStartDateTime: a data/hora de início usada para obter dados do Cloud Storage (https://tools.ietf.org/html/rfc3339). A predefinição é: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: o número de ficheiros DataStream a ler em simultâneo. A predefinição é: 30.
  • deadLetterQueueDirectory: o caminho do ficheiro usado ao armazenar o resultado da fila de erros. O caminho do ficheiro predefinido é um diretório na localização temporária da tarefa do Dataflow.
  • dlqRetryMinutes: o número de minutos entre as novas tentativas da fila de mensagens rejeitadas. A predefinição é 10.
  • dlqMaxRetryCount: o número máximo de vezes que os erros temporários podem ser repetidos através da DLQ. A predefinição é 500.
  • dataStreamRootUrl: URL raiz da API Datastream. O valor predefinido é: https://datastream.googleapis.com/.
  • datastreamSourceType: este é o tipo de base de dados de origem à qual o Datastream se liga. Exemplo: mysql/oracle. Tem de ser definido quando estiver a testar sem uma stream de dados em execução real.
  • roundJsonDecimals: se esta flag estiver definida, arredonda os valores decimais nas colunas JSON para um número que possa ser armazenado sem perda de precisão. A predefinição é: false.
  • runMode: este é o tipo de modo de execução, seja regular ou com retryDLQ. A predefinição é: normal.
  • transformationContextFilePath: caminho do ficheiro de contexto de transformação no armazenamento na nuvem usado para preencher dados usados em transformações realizadas durante as migrações. Por exemplo: o ID do fragmento para o nome da base de dados para identificar a base de dados a partir da qual uma linha foi migrada.
  • directoryWatchDurationInMinutes: a duração durante a qual o pipeline deve continuar a sondar um diretório no GCS. Os ficheiros datastreamoutput estão organizados numa estrutura de diretórios que representa a data/hora do evento agrupado por minutos. Este parâmetro deve ser aproximadamente igual ao atraso máximo que pode ocorrer entre o evento que ocorre na base de dados de origem e o mesmo evento que está a ser escrito no GCS pelo Datastream. Percentil 99,9 = 10 minutos. A predefinição é: 10.
  • spannerPriority: a prioridade do pedido para chamadas do Cloud Spanner. O valor tem de ser um dos seguintes: [HIGH,MEDIUM,LOW]. O valor predefinido é HIGH.
  • dlqGcsPubSubSubscription: a subscrição do Pub/Sub que está a ser usada numa política de notificação do Cloud Storage para o diretório de repetição de DLQ quando executada no modo normal. Para o nome, use o formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Quando definido, o deadLetterQueueDirectory e o dlqRetryMinutes são ignorados.
  • transformationJarPath: localização do ficheiro JAR personalizado no Cloud Storage para o ficheiro que contém a lógica de transformação personalizada para o processamento de registos na migração direta. A predefinição é vazio.
  • transformationClassName: nome da classe totalmente qualificado com a lógica de transformação personalizada. É um campo obrigatório se transformationJarPath for especificado. A predefinição é vazio.
  • transformationCustomParameters: string que contém quaisquer parâmetros personalizados a transmitir à classe de transformação personalizada. A predefinição é vazio.
  • filteredEventsDirectory: este é o caminho do ficheiro para armazenar os eventos filtrados através da transformação personalizada. A predefinição é um diretório na localização temporária da tarefa do Dataflow. O valor predefinido é suficiente na maioria das condições.
  • shardingContextFilePath: o caminho do ficheiro de contexto de divisão em partições no armazenamento na nuvem é usado para preencher o ID da partição na base de dados do Spanner para cada partição de origem.Tem o formato Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides: estas são as substituições do nome da tabela da origem para o Spanner. São escritas no seguinte formato: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]Este exemplo mostra o mapeamento da tabela Singers para Vocalists e da tabela Albums para Records. Por exemplo, [{Singers, Vocalists}, {Albums, Records}]. A predefinição é vazio.
  • columnOverrides: estas são as substituições do nome da coluna da origem para o Spanner. São escritas no seguinte formato: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Tenha em atenção que o SourceTableName deve permanecer o mesmo no par de origem e Spanner. Para substituir os nomes das tabelas, use tableOverrides.O exemplo mostra o mapeamento de SingerName para TalentName e AlbumName para RecordName nas tabelas Singers e Albums, respetivamente. Por exemplo, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. A predefinição é vazio.
  • schemaOverridesFilePath: um ficheiro que especifica a tabela e as substituições do nome da coluna da origem para o Spanner. A predefinição é vazio.
  • shadowTableSpannerDatabaseId: base de dados separada opcional para tabelas de sombra. Se não for especificado, as tabelas de sombra são criadas na base de dados principal. Se especificado, certifique-se de que shadowTableSpannerInstanceId também é especificado. A predefinição é vazio.
  • shadowTableSpannerInstanceId: instância separada opcional para tabelas de sombra. Se não for especificado, as tabelas de sombra são criadas na instância principal. Se especificado, certifique-se de que shadowTableSpannerDatabaseId também é especificado. A predefinição é vazio.
  • failureInjectionParameter: parâmetro de injeção de falhas. Usado apenas para testes. A predefinição é vazio.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Datastream to Spanner template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos de fluxo de dados. Por exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a sua instância do Spanner.
  • CLOUDSPANNER_DATABASE: a sua base de dados do Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • GCS_FILE_PATH: o caminho do Cloud Storage usado para armazenar eventos de fluxo de dados. Por exemplo: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: a sua instância do Spanner.
  • CLOUDSPANNER_DATABASE: a sua base de dados do Spanner.
  • DLQ: o caminho do Cloud Storage para o diretório da fila de erros.

O que se segue?