GroupByKey et jointure

La transformation principale GroupByKey est une opération de réduction parallèle utilisée pour traiter des collections de paires clé/valeur. Vous utilisez GroupByKey avec une PCollection d'entrée de paires clé/valeur qui représente un élément multimap, où la collection contient plusieurs paires ayant la même clé mais des valeurs différentes. La transformation GroupByKey vous permet de rassembler toutes les valeurs de l'élément multimap partageant la même clé.

GroupByKey est analogue à la phase de lecture aléatoire d'un algorithme de style mappage/brassage/réduction. Vous utilisez GroupByKey pour collecter toutes les valeurs associées à une clé unique.

GroupByKey est un bon moyen d'agréger des données ayant quelque chose en commun. Par exemple, vous souhaiterez peut-être regrouper les commandes client provenant du même code postal (la "clé" correspondant au code postal de chaque commande et la "valeur" au reste de la commande). Vous souhaitez peut-être également regrouper toutes les requêtes utilisateur par ID utilisateur ou par heure de la journée.

Vous pouvez également joindre plusieurs collections de paires clé/valeur, lorsque ces collections partagent une clé commune (même si les types de valeur sont différents). Il existe deux manières d'effectuer une jointure. Une solution consiste à utiliser la transformation CoGroupByKey, qui vous permet de regrouper toutes les valeurs (de tout type) dans plusieurs collections partageant une clé commune. L'autre méthode consiste à joindre plusieurs collections de paires clé/valeur en utilisant un ParDo avec une ou plusieurs entrées secondaires. Dans certains cas, cette seconde méthode peut s'avérer plus efficace que le recours à un CoGroupByKey.

Les jointures sont utiles lorsque vous avez plusieurs ensembles de données, provenant éventuellement de plusieurs sources, qui fournissent des informations sur des éléments connexes. Par exemple, supposons que vous ayez deux fichiers différents contenant des données d'enchères : un fichier contient l'ID de l'enchère, les données d'enchères et les données de tarification ; l'autre fichier contient l'ID de l'enchère et une description de l'article. Vous pouvez joindre ces deux ensembles de données en utilisant l'ID de l'enchère en tant que clé commune et les autres données connexes en tant que valeurs associées. Après la jointure, vous disposez d'un ensemble de données contenant toutes les informations (enchère, tarification et article) associées à chaque ID de l'enchère.

GroupByKey

Examinons les mécanismes de GroupByKey à l'aide d'un exemple simple, dans lequel notre ensemble de données est constitué de mots provenant d'un fichier texte et de la ligne sur laquelle ils apparaissent. Nous voulons regrouper tous les numéros de ligne (valeurs) qui partagent le même mot (clé), afin de voir tous les endroits du texte où un mot donné apparaît.

Notre entrée est une collection PCollection de paires clé/valeur où chaque mot est une clé et la valeur est un numéro de ligne dans le fichier où le mot apparaît. Voici une liste des paires clé/valeur de notre collection d'entrée :

  cat, 1
  dog, 5
  and, 1
  jump, 3
  tree, 2
  cat, 5
  dog, 2
  and, 2
  cat, 9
  and, 6
  ...

La transformation GroupByKey rassemble toutes les valeurs avec la même clé et crée une nouvelle paire composée de la clé unique et d'une collection de toutes les valeurs associées à cette clé dans la PCollection d'entrée. Si nous appliquions GroupByKey à la collection de paires clé/valeur ci-dessus, la collection de sortie ressemblerait à ce qui suit :

  cat, [1,5,9]
  dog, [5,2]
  and, [1,2,6]
  jump, [3]
  tree, [2]
  ...

Ainsi, GroupByKey représente une transformation d'un type multi-map, qui est un mappage de plusieurs clés avec des valeurs individuelles, vers un type uni-map, qui est un mappage de clés uniques avec des collections de valeurs.

Java

Remarque sur la représentation des paires clé/valeur :
Dans le SDK Dataflow pour Java, vous représentez une paire clé/valeur avec un objet de type KV<K, V>. KV est une classe de domaine d'expertise avec des clés de type K et des valeurs de type V.

Un schéma de traitement commun aux collections regroupées par clé (le résultat d'une transformation GroupByKey) consiste à combiner les valeurs associées à chaque clé en une seule valeur fusionnée associée à cette clé. Le SDK Dataflow encapsule l'ensemble de ce schéma (regroupement de clés, puis combinaison des valeurs pour chaque clé) en tant que transformation Combine. Pour plus d'informations, consultez la section Combinaison de collections et de valeurs.

Appliquer une transformation GroupByKey

Vous devez appliquer un GroupByKey à une PCollection d'entrée de paires clé/valeur. GroupByKey renvoie une nouvelle collection PCollection de paires clé/valeur dans la nouvelle collection, les clés sont uniques et chaque élément de valeur associé est en fait un flux de valeurs contenant une ou plusieurs valeurs associées à la clé.

Java

L'exemple de code suivant montre comment appliquer GroupByKey à une collection PCollection d'objets KV<K, V>, où chaque paire d'éléments représente une clé de type K et une valeur unique de type V.

La valeur GroupByKey renvoyée est une nouvelle collection PCollection de type KV<K, Iterable<V>>, où chaque paire d'éléments représente une clé et une collection de valeurs sous la forme d'un élément Iterable Java.

  // A PCollection of key/value pairs: words and line numbers.
  PCollection<KV<String, Integer>> wordsAndLines = ...;

  // Apply a GroupByKey transform to the PCollection "wordsAndLines".
  PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
    GroupByKey.<String, Integer>create());
