Modelo Datastream para MySQL ou PostgreSQL (stream)

O modelo Datastream para SQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer base de dados MySQL ou PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica estes dados em tabelas de réplicas SQL. Especifique o parâmetro gcsPubSubSubscription para ler dados de notificações do Pub/Sub OU forneça o parâmetro inputFilePattern para ler diretamente dados de ficheiros no Cloud Storage.

O modelo não suporta a linguagem de definição de dados (LDD) e espera que todas as tabelas já existam na base de dados. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir a consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver sido processada, uma versão dessa linha que chegue mais tarde é ignorada. A linguagem de manipulação de dados (DML) que é executada é uma tentativa de replicar perfeitamente os dados de origem para os dados de destino. As instruções DML executadas seguem as seguintes regras:

  • Se existir uma chave principal, as operações de inserção e atualização usam a sintaxe de upsert (ou seja, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se existirem chaves primárias, as eliminações são replicadas como um DML de eliminação.
  • Se não existir uma chave principal, as operações de inserção e atualização são inseridas na tabela.
  • Se não existirem chaves primárias, as eliminações são ignoradas.

Se estiver a usar as utilidades do Oracle para o Postgres, adicione ROWID na SQL como a chave principal quando não existir nenhuma.

Requisitos do pipeline

  • Uma stream do Datastream que está pronta ou já está a replicar dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Uma base de dados PostgreSQL foi preenchida com o esquema necessário.
  • O acesso à rede entre os trabalhadores do Dataflow e o PostgreSQL está configurado.

Parâmetros de modelos

Parâmetros obrigatórios

  • inputFilePattern: a localização dos ficheiros do fluxo de dados no Cloud Storage a replicar. Normalmente, esta localização do ficheiro é o caminho raiz da stream.
  • databaseHost: o anfitrião SQL ao qual se ligar.
  • databaseUser: o utilizador SQL com todas as autorizações necessárias para escrever em todas as tabelas na replicação.
  • databasePassword: a palavra-passe do utilizador SQL.

Parâmetros opcionais

  • gcsPubSubSubscription: a subscrição do Pub/Sub com notificações de ficheiros do Datastream. Por exemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: o formato do ficheiro de saída produzido pelo Datastream. Por exemplo, avro ou json. A predefinição é avro.
  • streamName: o nome ou o modelo da stream para obter informações do esquema. O valor predefinido é {_metadata_stream}.
  • rfcStartDateTime: a data/hora de início usada para obter dados do Cloud Storage (https://tools.ietf.org/html/rfc3339). A predefinição é: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL raiz da API Datastream. O valor predefinido é: https://datastream.googleapis.com/.
  • databaseType: o tipo de base de dados para o qual escrever (por exemplo, Postgres). A predefinição é: postgres.
  • databasePort: a porta da base de dados SQL à qual estabelecer ligação. O valor predefinido é 5432.
  • databaseName: o nome da base de dados SQL à qual estabelecer ligação. O valor predefinido é postgres.
  • defaultCasing: uma opção para ativar/desativar o comportamento de capitalização da tabela. Por exemplo,(ou seja, LOWERCASE = mytable -> mytable, UPPERCASE = mytable -> MYTABLECAMEL = my_table -> myTable, SNAKE = myTable -> my_table. A predefinição é: LOWERCASE.
  • columnCasing: um botão para alternar entre maiúsculas e minúsculas no nome da coluna de destino. LOWERCASE (predefinição): My_Column -> my_column. MAIÚSCULAS: minha_coluna -> MINHA_COLUNA. CAMEL: my_column -> myColumn. SNAKE: myColumn -> my_column.
  • schemaMap: um mapa de chaves/valores usado para determinar as alterações de nome do esquema (ou seja, old_name:new_name,CaseError:case_error). A predefinição é vazio.
  • customConnectionString: string de ligação opcional que vai ser usada em vez da string de base de dados predefinida.
  • numThreads: determina o paralelismo das chaves do passo de formatação para DML. Especificamente, o valor é transmitido para Reshuffle.withNumBuckets. A predefinição é: 100.
  • databaseLoginTimeout: o limite de tempo em segundos para tentativas de início de sessão na base de dados. Isto ajuda a evitar falhas de ligação quando vários trabalhadores tentam estabelecer ligação em simultâneo.
  • datastreamSourceType: substitui a deteção do tipo de origem para dados de CDC do Datastream. Quando especificado, este valor é usado em vez de derivar o tipo de origem do campo read_method. Os valores válidos incluem "mysql", "postgresql", "oracle", etc. Este parâmetro é útil quando o campo read_method contém "cdc" e não é possível determinar automaticamente o tipo de origem real.
  • orderByIncludesIsDeleted: a ordenação por configurações de dados deve incluir a priorização de dados que não foram eliminados. A predefinição é: false.

Execute o modelo

Consola

  1. Aceda à página do fluxo de dados Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte as localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Cloud Datastream to SQL template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Por exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a subscrição do Pub/Sub para ler ficheiros alterados. Por exemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do anfitrião SQL.
  • DATABASE_USER: o seu utilizador SQL.
  • DATABASE_PASSWORD: a sua palavra-passe de SQL.

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Por exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a subscrição do Pub/Sub para ler ficheiros alterados. Por exemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do anfitrião SQL.
  • DATABASE_USER: o seu utilizador SQL.
  • DATABASE_PASSWORD: a sua palavra-passe de SQL.

O que se segue?