使用 Bigtable Spark 连接器

借助 Bigtable Spark 连接器,您可以在 Bigtable 中读取和写入数据。您可以使用 Spark SQL 和 DataFrame 从 Spark 应用中读取数据。使用 Bigtable Spark 连接器支持以下 Bigtable 操作:

  • 写入数据
  • 读取数据
  • 创建新表

本文档介绍如何将 Spark SQL DataFrames 表转换为 Bigtable 表,然后编译并创建 JAR 文件以提交 Spark 作业。

Spark 和 Scala 支持状态

Bigtable Spark 连接器仅支持 Scala 2.12 版本和以下 Spark 版本:

Bigtable Spark 连接器支持以下 Dataproc 版本:

计算费用

如果您决定使用 Google Cloud 的以下任何可计费组件,则需要为使用的资源付费:

  • Bigtable(使用 Bigtable 模拟器无需付费)
  • Dataproc
  • Cloud Storage

Dataproc 价格适用于在 Compute Engine 集群上使用 Dataproc。Dataproc Serverless 价格适用于在 Dataproc Serverless for Spark 上运行的工作负载和会话。

您可使用价格计算器根据您的预计使用量来估算费用。

准备工作

在使用 Bigtable Spark 连接器之前,请先满足以下前提条件。

所需的角色

如需获取使用 Bigtable Spark 连接器所需的权限,请让管理员向您授予项目的以下 IAM 角色:

  • Bigtable Administrator (roles/bigtable.admin)(可选):可让您读取或写入数据以及创建新表。
  • Bigtable User (roles/bigtable.user):可让您读取或写入数据,但不允许创建新表。

如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

如果您使用的是 Dataproc 或 Cloud Storage,则可能需要其他权限。如需了解详情,请参阅 Dataproc 权限Cloud Storage 权限。

设置 Spark

除了创建 Bigtable 实例之外,您还需要设置您的 Spark 实例。您可以在本地执行此操作,也可以选择以下选项之一,将 Spark 与 Dataproc 结合使用:

  • Dataproc 集群
  • Dataproc Serverless

如需详细了解如何选择 Dataproc 集群或无服务器方案,请参阅 Dataproc Serverless for Spark 与 Dataproc on Compute Engine 对比 文档。

下载连接器 JAR 文件

您可以在 Bigtable Spark 连接器 GitHub 代码库中找到 Bigtable Spark 连接器的源代码和示例。

根据您的 Spark 设置,您可以按如下方式访问 JAR 文件:

  • 如果您在本地运行 PySpark,则应从 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage 位置下载连接器的 JAR 文件。

    SCALA_VERSION 替换为 Scala 版本,设置为 2.12 作为唯一支持的版本,并将 CONNECTOR_VERSION 替换为要使用的连接器版本。

  • 对于 Dataproc 集群或无服务器方案,请使用最新的 JAR 文件作为可添加到 Scala 或 Java Spark 应用中的工件。如需详细了解如何将 JAR 文件用作工件,请参阅管理依赖项

  • 如果要将 PySpark 作业提交到 Dataproc,请使用 gcloud dataproc jobs submit pyspark --jars 标志将 URI 设置为 Cloud Storage 中的 JAR 文件位置,例如 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar

将 Bigtable 配置添加到您的 Spark 应用

在您的 Spark 应用中,添加可让您与 Bigtable 进行交互的 Spark 选项。

支持的 Spark 选项

使用作为 com.google.cloud.spark.bigtable 软件包的一部分提供的 Spark 选项。

选项名称 需要 默认值 含义
spark.bigtable.project.id 不适用 设置 Bigtable 项目 ID。
spark.bigtable.instance.id 不适用 设置 Bigtable 实例 ID。
catalog 不适用 设置 JSON 格式,用于指定 DataFrame 的类似 SQL 的架构与 Bigtable 表架构之间的转换格式。

如需了解详情,请参阅以 JSON 格式创建表元数据
spark.bigtable.app_profile.id default 设置 Bigtable 应用配置文件 ID。
spark.bigtable.write.timestamp.milliseconds 当前系统时间 设置在将 DataFrame 写入 Bigtable 时要使用的时间戳(以毫秒为单位)。

