使用 Bigtable Spark 连接器
借助 Bigtable Spark 连接器,您可以从 Bigtable 中读取数据以及向 Bigtable 中写入数据。您可以使用 Spark SQL 和 DataFrame 从 Spark 应用中读取数据。Bigtable Spark 连接器支持以下 Bigtable 操作:
- 写入数据
- 读取数据
- 创建新表
本文档介绍了如何将 Spark SQL DataFrames 表转换为 Bigtable 表,然后编译并创建 JAR 文件以提交 Spark 作业。
Spark 和 Scala 支持状态
Bigtable Spark 连接器支持以下 Scala 版本:
Bigtable Spark 连接器支持以下 Spark 版本:
Bigtable Spark 连接器支持以下 Dataproc 版本:
计算费用
如果您决定使用 Google Cloud的以下任何收费组件,则需要为所使用的资源付费:
- Bigtable(使用 Bigtable 模拟器不会产生费用)
- Dataproc
- Cloud Storage
Dataproc 价格适用于 Dataproc on Compute Engine 集群的使用。Dataproc Serverless 价格适用于在 Dataproc Serverless for Spark 上运行的工作负载和会话。
您可使用价格计算器根据您的预计使用量来估算费用。
准备工作
在使用 Bigtable Spark 连接器之前,请先完成以下前提条件。
所需的角色
如需获得使用 Bigtable Spark 连接器所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
- 
            Bigtable Administrator (roles/bigtable.admin)(可选): 可让您读取或写入数据以及创建新表。
- 
            Bigtable User (roles/bigtable.user): 允许您读取或写入数据,但不允许您创建新表。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
