GroupByKey et jointure

La transformation de cœur GroupByKey est une opération de réduction parallèle utilisée pour traiter les collections de paires clé/valeur. Vous utilisez GroupByKey avec une entrée PCollection de paires clé/valeur qui représente un multimap, où la collection contient plusieurs paires ayant la même clé, mais des valeurs différentes. La transformation GroupByKey vous permet de regrouper toutes les valeurs du fichier multimap partageant la même clé.

GroupByKey est similaire à la phase de lecture aléatoire d'un algorithme de style mappage, brassage ou réduction. Utilisez GroupByKey pour collecter toutes les valeurs associées à une clé unique.

GroupByKey est un bon moyen d'agréger des données qui ont quelque chose en commun. Par exemple, vous souhaiterez peut-être regrouper les commandes client provenant du même code postal (la "clé" correspondant au code postal de chaque commande et la "valeur" au reste de la commande). Vous souhaitez peut-être également regrouper toutes les requêtes utilisateur par ID utilisateur ou par heure de la journée.

Vous pouvez également joindre plusieurs collections de paires clé/valeur, lorsque ces collections partagent une clé commune (même si les types de valeur sont différents). Il existe deux manières d'effectuer une jointure. Une solution consiste à utiliser la transformation CoGroupByKey, qui vous permet de regrouper toutes les valeurs (de tout type) dans plusieurs collections partageant une clé commune. L'autre méthode consiste à joindre plusieurs collections de paires clé/valeur en utilisant un ParDo avec une ou plusieurs entrées secondaires. Dans certains cas, cette seconde méthode peut s'avérer plus efficace que l'utilisation d'un CoGroupByKey.

Les jointures sont utiles lorsque vous avez plusieurs ensembles de données, provenant éventuellement de plusieurs sources, qui fournissent des informations sur des éléments connexes. Par exemple, supposons que vous ayez deux fichiers différents contenant des données d'enchères : un fichier contient l'ID de l'enchère, les données d'enchères et les données de tarification ; l'autre fichier contient l'ID de l'enchère et une description de l'article. Vous pouvez joindre ces deux ensembles de données en utilisant l'ID de l'enchère en tant que clé commune et les autres données connexes en tant que valeurs associées. Après la jointure, vous disposez d'un ensemble de données contenant toutes les informations (enchère, tarification et article) associées à chaque ID de l'enchère.

GroupByKey

Examinons la mécanique de GroupByKey avec un exemple de cas simple, où notre ensemble de données se compose de mots d'un fichier texte et de la ligne sur laquelle ils apparaissent. Nous voulons regrouper tous les numéros de ligne (valeurs) qui partagent le même mot (clé), afin de voir tous les endroits du texte où un mot donné apparaît.

Notre entrée est un PCollection de paires clé-valeur où chaque mot est une clé, et la valeur est un numéro de ligne dans le fichier où le mot apparaît. Voici une liste des paires clé/valeur de notre collection d'entrée :

      cat, 1
      dog, 5
      and, 1
      jump, 3
      tree, 2
      cat, 5
      dog, 2
      and, 2
      cat, 9
      and, 6
      ...
    

La transformation GroupByKey rassemble toutes les valeurs avec la même clé et crée une nouvelle paire constituée de la clé unique et d'un ensemble de valeurs associées à cette clé dans l'entrée PCollection. Si nous devions appliquer GroupByKey à la collection de paires clé-valeur ci-dessus, la collection de sortie ressemblerait à ceci :

      cat, [1,5,9]
      dog, [5,2]
      and, [1,2,6]
      jump, [3]
      tree, [2]
      ...
    

Ainsi, GroupByKey représente une transformation d'une multi-map, qui correspond à une carte de plusieurs clés de valeurs individuelles, à une uni-map, des clés uniques aux collections de valeurs.

Java