请注意,由于 DataFrame 中的所有行都使用相同的时间戳,因此在 DataFrame 中具有相同行键列的行将保留为 Bigtable 中的单个版本,因为它们具有相同的时间戳。
spark.bigtable.create.new.table false 设置为 true 可在写入 Bigtable 之前创建一个新表。
spark.bigtable.read.timerange.start.millisecondsspark.bigtable.read.timerange.end.milliseconds 不适用 设置时间戳(以自纪元时间以来的毫秒数表示)以分别过滤具有特定开始日期和结束日期的单元。必须同时指定这两个参数或一个也不指定。
spark.bigtable.push.down.row.key.filters true 设置为 true 以允许在服务器端进行简单的行键过滤。复合行键的过滤是在客户端实现的。

如需了解详情,请参阅使用过滤条件读取特定的 DataFrame 行
spark.bigtable.read.rows.attempt.timeout.milliseconds 30m 为与 Java 版 Bigtable 客户端中的一个 DataFrame 分区对应的读取行尝试设置超时时长。
spark.bigtable.read.rows.total.timeout.milliseconds 12 小时 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的读取行尝试设置总超时时长。
spark.bigtable.mutate.rows.attempt.timeout.milliseconds 100 万 为 Java 版 Bigtable 客户端中的一个 DataFrame 分区所对应的 mutate 行尝试设置超时时长。
spark.bigtable.mutate.rows.total.timeout.milliseconds 10 分钟 为 Java 版 Bigtable 客户端中的一个 DataFrame 分区所对应的 mutate 行尝试设置总超时时长。
spark.bigtable.batch.mutate.size 100 将此项设为每批次中的更改数量。您可以设置的最大值为 100000
spark.bigtable.enable.batch_mutate.flow_control false 设置为 true 可为批量变更启用流控制

创建 JSON 格式的表元数据

必须使用 JSON 格式的字符串将 Spark SQL DataFrames 表格式转换为 Bigtable 表。此字符串 JSON 格式使数据格式与 Bigtable 兼容。您可以使用 .option("catalog", catalog_json_string) 选项在应用代码中传递 JSON 格式。

以下面的 DataFrame 表和相应的 Bigtable 表为例。

在此示例中,DataFrame 中的 namebirthYear 列在 info 列族下归为一组,并分别重命名为 namebirth_year。同样,address 列存储在具有相同列名称的 location 列族下。DataFrame 中的 id 列会转换为 Bigtable 行键。

这些行键在 Bigtable 中没有专用的列名称,在本示例中,id_rowkey 仅用于向连接器指示这是行键列。您可以对行键列使用任何名称,并确保在以 JSON 格式声明 "rowkey":"column_name" 字段时使用相同的名称。

DataFrame Bigtable 表 = t1
行键 列族
信息 位置
id name birthYear 地址 id_rowkey name birth_year 地址

目录的 JSON 格式如下所示:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

JSON 格式中使用的键和值如下所示:

目录键 目录值 JSON 格式
Bigtable 表的名称。 "table":{"name":"t1"}

如果表不存在,请使用 .option("spark.bigtable.create.new.table", "true") 创建一个表。
行键 将用作 Bigtable 行键的列的名称。确保将 DataFrame 列的列名称用作行键,例如 id_rowkey

复合键也可以用作行键。例如 "rowkey":"name:address"。此方法可能会导致行键需要对所有读取请求进行全表扫描。
"rowkey":"id_rowkey",
每个 DataFrame 列映射到相应的 Bigtable 列族 ("cf") 和列名称 ("col")。列名称可以不同于 DataFrame 表中的列名称。支持的数据类型包括 stringlongbinary "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

在此示例中,id_rowkey 是行键,infolocation 是列族。

支持的数据类型

连接器支持在目录中使用 stringlongbinary(字节数组)类型。在添加对其他类型(例如 intfloat)的支持之前,您可以先手动将此类数据类型转换为字节数组(Spark SQL 的 BinaryType),然后再使用连接器将其写入 Bigtable。

