使用 Bigtable Spark 连接器

借助 Bigtable Spark 连接器,您可以在 Bigtable 中读取和写入数据。您可以使用 Spark SQL 和 DataFrames 从 Spark 应用读取数据。使用 Bigtable Spark 连接器支持以下 Bigtable 操作:

  • 写入数据
  • 读取数据
  • 创建新表

本文档介绍如何将 Spark SQL DataFrames 表转换为 Bigtable 表,然后编译并创建 JAR 文件以提交 Spark 作业。

Spark 和 Scala 支持状态

Bigtable Spark 连接器仅支持 Scala 2.12 版本和以下 Spark 版本:

Bigtable Spark 连接器支持以下 Dataproc 版本:

计算费用

如果您决定使用 Google Cloud 的以下任何计费组件,则需要为您使用的资源付费:

  • Bigtable(使用 Bigtable 模拟器不会产生费用)
  • Dataproc
  • Cloud Storage

Dataproc 价格适用于在 Compute Engine 集群上使用 Dataproc 的情况。Dataproc 无服务器价格适用于在 Dataproc Serverless for Spark 上运行的工作负载和会话。

您可使用价格计算器根据您的预计使用量来估算费用。

准备工作

在使用 Bigtable Spark 连接器之前,请先满足以下前提条件。

所需的角色

如需获取使用 Bigtable Spark 连接器所需的权限,请让管理员授予您项目的以下 IAM 角色:

  • Bigtable Administrator (roles/bigtable.admin)(可选): 可让您读取或写入数据以及创建新表。
  • Bigtable User (roles/bigtable.user):允许您读取或写入数据,但不允许创建新表。

如需详细了解如何授予角色,请参阅管理访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

如果您使用的是 Dataproc 或 Cloud Storage,则可能需要其他权限。如需了解详情,请参阅 Dataproc 权限Cloud Storage 权限。

设置 Spark

除了创建 Bigtable 实例之外,您还需要设置 Spark 实例。您可以在本地执行此操作,也可以选择以下任一选项,将 Spark 与 Dataproc 搭配使用:

  • Dataproc 集群
  • Dataproc Serverless

如需详细了解如何选择 Dataproc 集群或无服务器选项,请参阅 Dataproc Serverless for Spark 与 Dataproc on Compute Engine 文档。

下载连接器 JAR 文件

您可以在 Bigtable Spark 连接器 GitHub 代码库中找到 Bigtable Spark 连接器源代码和示例。

根据您的 Spark 设置,您可以按如下方式访问 JAR 文件:

  • 如果您在本地运行 PySpark,则应从 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage 位置下载连接器的 JAR 文件。

    SCALA_VERSION 替换为 Scala 版本(设置为 2.12 作为唯一受支持的版本),并将 CONNECTOR_VERSION 替换为您要使用的连接器版本。

  • 对于 Dataproc 集群或无服务器选项,请使用最新的 JAR 文件作为可添加到 Scala 或 Java Spark 应用中的工件。如需详细了解如何将 JAR 文件用作工件,请参阅管理依赖项

  • 如果要将 PySpark 作业提交到 Dataproc,请使用 gcloud dataproc jobs submit pyspark --jars 标志将 URI 设置为 Cloud Storage 中 JAR 文件的位置,例如 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar

将 Bigtable 配置添加到您的 Spark 应用

在 Spark 应用中,添加可让您与 Bigtable 进行交互的 Spark 选项。

支持的 Spark 选项

使用 com.google.cloud.spark.bigtable 软件包中提供的 Spark 选项。

选项名称 需要 默认值 含义
spark.bigtable.project.id 不适用 设置 Bigtable 项目 ID。
spark.bigtable.instance.id 不适用 设置 Bigtable 实例 ID。
catalog 不适用 设置 JSON 格式,用于指定 DataFrame 的类似 SQL 的架构与 Bigtable 表的架构之间的转换格式。

如需了解详情,请参阅创建 JSON 格式的表元数据
spark.bigtable.app_profile.id default 设置 Bigtable 应用配置文件 ID。
spark.bigtable.write.timestamp.milliseconds 当前系统时间 设置将 DataFrame 写入 Bigtable 时使用的时间戳(以毫秒为单位)。

