Pub/Sub to Java Database Connectivity (JDBC) 模板

Pub/Sub to Java Database Connectivity (JDBC) 模板是一种流处理流水线,可从预先存在的 Pub/Sub 订阅注入数据作为 JSON 字符串,并将生成的记录写入 JDBC。

流水线要求

  • Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行流水线之前,JDBC 源必须已存在。
  • Pub/Sub 输出死信主题必须已存在才能运行此流水线。

模板参数

参数 说明
driverClassName JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
connectionUrl JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。 您可以将此值作为使用 Cloud KMS 密钥加密,然后进行 Base64 编码的字符串传入。 从 Base64 编码的字符串中移除空白字符。
driverJars 以英文逗号分隔的 JDBC 驱动程序 Cloud Storage 路径。例如 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username 可选:用于 JDBC 连接的用户名。您可以将由 Cloud KMS 密钥加密的此值作为 Base64 编码的字符串传入。
password 可选:用于 JDBC 连接的密码。您可以将由 Cloud KMS 密钥加密的此值作为 Base64 编码的字符串传入。
connectionProperties 可选:用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]*。例如 unicode=true;characterEncoding=UTF-8
statement 针对数据库运行的语句。该语句必须以任意顺序指定表的列名。只会从 JSON 中读取指定列名称的值并将其添加到语句中。例如 INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
KMSEncryptionKey 可选:用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。
extraFilesToStage 用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to JDBC template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • DRIVER_CLASS_NAME:驱动程序类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • SQL_STATEMENT:要对数据库执行的 SQL 语句
  • INPUT_SUBSCRIPTION:要读取的 Pub/Sub 输入订阅。
  • OUTPUT_DEADLETTER_TOPIC:用于转发无法递送的消息的 Pub/Sub
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • DRIVER_CLASS_NAME:驱动程序类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • SQL_STATEMENT:要对数据库执行的 SQL 语句
  • INPUT_SUBSCRIPTION:要读取的 Pub/Sub 输入订阅。
  • OUTPUT_DEADLETTER_TOPIC:用于转发无法递送的消息的 Pub/Sub
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

后续步骤