此外,您还可以使用 Avro 对复杂类型进行序列化,例如 ArrayType。如需了解详情,请参阅使用 Apache Avro 序列化复杂数据类型

写入 Bigtable

使用 .write() 函数和支持的选项将数据写入 Bigtable。

Java

GitHub 代码库中的以下代码使用 Java 和 Maven 向 Bigtable 写入数据。

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");

…

  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

GitHub 代码库中的以下代码使用 Python 向 Bigtable 写入数据。

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  …

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  …

从 Bigtable 中读取

使用 .read() 函数检查表是否已成功导入 Bigtable。

Java

  …
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  …

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

编译项目

生成用于在 Dataproc 集群、Dataproc Serverless 或本地 Spark 实例中运行作业的 JAR 文件。您可以在本地编译 JAR 文件,然后使用该文件提交作业。当您提交作业时,已编译 JAR 的路径会设置为 PATH_TO_COMPILED_JAR 环境变量。

此步骤不适用于 PySpark 应用。

管理依赖项

Bigtable Spark 连接器支持以下依赖项管理工具:

编译 JAR 文件

Maven

  1. spark-bigtable 依赖项添加到您的 pom.xml 文件中。

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Maven Shade 插件添加到您的 pom.xml 文件中,以创建超级 JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. 运行 mvn clean install 命令以生成 JAR 文件。

SBT

  1. spark-bigtable 依赖项添加到您的 build.sbt 文件中:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
    
  2. sbt-assembly 插件添加到 project/plugins.sbtproject/assembly.sbt 文件以创建 Uber JAR 文件。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
    
  3. 运行 sbt clean assembly 命令以生成 JAR 文件。

Gradle

  1. spark-bigtable 依赖项添加到 build.gradle 文件中。

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
    
  2. build.gradle 文件中添加 Shadow 插件以创建超级 JAR 文件:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
    
  3. 如需详细了解配置和 JAR 编译信息,请参阅 Shadow 插件的文档。

提交作业

使用 Dataproc、Dataproc Serverless 或本地 Spark 实例提交 Spark 作业以启动您的应用。

设置运行时环境

设置以下环境变量。

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

请替换以下内容:

  • PROJECT_ID:Bigtable 项目的永久性标识符。
  • INSTANCE_ID:Bigtable 实例的永久性标识符。
  • TABLE_NAME:表的永久标识符。
  • DATAPROC_CLUSTER:Dataproc 集群的永久性标识符。
  • DATAPROC_REGION:包含您的 Dataproc 实例中的一个集群的 Dataproc 区域,例如 northamerica-northeast2
  • DATAPROC_ZONE:Dataproc 集群在其中运行的区域。
  • SUBNET子网的完整资源路径。
  • GCS_BUCKET_NAME:用于上传 Spark 工作负载依赖项的 Cloud Storage 存储桶。
  • PATH_TO_COMPILED_JAR:已编译 JAR 的完整或相对路径,例如适用于 Maven 的 /path/to/project/root/target/<compiled_JAR_name>
  • GCS_PATH_TO_CONNECTOR_JARspark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar 文件所在的 gs://spark-lib/bigtable Cloud Storage 存储桶。
  • PATH_TO_PYTHON_FILE:对于 PySpark 应用,这是 Python 文件的路径,该文件用于向 Bigtable 写入数据以及从 Bigtable 读取数据。
  • LOCAL_PATH_TO_CONNECTOR_JAR:对于 PySpark 应用,这是指已下载的 Bigtable Spark 连接器 JAR 文件的路径。

提交 Spark 作业

对于 Dataproc 实例或本地 Spark 设置,请运行 Spark 作业以将数据上传到 Bigtable。

Dataproc 集群

使用已编译的 JAR 文件,创建一个 Dataproc 集群作业,该作业从 Bigtable 读取数据以及向其中写入数据。

  1. 创建 Dataproc 集群。以下示例展示了一个示例命令,该命令用于创建具有 Debian 10、两个工作器节点和默认配置的 Dataproc v2.0 集群。

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. 提交作业。

    Scala/Java

    以下示例展示了 spark.bigtable.example.WordCount 类,其中包含用于在 DataFrame 中创建测试表、将表写入 Bigtable,然后计算表中单词数的逻辑。

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

