Java Database Connectivity (JDBC) to BigQuery 模板

JDBC to BigQuery 模板是一种批处理流水线,可将数据从关系数据库表中复制到现有的 BigQuery 表中。此流水线使用 JDBC 连接到关系型数据库。可使用此模板将数据从任何具有可用 JDBC 驱动程序的关系型数据库复制到 BigQuery 中。

为了增加一项保护措施,您可以在传入使用 Cloud KMS 密钥加密的 Base64 编码用户名、密码和连接字符串参数的同时,传入该 Cloud KMS 密钥。如需详细了解如何对用户名、密码和连接字符串参数进行加密,请参阅 Cloud KMS API 加密端点

流水线要求

  • 关系数据库的 JDBC 驱动程序必须可用。
  • 在运行此流水线之前,BigQuery 表必须已存在。
  • BigQuery 表必须具有兼容的架构。
  • 必须能够从运行 Dataflow 的子网访问关系型数据库。

模板参数

必需参数

  • driverJars:以英文逗号分隔的驱动程序 JAR 文件列表。例如 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
  • driverClassName:JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
  • connectionURL:JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。您可以将此值作为使用 Cloud KMS 密钥加密,然后进行 Base64 编码的字符串传入。 从 Base64 编码的字符串中移除空白字符。 请注意 Oracle 非 RAC 数据库连接字符串 (jdbc:oracle:thin:@some-host:<port>:<sid>) 和 Oracle RAC 数据库连接字符串 (jdbc:oracle:thin:@//some-host[:<port>]/<service_name>) 之间的差异。例如,jdbc:mysql://some-host:3306/sampledb
  • outputTable:BigQuery 输出表位置。例如 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • bigQueryLoadingTemporaryDirectory:BigQuery 加载进程的临时目录。例如 gs://your-bucket/your-files/temp_dir

可选参数

  • connectionProperties:要用于 JDBC 连接的属性字符串。字符串的格式必须为 [propertyName=property;]*。如需了解详情,请参阅 MySQL 文档中的“配置属性”(https://dev.mysql.com/doc/connector-j/8.1/en/connector-j-reference-configuration-properties.html)。例如 unicode=true;characterEncoding=UTF-8
  • username:要用于 JDBC 连接的用户名。您可以将此值作为使用 Cloud KMS 密钥加密,然后进行 Base64 编码的字符串传入。 从 Base64 编码的字符串中移除空白字符。
  • password:要用于 JDBC 连接的密码。您可以将此值作为使用 Cloud KMS 密钥加密,然后进行 Base64 编码的字符串传入。 从 Base64 编码的字符串中移除空白字符。
  • query:要在提取数据的来源上运行的查询。请注意,某些 JDBC SQL 类型和 BigQuery 类型虽然名称相同,但存在一些差异。需要注意的一些重要的 SQL -> BigQuery 类型映射如下:DATETIME --> TIMESTAMP。如果您的架构不匹配,可能需要进行类型转换。例如 select * from sampledb.sample_table
  • KMSEncryptionKey:要用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果您传入 Cloud KMS 密钥,则还必须对用户名、密码和连接字符串进行加密。例如 projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • useColumnAlias:如果设置为 true,则流水线会使用列别名 (AS) 而不是列名称将行映射到 BigQuery。默认值为 false
  • isTruncate:如果设置为 true,则流水线会在将数据加载到 BigQuery 之前截断。默认为 false,这会导致流水线附加数据。
  • partitionColumn:如果此参数提供 table 的名称(定义为可选参数),则 JdbcIO 使用范围对同一个表(子查询)执行多个查询实例来并行读取表。目前仅支持 Long 分区列。
  • table:使用分区时要读取的表。此参数还接受用英文括号括起的子查询。 例如 (select id, name from Person) as subq
  • numPartitions:分区的数量。使用下限和上限,此值会为生成的 WHERE 子句表达式形成分区步长,这些表达式用于均匀拆分分区列。当输入小于 1 时,数字设置为 1
  • lowerBound:要在分区方案中使用的下限。如果未提供,Apache Beam 会针对受支持的类型自动推断此值。
  • upperBound:要在分区方案中使用的上限。如果未提供,Apache Beam 会针对受支持的类型自动推断此值。
  • fetchSize:一次从数据库中提取的行数。不用于分区读取。默认值为 50000。
  • createDisposition:要使用的 BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认值为:CREATE_NEVER。
  • bigQuerySchemaPath:BigQuery JSON 架构的 Cloud Storage 路径。如果将 createDisposition 设置为 CREATE_IF_NEEDED,则必须指定此参数。例如 gs://your-bucket/your-schema.json
  • disabledAlgorithms:要停用的算法(以英文逗号分隔)。如果此值设置为 none,则不会停用任何算法。请谨慎使用此参数,因为默认停用的算法可能存在漏洞或性能问题。 例如 SSLv3, RC4
  • extraFilesToStage:用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文(以英文逗号分隔)。这些文件保存在每个工作器的 /extra_files 目录中。例如 gs://<BUCKET_NAME>/file.txt,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<VERSION_ID>
  • useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为 false。如需了解详情,请参阅使用 Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the JDBC to BigQuery with BigQuery Storage API support template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Jdbc_to_BigQuery_Flex \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       driverJars=DRIVER_JARS,\
       driverClassName=DRIVER_CLASS_NAME,\
       connectionURL=CONNECTION_URL,\
       outputTable=OUTPUT_TABLE,\
       bigQueryLoadingTemporaryDirectory=BIG_QUERY_LOADING_TEMPORARY_DIRECTORY,\

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • DRIVER_JARS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:JDBC 驱动程序类名称
  • CONNECTION_URL:JDBC 连接网址字符串。
  • OUTPUT_TABLE:BigQuery 输出表
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY:BigQuery 加载进程的临时目录

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "driverJars": "DRIVER_JARS",
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "CONNECTION_URL",
       "outputTable": "OUTPUT_TABLE",
       "bigQueryLoadingTemporaryDirectory": "BIG_QUERY_LOADING_TEMPORARY_DIRECTORY",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Jdbc_to_BigQuery_Flex",
     "environment": { "maxWorkers": "10" }
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • DRIVER_JARS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • DRIVER_CLASS_NAME:JDBC 驱动程序类名称
  • CONNECTION_URL:JDBC 连接网址字符串。
  • OUTPUT_TABLE:BigQuery 输出表
  • BIG_QUERY_LOADING_TEMPORARY_DIRECTORY:BigQuery 加载进程的临时目录

后续步骤