如果您使用的是 Dataproc 或 Cloud Storage,可能需要额外权限。如需了解详情,请参阅 Dataproc 权限和 Cloud Storage 权限。
设置 Spark
除了创建 Bigtable 实例之外,您还需要设置 Spark 实例。您可以在本地执行此操作,也可以选择以下任一选项,将 Spark 与 Dataproc 搭配使用:
- Dataproc 集群
- Dataproc Serverless
如需详细了解如何选择 Dataproc 集群或无服务器选项,请参阅 Dataproc Serverless for Spark 与 Dataproc on Compute Engine 文档。
下载连接器 JAR 文件
您可以在 Bigtable Spark 连接器 GitHub 代码库中找到 Bigtable Spark 连接器源代码及示例。
根据您的 Spark 设置,您可以按如下方式访问 JAR 文件:
- 如果您在本地运行 PySpark,则应从 - gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jarCloud Storage 位置下载连接器的 JAR 文件。- 将 - SCALA_VERSION替换为- 2.12或- 2.13(这是唯一受支持的 Scala 版本),并将- CONNECTOR_VERSION替换为您要使用的连接器版本。
- 对于 Dataproc 集群或无服务器选项,请使用最新的 JAR 文件作为可在 Scala 或 Java Spark 应用中添加的制品。如需详细了解如何将 JAR 文件用作制品,请参阅管理依赖项。 
- 如果您要将 PySpark 作业提交到 Dataproc,请使用 - gcloud dataproc jobs submit pyspark --jars标志将 URI 设置为 Cloud Storage 中 JAR 文件的位置,例如- gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar。
确定计算类型
对于只读作业,您可以使用 Data Boost 无服务器计算,这样可以避免影响应用处理集群。您的 Spark 应用必须使用 1.1.0 版或更高版本的 Spark 连接器才能使用 Data Boost。
如需使用 Data Boost,您必须创建 Data Boost 应用配置文件,然后在将 Bigtable 配置添加到 Spark 应用时,为 spark.bigtable.app_profile.id Spark 选项提供应用配置文件 ID。如果您已为 Spark 读取作业创建应用配置文件,并且希望继续使用该配置文件而不更改应用代码,则可以将该应用配置文件转换为 Data Boost 应用配置文件。如需了解详情,请参阅转换应用配置文件。
如需了解详情,请参阅 Bigtable Data Boost 概览。
对于涉及读取和写入的作业,您可以在请求中指定标准应用配置文件,以使用实例的集群节点进行计算。
确定或创建要使用的应用配置文件
如果您未指定应用配置文件 ID,连接器将使用默认应用配置文件。
我们建议您为运行的每个应用(包括 Spark 应用)使用各自不同的应用配置文件。如需详细了解应用配置文件类型和设置,请参阅应用配置文件概览。如需查看相关说明,请参阅创建和配置应用配置文件。
向 Spark 应用添加 Bigtable 配置
在 Spark 应用中,添加允许您与 Bigtable 交互的 Spark 选项。
支持的 Spark 选项
使用作为 com.google.cloud.spark.bigtable 软件包一部分提供的 Spark 选项。
| 选项名称 | 必需 | 默认值 | 含义 | 
|---|---|---|---|
| spark.bigtable.project.id | 是 | 不适用 | 设置 Bigtable 项目 ID。 | 
| spark.bigtable.instance.id | 是 | 不适用 | 设置 Bigtable 实例 ID。 | 
| catalog | 是 | 不适用 | 设置 JSON 格式,用于指定 DataFrame 的类 SQL 架构与 Bigtable 表的架构之间的转换格式。 如需了解详情,请参阅以 JSON 格式创建表元数据。 | 
| spark.bigtable.app_profile.id | 否 | default | 设置 Bigtable 应用配置文件 ID。 | 
| spark.bigtable.write.timestamp.milliseconds | 否 | 当前系统时间 | 设置将 DataFrame 写入 Bigtable 时要使用的以毫秒为单位的时间戳。 请注意,由于 DataFrame 中的所有行都使用相同的时间戳,因此 DataFrame 中具有相同行键列的行在 Bigtable 中会以单个版本保留,因为它们共享相同的时间戳。 | 
| spark.bigtable.create.new.table | 否 | false | 设置为 true可在写入 Bigtable 之前创建新表。 | 
| spark.bigtable.read.timerange.start.milliseconds或spark.bigtable.read.timerange.end.milliseconds | 否 | 不适用 | 设置时间戳(以毫秒为单位,从纪元时间开始计算),以过滤具有特定开始日期和结束日期的单元格。 | 
| spark.bigtable.push.down.row.key.filters | 否 | true | 设置为 true可在服务器端进行简单的行键过滤。对复合行键的过滤是在客户端实现的。如需了解详情,请参阅使用过滤条件读取特定的 DataFrame 行。 | 
| spark.bigtable.read.rows.attempt.timeout.milliseconds | 否 | 30 分钟 | 在 Java 版 Bigtable 客户端中,为读取行尝试设置与一个 DataFrame 分区对应的超时时长。 | 
| spark.bigtable.read.rows.total.timeout.milliseconds | 否 | 12 小时 | 为 Java 版 Bigtable 客户端中与一个 DataFrame 分区对应的读取行尝试设置总超时时长。 | 
| spark.bigtable.mutate.rows.attempt.timeout.milliseconds | 否 | 1 分钟 | 为 Java 版 Bigtable 客户端中与一个 DataFrame 分区对应的 mutate rows 尝试设置超时时长。 | 
| spark.bigtable.mutate.rows.total.timeout.milliseconds | 否 | 10 分钟 | 为 Java 版 Bigtable 客户端中与一个 DataFrame 分区对应的 mutateRows 尝试设置总超时时长。 | 
| spark.bigtable.batch.mutate.size | 否 | 100 | 设置为每个批次中的突变数量。您可以设置的最大值为 100000。 | 
| spark.bigtable.enable.batch_mutate.flow_control | 否 | false | 设置为 true可为批量变更启用流量控制。 | 
创建 JSON 格式的表格元数据
必须使用 JSON 格式的字符串将 Spark SQL DataFrame 表格式转换为 Bigtable 表。这种字符串 JSON 格式可使数据格式与 Bigtable 兼容。您可以使用 .option("catalog", catalog_json_string) 选项在应用代码中传递 JSON 格式。
例如,假设有以下 DataFrame 表和相应的 Bigtable 表。
在此示例中,DataFrame 中的 name 和 birthYear 列被归入 info 列族,并分别重命名为 name 和 birth_year。同样,address 列存储在 location 列族下,列名称相同。DataFrame 中的 id 列会转换为 Bigtable 行键。
行键在 Bigtable 中没有专用列名称,在本例中,id_rowkey 仅用于向连接器指示这是行键列。您可以为行键列使用任意名称,但请务必在以 JSON 格式声明 "rowkey":"column_name" 字段时使用相同的名称。
| DataFrame | Bigtable 表 = t1 | |||||||
| 列 | 行键 | 列族 | ||||||
| 信息 | location | |||||||
| 列 | 列 | |||||||
| id | name | birthYear | 地址 | id_rowkey | name | birth_year | 地址 | |
目录的 JSON 格式如下:
    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """
JSON 格式中使用的键和值如下所示:
| 目录键 | 目录值 | JSON 格式 | 
|---|---|---|
| 表 | Bigtable 表的名称。 | "table":{"name":"t1"}如果表不存在,请使用 .option("spark.bigtable.create.new.table", "true")创建表。 | 
| rowkey | 将用作 Bigtable 行键的列的名称。确保 DataFrame 列的列名用作行键,例如 id_rowkey。复合键也可作为行键。例如 "rowkey":"name:address"。这种方法可能会导致行键需要对所有读取请求进行全表扫描。 | "rowkey":"id_rowkey"、 | 
| 列 | 将每个 DataFrame 列映射到相应的 Bigtable 列族 ( "cf") 和列名称 ("col")。列名称可以与 DataFrame 表中的列名称不同。支持的数据类型包括string、long和binary。 | "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"在此示例中, id_rowkey是行键,info和location是列族。 | 
支持的数据类型
该连接器支持在目录中使用 string、long 和 binary(字节数组)类型。在添加对 int 和 float 等其他类型的支持之前,您可以先手动将此类数据类型转换为字节数组 (Spark SQL 的 BinaryType),然后再使用连接器将其写入 Bigtable。
此外,您还可以使用 Avro 序列化复杂类型,例如 ArrayType。如需了解详情,请参阅使用 Apache Avro 序列化复杂数据类型。
写入 Bigtable
使用 .write() 函数和支持的选项将数据写入 Bigtable。
Java
GitHub 代码库中的以下代码使用 Java 和 Maven 写入 Bigtable。
  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");
…
  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }
Python
GitHub 代码库中的以下代码使用 Python 写入 Bigtable。
  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  …
  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()
  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')
  …
从 Bigtable 中读取
使用 .read() 函数检查表是否已成功导入到 Bigtable 中。
Java
  …
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }
Python
  …
  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()
  print('Reading the DataFrame from Bigtable:')
  records.show()
编译项目
生成用于在 Dataproc 集群、Dataproc Serverless 或本地 Spark 实例中运行作业的 JAR 文件。您可以在本地编译 JAR 文件,然后使用该文件提交作业。提交作业时,已编译 JAR 的路径会设置为 PATH_TO_COMPILED_JAR 环境变量。
此步骤不适用于 PySpark 应用。
管理依赖项
Bigtable Spark 连接器支持以下依赖项管理工具:
编译 JAR 文件
Maven
- 将 - spark-bigtable依赖项添加到您的 pom.xml 文件中。- <dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
- 将 Maven Shade 插件添加到 - pom.xml文件中,以创建超级 JAR:- <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
- 运行 - mvn clean install命令以生成 JAR 文件。
sbt
- 将 - spark-bigtable依赖项添加到- build.sbt文件中:- libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}" 
- 将 - sbt-assembly插件添加到- project/plugins.sbt或- project/assembly.sbt文件,以创建超级 JAR 文件。- addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1") 
- 运行 - sbt clean assembly命令以生成 JAR 文件。
Gradle
- 将 - spark-bigtable依赖项添加到- build.gradle文件中。- dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' } 
- 在 - build.gradle文件中添加 Shadow 插件,以创建超级 JAR 文件:- plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' } 
- 如需了解更多配置和 JAR 编译信息,请参阅 Shadow 插件的文档。 
提交作业
使用 Dataproc、Dataproc Serverless 或本地 Spark 实例提交 Spark 作业,以启动应用。
设置运行时环境
设置以下环境变量。
      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
替换以下内容:
- PROJECT_ID:Bigtable 项目的永久性标识符。
- INSTANCE_ID:Bigtable 实例的永久性标识符。
- TABLE_NAME:表的永久标识符。
- DATAPROC_CLUSTER:Dataproc 集群的永久性标识符。
- DATAPROC_REGION:包含 Dataproc 实例中某个集群的 Dataproc 区域,例如 northamerica-northeast2。
- DATAPROC_ZONE:Dataproc 集群运行的地区。
- SUBNET:子网的完整资源路径。
- GCS_BUCKET_NAME:用于上传 Spark 工作负载依赖项的 Cloud Storage 存储桶。
- PATH_TO_COMPILED_JAR:已编译 JAR 的完整路径或相对路径,例如 Maven 的 /path/to/project/root/target/<compiled_JAR_name>。
- GCS_PATH_TO_CONNECTOR_JAR:spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar文件所在的gs://spark-lib/bigtableCloud Storage 存储桶。
- PATH_TO_PYTHON_FILE:对于 PySpark 应用,将用于向 Bigtable 写入数据和从 Bigtable 读取数据的 Python 文件的路径。
- LOCAL_PATH_TO_CONNECTOR_JAR:对于 PySpark 应用,下载的 Bigtable Spark 连接器 JAR 文件的路径。
提交 Spark 作业
对于 Dataproc 实例或本地 Spark 设置,请运行 Spark 作业以将数据上传到 Bigtable。
Dataproc 集群
使用已编译的 JAR 文件,并创建一个 Dataproc 集群作业,用于从 Bigtable 读取数据以及向 Bigtable 写入数据。
- 创建 Dataproc 集群。以下示例展示了一个示例命令,该命令用于创建具有 Debian 10、两个工作器节点和默认配置的 Dataproc v2.0 集群。 - gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
- 提交作业。 - Scala/Java- 以下示例展示了 - spark.bigtable.example.WordCount类,其中包含在 DataFrame 中创建测试表、将表写入 Bigtable,然后统计表中字数的逻辑。- gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \- PySpark- gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
使用已编译的 JAR 文件创建一个 Dataproc 作业,该作业可使用 Dataproc Serverless 实例从 Bigtable 读取数据以及向其中写入数据。
Scala/Java
  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME
PySpark
  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
本地 Spark
使用下载的 JAR 文件创建一个 Spark 作业,该作业可使用本地 Spark 实例从 Bigtable 读取数据以及向 Bigtable 写入数据。您还可以使用 Bigtable 模拟器提交 Spark 作业。
使用 Bigtable 模拟器
如果您决定使用 Bigtable 模拟器,请按以下步骤操作:
- 运行以下命令启动模拟器: - gcloud beta emulators bigtable start- 默认情况下,该模拟器会选择 - localhost:8086。
- 设置 - BIGTABLE_EMULATOR_HOST环境变量:- export BIGTABLE_EMULATOR_HOST=localhost:8086
如需详细了解如何使用 Bigtable 模拟器,请参阅使用模拟器进行测试。
提交 Spark 作业
无论您是否使用本地 Bigtable 模拟器,都可以使用 spark-submit 命令提交 Spark 作业。
Scala/Java
  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME
PySpark
  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
验证表数据
运行以下 cbt CLI 命令,验证数据是否已写入 Bigtable。cbt CLI 是 Google Cloud CLI 的一个组件。如需了解详情,请参阅 
cbt CLI 概览。
    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME
其他解决方案
使用 Bigtable Spark 连接器可实现特定解决方案,例如序列化复杂的 Spark SQL 类型、读取特定行和生成客户端指标。
使用过滤条件读取特定的 DataFrame 行
使用 DataFrame 从 Bigtable 读取数据时,您可以指定过滤条件,以便仅读取特定行。系统会在服务器端应用行键列上的简单过滤条件(例如 ==、<= 和 startsWith),以避免进行全表扫描。对复合行键的过滤条件或复杂过滤条件(例如对行键列的 LIKE 过滤条件)在客户端应用。
如果您要读取大型表,建议您使用简单的行键过滤条件,以免执行全表扫描。以下示例语句展示了如何使用简单过滤条件进行读取。请确保在 Spark 过滤器中,您使用的是转换为行键的 DataFrame 列的名称:
dataframe.filter("id == 'some_id'").show()
应用过滤条件时,请使用 DataFrame 列名称,而不是 Bigtable 表列名称。
使用 Apache Avro 序列化复杂数据类型
Bigtable Spark 连接器支持使用 Apache Avro 序列化复杂的 Spark SQL 类型,例如 ArrayType、MapType 或 StructType。Apache Avro 为记录数据提供数据序列化,通常用于处理和存储复杂的数据结构。
使用 "avro":"avroSchema" 等语法指定应使用 Avro 对 Bigtable 中的列进行编码。然后,您可以在从 Bigtable 读取数据或向其写入数据时使用 .option("avroSchema", avroSchemaString),以字符串格式指定与相应列对应的 Avro 架构。您可以为不同的列使用不同的选项名称(例如 "anotherAvroSchema"),并传递多个列的 Avro 架构。
def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin
使用客户端指标
由于 Bigtable Spark 连接器基于 Java 版 Bigtable 客户端,因此默认情况下会在连接器内启用客户端指标。您可以参阅客户端指标文档,详细了解如何访问和解读这些指标。
将 Bigtable Java 客户端与低级 RDD 函数搭配使用
由于 Bigtable Spark 连接器基于 Bigtable Java 客户端,因此您可以在 Spark 应用中直接使用该客户端,并在 mapPartitions 和 foreachPartition 等低级 RDD 函数中执行分布式读取或写入请求。
如需使用 Java 类的 Bigtable 客户端,请在软件包名称中添加 com.google.cloud.spark.bigtable.repackaged 前缀。例如,使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient 而不是使用 com.google.cloud.bigtable.data.v2.BigtableDataClient 作为类名称。
如需详细了解 Java 版 Bigtable 客户端,请参阅 Java 版 Bigtable 客户端。
后续步骤
- 了解如何在 Dataproc 中调整 Spark 作业。
- 将 Java 版 Bigtable 客户端中的类与 Bigtable Spark 连接器搭配使用。