请注意,由于 DataFrame 中的所有行都使用相同的时间戳,因此 DataFrame 中具有相同行键列的行在 Bigtable 中会保留为单个版本,因为它们具有相同的时间戳。
spark.bigtable.create.new.table false 设置为 true 可在写入 Bigtable 之前创建一个新表。
spark.bigtable.read.timerange.start.millisecondsspark.bigtable.read.timerange.end.milliseconds 不适用 设置时间戳(以毫秒为单位,从纪元时间开始),以分别过滤具有特定开始日期和结束日期的单元格。这两个参数都必须指定,或者不指定。
spark.bigtable.push.down.row.key.filters true 设置为 true 以允许在服务器端进行简单的行键过滤。对复合行键进行过滤是在客户端实现的。

如需了解详情,请参阅使用过滤条件读取特定的 DataFrame 行
spark.bigtable.read.rows.attempt.timeout.milliseconds 30m 设置与 Java 版 Bigtable 客户端中一个 DataFrame 分区对应的读取行尝试的超时时长。
spark.bigtable.read.rows.total.timeout.milliseconds 12 小时 设置与 Java 版 Bigtable 客户端中的一个 DataFrame 分区对应的读取行尝试的超时时长。
spark.bigtable.mutate.rows.attempt.timeout.milliseconds 100 万 设置与 Java 版 Bigtable 客户端中的一个 DataFrame 分区对应的 mutate 行尝试的超时时长。
spark.bigtable.mutate.rows.total.timeout.milliseconds 10 分钟 设置与 Java 版 Bigtable 客户端中的一个 DataFrame 分区对应的 mutate 行尝试的超时时长。
spark.bigtable.batch.mutate.size 100 设置为每批中的更改数量。您可以设置的最大值为 100000
spark.bigtable.enable.batch_mutate.flow_control false 设置为 true 可为批量变更启用流控制

创建 JSON 格式的表元数据

必须使用 JSON 格式的字符串将 Spark SQL DataFrames 表格式转换为 Bigtable 表。这种字符串 JSON 格式使数据格式与 Bigtable 兼容。您可以使用 .option("catalog", catalog_json_string) 选项在应用代码中传递 JSON 格式。

例如,请参考以下 DataFrame 表和对应的 Bigtable 表。

在本例中,DataFrame 中的 namebirthYear 列组合到 info 列族下,并分别重命名为 namebirth_year。同样,address 列存储在具有相同列名称的 location 列族下。DataFrame 中的 id 列会转换为 Bigtable 行键。

行键在 Bigtable 中没有专用列名称,在本示例中,id_rowkey 仅用于向连接器表明这是行键列。您可以为行键列使用任何名称,并确保在以 JSON 格式声明 "rowkey":"column_name" 字段时使用相同的名称。

DataFrame Bigtable 表 = t1
行键 列族
信息 位置信息
id name birthYear 地址 id_rowkey name birth_year 地址

目录的 JSON 格式如下所示:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

JSON 格式中使用的键和值如下:

目录键 目录值 JSON 格式
Bigtable 表的名称。 "table":{"name":"t1"}

如果该表不存在,请使用 .option("spark.bigtable.create.new.table", "true") 创建一个表。
行键 将用作 Bigtable 行键的列名称。确保将 DataFrame 列的列名称用作行键,例如 id_rowkey

复合键也可以用作行键。例如 "rowkey":"name:address"。使用此方法可能会导致行键需要对所有读取请求进行全表扫描。
"rowkey":"id_rowkey",
将每个 DataFrame 列映射到相应的 Bigtable 列族 ("cf") 和列名称 ("col")。列名称可以不同于 DataFrame 表中的列名称。支持的数据类型包括 stringlongbinary "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

在此示例中,id_rowkey 是行键,infolocation 是列族。

支持的数据类型

连接器支持在目录中使用 stringlongbinary(字节数组)类型。在添加对其他类型(如 intfloat)的支持之前,您可以先手动将此类数据类型转换为字节数组(Spark SQL 的 BinaryType),然后再使用连接器将其写入 Bigtable。

