Pub/Sub to Java Database Connectivity (JDBC) 模板是一种流处理流水线,可从预先存在的 Pub/Sub 订阅注入数据作为 JSON 字符串,并将生成的记录写入 JDBC。
流水线要求
- Pub/Sub 订阅必须已存在才能运行此流水线。
- 在运行流水线之前,JDBC 源必须已存在。
- Pub/Sub 输出死信主题必须已存在才能运行此流水线。
模板参数
参数 | 说明 |
---|---|
driverClassName |
JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver 。 |
connectionUrl |
JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb 。
您可以将此值作为使用 Cloud KMS 密钥加密,然后进行 Base64 编码的字符串传入。
从 Base64 编码的字符串中移除空白字符。 |
driverJars |
以英文逗号分隔的 JDBC 驱动程序 Cloud Storage 路径。例如 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar 。 |
username |
可选:用于 JDBC 连接的用户名。您可以将由 Cloud KMS 密钥加密的此值作为 Base64 编码的字符串传入。 |
password |
可选:用于 JDBC 连接的密码。您可以将由 Cloud KMS 密钥加密的此值作为 Base64 编码的字符串传入。 |
connectionProperties |
可选:用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]* 。例如 unicode=true;characterEncoding=UTF-8 。 |
statement |
针对数据库运行的语句。该语句必须以任意顺序指定表的列名。只会从 JSON 中读取指定列名称的值并将其添加到语句中。例如 INSERT INTO tableName (column1, column2) VALUES (?,?) 。 |
inputSubscription |
要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription> 。 |
outputDeadletterTopic |
用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name> 。 |
KMSEncryptionKey |
可选:用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。 |
extraFilesToStage |
用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id> 。 |
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to JDBC template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Jdbc \ --region REGION_NAME \ --parameters \ driverClassName=DRIVER_CLASS_NAME,\ connectionURL=JDBC_CONNECTION_URL,\ driverJars=DRIVER_PATHS,\ username=CONNECTION_USERNAME,\ password=CONNECTION_PASSWORD,\ connectionProperties=CONNECTION_PROPERTIES,\ statement=SQL_STATEMENT,\ inputSubscription=INPUT_SUBSCRIPTION,\ outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\ KMSEncryptionKey=KMS_ENCRYPTION_KEY
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
DRIVER_CLASS_NAME
:驱动程序类名称JDBC_CONNECTION_URL
:JDBC 连接网址DRIVER_PATHS
:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径CONNECTION_USERNAME
:JDBC 连接用户名CONNECTION_PASSWORD
:JDBC 连接密码CONNECTION_PROPERTIES
:JDBC 连接属性(如有需要)SQL_STATEMENT
:要对数据库执行的 SQL 语句INPUT_SUBSCRIPTION
:要读取的 Pub/Sub 输入订阅。OUTPUT_DEADLETTER_TOPIC
:用于转发无法递送的消息的 Pub/SubKMS_ENCRYPTION_KEY
:Cloud KMS 加密密钥
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "jobName": "JOB_NAME", "parameters": { "driverClassName": "DRIVER_CLASS_NAME", "connectionURL": "JDBC_CONNECTION_URL", "driverJars": "DRIVER_PATHS", "username": "CONNECTION_USERNAME", "password": "CONNECTION_PASSWORD", "connectionProperties": "CONNECTION_PROPERTIES", "statement": "SQL_STATEMENT", "inputSubscription": "INPUT_SUBSCRIPTION", "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC", "KMSEncryptionKey":"KMS_ENCRYPTION_KEY" }, "environment": { "zone": "us-central1-f" }, }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
DRIVER_CLASS_NAME
:驱动程序类名称JDBC_CONNECTION_URL
:JDBC 连接网址DRIVER_PATHS
:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径CONNECTION_USERNAME
:JDBC 连接用户名CONNECTION_PASSWORD
:JDBC 连接密码CONNECTION_PROPERTIES
:JDBC 连接属性(如有需要)SQL_STATEMENT
:要对数据库执行的 SQL 语句INPUT_SUBSCRIPTION
:要读取的 Pub/Sub 输入订阅。OUTPUT_DEADLETTER_TOPIC
:用于转发无法递送的消息的 Pub/SubKMS_ENCRYPTION_KEY
:Cloud KMS 加密密钥
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。