Use Spark with Cloud Bigtable
Contributed by Google employees.
This tutorial shows you how to use Apache Spark for distributed and parallelized data processing with Cloud Bigtable.
Prerequisites
This tutorial assumes that you have basic familiarity with Apache Spark and Scala.
Install and create the resources that are used in this tutorial:
- Create or select a Google Cloud project.
- Install the Cloud SDK.
- Install the sbt build tool.
- Install Apache Spark. Download Spark built for Scala 2.11. This example uses Spark 2.4.7 and Scala 2.11.2.
Get the example code and assemble the example
Clone the repository that contains the example code:
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
Go to the directory for the example:
cd java-docs-samples/bigtable/spark
Assemble the sample applications as a single uber JAR file (a JAR file with all of its dependencies and configurations):
sbt clean assembly
Set up Bigtable
Set the following environment variables, so you can copy and paste the commands in this tutorial:
SPARK_HOME=/PATH/TO/spark-2.4.7-bin-hadoop2.7 BIGTABLE_SPARK_PROJECT_ID=your-project-id BIGTABLE_SPARK_INSTANCE_ID=your-instance-id BIGTABLE_SPARK_WORDCOUNT_TABLE=wordcount BIGTABLE_SPARK_WORDCOUNT_FILE=src/test/resources/Romeo-and-Juliet-prologue.txt BIGTABLE_SPARK_ASSEMBLY_JAR=target/scala-2.11/bigtable-spark-samples-assembly-0.1.jar
Choose a zone to use for your Bigtable instance, and set the zone as an environment variable:
BIGTABLE_SPARK_INSTANCE_ZONE=your-zone
Create the instance:
cbt -project=$BIGTABLE_SPARK_PROJECT_ID createinstance \ $BIGTABLE_SPARK_INSTANCE_ID "Spark wordcount instance" \ $BIGTABLE_SPARK_INSTANCE_ID $BIGTABLE_SPARK_INSTANCE_ZONE 1 SSD
Note: You can use an existing instance rather than creating a new instance in this step.
Create a table called
wordcount
:cbt \ -project=$BIGTABLE_SPARK_PROJECT_ID \ -instance=$BIGTABLE_SPARK_INSTANCE_ID \ createtable $BIGTABLE_SPARK_WORDCOUNT_TABLE \ "families=cf"
To ensure that the table was created, run this command and look for
wordcount
:cbt \ -project=$BIGTABLE_SPARK_PROJECT_ID \ -instance=$BIGTABLE_SPARK_INSTANCE_ID \ ls
Run the Wordcount job
This section contains the Spark Wordcount job that you'll be running. After the job has counted the words, it writes a row with the word as the key with a column count containing the number of occurrences. You can view the full code on GitHub.
var hConf = BigtableConfiguration.configure(projectId, instanceId)
hConf.set(TableOutputFormat.OUTPUT_TABLE, table)
import org.apache.hadoop.mapreduce.Job
val job = Job.getInstance(hConf)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
hConf = job.getConfiguration
import org.apache.spark.SparkConf
val config = new SparkConf()
// Workaround for a bug in TableOutputFormat
// See https://stackoverflow.com/a/51959451/1305344
config.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext.getOrCreate(config)
val wordCounts = sc
.textFile(file)
.flatMap(_.split("\\W+"))
.filter(!_.isEmpty)
.map { word => (word, 1) }
.reduceByKey(_ + _)
.map { case (word, count) =>
val ColumnFamilyBytes = Bytes.toBytes("cf")
val ColumnNameBytes = Bytes.toBytes("Count")
val put = new Put(Bytes.toBytes(word))
.addColumn(ColumnFamilyBytes, ColumnNameBytes, Bytes.toBytes(count))
// The KEY is ignored while the output value must be either a Put or a Delete instance
// The underlying writer ignores keys, only the value matters here.
(null, put)
}
wordCounts.saveAsNewAPIHadoopDataset(hConf)
Run the Spark job locally:
$SPARK_HOME/bin/spark-submit \ --packages org.apache.hbase.connectors.spark:hbase-spark:1.0.0 \ --class example.Wordcount \ $BIGTABLE_SPARK_ASSEMBLY_JAR \ $BIGTABLE_SPARK_PROJECT_ID $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_WORDCOUNT_TABLE $BIGTABLE_SPARK_WORDCOUNT_FILE
Count the number of rows in the
BIGTABLE_SPARK_WORDCOUNT_TABLE
table:cbt \ -project=$BIGTABLE_SPARK_PROJECT_ID \ -instance=$BIGTABLE_SPARK_INSTANCE_ID \ count $BIGTABLE_SPARK_WORDCOUNT_TABLE
There should be 88 rows.
Running on Dataproc
If you'd like to run the Wordcount example on Dataproc, Google Cloud's fully managed and highly scalable service for running Apache Spark, see part 2 of this tutorial: Run a Cloud Bigtable Spark job on Dataproc.
Part 2 continues with the same table and instance, so don't delete these resources if you intend to continue on to part 2.
Cleaning up
If you're not continuing to part 2 of this tutorial, Run a Cloud Bigtable Spark job on Dataproc, then we recommend that you clean up the resources that you created in this tutorial.
If you created a new instance to try this out, delete the instance:
cbt -project=$BIGTABLE_SPARK_PROJECT_ID deleteinstance $BIGTABLE_SPARK_INSTANCE_ID
If you created a table on an existing instance, delete the table:
cbt \ -project=$BIGTABLE_SPARK_PROJECT_ID \ -instance=$BIGTABLE_SPARK_INSTANCE_ID \ deletetable $BIGTABLE_SPARK_WORDCOUNT_TABLE
What's next
- Learn more about Cloud Bigtable.
- Learn more about Dataproc.
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see our Site Policies. Java is a registered trademark of Oracle and/or its affiliates.