O modelo PostgreSQL para BigQuery é um pipeline em lote que copia dados de uma tabela do PostgreSQL para uma tabela atual do BigQuery. Este pipeline usa JDBC para se conectar ao PostgreSQL. Para ter uma camada extra de proteção, é possível transmitir uma chave do Cloud KMS com um nome de usuário, senha e parâmetros da string de conexão criptografados em Base64 com a chave do Cloud KMS. Para mais informações sobre como criptografar o nome de usuário, a senha e os parâmetros da string de conexão, consulte o endpoint de criptografia da API Cloud KMS.
Requisitos de pipeline
- A tabela do BigQuery precisa existir antes da execução do pipeline.
- A tabela do BigQuery precisa ter um esquema compatível.
- O banco de dados relacional precisa estar acessível na sub-rede em que o Dataflow é executado.
Parâmetros do modelo
Parâmetros obrigatórios
- driverJars: a lista separada por vírgulas de arquivos JAR do driver. (Examplo: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar).
- driverClassName: o nome da classe do driver do JDBC. (Exemplo: com.mysql.jdbc.Driver).
- connectionURL: a string do URL de conexão do JDBC. Por exemplo,
jdbc:mysql://some-host:3306/sampledb
. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Remova os caracteres de espaço em branco da string codificada em Base64. Observe a diferença entre uma string de conexão do banco de dados Oracle não RAC (jdbc:oracle:thin:@some-host:<port>:<sid>
) e uma string de conexão do banco de dados Oracle RAC (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>
). (Exemplo: jdbc:mysql://some-host:3306/sampledb). - outputTable: o local da tabela de saída do BigQuery. (Exemplo: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>).
- bigQueryLoadingTemporaryDirectory: o diretório temporário do processo de carregamento do BigQuery (exemplo: gs://your-bucket/your-files/temp_dir).
Parâmetros opcionais
- connectionProperties: a string de propriedades a ser usada para a conexão JDBC. O formato da string precisa ser
[propertyName=property;]*
.Para mais informações, consulte as propriedades de configuração (https://dev.mysql.com/doc/connector-j/8.1/en/connector-j-reference-configuration-properties.html) na documentação do MySQL. (Exemplo: unicode=true;characterEncoding=UTF-8). - username: o nome de usuário a ser usado para a conexão JDBC. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Remova os caracteres de espaço em branco da string codificada em Base64.
- password: a senha a ser usada para a conexão JDBC. É possível transmitir esse valor como uma string criptografada com uma chave do Cloud KMS e, em seguida, codificada em Base64. Remova os caracteres de espaço em branco da string codificada em Base64.
- query: a consulta a ser executada na origem para extrair os dados. Alguns tipos JDBC e BigQuery, embora compartilhem o mesmo nome, têm algumas diferenças. Alguns mapeamentos de tipo importantes de SQL -> BigQuery a serem lembrados são: DATETIME --> TIMESTAMP
Talvez seja necessário transmitir o tipo se os esquemas não corresponderem. (Exemplo: select * from sampledb.sample_table).
- KMSEncryptionKey: a chave de criptografia do Cloud KMS a ser usada para descriptografar o nome de usuário, a senha e a string de conexão. Se você transmitir uma chave do Cloud KMS, também precisará criptografar o nome de usuário, a senha e a string de conexão. (Exemplo: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
- useColumnAlias: se definido como
true
, o pipeline usará o alias de coluna (AS
) em vez do nome da coluna para mapear as linhas para o BigQuery. O padrão éfalse
. - isTruncate: se definido como
true
, o pipeline truncará antes de carregar dados no BigQuery. O padrão éfalse
, o que faz com que o pipeline adicione dados ao final. - partitionColumn: se esse parâmetro for fornecido com o nome do
table
definido como um parâmetro opcional, o JdbcIO lerá a tabela em paralelo executando várias instâncias da consulta na mesma tabela (subconsulta) usando intervalos. No momento, ele só é compatível com colunas de partiçãoLong
. - table: a tabela a ser lida ao usar partições. Esse parâmetro também aceita uma subconsulta entre parênteses. (Example: (select id, name from Person) as subq).
- numPartitions : o número de partições. Com os limites inferior e superior, esse valor forma saltos de partição para expressões de cláusula
WHERE
geradas que são usadas para dividir a coluna de partição de maneira uniforme. Quando a entrada for menor que1
, o número será definido como1
. - lowerBound: o limite inferior a ser usado no esquema de partição. Se não for fornecido, esse valor será inferido automaticamente pelo Apache Beam para os tipos compatíveis.
- upperBound: o limite superior a ser usado no esquema de partição. Se não for fornecido, esse valor será inferido automaticamente pelo Apache Beam para os tipos compatíveis.
- fetchSize: o número de linhas a serem buscadas no banco de dados de cada vez. Não é usado para leituras particionadas. O padrão é 50000.
- createDisposition: o CreateDisposition do BigQuery a ser usado. Por exemplo,
CREATE_IF_NEEDED
ouCREATE_NEVER
. O padrão é: CREATE_NEVER. - bigQuerySchemaPath : o caminho do Cloud Storage para o esquema JSON do BigQuery. Se
createDisposition
estiver definido como CREATE_IF_NEEDED, esse parâmetro precisará ser especificado. (Exemplo: gs://your-bucket/your-schema.json). - disabledAlgorithms: algoritmos separados por vírgula a serem desativados. Se esse valor for definido como nenhum, nenhum algoritmo será desativado. Use esse parâmetro com cuidado, porque os algoritmos desativados por padrão podem ter vulnerabilidades ou problemas de desempenho. (Exemplo: SSLv3, RC4).
- extraFilesToStage : caminhos do Cloud Storage separados por vírgulas ou secrets do Secret Manager para que os arquivos sejam organizados no worker. Esses arquivos são salvos no diretório /extra_files em cada worker. (Exemplo: gs://
- defaultLogLevel: define o nível de registro nos workers. As opções aceitas são OFF, ERROR, WARN, INFO, DEBUG, TRACE. O padrão é INFO.
- useStorageWriteApi: se
true
, o pipeline usará a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão éfalse
. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: ao usar a API Storage Write, especifica a semântica de gravação. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como
true
. Para usar semântica exatamente uma vez, defina o parâmetro comofalse
. Esse parâmetro se aplica apenas quandouseStorageWriteApi
étrue
. O valor padrão éfalse
.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the PostgreSQL to BigQuery template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PostgreSQL_to_BigQuery \ --parameters \ connectionURL=JDBC_CONNECTION_URL,\ query=SOURCE_SQL_QUERY,\ outputTable=PROJECT_ID:DATASET.TABLE_NAME, bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\ connectionProperties=CONNECTION_PROPERTIES,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
Substitua:
JOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
JDBC_CONNECTION_URL
: o URL de conexão de JDBCSOURCE_SQL_QUERY
: a consulta SQL a ser executada no banco de dados de origem.DATASET
: o conjunto de dados do BigQueryTABLE_NAME
: o nome da tabela do BigQueryPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporárioCONNECTION_PROPERTIES
: as propriedades de conexão do JDBC, se necessárioCONNECTION_USERNAME
: o nome de usuário da conexão JDBC.CONNECTION_PASSWORD
: a senha de conexão JDBCKMS_ENCRYPTION_KEY
: a chave de criptografia do Cloud KMS
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PostgreSQL_to_BigQuery" "parameters": { "connectionURL": "JDBC_CONNECTION_URL", "query": "SOURCE_SQL_QUERY", "outputTable": "PROJECT_ID:DATASET.TABLE_NAME", "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS", "connectionProperties": "CONNECTION_PROPERTIES", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" } } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região onde você quer implantar o job do Dataflow, por exemplo,us-central1
JDBC_CONNECTION_URL
: o URL de conexão de JDBCSOURCE_SQL_QUERY
: a consulta SQL a ser executada no banco de dados de origem.DATASET
: o conjunto de dados do BigQueryTABLE_NAME
: o nome da tabela do BigQueryPATH_TO_TEMP_DIR_ON_GCS
: o caminho do Cloud Storage para o diretório temporárioCONNECTION_PROPERTIES
: as propriedades de conexão do JDBC, se necessárioCONNECTION_USERNAME
: o nome de usuário da conexão JDBC.CONNECTION_PASSWORD
: a senha de conexão JDBCKMS_ENCRYPTION_KEY
: a chave de criptografia do Cloud KMS
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.