PostgreSQL to BigQuery template

The PostgreSQL to BigQuery template is a batch pipeline that copies data from a PostgreSQL table into an existing BigQuery table. This pipeline uses JDBC to connect to PostgreSQL. For an extra layer of protection, you can also pass in a Cloud KMS key along with Base64-encoded username, password, and connection string parameters encrypted with the Cloud KMS key. For more information about encrypting your username, password, and connection string parameters, see the Cloud KMS API encryption endpoint.

Pipeline requirements

  • The BigQuery table must exist before pipeline execution.
  • The BigQuery table must have a compatible schema.
  • The relational database must be accessible from the subnet where Dataflow runs.

Template parameters

Required parameters

  • driverJars : The comma-separated list of driver JAR files. (Example: gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar).
  • driverClassName : The JDBC driver class name. (Example: com.mysql.jdbc.Driver).
  • connectionURL : The JDBC connection URL string. For example, jdbc:mysql://some-host:3306/sampledb. You can pass in this value as a string that's encrypted with a Cloud KMS key and then Base64-encoded. Remove whitespace characters from the Base64-encoded string. Note the difference between an Oracle non-RAC database connection string (jdbc:oracle:thin:@some-host:<port>:<sid>) and an Oracle RAC database connection string (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>). (Example: jdbc:mysql://some-host:3306/sampledb).
  • outputTable : The BigQuery output table location. (Example: <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>).
  • bigQueryLoadingTemporaryDirectory : The temporary directory for the BigQuery loading process. (Example: gs://your-bucket/your-files/temp_dir).

Optional parameters

  • connectionProperties : The properties string to use for the JDBC connection. The format of the string must be [propertyName=property;]*.For more information, see Configuration Properties (https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html) in the MySQL documentation. (Example: unicode=true;characterEncoding=UTF-8).
  • username : The username to use for the JDBC connection. Can be passed in as a string that's encrypted with a Cloud KMS key, or can be a Secret Manager secret in the form projects/{project}/secrets/{secret}/versions/{secret_version}.
  • password : The password to use for the JDBC connection. Can be passed in as a string that's encrypted with a Cloud KMS key, or can be a Secret Manager secret in the form projects/{project}/secrets/{secret}/versions/{secret_version}.
  • query : The query to run on the source to extract the data. Note that some JDBC SQL and BigQuery types, although sharing the same name, have some differences. Some important SQL -> BigQuery type mappings to keep in mind are: DATETIME --> TIMESTAMP

Type casting may be required if your schemas do not match. This parameter can be set to a gs:// path pointing to a file in Cloud Storage to load the query from. The file encoding should be UTF-8. (Example: select * from sampledb.sample_table).

  • KMSEncryptionKey : The Cloud KMS encryption key to use to decrypt the username, password, and connection string. If you pass in a Cloud KMS key, you must also encrypt the username, password, and connection string. (Example: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key).
  • useColumnAlias : If set to true, the pipeline uses the column alias (AS) instead of the column name to map the rows to BigQuery. Defaults to false.
  • isTruncate : If set to true, the pipeline truncates before loading data into BigQuery. Defaults to false, which causes the pipeline to append data.
  • partitionColumn : If this parameter is provided with the name of the table defined as an optional parameter, JdbcIO reads the table in parallel by executing multiple instances of the query on the same table (subquery) using ranges. Currently, only supports Long partition columns.
  • table : The table to read from when using partitions. This parameter also accepts a subquery in parentheses. (Example: (select id, name from Person) as subq).
  • numPartitions : The number of partitions. With the lower and upper bound, this value forms partition strides for generated WHERE clause expressions that are used to split the partition column evenly. When the input is less than 1, the number is set to 1.
  • lowerBound : The lower bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.
  • upperBound : The upper bound to use in the partition scheme. If not provided, this value is automatically inferred by Apache Beam for the supported types.
  • fetchSize : The number of rows to be fetched from database at a time. Not used for partitioned reads. Defaults to: 50000.
  • createDisposition : The BigQuery CreateDisposition to use. For example, CREATE_IF_NEEDED or CREATE_NEVER. Defaults to: CREATE_NEVER.
  • bigQuerySchemaPath : The Cloud Storage path for the BigQuery JSON schema. If createDisposition is set to CREATE_IF_NEEDED, this parameter must be specified. (Example: gs://your-bucket/your-schema.json).
  • disabledAlgorithms : Comma separated algorithms to disable. If this value is set to none, no algorithm is disabled. Use this parameter with caution, because the algorithms disabled by default might have vulnerabilities or performance issues. (Example: SSLv3, RC4).
  • extraFilesToStage : Comma separated Cloud Storage paths or Secret Manager secrets for files to stage in the worker. These files are saved in the /extra_files directory in each worker. (Example: gs://
  • useStorageWriteApi : If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the PostgreSQL to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PostgreSQL_to_BigQuery \
    --parameters \
connectionURL=JDBC_CONNECTION_URL,\
query=SOURCE_SQL_QUERY,\
outputTable=PROJECT_ID:DATASET.TABLE_NAME,
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS,\
connectionProperties=CONNECTION_PROPERTIES,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • JDBC_CONNECTION_URL: the JDBC connection URL
  • SOURCE_SQL_QUERY: the SQL query to run on the source database
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name
  • PATH_TO_TEMP_DIR_ON_GCS: your Cloud Storage path to the temp directory
  • CONNECTION_PROPERTIES: the JDBC connection properties, if needed
  • CONNECTION_USERNAME: the JDBC connection username
  • CONNECTION_PASSWORD: the JDBC connection password
  • KMS_ENCRYPTION_KEY: the Cloud KMS encryption key

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launchParameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PostgreSQL_to_BigQuery"
    "parameters": {
      "connectionURL": "JDBC_CONNECTION_URL",
      "query": "SOURCE_SQL_QUERY",
      "outputTable": "PROJECT_ID:DATASET.TABLE_NAME",
      "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS",
      "connectionProperties": "CONNECTION_PROPERTIES",
      "username": "CONNECTION_USERNAME",
      "password": "CONNECTION_PASSWORD",
      "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
    },
    "environment": { "zone": "us-central1-f" }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • JDBC_CONNECTION_URL: the JDBC connection URL
  • SOURCE_SQL_QUERY: the SQL query to run on the source database
  • DATASET: your BigQuery dataset
  • TABLE_NAME: your BigQuery table name
  • PATH_TO_TEMP_DIR_ON_GCS: your Cloud Storage path to the temp directory
  • CONNECTION_PROPERTIES: the JDBC connection properties, if needed
  • CONNECTION_USERNAME: the JDBC connection username
  • CONNECTION_PASSWORD: the JDBC connection password
  • KMS_ENCRYPTION_KEY: the Cloud KMS encryption key

What's next