MapReduce-Job mit dem BigQuery-Connector schreiben

Der Hadoop BigQuery-Connector wird standardmäßig auf allen Dataproc 1.0-1.2-Clusterknoten unter /usr/lib/hadoop/lib/ installiert. Er ist sowohl in Spark- als auch in PySpark-Umgebungen verfügbar.

Dataproc-Image-Versionen 1.3 und höher: Der BigQuery-Connector wird nicht standardmäßig in den Dataproc -Image-Versionen 1.3 und höher installiert. So verwenden Sie den Connector mit diesen Versionen:

  1. Installieren Sie den BigQuery-Connector mithilfe der Initialisierungsaktion.

  2. Geben Sie den BigQuery-Connector im Parameter jars an, wenn Sie einen Job senden:


  3. Fügen Sie die BigQuery-Connector-Klassen zur jar-with-dependencies der Anwendung hinzu.

Konflikte vermeiden: Wenn Ihre Anwendung eine andere Connector-Version als die in Ihrem Dataproc-Cluster bereitgestellte Connector-Version verwendet, müssen Sie entweder:

  1. Erstellen Sie einen neuen Cluster unter Verwendung einer Initialisierungsaktion, mit der die von Ihrer Anwendung verwendete Connector-Version installiert wird.

  2. Die Connector-Klassen und -Abhängigkeiten für die von Ihnen verwendete Version in die JAR-Datei Ihrer Anwendung einfügen und verschieben. Auf diese Weise vermeiden Sie Konflikte zwischen Ihrer Connector-Version und der auf Ihrem Dataproc-Cluster bereitgestellten Connector-Version. Weitere Informationen dazu finden Sie im Beispiel für die Verschiebung von Abhängigkeiten in Maven.


GsonBigQueryInputFormat stellt Hadoop BigQuery-Objekte in einem JsonObject-Format über die folgenden primären Vorgänge bereit:

  • Verwenden einer benutzerdefinierten Abfrage zum Auswählen von BigQuery-Objekten.
  • Gleichmäßiges Aufteilen der Ergebnisse der Abfrage auf Hadoop-Knoten.
  • Parsen der Splitergebnisse in Java-Objekte zur Übergabe an den Mapper. Die Hadoop-Mapper-Klasse erhält eine JsonObject-Darstellung für jedes ausgewählte BigQuery-Objekt.

Die BigQueryInputFormat-Klasse bietet Zugriff auf BigQuery-Datensätze über eine Erweiterung der Hadoop InputFormat-Klasse. So verwenden Sie die BigQueryInputFormat-Klasse:

  1. Zeilen müssen dem Hauptjob von Hadoop hinzugefügt werden, um Parameter in der Hadoop-Konfiguration festzulegen.

  2. Die Klasse InputFormat muss auf GsonBigQueryInputFormat festgelegt sein.

In den folgenden Abschnitten erfahren Sie, wie Sie diese Anforderungen erfüllen.


Die BigQuery-Tabelle, aus der gelesen werden soll, im Format: optional-projectId:datasetId.tableId
Beispiel: publicdata:samples.shakespeare
Die Bezeichnung des BigQuery-Projekts, unter dem alle Eingabevorgänge stattfinden.
Beispiel: my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.


  • job bezieht sich auf org.apache.hadoop.mapreduce.Job, den auszuführenden Hadoop-Job.
  • conf bezieht sich auf org.apache.hadoop.Configuration für den Hadoop-Job.


Die Klasse GsonBigQueryInputFormat liest aus BigQuery. Die BigQuery-Objekte werden nacheinander als Eingabe an die Hadoop-Funktion Mapper übergeben. Die Eingaben erfolgen in Form von LongWritable und JsonObject. LongWritable verfolgt die Datensatznummer, JsonObject enthält den BigQuery-Datensatz im JSON-Format. Der Mapper akzeptiert das LongWritable- und JsonObject-Paar als Eingabe. Hier ist ein Snippet aus dem Mapper für einen Beispiel-Wordcount-Job.

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

   * The mapper function for WordCount.
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,

    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);


IndirectBigQueryOutputFormat bietet Hadoop die Möglichkeit, JsonObject-Werte direkt in eine BigQuery-Tabelle zu schreiben. Diese Klasse bietet über eine Erweiterung der Hadoop-Klasse OutputFormat Zugriff auf BigQuery-Datensätze. Zur korrekten Verwendung müssen mehrere Parameter in der Hadoop-Konfiguration festgelegt werden und die OutputFormat-Klasse muss auf IndirectBigQueryOutputFormat festgelegt sein. Im Folgenden finden Sie ein Beispiel für die notwendigen Parameter und Codezeilen zur korrekten Verwendung von IndirectBigQueryOutputFormat.


Die Bezeichnung des BigQuery-Projekts, unter dem alle Ausgabevorgänge stattfinden.
Beispiel: "my-first-cloud-project"
Das BigQuery-Dataset, in das die endgültigen Jobergebnisse geschrieben werden sollen, im Format optional-projectId:datasetId.tableId. Die Dataset-ID sollte im Projekt bereits vorhanden sein. Das Dataset outputDatasetId_hadoop_temporary wird in BigQuery für die Aufnahme temporärer Ergebnisse erstellt. Überprüfen Sie, dass es nicht mit vorhandenen Datasets in Konflikt steht.
Ein Schema, das das Schema für die BigQuery-Ausgabetabelle definiert
Der Ausgabepfad zum Speichern temporärer Cloud Storage-Daten (gs://bucket/dir/)
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.

    // (Optional) Configure the KMS key used to encrypt the output table.


Die IndirectBigQueryOutputFormat-Klasse schreibt in BigQuery. Sie erhält einen Schlüssel und einen JsonObject-Wert als Eingabe, schreibt aber nur den JsonObject-Wert in BigQuery (der Schlüssel wird ignoriert). Das JsonObject sollte einen BigQuery-Datensatz im Json-Format enthalten. Der Reducer sollte ein Paar aus einem Schlüssel eines beliebigen Typs (in unserem Beispiel-WordCount-Job wird NullWritable verwendet) und einem JsonObject-Wert ausgeben. Der Reducer für den Beispiel-WordCount-Job wird unten angezeigt.

   * Reducer function for WordCount.
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());


Bereinigen Sie nach Abschluss des Jobs die Cloud Storage-Exportpfade.

GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

Die Wortzahl wird in der BigQuery-Ausgabetabelle in der Google Cloud Console angezeigt.

Vollständiger Code für einen Beispiel-WordCount-Job

Der folgende Code ist ein Beispiel für einen einfachen WordCount-Job, mit dem Wortzahlen aus Objekten in BigQuery aggregiert werden.



import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.util.ArrayList;
import java.util.List;

 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);

   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();

    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);

   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());

   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.

    // (Optional) Configure the KMS key used to encrypt the output table.

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.

    // Tell the job what data the mapper will output.

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.


    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at


Der BigQuery-Connector erfordert Java 8.

Informationen zu Apache Maven Dependency

    <version>insert "hadoopX-X.X.X" connector version number here</version>

Weitere Informationen finden Sie in den Versionshinweisen zu BigQuery-Connector und in der Javadoc-Referenz.