The Hadoop BigQuery connector is installed by default on all
Dataproc 1.0-1.2 cluster nodes under /usr/lib/hadoop/lib/
.
It is available in both Spark and PySpark environments.
Dataproc image versions 1.5+: The BigQuery connector is not installed by default in Dataproc image versions 1.5 and higher. To use it with these versions:
Install the BigQuery connector using this initialization action.
Specify the BigQuery connector in the
jars
parameter when submitting a job:--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar
Include the BigQuery connector classes in the application's jar-with-dependencies.
To Avoid Conflicts: If your application uses a connector version that is different from the connector version deployed on your Dataproc cluster, you must either:
Create a new cluster with an initialization action that installs the connector version used by your application, or
Include and relocate the connector classes and connector dependencies for the version you are using into your application's jar to avoid conflict between your connector version and the connector version deployed on your Dataproc cluster (see this example of dependencies relocation in Maven).
GsonBigQueryInputFormat class
GsonBigQueryInputFormat
provides Hadoop with the BigQuery
objects in a JsonObject format via the following primary
operations:
- Using a user-specified query to select BigQuery objects
- Splitting the results of the query evenly among Hadoop nodes
- Parsing the splits into Java objects to pass to the Mapper.
The Hadoop Mapper class receives a
JsonObject
representation of each selected BigQuery object.
The BigQueryInputFormat
class provides access to BigQuery records through an extension of the
Hadoop
InputFormat
class. To use the BigQueryInputFormat class:
Lines must be added to the main Hadoop job to set parameters in the Hadoop configuration.
The InputFormat class must be set to
GsonBigQueryInputFormat
.
The sections, below, show you how to meet these requirements.
Input Parameters
- QualifiedInputTableId
- The BigQuery table to read from, in the form:
optional-projectId:datasetId.tableId
Example:publicdata:samples.shakespeare
- projectId
- The BigQuery projectId under which all of the input
operations occur.
Example:my-first-cloud-project
// Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input parameters. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Set InputFormat. job.setInputFormatClass(GsonBigQueryInputFormat.class);
Notes:
job
refers to theorg.apache.hadoop.mapreduce.Job
, the Hadoop job to run.conf
refers to theorg.apache.hadoop.Configuration
for the Hadoop job.
Mapper
The GsonBigQueryInputFormat
class reads from BigQuery and passes
BigQuery objects one at a time as input to the Hadoop Mapper
function. The inputs take the form of a pair comprising the following:
LongWritable
, the record numberJsonObject
, the Json-formatted BigQuery record
The Mapper
accepts the LongWritable
and JsonObject pair
as
input.
Here is a snippet from the Mapper
for a
sample WordCount job.
// private static final LongWritable ONE = new LongWritable(1); // The configuration key used to specify the BigQuery field name // ("column name"). public static final String WORDCOUNT_WORD_FIELDNAME_KEY = "mapred.bq.samples.wordcount.word.key"; // Default value for the configuration entry specified by // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in // publicdata:samples.shakespeare or 'repository_name' // in publicdata:samples.github_timeline. public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word"; /** * The mapper function for WordCount. */ public static class Map extends Mapper <LongWritable, JsonObject, Text, LongWritable> { private static final LongWritable ONE = new LongWritable(1); private Text word = new Text(); private String wordKey; @Override public void setup(Context context) throws IOException, InterruptedException { // Find the runtime-configured key for the field name we're looking for // in the map task. Configuration conf = context.getConfiguration(); wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT); } @Override public void map(LongWritable key, JsonObject value, Context context) throws IOException, InterruptedException { JsonElement countElement = value.get(wordKey); if (countElement != null) { String wordInRecord = countElement.getAsString(); word.set(wordInRecord); // Write out the key, value pair (write out a value of 1, which will be // added to the total count for this word in the Reducer). context.write(word, ONE); } } }
IndirectBigQueryOutputFormat class
IndirectBigQueryOutputFormat
provides Hadoop with the ability to write
JsonObject
values directly into a BigQuery table. This class provides access
to BigQuery records through an extension of the Hadoop
OutputFormat
class. To use it correctly, several parameters must be set in the Hadoop
configuration, and the OutputFormat class must be set to
IndirectBigQueryOutputFormat
. Below is an example of the
parameters to set and the lines of code needed to correctly use
IndirectBigQueryOutputFormat
.
Output Parameters
- projectId
- The BigQuery projectId under which all of the output
operations occur.
Example: "my-first-cloud-project" - QualifiedOutputTableId
- The BigQuery dataset to write the final job results to, in
the form optional-projectId:datasetId.tableId.
The datasetId should already be present in your project.
outputDatasetId_hadoop_temporary dataset will be created in
BigQuery for temporary results. Make sure this does not conflict
with an existing dataset.
Examples:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
- outputTableFieldSchema
- A schema that defines the schema for the output BigQuery table
- GcsOutputPath
- The output path to store temporary Cloud Storage data (
gs://bucket/dir/
)
// Define the schema we will be using for the output BigQuery table. List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>(); outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING")); outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER")); TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema); // Create the job and get its configuration. Job job = new Job(parser.getConfiguration(), "wordcount"); Configuration conf = job.getConfiguration(); // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Configure output. BigQueryOutputConfiguration.configure( conf, outputQualifiedTableId, outputSchema, outputGcsPath, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class); // (Optional) Configure the KMS key used to encrypt the output table. BigQueryOutputConfiguration.setKmsKeyName( conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1"); );
Reducer
The IndirectBigQueryOutputFormat
class writes to BigQuery.
It takes a key and a JsonObject
value as input and writes only the
JsonObject value to BigQuery (the key is ignored). The JsonObject
should
contain a Json-formatted BigQuery record. The Reducer should output a key of
any type (NullWritable
is used in our sample WordCount job)
and JsonObject
value pair. The Reducer for the
sample WordCount job is shown below.
/** * Reducer function for WordCount. */ public static class Reduce extends Reducer<Text, LongWritable, JsonObject, NullWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // Add up the values to get a total number of occurrences of our word. long count = 0; for (LongWritable val : values) { count = count + val.get(); } JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("Word", key.toString()); jsonObject.addProperty("Count", count); // Key does not matter. context.write(jsonObject, NullWritable.get()); } }
Clean up
After the job completes, clean up Cloud Storage export paths.
job.waitForCompletion(true); GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
You can view word counts in the BigQuery output table in the Google Cloud console.
Complete Code for a sample WordCount job
The code below is an example of a simple WordCount job that aggregates word counts from objects in BigQuery.
package com.google.cloud.hadoop.io.bigquery.samples;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sample program to run the Hadoop Wordcount example over tables in BigQuery.
*/
public class WordCount {
// The configuration key used to specify the BigQuery field name
// ("column name").
public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
"mapred.bq.samples.wordcount.word.key";
// Default value for the configuration entry specified by
// WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
// publicdata:samples.shakespeare or 'repository_name'
// in publicdata:samples.github_timeline.
public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";
// Guava might not be available, so define a null / empty helper:
private static boolean isStringNullOrEmpty(String toTest) {
return toTest == null || "".equals(toTest);
}
/**
* The mapper function for WordCount. For input, it consumes a LongWritable
* and JsonObject as the key and value. These correspond to a row identifier
* and Json representation of the row's values/columns.
* For output, it produces Text and a LongWritable as the key and value.
* These correspond to the word and a count for the number of times it has
* occurred.
*/
public static class Map
extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
private Text word = new Text();
private String wordKey;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Find the runtime-configured key for the field name we're looking for in
// the map task.
Configuration conf = context.getConfiguration();
wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
}
@Override
public void map(LongWritable key, JsonObject value, Context context)
throws IOException, InterruptedException {
JsonElement countElement = value.get(wordKey);
if (countElement != null) {
String wordInRecord = countElement.getAsString();
word.set(wordInRecord);
// Write out the key, value pair (write out a value of 1, which will be
// added to the total count for this word in the Reducer).
context.write(word, ONE);
}
}
}
/**
* Reducer function for WordCount. For input, it consumes the Text and
* LongWritable that the mapper produced. For output, it produces a JsonObject
* and NullWritable. The JsonObject represents the data that will be
* loaded into BigQuery.
*/
public static class Reduce
extends Reducer<Text, LongWritable, JsonObject, NullWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Add up the values to get a total number of occurrences of our word.
long count = 0;
for (LongWritable val : values) {
count = count + val.get();
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("Word", key.toString());
jsonObject.addProperty("Count", count);
// Key does not matter.
context.write(jsonObject, NullWritable.get());
}
}
/**
* Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
* [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
* [QualifiedOutputTableId] [GcsOutputPath]
*
* ProjectId - Project under which to issue the BigQuery
* operations. Also serves as the default project for table IDs that don't
* specify a project for the table.
*
* QualifiedInputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* InputTableFieldName - Name of the field to count in the
* input table, e.g., 'word' in publicdata:samples.shakespeare or
* 'repository_name' in publicdata:samples.github_timeline.
*
* QualifiedOutputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* GcsOutputPath - The output path to store temporary
* Cloud Storage data, e.g., gs://bucket/dir/
*
* @param args a String[] containing ProjectId, QualifiedInputTableId,
* InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
* @throws IOException on IO Error.
* @throws InterruptedException on Interrupt.
* @throws ClassNotFoundException if not all classes are present.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// GenericOptionsParser is a utility to parse command line arguments
// generic to the Hadoop framework. This example doesn't cover the specifics,
// but recognizes several standard command line arguments, enabling
// applications to easily specify a NameNode, a ResourceManager, additional
// configuration resources, etc.
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
// Make sure we have the right parameters.
if (args.length != 5) {
System.out.println(
"Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
+ "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
+ " ProjectId - Project under which to issue the BigQuery operations. Also serves "
+ "as the default project for table IDs that don't explicitly specify a project for "
+ "the table.\n"
+ " QualifiedInputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " InputTableFieldName - Name of the field to count in the input table, e.g., "
+ "'word' in publicdata:samples.shakespeare or 'repository_name' in "
+ "publicdata:samples.github_timeline.\n"
+ " QualifiedOutputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
+ "gs://bucket/dir/");
System.exit(1);
}
// Get the individual parameters from the command line.
String projectId = args[0];
String inputQualifiedTableId = args[1];
String inputTableFieldId = args[2];
String outputQualifiedTableId = args[3];
String outputGcsPath = args[4];
// Define the schema we will be using for the output BigQuery table.
List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Configure output.
BigQueryOutputConfiguration.configure(
conf,
outputQualifiedTableId,
outputSchema,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
TextOutputFormat.class);
// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
conf,
"projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);
// This helps Hadoop identify the Jar which contains the mapper and reducer
// by specifying a class in that Jar. This is required if the jar is being
// passed on the command line to Hadoop.
job.setJarByClass(WordCount.class);
// Tell the job what data the mapper will output.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(GsonBigQueryInputFormat.class);
// Instead of using BigQueryOutputFormat, we use the newer
// IndirectBigQueryOutputFormat, which works by first buffering all the data
// into a Cloud Storage temporary file, and then on commitJob, copies all data from
// Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
// since it only requires one BigQuery "load" job per Hadoop/Spark job, as
// compared to BigQueryOutputFormat, which performs one BigQuery job for each
// Hadoop/Spark task.
job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);
job.waitForCompletion(true);
// After the job completes, clean up the Cloud Storage export paths.
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
// You can view word counts in the BigQuery output table at
// https://console.cloud.google.com/.
}
}
Java version
The BigQuery connector requires Java 8.
Apache Maven Dependency Information
<dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>bigquery-connector</artifactId> <version>insert "hadoopX-X.X.X" connector version number here</version> </dependency>
For detailed information, see the BigQuery connector release notes and Javadoc reference.