Java 8 peut inférer les types de paramètres de GroupByKey.create, mais il peut être nécessaire de les spécifier explicitement dans les versions antérieures de Java.

GroupByKey avec fenêtrage

Java

GroupByKey se comporte de manière légèrement différente lorsque la PCollection d'entrée est divisée en plusieurs fenêtres au lieu d'une seule fenêtre globale.

La transformation GroupByKey considère également la fenêtre à laquelle chaque élément appartient lors de l'exécution de la réduction. La ou les fenêtres (déterminées par l'horodatage de chaque paire clé/valeur) agissent essentiellement en tant que clé secondaire. GroupByKey avec fenêtrage regroupe donc à la fois par clé et par fenêtre. Lorsque tous les éléments font partie d'une seule fenêtre globale, GroupByKey prend la forme de la sémantique simple décrite ci-dessus.

Bien que la ou les fenêtres d'un élément agissent en tant que clé secondaire pour le regroupement, elles peuvent être potentiellement plus puissantes. Les éléments peuvent appartenir à plusieurs fenêtres et des fenêtres superposées peuvent être fusionnées. Cela vous permet de créer des regroupements plus complexes.

Appliquons le fenêtrage à notre exemple précédent :

  cat, 1 (window 0)
  dog, 5 (window 0)
  and, 1 (window 0)

  jump, 3 (window 1)
  tree, 2 (window 1)
  cat, 5  (window 1)

  dog, 2 (window 2)
  and, 2 (window 2)
  cat, 9 (window 2)
  and, 6 (window 2)
  ...

GroupByKey rassemble tous les éléments ayant la même clé et la même fenêtre, et génère une collection de sortie comme ci-après :

  cat, [1] (window 0)
  dog, [5] (window 0)
  and, [1] (window 0)

  jump, [3] (window 1)
  tree, [2] (window 1)
  cat, [5]  (window 1)

  dog, [2]   (window 2)
  and, [2,6] (window 2)
  cat, [9]   (window 2)

Notez que la fenêtre affecte maintenant les groupes de sortie. Les paires clé/valeur dans différentes fenêtres ne sont pas regroupées.

Jointures à CoGroupByKey

La transformation CoGroupByKey effectue une jointure relationnelle de deux ensembles de données ou plus. CoGroupByKey regroupe les valeurs de plusieurs PCollection de paires clé/valeur, où chaque PCollection de la sortie possède le même type de clé.

Java

Pour la sécurité de type, Dataflow exige que vous transmettiez chaque PCollection en tant que partie de KeyedPCollectionTuple. Vous devez déclarer un TupleTag pour chaque PCollection d'entrée que vous souhaitez transmettre à CoGroupByKey.

CoGroupByKey regroupe la sortie jointe dans un objet CoGbkResult.

Utilisons un exemple simple pour illustrer les mécanismes d'un élément CoGroupByKey. Nous avons deux collections d'entrée à regrouper dans un élément KeyedPCollectionTuple. Le premier est un PCollection<K, V1>, nous allons donc assigner un TupleTag<V1> appelé tag1. Les paires clé/valeur dans cette PCollection sont les suivantes :

  key1 &map; v1
  key2 &map; v2
  key2 &map; v3

Le deuxième exemple illustre une PCollection<K, V2>, nous attribuerons donc un TupleTag<V2> appelé tag2. Cette collection inclut :

  key1 &map; x1
  key1 &map; x2
  key3 &map; x3

La collection CoGbkResult résultante contient toutes les données associées à chaque clé unique de n'importe quelle collection d'entrée. Le type de données renvoyé est PCollection<KV<K, CoGbkResult>>, avec le contenu suivant :

  key1 -> {
    tag1 &map; [v1]
    tag2 &map; [x1, x2]
  }
  key2 -> {
    tag1 &map; [v2, v3]
    tag2 &map; []
  }
  key3 -> {
    tag1 &map; []
    tag2 &map; [x3]
  }

Après avoir appliqué CoGroupByKey, vous pouvez rechercher les données de chaque collection à l'aide du TupleTag approprié.

Appliquer CoGroupByKey

Java

CoGroupByKey accepte un tuple de PCollection associé (PCollection<KV<K, V>>) en tant qu'entrée. En sortie, CoGroupByKey renvoie un type spécial appelé CoGbkResults, qui regroupe les valeurs de toutes les PCollection d'entrée par leurs clés communes. Vous indexez les CoGbkResults à l'aide du mécanisme TupleTag pour plusieurs collections. Vous pouvez accéder à une collection spécifique de l'objet CoGbkResults à l'aide du TupleTag que vous avez fourni avec la collection initiale.

