使用 Bigtable Spark 连接器
借助 Bigtable Spark 连接器,您可以往返 Bigtable 读写数据。您可以使用 Spark SQL 和 DataFrame 从 Spark 应用中读取数据。使用 Bigtable Spark 连接器支持以下 Bigtable 操作:
- 写入数据
- 读取数据
- 创建新表
本文档介绍如何转换 Spark SQL DataFrames 表 然后编译并创建一个 JAR 文件 提交 Spark 作业。
Spark 和 Scala 支持状态
Bigtable Spark 连接器仅支持 Scala 2.12 版本和以下 Spark 版本:
Bigtable Spark 连接器支持以下 Dataproc 版本:
计算费用
如果您决定使用 Google Cloud 的以下任何可计费组件,则需要为使用的资源付费:
- Bigtable(使用 Bigtable 模拟器无需付费)
- Dataproc
- Cloud Storage
Dataproc 价格适用于 在 Compute Engine 集群上使用 Dataproc。Dataproc 无服务器 适用于工作负载和会话运行的价格 如何使用 Dataproc Serverless for Spark。
您可使用价格计算器根据您的预计使用量来估算费用。
准备工作
在使用 Bigtable Spark 连接器之前,请先满足以下前提条件。
所需的角色
如需获取使用 Bigtable Spark 连接器所需的权限, 请让管理员授予您 项目的以下 IAM 角色:
-
Bigtable Administrator (
roles/bigtable.admin
)(可选): 可让您读取或写入数据以及创建新表。 -
Bigtable User (
roles/bigtable.user
): 可让您读取或写入数据,但不能创建新表。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
如果您使用的是 Dataproc 或 Cloud Storage,则可能需要其他权限。如需了解详情,请参阅 Dataproc 权限和 Cloud Storage 权限。
设置 Spark
除了创建 Bigtable 实例之外,您还需要设置您的 Spark 实例。您可以在本地执行此操作,也可以选择以下选项之一,将 Spark 与 Dataproc 结合使用:
- Dataproc 集群
- Dataproc Serverless
如需详细了解如何在 Dataproc 集群和无服务器选项之间进行选择,请参阅 Dataproc Serverless for Spark 与 Dataproc on Compute Engine 文档。
下载连接器 JAR 文件
您可以在 Bigtable Spark 连接器 GitHub 代码库中找到 Bigtable Spark 连接器的源代码和示例。
根据您的 Spark 设置,您可以按如下方式访问 JAR 文件:
如果您在本地运行 PySpark,则应从
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
Cloud Storage 位置下载连接器的 JAR 文件。将
SCALA_VERSION
替换为 Scala 版本,设置为2.12
作为唯一支持的版本,并将CONNECTOR_VERSION
替换为要使用的连接器版本。对于 Dataproc 集群或无服务器方案,请使用最新的 JAR 文件作为可添加到 Scala 或 Java Spark 应用中的工件。如需详细了解如何将 JAR 文件用作工件,请参阅管理依赖项。
如果要将 PySpark 作业提交到 Dataproc,请使用
gcloud dataproc jobs submit pyspark --jars
标志将 URI 设置为 Cloud Storage 中的 JAR 文件位置,例如gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
。
确定计算类型
对于只读作业,您可以使用 Data Boost(预览版)无服务器计算 避免影响应用服务的集群。您的 Spark 应用必须使用 1.1.0 或更高版本的 Spark 连接器才能使用 流量提升。
若要使用 Data Boost,您必须创建 Data Boost 应用配置文件,然后
提供 spark.bigtable.app_profile.id
Spark 的应用配置文件 ID
选项
添加到 Spark 应用。如果您已为 Spark 读取作业创建了应用配置文件,并且希望在不更改应用代码的情况下继续使用该配置文件,则可以将应用配置文件转换为 Data Boost 应用配置文件。有关详情,请参阅转换应用
个人资料。
如需了解详情,请参阅 Bigtable Data Boost 概览。
对于涉及读取和写入的作业,您可以通过在请求中指定标准应用配置文件,使用实例的集群节点进行计算。
指定或创建要使用的应用配置文件
如果您未指定应用配置文件 ID,该连接器将使用默认应用 个人资料。
我们建议您为自己的每个应用分别使用不同的应用配置文件 包括您的 Spark 应用。如需详细了解应用配置文件类型和设置,请参阅应用配置文件概览。如需查看相关说明,请参阅创建和配置应用配置文件。
将 Bigtable 配置添加到您的 Spark 应用
在您的 Spark 应用中,添加可让您与 Bigtable 进行交互的 Spark 选项。
支持的 Spark 选项
使用作为 com.google.cloud.spark.bigtable
软件包的一部分提供的 Spark 选项。
选项名称 | 必填 | 默认值 | 含义 |
---|---|---|---|
spark.bigtable.project.id |
是 | 不适用 | 设置 Bigtable 项目 ID。 |
spark.bigtable.instance.id |
是 | 不适用 | 设置 Bigtable 实例 ID。 |
catalog |
是 | 不适用 | 设置 JSON 格式,用于指定 DataFrame 的类似 SQL 的架构与 Bigtable 表架构之间的转换格式。 如需了解详情,请参阅以 JSON 格式创建表元数据。 |
spark.bigtable.app_profile.id |
否 | default |
设置 Bigtable 应用配置文件 ID。 |
spark.bigtable.write.timestamp.milliseconds |
否 | 当前系统时间 | 设置在将 DataFrame 写入 Bigtable 时要使用的时间戳(以毫秒为单位)。 请注意,由于 DataFrame 中的所有行都使用相同的时间戳,因此在 DataFrame 中具有相同行键列的行将保留为 Bigtable 中的单个版本,因为它们具有相同的时间戳。 |
spark.bigtable.create.new.table |
否 | false |
设置为 true 可在写入 Bigtable 之前创建一个新表。 |
spark.bigtable.read.timerange.start.milliseconds 或 spark.bigtable.read.timerange.end.milliseconds |
否 | 不适用 | 设置时间戳(以自纪元时间以来的毫秒数表示)以分别过滤具有特定开始日期和结束日期的单元。 |
spark.bigtable.push.down.row.key.filters |
否 | true |
设置为 true 以允许在服务器端进行简单的行键过滤。复合行键的过滤是在客户端实现的。如需了解详情,请参阅使用过滤器读取特定的 DataFrame 行。 |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
否 | 30m | 为与 Java 版 Bigtable 客户端中的一个 DataFrame 分区对应的读取行尝试设置超时时长。 |
spark.bigtable.read.rows.total.timeout.milliseconds |
否 | 12 小时 | 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的读取行尝试设置总超时时长。 |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
否 | 1 分钟 | 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的行更改尝试设置超时时长。 |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
否 | 10 分钟 | 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的行更改尝试设置总超时时长。 |
spark.bigtable.batch.mutate.size |
否 | 100 |
将此项设为每批次中的更改数量。您可以设置的最大值为 100000 。 |
spark.bigtable.enable.batch_mutate.flow_control |
否 | false |
设置为 true 可为批量变更启用流控制。 |
创建 JSON 格式的表元数据
必须使用 JSON 格式的字符串将 Spark SQL DataFrame 表格式转换为 Bigtable 表格式。此字符串 JSON 格式使数据格式与 Bigtable 兼容。您可以使用 .option("catalog", catalog_json_string)
选项在应用代码中传递 JSON 格式。
以下面的 DataFrame 表和相应的 Bigtable 表为例。
在此示例中,DataFrame 中的 name
和 birthYear
列在 info
列族下归为一组,并分别重命名为 name
和 birth_year
。同样,address
列存储在具有相同列名称的 location
列族下。DataFrame 中的 id
列会转换为 Bigtable 行键。
这些行键在 Bigtable 中没有专用的列名称,在本示例中,id_rowkey
仅用于向连接器指示这是行键列。您可以对行键列使用任何名称,并确保在以 JSON 格式声明 "rowkey":"column_name"
字段时使用相同的名称。
DataFrame | Bigtable 表 = t1 | |||||||
列 | 行键 | 列族 | ||||||
信息 | 位置 | |||||||
列 | 列 | |||||||
id | name | birthYear | 地址 | id_rowkey | name | birth_year | 地址 |
目录的 JSON 格式如下:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
JSON 格式中使用的键和值如下所示:
目录键 | 目录值 | JSON 格式 |
---|---|---|
表 | Bigtable 表的名称。 | "table":{"name":"t1"} 如果表不存在,请使用 .option("spark.bigtable.create.new.table", "true") 创建一个表。 |
行键 | 将用作 Bigtable 行键的列的名称。确保将 DataFrame 列的列名称用作行键,例如 id_rowkey 。复合键也可以用作行键。例如 "rowkey":"name:address" 。此方法可能会导致行键需要对所有读取请求进行全表扫描。 |
"rowkey":"id_rowkey" 、 |
列 | 将每个 DataFrame 列映射到相应的 Bigtable 列族 ("cf" ) 和列名称 ("col" )。列名称可以与 DataFrame 表中的列名称不同。支持的数据类型包括 string 、long 和 binary 。 |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" 在此示例中, id_rowkey 是行键,info 和 location 是列族。 |
支持的数据类型
连接器支持使用 string
、long
和 binary
(字节数组)类型
目录在添加对其他类型(例如 int
和 float
)的支持之前,
您可以手动将此类数据类型转换为字节数组(Spark SQL 的
BinaryType
),然后才能使用连接器将它们写入到
Bigtable。
此外,您还可以使用 Avro 对复杂的
例如 ArrayType
。如需了解详情,请参阅序列化复杂数据
类型。
写入 Bigtable
使用 .write()
函数和支持的选项将数据写入 Bigtable。
Java
GitHub 代码库中的以下代码使用 Java 和 Maven 向 Bigtable 写入数据。
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
GitHub 代码库中的以下代码使用 Python 向 Bigtable 写入数据。
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
从 Bigtable 中读取
使用 .read()
函数检查表是否已成功导入 Bigtable。
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
编译项目
生成用于在 Dataproc 集群、Dataproc Serverless 或本地 Spark 实例中运行作业的 JAR 文件。您可以在本地编译 JAR 文件,然后使用该文件提交作业。提交作业时,已编译的 JAR 的路径设置为 PATH_TO_COMPILED_JAR
环境变量。
此步骤不适用于 PySpark 应用。
管理依赖项
Bigtable Spark 连接器支持以下依赖项管理工具:
编译 JAR 文件
Maven
将
spark-bigtable
依赖项添加到您的 pom.xml 文件中。<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
将 Maven Shade 插件添加到您的
pom.xml
文件中,以创建超级 JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
运行
mvn clean install
命令以生成 JAR 文件。
sbt
将
spark-bigtable
依赖项添加到您的build.sbt
文件中:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
将
sbt-assembly
插件添加到project/plugins.sbt
或project/assembly.sbt
文件以创建 Uber JAR 文件。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
运行
sbt clean assembly
命令以生成 JAR 文件。
Gradle
将
spark-bigtable
依赖项添加到build.gradle
文件中。dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
在
build.gradle
文件中添加 Shadow 插件以创建超级 JAR 文件:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
如需了解更多配置和 JAR 编译信息,请参阅 Shadow 插件文档。
提交作业
使用 Dataproc、Dataproc Serverless 或本地 Spark 实例提交 Spark 作业以启动您的应用。
设置运行时环境
设置以下环境变量。
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
替换以下内容:
- PROJECT_ID:Bigtable 项目的永久性标识符。
- INSTANCE_ID:Bigtable 实例的永久性标识符。
- TABLE_NAME:表的永久标识符。
- DATAPROC_CLUSTER:Dataproc 集群的永久性标识符。
- DATAPROC_REGION:包含您的 Dataproc 实例中的一个集群的 Dataproc 区域,例如
northamerica-northeast2
。 - DATAPROC_ZONE:Dataproc 集群在其中运行的区域。
- SUBNET:子网的完整资源路径。
- GCS_BUCKET_NAME:用于上传 Spark 工作负载依赖项的 Cloud Storage 存储桶。
- PATH_TO_COMPILED_JAR:已编译 JAR 文件的绝对或相对路径,例如 Maven 的
/path/to/project/root/target/<compiled_JAR_name>
。 - GCS_PATH_TO_CONNECTOR_JAR:
spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
文件所在的gs://spark-lib/bigtable
Cloud Storage 存储桶。 - PATH_TO_PYTHON_FILE:对于 PySpark 应用,这是 Python 文件的路径,该文件用于向 Bigtable 写入数据以及从 Bigtable 读取数据。
- LOCAL_PATH_TO_CONNECTOR_JAR:对于 PySpark 应用,这是指已下载的 Bigtable Spark 连接器 JAR 文件的路径。
提交 Spark 作业
对于 Dataproc 实例或本地 Spark 设置,请运行 Spark 作业以将数据上传到 Bigtable。
Dataproc 集群
使用已编译的 JAR 文件,创建一个 Dataproc 集群作业,该作业可以在 Bigtable 中读取和写入数据。
创建 Dataproc 集群。以下示例展示了一个示例命令,该命令用于创建具有 Debian 10、两个工作器节点和默认配置的 Dataproc v2.0 集群。
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
提交作业。
Scala/Java
以下示例展示了
spark.bigtable.example.WordCount
类,其中包含用于在 DataFrame 中创建测试表、将该表写入 Bigtable,然后计算表中单词数的逻辑。gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
使用编译后的 JAR 文件创建一个 Dataproc 作业,以便通过 Dataproc Serverless 实例在 Bigtable 中读取和写入数据。
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
本地 Spark
使用下载的 JAR 文件创建一个 Spark 作业,用于通过本地 Spark 实例在 Bigtable 中读取和写入数据。您还可以使用 Bigtable 模拟器提交 Spark 作业。
使用 Bigtable 模拟器
如果您决定使用 Bigtable 模拟器,请按照以下步骤操作:
运行以下命令启动模拟器:
gcloud beta emulators bigtable start
默认情况下,模拟器会选择
localhost:8086
。设置
BIGTABLE_EMULATOR_HOST
环境变量:export BIGTABLE_EMULATOR_HOST=localhost:8086
如需详细了解如何使用 Bigtable 模拟器,请参阅使用模拟器进行测试。
提交 Spark 作业
无论您是否使用本地 Bigtable 模拟器,都可以使用 spark-submit
命令提交 Spark 作业。
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
验证表数据
运行以下命令
cbt
CLI
命令来验证数据是否已写入 Bigtable。通过
cbt
CLI
是 Google Cloud CLI 的一个组件。有关详情,请参阅
cbt
CLI
概览。
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
其他解决方案
将 Bigtable Spark 连接器用于特定解决方案,例如序列化复杂的 Spark SQL 类型、读取特定行和生成客户端指标。
使用过滤条件读取特定 DataFrame 行
使用 DataFrame 从 Bigtable 读取数据时,您可以指定过滤条件,以便仅读取特定行。系统会在服务器端对行键列应用 ==
、<=
和 startsWith
等简单过滤条件,以避免执行全表扫描。在客户端应用复合行键过滤条件或复杂过滤条件(例如行键列的 LIKE
过滤条件)。
如果您要读取大型表,我们建议您使用简单的行键过滤条件,以免执行全表扫描。以下示例语句显示了如何使用简单的过滤条件进行读取。确保在 Spark 过滤器中使用转换为行键的 DataFrame 列的名称:
dataframe.filter("id == 'some_id'").show()
应用过滤条件时,请使用 DataFrame 列名称,而不是 Bigtable 表列名称。
使用 Apache Avro 序列化复杂数据类型
Bigtable Spark 连接器支持使用 Apache Avro 对复杂的 Spark SQL 类型(例如 ArrayType
、MapType
或 StructType
)进行序列化。Apache Avro 为记录数据提供数据序列化,通常用于处理和存储复杂的数据结构。
使用 "avro":"avroSchema"
等语法指定 Bigtable 中的某个列应使用 Avro 进行编码。然后,您可以在从 Bigtable 读取数据或向其写入数据时使用 .option("avroSchema", avroSchemaString)
,以字符串格式指定与该列对应的 Avro 架构。您可以使用不同的选项名称,例如,针对不同的列使用 "anotherAvroSchema"
,并为多列传递 Avro 架构。
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
使用客户端指标
由于 Bigtable Spark 连接器基于适用于 Java 的 Bigtable 客户端,因此默认情况下,连接器内会启用客户端指标。您可以参阅客户端指标文档,详细了解如何访问和解读这些指标。
将 Java 版 Bigtable 客户端与低级 RDD 函数搭配使用
由于 Bigtable Spark 连接器基于适用于 Java 的 Bigtable 客户端,因此您可以直接在 Spark 应用中使用该客户端,并在低级 RDD 函数(例如 mapPartitions
和 foreachPartition
)中执行分布式读取或写入请求。
如需使用适用于 Java 类的 Bigtable 客户端,请将 com.google.cloud.spark.bigtable.repackaged
前缀附加到软件包名称。例如,不要将类名称用作 com.google.cloud.bigtable.data.v2.BigtableDataClient
,而应使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
。
如需详细了解 Java 版 Bigtable 客户端,请参阅适用于 Java 的 Bigtable 客户端。
后续步骤
- 了解如何在 Dataproc 中调整 Spark 作业。
- 将 Java 版 Bigtable 客户端中的类与 Bigtable Spark 连接器搭配使用。