O modelo Datastream para SQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer banco de dados MySQL ou PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica esses dados em tabelas de réplica do PostgreSQL.
O modelo não é compatível com a linguagem de definição de dados (DDL) e espera que todas as tabelas já existam no banco de dados. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver passado, uma versão que chega atrasada dessa linha será ignorada. A linguagem de manipulação de dados (DML, na sigla em inglês) executada é a melhor tentativa de replicar perfeitamente os dados de origem nos de destino. As instruções DML executadas seguem as seguintes regras:
- Se houver uma chave primária, as operações de inserção e atualização usarão sintaxe de mesclagem
(isto é,
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Se houver chaves primárias, as exclusões serão replicadas como uma DML de exclusão.
- Se não houver uma chave primária, as operações de inserção e atualização serão inseridas na tabela.
- Se não houver chaves primárias, as exclusões serão ignoradas.
Se você estiver usando os utilitários Oracle para Postgres, adicione ROWID
no
PostgreSQL como a chave primária quando não houver nenhuma.
Requisitos de pipeline
- Um stream do Datastream que está pronto ou já está replicando dados.
- As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
- Um banco de dados do PostgreSQL foi propagado com o esquema necessário.
- O acesso à rede entre workers do Dataflow e o PostgreSQL está configurado.
Parâmetros do modelo
Parâmetros obrigatórios
- inputFilePattern: o local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
- databaseHost: o host SQL para se conectar.
- databaseUser: o usuário do PostgreSQL com todas as permissões necessárias para gravar em todas as tabelas na replicação.
- databasePassword: a senha do usuário SQL.
Parâmetros opcionais
- gcsPubSubSubscription: a assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
. - inputFileFormat: o formato do arquivo de saída produzido pelo Datastream. Por exemplo,
avro
oujson
. O padrão éavro
. - streamName: o nome ou modelo do stream para pesquisar informações de esquema. O valor padrão é
{_metadata_stream}
. - rfcStartDateTime: o DateTime inicial usado para buscar do Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é: 1970-01-01T00:00:00.00Z.
- dataStreamRootUrl: URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
- databaseType: o tipo de banco de dados para gravar (por exemplo, Postgres). O padrão é postgres.
- databasePort: a porta do banco de dados do PostgreSQL para a conexão. O valor padrão é
5432
. - databaseName: o nome do banco de dados do PostgreSQL ao qual se conectar. O valor padrão é
postgres
. - schemaMap: um mapa de chaves/valores usados para ditar mudanças no nome do esquema (por exemplo, old_name:new_name,CaseError:case_error). O padrão é vazio.
- customConnectionString: string de conexão opcional que será usada no lugar da string do banco de dados padrão.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to SQL template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaREGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: o caminho do Cloud Storage para os dados do Datastream. Exemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo,projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: o IP do host do SQL.DATABASE_USER
: seu usuário do SQL.DATABASE_PASSWORD
: sua senha do SQL.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL", } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaLOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: o caminho do Cloud Storage para os dados do Datastream. Exemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo,projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: o IP do host do SQL.DATABASE_USER
: seu usuário do SQL.DATABASE_PASSWORD
: sua senha do SQL.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.