Java Database Connectivity (JDBC) to BigQuery 模板

JDBC to BigQuery 模板是一种批处理流水线,可将数据从关系数据库表中复制到现有的 BigQuery 表中。此流水线使用 JDBC 连接到关系型数据库。可使用此模板将数据从任何具有可用 JDBC 驱动程序的关系型数据库复制到 BigQuery 中。

为了增加一项保护措施,您可以在传入使用 Cloud KMS 密钥加密的 Base64 编码用户名、密码和连接字符串参数的同时,传入该 Cloud KMS 密钥。如需详细了解如何对用户名、密码和连接字符串参数进行加密,请参阅 Cloud KMS API 加密端点

流水线要求

  • 关系数据库的 JDBC 驱动程序必须可用。
  • 在运行此流水线之前,BigQuery 表必须已存在。
  • BigQuery 表必须具有兼容的架构。
  • 必须能够从 Dataflow 运行的子网访问关系数据库。

模板参数

参数 说明
driverJars 以逗号分隔的驱动程序 JAR 文件列表。例如:gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
driverClassName JDBC 驱动程序类名称。例如:com.mysql.jdbc.Driver
connectionURL JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。您可以将此值作为使用 Cloud KMS 密钥加密,然后进行 Base64 编码的字符串传入。 从 Base64 编码的字符串中移除空白字符。请注意 Oracle 非 RAC 数据库连接字符串 (jdbc:oracle:thin:@some-host:<port>:<sid>) 和 Oracle RAC 数据库连接字符串 (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>) 之间的差异。例如:jdbc:mysql://some-host:3306/sampledb
outputTable 要将输出写入的 BigQuery 表位置。名称必须采用 <project>:<dataset>.<table_name> 格式。表的架构必须与输入对象匹配。例如:<my-project>:<my-dataset>.<my-table>
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。 例如:gs://your-bucket/your-files/temp_dir
connectionProperties 可选:用于 JDBC 连接的属性字符串。使用字符串格式 [propertyName=property;]*。例如:unicode=true;characterEncoding=UTF-8
username 可选:要用于 JDBC 连接的用户名。您可以将由 Cloud KMS 密钥加密的值作为 Base64 编码的字符串传入。
password 可选:要用于 JDBC 连接的密码。您可以将由 Cloud KMS 密钥加密的值作为 Base64 编码的字符串传入。
query 可选:要在提取数据的源上运行的查询。例如:select * from sampledb.sample_table
KMSEncryptionKey 可选:要用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果您传入 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。例如:projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
useColumnAlias 可选:如果已启用(设置为 true),则流水线会使用列别名(“AS”)而不是列名称将行映射到 BigQuery。默认值为 false
isTruncate 可选:如果已启用(设置为 true),则流水线会在将数据加载到 BigQuery 之前截断。默认为 false,这会导致流水线附加数据。
partitionColumn 可选:如果提供了此参数(以及 table),则 JdbcIO 使用范围对同一个表(子查询)执行多个查询实例来并行读取表。目前仅支持 Long 分区列。
table 可选:使用分区时要读取的表。此参数还接受用英文括号括起的子查询。 例如:(select id, name from Person 作为 subq)。
numPartitions 可选:分区数量。使用下限和上限,此值会为生成的 WHERE 子句表达式形成分区步长,这些表达式用于均匀拆分分区列。当输入小于 1 时,数字设置为 1
lowerBound 可选:要在分区方案中使用的下限。如果未提供,则 Apache Beam 会自动为受支持的类型推断此值。
upperBound 可选:要在分区方案中使用的上限。如果未提供,则 Apache Beam 会自动为受支持的类型推断此值。
disabledAlgorithms 可选:要停用的以英文逗号分隔的算法。如果此值设置为 none,则不会停用任何算法。 请谨慎使用,因为已知默认停用的算法存在漏洞或性能问题。例如:SSLv3, RC4
extraFilesToStage 可选:用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。 这些文件保存在每个工作器的 /extra_files 目录中。例如:gs://your-bucket/file.txt,projects/project-id/secrets/secret-id/versions/version-id
useStorageWriteApi 可选:如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce 可选:使用 Storage Write API 时,请指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the JDBC to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       driverJars=DRIVER_JARS,\
       driverClassName=DRIVER_CLASS_NAME,\
       connectionURL=CONNECTION_URL,\
       outputTable=OUTPUT_TABLE,\
       bigQueryLoadingTemporaryDirectory=BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,\

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • DRIVER_JARS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:JDBC 驱动程序类名称
  • CONNECTION_URL:JDBC 连接网址字符串。
  • OUTPUT_TABLE:BigQuery 输出表
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY:BigQuery 加载进程的临时目录

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "driverJars": "DRIVER_JARS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "CONNECTION_URL",
       "outputTable": "OUTPUT_TABLE",
       "bigQueryLoadingTemporaryDirectory": "BIG_QUERY_LOADING_TEMPORARY_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • DRIVER_JARS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:JDBC 驱动程序类名称
  • CONNECTION_URL:JDBC 连接网址字符串。
  • OUTPUT_TABLE:BigQuery 输出表
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY:BigQuery 加载进程的临时目录

后续步骤