使用编译后的 JAR 文件创建一个 Dataproc 作业,以便通过 Dataproc Serverless 实例在 Bigtable 中读取和写入数据。

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

本地 Spark

使用下载的 JAR 文件创建一个 Spark 作业,用于通过本地 Spark 实例在 Bigtable 中读取和写入数据。您还可以使用 Bigtable 模拟器提交 Spark 作业。

使用 Bigtable 模拟器

如果您决定使用 Bigtable 模拟器,请按照以下步骤操作:

  1. 运行以下命令启动模拟器:

    gcloud beta emulators bigtable start
    

    默认情况下,模拟器会选择 localhost:8086

  2. 设置 BIGTABLE_EMULATOR_HOST 环境变量:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. 提交 Spark 作业。

如需详细了解如何使用 Bigtable 模拟器,请参阅使用模拟器进行测试

提交 Spark 作业

无论您是否使用本地 Bigtable 模拟器,都可以使用 spark-submit 命令提交 Spark 作业。

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

验证表数据

运行以下 cbt CLI 命令以验证数据是否已写入 Bigtable。cbt CLI 是 Google Cloud CLI 的一个组件。如需了解详情,请参阅 cbt CLI 概览

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

其他解决方案

将 Bigtable Spark 连接器用于特定解决方案,例如序列化复杂的 Spark SQL 类型、读取特定行和生成客户端指标。

使用过滤条件读取特定的 DataFrame 行

使用 DataFrame 从 Bigtable 读取数据时,您可以指定过滤条件以仅读取特定行。系统会在服务器端对行键列应用 ==<=startsWith 等简单过滤条件,以避免执行全表扫描。在客户端应用复合行键过滤条件或复杂过滤条件(例如行键列的 LIKE 过滤条件)。

如果您要读取大型表,我们建议您使用简单的行键过滤条件,以避免执行全表扫描。以下示例语句显示了如何使用简单的过滤条件进行读取。确保在 Spark 过滤器中使用转换为行键的 DataFrame 列的名称:

    dataframe.filter("id == 'some_id'").show()
  

应用过滤条件时,请使用 DataFrame 列名称,而不是 Bigtable 表列名称。

使用 Apache Avro 序列化复杂数据类型

Bigtable Spark 连接器支持使用 Apache Avro 对复杂的 Spark SQL 类型(例如 ArrayTypeMapTypeStructType)进行序列化。Apache Avro 为记录数据提供数据序列化,通常用于处理和存储复杂的数据结构。

使用 "avro":"avroSchema" 等语法指定 Bigtable 中的列应使用 Avro 进行编码。然后,您可以在从 Bigtable 读取数据或向其写入数据时使用 .option("avroSchema", avroSchemaString),以字符串格式指定与该列对应的 Avro 架构。您可以使用不同的选项名称,例如,针对不同的列使用 "anotherAvroSchema",并为多列传递 Avro 架构。

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

使用客户端指标

由于 Bigtable Spark 连接器基于适用于 Java 的 Bigtable 客户端,因此默认情况下,连接器内会启用客户端指标。您可以参阅客户端指标文档,详细了解如何访问和解读这些指标。

将 Java 版 Bigtable 客户端与低级 RDD 函数搭配使用

由于 Bigtable Spark 连接器基于适用于 Java 的 Bigtable 客户端,因此您可以直接在 Spark 应用中使用该客户端,并在低级 RDD 函数(例如 mapPartitionsforeachPartition)中执行分布式读取或写入请求。

如需使用适用于 Java 类的 Bigtable 客户端,请将 com.google.cloud.spark.bigtable.repackaged 前缀附加到软件包名称。例如,不要将类名称用作 com.google.cloud.bigtable.data.v2.BigtableDataClient,而应使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient

如需详细了解 Java 版 Bigtable 客户端,请参阅适用于 Java 的 Bigtable 客户端

后续步骤