Remarque concernant la représentation des paires clé-valeur :
Dans le SDK Java Dataflow, vous représentez une paire clé-valeur avec un objet de type KV<K, V>. KV est une classe de domaine d'expertise avec des clés de type K et des valeurs de type V.

Un modèle de traitement courant pour les collections regroupées par clé (résultat d'une transformation GroupByKey) consiste à combiner les valeurs associées à chaque clé dans une seule valeur fusionnée associée à cette clé. Les SDK Dataflow encapsulent l'ensemble du schéma (regroupement de clés, puis en combinant les valeurs pour chaque clé) en tant que transformation Combine. Pour plus d'informations, consultez la section Combinaison de collections et de valeurs.

Appliquer une transformation GroupByKey

Vous devez appliquer un GroupByKey à une entrée PCollection de paires clé-valeur. GroupByKey renvoie un nouveau PCollection paires clé-valeur ; dans la nouvelle collection, les clés sont uniques et chaque élément de valeur associé est en fait un flux de valeurs qui contient une ou plusieurs valeurs associées à la clé.

Java

L'exemple de code suivant montre comment appliquer GroupByKey à un objet PCollection sur KV<K, V>, où chaque paire d'éléments représente une clé de type K et une seule valeur de type V.

La valeur renvoyée par GroupByKey est un nouveau PCollection de type KV<K, Iterable<V>>, où chaque paire d'éléments représente une clé et un ensemble de valeurs sous la forme Java Iterable.

      // A PCollection of key/value pairs: words and line numbers.
      PCollection<KV<String, Integer>> wordsAndLines = ...;

      // Apply a GroupByKey transform to the PCollection "wordsAndLines".
      PCollection<KV<String, Iterable<Integer>>> groupedWords = wordsAndLines.apply(
        GroupByKey.<String, Integer>create());
    
Java 8 peut déduire les types de paramètres de GroupByKey.create, mais il peut être nécessaire de les spécifier explicitement dans les versions antérieures de Java.

GroupByKey avec fenêtrage

Java

GroupByKey se comporte un peu différemment lorsque l'entrée PCollection est divisée en plusieurs fenêtres - au lieu d'une seule fenêtre globale.

La transformation GroupByKey prend également en compte la fenêtre à laquelle appartient chaque élément lors de la réduction. La ou les fenêtres (déterminées par l'horodatage de chaque paire clé-valeur) agissent essentiellement en tant que clé secondaire. GroupByKey avec fenêtrage effectue donc des groupes à la fois par la clé et par la fenêtre. Lorsque tous les éléments font partie d'une seule fenêtre globale, GroupByKey dégénère en sémantique simple décrite ci-dessus.

Bien que la ou les fenêtres d'un élément agissent en tant que clé secondaire pour le regroupement, elles peuvent être potentiellement plus puissantes. Les éléments peuvent appartenir à plusieurs fenêtres et des fenêtres superposées peuvent être fusionnées. Cela vous permet de créer des regroupements plus complexes.

Appliquons le fenêtrage à notre exemple précédent :

      cat, 1 (window 0)
      dog, 5 (window 0)
      and, 1 (window 0)

      jump, 3 (window 1)
      tree, 2 (window 1)
      cat, 5  (window 1)

      dog, 2 (window 2)
      and, 2 (window 2)
      cat, 9 (window 2)
      and, 6 (window 2)
      ...
    

GroupByKey rassemble tous les éléments avec la même clé et la même fenêtre, générant ainsi une collection de sortie comme cell-ci :

      cat, [1] (window 0)
      dog, [5] (window 0)
      and, [1] (window 0)

      jump, [3] (window 1)
      tree, [2] (window 1)
      cat, [5]  (window 1)

      dog, [2]   (window 2)
      and, [2,6] (window 2)
      cat, [9]   (window 2)
    

Notez que la fenêtre affecte maintenant les groupes de sortie. Les paires clé-valeur dans différentes fenêtres ne sont pas regroupées.

Jointures à CoGroupByKey

La transformation CoGroupByKey effectue une jointure relationnelle de deux ou plusieurs ensembles de données. CoGroupByKey regroupe les valeurs de plusieurs PCollection de paires clé-valeur, où chaque PCollection de l'entrée possède le même type de clé.

Java

Pour la sécurité des types, Dataflow vous oblige à transmettre chaque PCollection dans le cadre d'un KeyedPCollectionTuple. Vous devez déclarer un TupleTag pour chaque entrée PCollection que vous souhaitez transmettre à CoGroupByKey.

CoGroupByKey regroupe la sortie jointe dans un objet CoGbkResult.

Prenons un exemple simple pour illustrer la mécanique d'un CoGroupByKey. Nous avons deux collections d'entrées à regrouper dans un KeyedPCollectionTuple. Le premier est un PCollection<K, V1>, nous allons donc attribuer un TupleTag<V1> appelé tag1. Les paires clé-valeur de ce PCollection sont les suivantes :

      key1 ↦ v1
      key2 ↦ v2
      key2 ↦ v3
    

La seconde est un PCollection<K, V2>, nous allons donc attribuer un TupleTag<V2> appelé tag2. Cette collection inclut :

      key1 ↦ x1
      key1 ↦ x2
      key3 ↦ x3
    

La collection CoGbkResult qui en résulte contient toutes les données associées à chaque clé unique de n'importe quelle collection en entrée. Le type de données renvoyé est PCollection<KV<K, CoGbkResult>>, avec le contenu suivant :

      key1 -> {
        tag1 ↦ [v1]
        tag2 ↦ [x1, x2]
      }
      key2 -> {
        tag1 ↦ [v2, v3]
        tag2 ↦ []
      }
      key3 -> {
        tag1 ↦ []
        tag2 ↦ [x3]
      }
    

Après avoir appliqué CoGroupByKey, vous pouvez rechercher les données de chaque collection à l'aide de TupleTag.

Appliquer CoGroupByKey

Java

CoGroupByKey accepte un tuple de PCollection associés (PCollection<KV<K, V>>) en entrée. En sortie, CoGroupByKey renvoie un type spécial appelé CoGbkResults, qui regroupe les valeurs de toutes les entrées PCollection par leurs clés communes. Indexez le CoGbkResults à l'aide du mécanisme TupleTag pour plusieurs collections. Vous pouvez accéder à une collection spécifique dans l'objet CoGbkResults en utilisant le TupleTag que vous avez fourni avec la collection initiale.

Voici un exemple qui regroupe deux ensembles de données différents (provenant peut-être de différentes sources) qui ont été lus dans des PCollection distincts :

      // Each data set is represented by key-value pairs in separate PCollections.
      // Both data sets share a common key type ("K").
      PCollection<KV<K, V1>> pc1 = ...;
      PCollection<KV<K, V2>> pc2 = ...;

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> tag1 = new TupleTag<V1>();
      final TupleTag<V2> tag2 = new TupleTag<V2>();

      // Merge collection values into a CoGbkResult collection.
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(tag1, pc1)
                             .and(tag2, pc2)
                             .apply(CoGroupByKey.<K>create());
    

Dans le PCollection<KV<K, CoGbkResult>> résultant, chaque clé (toutes de type K) aura un CoGbkResult différent. En d'autres termes, CoGbkResult est une carte de TupleTag<T> à Iterable<T>.

Voici un autre exemple d'utilisation de CoGroupByKey, suivi d'un ParDo qui consomme le CoGbkResult résultant ; le ParDo peut, par exemple, mettre en forme les données pour un traitement ultérieur :

      // Each BigQuery table of key-value pairs is read into separate PCollections.
      // Each shares a common key ("K").
      PCollection<KV<K, V1>> pt1 = ...;
      PCollection<KV<K, V2>> pt2 = ...;

      // Create tuple tags for the value types in each collection.
      final TupleTag<V1> t1 = new TupleTag<V1>();
      final TupleTag<V2> t2 = new TupleTag<V2>();

      //Merge collection values into a CoGbkResult collection
      PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
        KeyedPCollectionTuple.of(t1, pt1)
                             .and(t2, pt2)
                             .apply(CoGroupByKey.<K>create());

      // Access results and do something.
      PCollection<T> finalResultCollection =
        coGbkResultCollection.apply(ParDo.of(
          new DoFn<KV<K, CoGbkResult>, T>() {
            @Override
            public void processElement(ProcessContext c) {
              KV<K, CoGbkResult> e = c.element();
              // Get all collection 1 values
              Iterable<V1> pt1Vals = e.getValue().getAll(t1);
              // Now get collection 2 values
              V2 pt2Val = e.getValue().getOnly(t2);
              ... Do Something ....
              c.output(...some T...);
            }
          }));
    

Utiliser CoGroupByKey avec des PCollections illimitées

Java

Lorsque vous utilisez CoGroupByKey pour regrouper des PCollection auxquels une stratégie de fenêtrage a été appliquée, tous les PCollection que vous souhaitez regrouper doivent utiliser la même stratégie de fenêtrage et la même taille de fenêtre. Par exemple, toutes les collections que vous fusionnez doivent utiliser (de manière hypothétique) des fenêtres fixes identiques de 5 minutes ou des fenêtres coulissantes de 4 minutes commençant toutes les 30 secondes.

Si votre pipeline tente d'utiliser CoGroupByKey pour fusionner PCollection avec des fenêtres incompatibles, Dataflow génère une erreur IllegalStateException lorsque votre pipeline est construit.

Jointures avec les entrées ParDo et Side

Au lieu d'utiliser CoGroupByKey, vous pouvez appliquer un ParDo avec une ou plusieurs entrées secondaires pour effectuer une jointure. Une entrée latérale est une entrée supplémentaire, autre que l'entrée principale, que vous pouvez fournir à votre PCollection. Votre DoFn peut accéder à l'entrée latérale chaque fois qu'il traite un élément dans le PCollection d'entrée.

Effectuer une jointure de cette manière peut s'avérer plus efficace que d'utiliser CoGroupByKey dans certains cas, tels que :

  • Les PCollection que vous rejoignez sont de taille disproportionnée et les plus petits PCollection peuvent entrer dans la mémoire.
  • Vous avez une grande table que vous souhaitez joindre plusieurs fois à différents endroits du pipeline. Au lieu d'effectuer plusieurs fois CoGroupByKey, vous pouvez créer une entrée latérale et la transmettre à plusieurs ParDo. Par exemple, vous souhaitez joindre deux tables pour en générer une troisième. Ensuite, vous souhaitez joindre la première table avec la troisième et ainsi de suite.

Pour effectuer une jointure de cette façon, appliquez un ParDo à l'un des PCollection et transmettez les autres PCollection en tant qu'entrées secondaires. L'exemple de code suivant montre comment effectuer une telle jointure.

Java

    // Each BigQuery table of key-value pairs is read into separate PCollections.
    PCollection<KV<K1, V1>> mainInput = ...
    PCollection<KV<K2, V2>> sideInput = ...

    // Create a view from your side input data
    final PCollectionView<Map<K2, V2>> view = sideInput.apply(View.<K2, V2>asMap());

    // Access side input and perform join logic
    PCollection<T> joinedData =
    mainInput.apply("SideInputJoin", ParDo.withSideInputs(view).of(new DoFn<KV<K1, V1>, T>() {
      @Override
      public void processElement(ProcessContext c) {
        K2 sideInputKey = ... transform c.element().getKey() into K2 ...
        V2 sideInputValue = c.sideInput(view).get(sideInputKey);
        V1 mainInputValue = c.element().getValue();
        ... Do Something ...
        c.output(...some T...);
      }
    }));