Écrire une tâche MapReduce avec le connecteur BigQuery

Le connecteur Hadoop BigQuery est installé par défaut sur tous les nœuds de cluster Dataproc 1.0-1.2 sous /usr/lib/hadoop/lib/. Il est disponible dans les environnements Spark et PySpark.

Images Dataproc versions 1.5 et ultérieures:le connecteur BigQuery n'est pas installé par défaut dans les versions d'image 1.5 et ultérieures Dataproc. Pour l'utiliser avec ces versions :

  1. Installez le connecteur BigQuery à l'aide de cette action d'initialisation.

  2. Spécifiez le connecteur BigQuery dans le paramètre jars lorsque vous envoyez une tâche:

    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar
    

  3. Incluez les classes du connecteur BigQuery dans le fichier JAR (avec ses dépendances) de l'application.

Pour éviter les conflits : Si votre application utilise une version de connecteur différente de la version de connecteur déployée sur votre cluster Dataproc, vous avez le choix entre les deux options suivantes :

  1. Créer un cluster à l'aide d'une action d'initialisation, qui installe la version de connecteur utilisée par votre application.

  2. Inclure et transférer les classes et les dépendances de connecteur pour la version que vous utilisez dans le fichier JAR de votre application afin d'éviter tout conflit entre votre version de connecteur et la version déployée sur votre cluster Dataproc (consultez cet exemple de transfert de dépendance dans Maven).

Classe GsonBigQueryInputFormat

GsonBigQueryInputFormat fournit à Hadoop les objets BigQuery au format JsonObject via les opérations de base suivantes :

  • Utilisation d'une requête spécifiée par l'utilisateur pour sélectionner des objets BigQuery
  • Répartition homogène des résultats de la requête entre les nœuds Hadoop
  • Analyse des éléments répartis dans des objets Java à transmettre au mappeur. La classe Hadoop Mapper reçoit une représentation JsonObject de chaque objet BigQuery sélectionné.

La classe BigQueryInputFormat fournit un accès aux enregistrements BigQuery via une extension de la classe Hadoop InputFormat. Pour utiliser la classe BigQueryInputFormat, procédez comme suit :

  1. Des lignes doivent être ajoutées à la tâche Hadoop principale afin de définir les paramètres de la configuration Hadoop.

  2. La classe InputFormat doit être définie sur GsonBigQueryInputFormat.

Les sections ci-dessous vous expliquent comment répondre à ces exigences.

Paramètres d'entrée

QualifiedInputTableId
Table BigQuery à lire, sous la forme : optional-projectId:datasetId.tableId
Exemple: publicdata:samples.shakespeare
projectId
L'ID du projet BigQuery sous lequel toutes les opérations d'entrée sont effectuées.
Exemple : my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);

Remarques :

  • job fait référence à org.apache.hadoop.mapreduce.Job, la tâche Hadoop à exécuter.
  • conf fait référence à org.apache.hadoop.Configuration pour la tâche Hadoop.

Mappeur

La classe GsonBigQueryInputFormat lit les données à partir de BigQuery et transmet les objets BigQuery un par un en tant qu'entrées à la fonction Hadoop Mapper. Les entrées se présentent sous la forme d'une paire composée des éléments suivants:

  • LongWritable, le numéro d'enregistrement
  • JsonObject, l'enregistrement BigQuery au format Json

Mapper accepte les éléments LongWritable et JsonObject pair en tant qu'entrées.

Voici un extrait de code Mapper pour un exemple de tâche GeoJSON.

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * The mapper function for WordCount.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

Classe IndirectBigQueryOutputFormat

La classe IndirectBigQueryOutputFormat fournit à Hadoop la capacité d'écrire des valeurs JsonObject directement dans une table BigQuery. Elle fournit un accès aux enregistrements BigQuery via une extension de la classe Hadoop OutputFormat. Pour l'utiliser correctement, vous devez définir plusieurs paramètres dans la configuration Hadoop. De plus, la classe OutputFormat doit être définie sur IndirectBigQueryOutputFormat. Vous trouverez ci-dessous un exemple des paramètres à définir, ainsi que les lignes de code requises pour utiliser correctement IndirectBigQueryOutputFormat.

Paramètres de sortie

projectId
L'ID du projet BigQuery sous lequel toutes les opérations de sortie sont effectuées.
Exemple : "my-first-cloud-project"
QualifiedOutputTableId
L'ensemble de données BigQuery dans lequel les résultats finaux de la tâche seront écrits, sous la forme optional-projectId:datasetId.tableId. Le "datasetId" devrait déjà être présent dans votre projet. Un ensemble de données temporaire "outputDatasetId_hadoop_temporary" sera créé dans BigQuery pour les résultats intermédiaires. Vérifiez que cet élément n'entre pas en conflit avec un ensemble de données existant.
Exemples :
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
outputTableFieldSchema
Schéma définissant le schéma de la table BigQuery de sortie.
GcsOutputPath
Chemin de sortie pour stocker les données Cloud Storage temporaires (gs://bucket/dir/)
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
);

Réducteur

La classe IndirectBigQueryOutputFormat écrit dans BigQuery. Elle accepte une clé et une valeur JsonObject en entrée et n'écrit que la valeur JsonObject dans BigQuery (la clé est ignorée). L'objet JsonObject doit contenir un enregistrement BigQuery au format Json. Le réducteur doit générer une paire composée d'une clé (peu importe le type, nous avons utilisé une clé NullWritable dans notre exemple de tâche WordCount) et d'une valeur JsonObject. Le réducteur utilisé dans l'exemple de tâche WordCount est présenté ci-dessous.

  /**
   * Reducer function for WordCount.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

Effectuer un nettoyage

Une fois la tâche terminée, nettoyez les chemins d’exportation Cloud Storage.

job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

Vous pouvez afficher le nombre de mots dans la table BigQuery de sortie de la console Google Cloud.

Code complet pour un exemple de tâche WordCount

Le code ci-dessous est un exemple d'une tâche WordCount simple qui agrège le nombre de mots contenus dans divers objets BigQuery.

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.
   */

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

  /**
   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   *
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   *
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   *
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   *
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(GsonBigQueryInputFormat.class);

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at
    // https://console.cloud.google.com/.
  }
}

Version Java compatible

Java 8 est indispensable au fonctionnement du connecteur BigQuery.

Informations sur la dépendance Apache Maven

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>insert "hadoopX-X.X.X" connector version number here</version>
</dependency>

Pour en savoir plus, consultez les notes de version relatives au connecteur BigQuery et la documentation de référence JavaDoc.