Voici un exemple qui joint deux ensembles de données différents (provenant peut-être de sources différentes) lus dans des PCollection distinctes :

  // Each data set is represented by key-value pairs in separate PCollections.
  // Both data sets share a common key type ("K").
  PCollection<KV<K, V1>> pc1 = ...;
  PCollection<KV<K, V2>> pc2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> tag1 = new TupleTag<V1>();
  final TupleTag<V2> tag2 = new TupleTag<V2>();

  // Merge collection values into a CoGbkResult collection.
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(tag1, pc1)
                         .and(tag2, pc2)
                         .apply(CoGroupByKey.<K>create());

Dans la PCollection<KV<K, CoGbkResult>> résultante; chaque clé (toutes de type K) aura un CoGbkResult différent. En d'autres termes, CoGbkResult est une correspondance de TupleTag<T> à Iterable<T>.

Voici un autre exemple d'utilisation de CoGroupByKey, suivi d'un ParDo qui consomme le CoGbkResult obtenu ; ParDo pourrait, par exemple, formater les données pour un traitement ultérieur :

  // Each BigQuery table of key-value pairs is read into separate PCollections.
  // Each shares a common key ("K").
  PCollection<KV<K, V1>> pt1 = ...;
  PCollection<KV<K, V2>> pt2 = ...;

  // Create tuple tags for the value types in each collection.
  final TupleTag<V1> t1 = new TupleTag<V1>();
  final TupleTag<V2> t2 = new TupleTag<V2>();

  //Merge collection values into a CoGbkResult collection
  PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
    KeyedPCollectionTuple.of(t1, pt1)
                         .and(t2, pt2)
                         .apply(CoGroupByKey.<K>create());

  // Access results and do something.
  PCollection<T> finalResultCollection =
    coGbkResultCollection.apply(ParDo.of(
      new DoFn<KV<K, CoGbkResult>, T>() {
        @Override
        public void processElement(ProcessContext c) {
          KV<K, CoGbkResult> e = c.element();
          // Get all collection 1 values
          Iterable<V1> pt1Vals = e.getValue().getAll(t1);
          // Now get collection 2 values
          V2 pt2Val = e.getValue().getOnly(t2);
          ... Do Something ....
          c.output(...some T...);
        }
      }));

Utiliser CoGroupByKey avec des PCollections illimitées

Java

Lorsque vous utilisez CoGroupByKey pour regrouper des PCollection pour lesquelles une stratégie de fenêtrage est appliquée, toutes les PCollection que vous souhaitez grouper doivent utiliser la même stratégie de fenêtrage et la même taille de fenêtre. Par exemple, toutes les collections que vous fusionnez doivent utiliser (de manière hypothétique) des fenêtres fixes identiques de 5 minutes ou des fenêtres coulissantes de 4 minutes commençant toutes les 30 secondes.

Si votre pipeline tente d'utiliser CoGroupByKey pour fusionner des collections PCollection avec des fenêtres incompatibles, Dataflow générera une erreur IllegalStateException lors de la construction de votre pipeline.

Jointures avec les entrées ParDo et Side

Au lieu d'utiliser CoGroupByKey, vous pouvez appliquer un ParDo avec une ou plusieurs entrées secondaires pour effectuer une jointure. Une entrée secondaire est une entrée supplémentaire, autre que l'entrée principale, que vous pouvez fournir à votre PCollection. Votre DoFn peut accéder à l'entrée secondaire chaque fois qu'il traite un élément dans la PCollection d'entrée.

Effectuer une jointure de cette manière peut s'avérer plus efficace que d'utiliser CoGroupByKey dans certains cas, par exemple lorsque :

  • Les collections PCollection que vous rejoignez ont une taille disproportionnée et les plus petites PCollection pourraient tenir dans la mémoire.
  • Vous avez une grande table que vous souhaitez joindre plusieurs fois à différents endroits du pipeline. Au lieu de faire plusieurs fois un CoGroupByKey, vous pouvez créer une entrée secondaire et la transmettre à plusieurs ParDo. Par exemple, vous souhaitez joindre deux tables pour en générer une troisième. Ensuite, vous souhaitez rejoindre la première table avec la troisième et ainsi de suite.

Pour effectuer une jointure de cette manière, appliquez un ParDo à l'une des collections PCollection et transmettez la ou les autres collections PCollection en tant qu'entrées secondaires. L'exemple de code suivant montre comment effectuer une telle jointure.

Java

// Each BigQuery table of key-value pairs is read into separate PCollections.
PCollection<KV<K1, V1>> mainInput = ...
PCollection<KV<K2, V2>> sideInput = ...

// Create a view from your side input data
final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

// Access side input and perform join logic
PCollection<T> joinedData =
mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
  @Override
  public void processElement(ProcessContext c) {
    K2 sideInputKey = ... transform c.element().getKey() into K2 ...
    V2 sideInputValue = c.sideInput(view).get(sideInputKey);
    V1 mainInputValue = c.element().getValue();
    ... Do Something ...
    c.output(...some T...);
  }
}));
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.