此外,您还可以使用 Avro 对复杂类型进行序列化,例如 ArrayType。如需了解详情,请参阅使用 Apache Avro 将复杂数据类型序列化

写入 Bigtable

使用 .write() 函数和支持的选项将数据写入 Bigtable。

Java

GitHub 代码库中的以下代码使用 Java 和 Maven 向 Bigtable 写入数据。

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");

…

  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

GitHub 代码库中的以下代码使用 Python 写入 Bigtable。

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  …

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  …

从 Bigtable 中读取

使用 .read() 函数检查表是否已成功导入 Bigtable。

Java

  …
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  …

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

编译项目

生成用于在 Dataproc 集群、Dataproc 无服务器或本地 Spark 实例中运行作业的 JAR 文件。您可以在本地编译 JAR 文件,然后使用该文件提交作业。在提交作业时,已编译的 JAR 路径将设置为 PATH_TO_COMPILED_JAR 环境变量。

此步骤不适用于 PySpark 应用。

管理依赖项

Bigtable Spark 连接器支持以下依赖项管理工具:

编译 JAR 文件

Maven

  1. spark-bigtable 依赖项添加到您的 pom.xml 文件中。

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Maven Shade 插件添加到 pom.xml 文件中,以创建超级 JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. 运行 mvn clean install 命令以生成 JAR 文件。

SBT

  1. spark-bigtable 依赖项添加到您的 build.sbt 文件中:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
    
  2. sbt-assembly 插件添加到您的 project/plugins.sbtproject/assembly.sbt 文件中,以创建 Uber JAR 文件。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
    
  3. 运行 sbt clean assembly 命令以生成 JAR 文件。

Gradle

  1. spark-bigtable 依赖项添加到您的 build.gradle 文件中。

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
    
  2. build.gradle 文件中添加 Shadow 插件以创建超级 JAR 文件:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
    
  3. 如需详细了解配置和 JAR 编译信息,请参阅 Shadow 插件的文档。

提交作业

使用 Dataproc、Dataproc Serverless 或本地 Spark 实例提交 Spark 作业,以启动您的应用。

设置运行时环境

设置以下环境变量。

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

请替换以下内容:

  • PROJECT_ID:Bigtable 项目的永久标识符。
  • INSTANCE_ID:Bigtable 实例的永久性标识符。
  • TABLE_NAME:表的永久标识符。
  • DATAPROC_CLUSTER:Dataproc 集群的永久性标识符。
  • DATAPROC_REGION:您的 Dataproc 实例中的某个集群所在的 Dataproc 区域,例如 northamerica-northeast2
  • DATAPROC_ZONE:运行 Dataproc 集群的地区。
  • SUBNET子网的完整资源路径。
  • GCS_BUCKET_NAME:用于上传 Spark 工作负载依赖项的 Cloud Storage 存储桶。
  • PATH_TO_COMPILED_JAR:已编译的 JAR 的完整路径或相对路径,例如 /path/to/project/root/target/<compiled_JAR_name>(适用于 Maven)。
  • GCS_PATH_TO_CONNECTOR_JARspark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar 文件所在的 gs://spark-lib/bigtable Cloud Storage 存储桶。
  • PATH_TO_PYTHON_FILE:对于 PySpark 应用,这是 Python 文件的路径,用于向 Bigtable 写入数据和从中读取数据。
  • LOCAL_PATH_TO_CONNECTOR_JAR:对于 PySpark 应用,表示所下载 Bigtable Spark 连接器 JAR 文件的路径。

提交 Spark 作业

对于 Dataproc 实例或本地 Spark 设置,请运行 Spark 作业将数据上传到 Bigtable。

Dataproc 集群

使用已编译的 JAR 文件,并创建 Dataproc 集群作业,以便从 Bigtable 读取数据以及向其中写入数据。

  1. 创建 Dataproc 集群。以下示例展示了一个示例命令,该命令使用 Debian 10、两个工作器节点和默认配置创建 Dataproc v2.0 集群。

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. 提交作业。

    Scala/Java

    以下示例展示了 spark.bigtable.example.WordCount 类,其中包含的逻辑用于在 DataFrame 中创建测试表、将表写入 Bigtable,然后统计表中的单词数。

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

