Pub/Sub to Java Database Connectivity (JDBC) template

The Pub/Sub to Java Database Connectivity (JDBC) template is a streaming pipeline that ingests data from a pre-existing Pub/Sub subscription as JSON strings, and writes the resulting records to JDBC.

Pipeline requirements

  • The Pub/Sub subscription must exist prior to running the pipeline.
  • The JDBC source must exist prior to running the pipeline.
  • The Pub/Sub output dead-letter topic must exist prior to running the pipeline.

Template parameters

Parameter Description
driverClassName The JDBC driver class name. For example, com.mysql.jdbc.Driver.
connectionUrl The JDBC connection URL string. For example, jdbc:mysql://some-host:3306/sampledb. You can pass in this value as a string that's encrypted with a Cloud KMS key and then Base64-encoded. Remove whitespace characters from the Base64-encoded string.
driverJars Comma separated Cloud Storage paths for JDBC drivers. For example, gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar.
username Optional: The username to be used for the JDBC connection. You can pass in this value encrypted by a Cloud KMS key as a Base64-encoded string.
password Optional: The password to be used for the JDBC connection. You can pass in this value encrypted by a Cloud KMS key as a Base64-encoded string.
connectionProperties Optional: Properties string to use for the JDBC connection. Format of the string must be [propertyName=property;]*. For example, unicode=true;characterEncoding=UTF-8.
statement Statement to run against the database. The statement must specify the column names of the table in any order. Only the values of the specified column names are read from the JSON and added to the statement. For example, INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription The Pub/Sub input subscription to read from, in the format of projects/<project>/subscriptions/<subscription>.
outputDeadletterTopic The Pub/Sub topic to forward undeliverable messages. For example, projects/<project-id>/topics/<topic-name>.
KMSEncryptionKey Optional: Cloud KMS Encryption Key to decrypt the username, password, and connection string. If Cloud KMS key is passed in, the username, password, and connection string must all be passed in encrypted.
extraFilesToStage Comma separated Cloud Storage paths or Secret Manager secrets for files to stage in the worker. These files will be saved under the /extra_files directory in each worker. For example, gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub to JDBC template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • DRIVER_CLASS_NAME: the driver class name
  • JDBC_CONNECTION_URL: the JDBC connection URL
  • DRIVER_PATHS: the comma-separated Cloud Storage path(s) of the JDBC driver(s)
  • CONNECTION_USERNAME: the JDBC connection username
  • CONNECTION_PASSWORD: the JDBC connection password
  • CONNECTION_PROPERTIES: the JDBC connection properties, if necessary
  • SQL_STATEMENT: the SQL statement to be executed against the database
  • INPUT_SUBSCRIPTION: the Pub/Sub input subscription to read from
  • OUTPUT_DEADLETTER_TOPIC: the Pub/Sub to forward undeliverable messages
  • KMS_ENCRYPTION_KEY: the Cloud KMS Encryption Key

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • DRIVER_CLASS_NAME: the driver class name
  • JDBC_CONNECTION_URL: the JDBC connection URL
  • DRIVER_PATHS: the comma-separated Cloud Storage path(s) of the JDBC driver(s)
  • CONNECTION_USERNAME: the JDBC connection username
  • CONNECTION_PASSWORD: the JDBC connection password
  • CONNECTION_PROPERTIES: the JDBC connection properties, if necessary
  • SQL_STATEMENT: the SQL statement to be executed against the database
  • INPUT_SUBSCRIPTION: the Pub/Sub input subscription to read from
  • OUTPUT_DEADLETTER_TOPIC: the Pub/Sub to forward undeliverable messages
  • KMS_ENCRYPTION_KEY: the Cloud KMS Encryption Key

What's next