Use the Bigtable Spark connector

The Bigtable Spark connector lets you read and write data to and from Bigtable. You can read data from your Spark application using Spark SQL and DataFrames. The following Bigtable operations are supported using the Bigtable Spark connector:

  • Write data
  • Read data
  • Create a new table

This document shows you how to convert a Spark SQL DataFrames table to a Bigtable table, and then compile and create a JAR file to submit a Spark job.

Spark and Scala support status

The Bigtable Spark connector only supports Scala 2.12 version, and the following Spark versions:

The Bigtable Spark connector supports the following Dataproc versions:

Calculate costs

If you decide to use any of the following billable components of Google Cloud, you are billed for the resources that you use:

  • Bigtable (You are not charged for using the Bigtable emulator)
  • Dataproc
  • Cloud Storage

Dataproc pricing applies to the use of Dataproc on Compute Engine clusters. Dataproc Serverless pricing applies to workloads and sessions run on Dataproc Serverless for Spark.

To generate a cost estimate based on your projected usage, use the pricing calculator.

Before you begin

Complete the following prerequisites before using the Bigtable Spark connector.

Required roles

To get the permissions that you need to use Bigtable Spark connector, ask your administrator to grant you the following IAM roles on your project:

  • Bigtable Administrator (roles/bigtable.admin)(Optional): lets you read or write data and create a new table.
  • Bigtable User (roles/bigtable.user): lets you read or write data, but doesn't let you create a new table.

For more information about granting roles, see Manage access.

You might also be able to get the required permissions through custom roles or other predefined roles.

If you are using Dataproc or Cloud Storage, additional permissions might be required. For more information, see Dataproc permissions and Cloud Storage permissions.

Set up Spark

Apart from creating a Bigtable instance, you also need to set up your Spark instance. You can do so locally or select either of these options to use Spark with Dataproc:

  • Dataproc cluster
  • Dataproc Serverless

For more information about choosing between a Dataproc cluster or serverless option, see the Dataproc Serverless for Spark compared to Dataproc on Compute Engine documentation.

Download the connector JAR file

You can find the Bigtable Spark connector source code with examples in the Bigtable Spark connector GitHub repository.

Based on your Spark setup, you can access the JAR file as follows:

  • If you are running PySpark locally, you should download the connector's JAR file from the gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage location.

    Replace SCALA_VERSION with the Scala version, set to 2.12 as the only supported version, and the CONNECTOR_VERSION with the connector version that you want to use.

  • For Dataproc cluster or serverless option, use the latest JAR file as an artifact that can be added in your Scala or Java Spark applications. For more information about using the JAR file as an artifact, see Manage dependencies.

  • If you are submitting your PySpark job to Dataproc, use the gcloud dataproc jobs submit pyspark --jars flag to set the URI to the JAR file location in Cloud Storage—for example gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Add Bigtable configuration to your Spark application

In your Spark application, add the Spark options that let you interact with Bigtable.

Supported Spark options

Use the Spark options that are available as part of the com.google.cloud.spark.bigtable package.

Option name Required Default value Meaning
spark.bigtable.project.id Yes N/A Set the Bigtable project ID.
spark.bigtable.instance.id Yes N/A Set the Bigtable instance ID.
catalog Yes N/A Set the JSON format that specifies the conversion format between the DataFrame's SQL-like schema and the Bigtable table's schema.

See Create table metadata in JSON format for more information.
spark.bigtable.app_profile.id No default Set the Bigtable app profile ID.
spark.bigtable.write.timestamp.milliseconds No Current system time Set the timestamp in milliseconds to use when writing a DataFrame to Bigtable.

Note that since all rows in the DataFrame use the same timestamp, rows with the same row key column in the DataFrame persist as a single version in Bigtable as they share the same timestamp.
spark.bigtable.create.new.table No false Set to true to create a new table before writing to Bigtable.
spark.bigtable.read.timerange.start.milliseconds or spark.bigtable.read.timerange.end.milliseconds No N/A Set timestamps (in milliseconds since epoch time) to filter cells with a specific start and end date, respectively. Both or none of these parameters must be specified.
spark.bigtable.push.down.row.key.filters No true Set to true to allow simple row key filtering on the server-side. Filtering on compound row keys is implemented on the client-side.