使用已编译的 JAR 文件,并创建一个 Dataproc 作业,以便通过 Dataproc 无服务器实例从 Bigtable 读取数据以及向 Bigtable 写入数据。

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

本地 Spark

使用下载的 JAR 文件并创建一个 Spark 作业,该作业使用本地 Spark 实例在 Bigtable 中读取和写入数据。您还可以使用 Bigtable 模拟器提交 Spark 作业。

使用 Bigtable 模拟器

如果您决定使用 Bigtable 模拟器,请按照以下步骤操作:

  1. 运行以下命令启动模拟器:

    gcloud beta emulators bigtable start
    

    默认情况下,模拟器会选择 localhost:8086

  2. 设置 BIGTABLE_EMULATOR_HOST 环境变量:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. 提交 Spark 作业

如需详细了解如何使用 Bigtable 模拟器,请参阅使用模拟器进行测试

提交 Spark 作业

无论是否使用本地 Bigtable 模拟器,都请使用 spark-submit 命令提交 Spark 作业。

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

验证表数据

运行以下 cbt CLI 命令以验证数据是否已写入 Bigtable。cbt CLI 是 Google Cloud CLI 的一个组件。如需了解详情,请参阅 cbt CLI 概览

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

其他解决方案

将 Bigtable Spark 连接器用于特定解决方案,例如序列化复杂的 Spark SQL 类型、读取特定行以及生成客户端指标。

使用过滤条件读取特定的 DataFrame 行

使用 DataFrames 从 Bigtable 读取数据时,您可以指定过滤条件,以仅读取特定行。系统会在服务器端应用简单的过滤条件(例如针对行键列的 ==<=startsWith),以避免执行全表扫描。系统会在客户端应用复合行键的过滤条件或复杂过滤条件(例如行键列的 LIKE 过滤条件)。

如果您要读取大型表,我们建议您使用简单的行键过滤器,以避免执行全表扫描。以下示例语句展示了如何使用简单过滤条件进行读取。确保在 Spark 过滤条件中,使用转换为行键的 DataFrame 列的名称:

    dataframe.filter("id == 'some_id'").show()
  

应用过滤条件时,请使用 DataFrame 列名称,而不是 Bigtable 表列名称。

使用 Apache Avro 将复杂数据类型序列化

Bigtable Spark 连接器支持使用 Apache Avro 对复杂的 Spark SQL 类型(例如 ArrayTypeMapTypeStructType)进行序列化。Apache Avro 为记录数据提供数据序列化功能,通常用于处理和存储复杂的数据结构。

使用 "avro":"avroSchema" 等语法指定 Bigtable 中的一列应使用 Avro 进行编码。然后,在对 Bigtable 执行读取或写入操作时,您可以使用 .option("avroSchema", avroSchemaString),以字符串格式指定与该列对应的 Avro 架构。您可以使用不同的选项名称,例如,为不同的列使用 "anotherAvroSchema",并为多个列传递 Avro 架构。

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

使用客户端指标

由于 Bigtable Spark 连接器基于 Java 版 Bigtable 客户端,因此默认情况下会在连接器内启用客户端指标。您可以参阅客户端指标文档,详细了解如何访问和解读这些指标。

将 Java 版 Bigtable 客户端与低层级 RDD 函数搭配使用

由于 Bigtable Spark 连接器基于适用于 Java 的 Bigtable 客户端,因此您可以直接在 Spark 应用中使用该客户端,并在低级别 RDD 函数(例如 mapPartitionsforeachPartition)中执行分布式读取或写入请求。

如需使用适用于 Java 类的 Bigtable 客户端,请将 com.google.cloud.spark.bigtable.repackaged 前缀附加到软件包名称中。例如,不要使用 com.google.cloud.bigtable.data.v2.BigtableDataClient 作为类名称,而应使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient

如需详细了解适用于 Java 的 Bigtable 客户端,请参阅适用于 Java 的 Bigtable 客户端

后续步骤