Modelo do Datastream para MySQL ou PostgreSQL (Stream)

O modelo Datastream para SQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer banco de dados MySQL ou PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica esses dados em tabelas de réplica do PostgreSQL.

O modelo não é compatível com a linguagem de definição de dados (DDL) e espera que todas as tabelas já existam no banco de dados. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver passado, uma versão que chega atrasada dessa linha será ignorada. A linguagem de manipulação de dados (DML, na sigla em inglês) executada é a melhor tentativa de replicar perfeitamente os dados de origem nos de destino. As instruções DML executadas seguem as seguintes regras:

  • Se houver uma chave primária, as operações de inserção e atualização usarão sintaxe de mesclagem (isto é, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se houver chaves primárias, as exclusões serão replicadas como uma DML de exclusão.
  • Se não houver uma chave primária, as operações de inserção e atualização serão inseridas na tabela.
  • Se não houver chaves primárias, as exclusões serão ignoradas.

Se você estiver usando os utilitários Oracle para Postgres, adicione ROWID no PostgreSQL como a chave primária quando não houver nenhuma.

Requisitos de pipeline

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Um banco de dados do PostgreSQL foi propagado com o esquema necessário.
  • O acesso à rede entre workers do Dataflow e o PostgreSQL está configurado.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
  • databaseHost: o host SQL para se conectar.
  • databaseUser: o usuário do PostgreSQL com todas as permissões necessárias para gravar em todas as tabelas na replicação.
  • databasePassword: a senha do usuário SQL.

Parâmetros opcionais

  • gcsPubSubSubscription: a assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: o formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro ou json. O padrão é avro.
  • streamName: o nome ou modelo do stream para pesquisar informações de esquema. O valor padrão é {_metadata_stream}.
  • rfcStartDateTime: o DateTime inicial usado para buscar do Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
  • databaseType: o tipo de banco de dados para gravar (por exemplo, Postgres). O padrão é postgres.
  • databasePort: a porta do banco de dados do PostgreSQL para a conexão. O valor padrão é 5432.
  • databaseName: o nome do banco de dados do PostgreSQL ao qual se conectar. O valor padrão é postgres.
  • schemaMap: um mapa de chaves/valores usados para ditar mudanças no nome do esquema (por exemplo, old_name:new_name,CaseError:case_error). O padrão é vazio.
  • customConnectionString: string de conexão opcional que será usada no lugar da string do banco de dados padrão.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to SQL template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

A seguir