See Read a specific DataFrame row using a filter for more information.
spark.bigtable.read.rows.attempt.timeout.milliseconds No 30m Set the timeout duration for a read rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.read.rows.total.timeout.milliseconds No 12h Set the total timeout duration for a read rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds No 1m Set the timeout duration for a mutate rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.mutate.rows.total.timeout.milliseconds No 10m Set the total timeout duration for a mutate rows attempt corresponding to one DataFrame partition in the Bigtable client for Java.
spark.bigtable.batch.mutate.size No 100 Set to the number of mutations in each batch. The maximum value you can set is 100000.
spark.bigtable.enable.batch_mutate.flow_control No false Set to true to enable flow control for batch mutations.

Create table metadata in JSON format

The Spark SQL DataFrames table format must be converted into a Bigtable table using a string with JSON format. This string JSON format makes the data format compatible with Bigtable. You can pass the JSON format in your application code using the .option("catalog", catalog_json_string) option.

As an example, consider the following DataFrame table and the corresponding Bigtable table.

In this example, the name and birthYear columns in the DataFrame are grouped together under the info column family and renamed to name and birth_year, respectively. Similarly, the address column is stored under the location column family with the same column name. The id column from the DataFrame is converted to the Bigtable row key.

The row keys don't have a dedicated column name in Bigtable and in this example, id_rowkey is only used to indicate to the connector that this is the row key column. You can use any name for the row key column and make sure that you use the same name when you declare the "rowkey":"column_name" field in the JSON format.

DataFrame Bigtable table = t1
Columns Row key Column families
info location
Columns Columns
id name birthYear address id_rowkey name birth_year address

The JSON format for the catalog is as follows:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

The keys and values used in the JSON format are the following:

Catalog key Catalog value JSON Format
table Name of the Bigtable table. "table":{"name":"t1"}

If the table does not exist, use .option("spark.bigtable.create.new.table", "true") to create a table.
rowkey Name of the column that will be used as the Bigtable row key. Ensure that the column name of the DataFrame column is used as the row key—for example, id_rowkey.

Compound keys are also accepted as row keys. For example, "rowkey":"name:address". This approach might result in row keys that require a full table scan for all read requests.
"rowkey":"id_rowkey",
columns Mapping of each DataFrame column into the corresponding Bigtable column family ("cf") and column name ("col"). The column name can be different from the column name in the DataFrame table. Supported data types include string, long, and binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

In this example, id_rowkey is the row key, and info and location are the column families.

Supported data types

The connector supports using string, long, and binary (byte array) types in the catalog. Until support for other types such as intand float is added, you can manually convert such data types to byte arrays (Spark SQL's BinaryType) before using the connector to write them to Bigtable.

In addition, you can use Avro for serialize complex types, such as ArrayType. For more information, see Serialize complex data types using Apache Avro.

Write to Bigtable

Use the .write() function and the supported options to write your data to Bigtable.

Java

The following code from the GitHub repository uses Java and Maven to write to Bigtable.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");

…

  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

The following code from the GitHub repository uses Python to write to Bigtable.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  …

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  …

Read from Bigtable

Use the .read() function to check whether the table was successfully imported into Bigtable.

Java

  …
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  …

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

Compile your project

Generate the JAR file that is used to run a job in either a Dataproc cluster, Dataproc Serverless, or local Spark instance. You can compile the JAR file locally and then use it to submit a job. The path to the compiled JAR is set as the PATH_TO_COMPILED_JAR environment variable when you submit a job.

This step doesn't apply to PySpark applications.

Manage dependencies

The Bigtable Spark connector supports the following dependency management tools:

Compile the JAR file

Maven

  1. Add the spark-bigtable dependency to your pom.xml file.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Add the Maven Shade plugin to your pom.xml file to create an uber JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. Run mvn clean install command to generate a JAR file.

sbt

  1. Add the spark-bigtable dependency to your build.sbt file:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
    
  2. Add the sbt-assembly plugin to your project/plugins.sbt or project/assembly.sbt file to create an Uber JAR file.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
    
  3. Run the sbt clean assembly command to generate the JAR file.

Gradle

  1. Add the spark-bigtable dependency to your build.gradle file.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
    
  2. Add the Shadow plugin in your build.gradle file to create an uber JAR file:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
    
  3. See the Shadow plugin's documentation for more configuration and JAR compilation information.

Submit a job

Submit a Spark job using either Dataproc, Dataproc Serverless, or a local Spark instance to launch your application.

Set runtime environment

Set the following environment variables.

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Replace the following:

  • PROJECT_ID: The permanent identifier for the Bigtable project.
  • INSTANCE_ID: The permanent identifier for the Bigtable instance.
  • TABLE_NAME: The permanent identifier for the table.
  • DATAPROC_CLUSTER: The permanent identifier for the Dataproc cluster.
  • DATAPROC_REGION: The Dataproc region that contains one of the clusters in your Dataproc instance—for example, northamerica-northeast2.
  • DATAPROC_ZONE: The zone where the Dataproc cluster runs.
  • SUBNET: The full resource path of the subnet.
  • GCS_BUCKET_NAME: The Cloud Storage bucket to upload Spark workload dependencies.
  • PATH_TO_COMPILED_JAR: The full or relative path to the compiled JAR—for example, /path/to/project/root/target/<compiled_JAR_name> for Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: The gs://spark-lib/bigtable Cloud Storage bucket, where the spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar file is located.
  • PATH_TO_PYTHON_FILE: For PySpark applications, the path to the Python file that will be used to write data to and read data from Bigtable.
  • LOCAL_PATH_TO_CONNECTOR_JAR: For PySpark applications, path to the downloaded Bigtable Spark connector JAR file.

Submit a Spark job

For Dataproc instances or your local Spark setup, run a Spark job to upload data to Bigtable.

Dataproc cluster

Use the compiled JAR file and create a Dataproc cluster job that reads and writes data from and to Bigtable.

  1. Create a Dataproc cluster. The following example shows a sample command to create a Dataproc v2.0 cluster with Debian 10, two worker nodes, and default configurations.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. Submit a job.

    Scala/Java

    The following example shows the spark.bigtable.example.WordCount class that includes the logic to create a test table in DataFrame, write the table to Bigtable, and then count the number of words in the table.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

Use the compiled JAR file and create a Dataproc job that reads and writes data from and to Bigtable with a Dataproc Serverless instance.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Local Spark

Use the downloaded JAR file and create a Spark job that reads and writes data from and to Bigtable with a local Spark instance. You can also use the Bigtable emulator to submit the Spark job.

Use the Bigtable emulator

If you decide to use the Bigtable emulator, then follow these steps:

  1. Run the following command to start the emulator:

    gcloud beta emulators bigtable start
    

    By default, the emulator chooses localhost:8086.

  2. Set the BIGTABLE_EMULATOR_HOST environment variable:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Submit the Spark job.

For more information about using the Bigtable emulator, see Test using the emulator.

Submit a Spark job

Use the spark-submit command to submit a Spark job regardless of whether you are using a local Bigtable emulator.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Verify the table data

Run the following cbt CLI command to verify the data is written to Bigtable. The cbt CLI is a component of the Google Cloud CLI. For more information, see the cbt CLI overview.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

Additional solutions

Use the Bigtable Spark connector for specific solutions, such as serializing complex Spark SQL types, reading specific rows, and generating client-side metrics.

Read a specific DataFrame row using a filter

When using DataFrames to read from Bigtable, you can specify a filter to only read specific rows. Simple filters such as ==, <=, and startsWith on the row key column are applied on the server side to avoid a full-table scan. Filters on compound row keys or complex filters such as the LIKE filter on the row key column are applied at the client side.

If you are reading large tables, we recommend using simple row key filters to avoid performing a full-table scan. The following sample statement shows how to read using a simple filter. Make sure that in your Spark filter, you use the name of the DataFrame column that is converted to the row key:

    dataframe.filter("id == 'some_id'").show()
  

When applying a filter, use the DataFrame column name instead of the Bigtable table column name.

Serialize complex data types using Apache Avro

The Bigtable Spark connector provides support for using Apache Avro to serialize complex Spark SQL types, such as ArrayType, MapType, or StructType. Apache Avro provides data serialization for record data that is commonly used for processing and storing complex data structures.

Use a syntax such as "avro":"avroSchema" to specify that a column in Bigtable should be encoded using Avro. You can then use .option("avroSchema", avroSchemaString) when reading from or writing to Bigtable to specify the Avro schema corresponding to that column in string format. You can use different option names—for example, "anotherAvroSchema" for different columns and pass Avro schemas for multiple columns.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

Use client-side metrics

Since the Bigtable Spark connector is based on the Bigtable Client for Java, client-side metrics are enabled inside the connector by default. You can refer to the client-side metrics documentation to find more details on accessing and interpreting these metrics.

Use the Bigtable client for Java with low-level RDD functions

Since the Bigtable Spark connector is based on the Bigtable client for Java, you can directly use the client in your Spark applications and perform distributed read or write requests within the low-level RDD functions such as mapPartitions and foreachPartition.

To use the Bigtable client for Java classes, append the com.google.cloud.spark.bigtable.repackaged prefix to the package names. For example, instead of using the class name as com.google.cloud.bigtable.data.v2.BigtableDataClient, use com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.

For more information about the Bigtable client for Java, see the Bigtable client for Java.

What's next