Tester votre pipeline

Le test de votre pipeline constitue une étape particulièrement importante dans le développement d'une solution de traitement de données efficace. Du fait de la nature indirecte du modèle de Cloud Dataflow, dans lequel votre code utilisateur construit un graphe de pipeline à exécuter à distance sur Google Cloud Platform, la tâche de débogage des exécutions en échec peut s'avérer complexe. Il est souvent plus simple et plus rapide d'effectuer des tests unitaires en local sur le code de votre pipeline que de déboguer l'exécution d'un pipeline à distance.

Pour identifier et corriger des bogues dans le code de votre pipeline, la méthode la plus efficace et la plus immédiate consiste souvent à effectuer des tests unitaires en local sur ce code avant d'exécuter le pipeline complet sur le service Cloud Dataflow. L'exécution en local des tests unitaires de votre pipeline vous permet également d'utiliser vos outils de débogage locaux habituels/préférés.

Les SDK Dataflow offrent plusieurs méthodes pour tester le code de votre pipeline à différents niveaux. Ainsi, du niveau le plus bas au plus élevé :

  • Vous pouvez tester individuellement les objets fonction, tels que DoFn, dans les transformations de base de votre pipeline.
  • Vous pouvez tester une transformation composite entière en tant qu'unité.
  • Vous pouvez effectuer un test de bout en bout pour un pipeline complet.

Java

Afin de prendre en charge les tests unitaires, le SDK Dataflow pour Java fournit plusieurs classes de test dans le package com.google.cloud.dataflow.sdk.testing. De plus, les transformations incluses dans ce SDK comportent des tests unitaires et les programmes d'exemple du SDK contiennent également des tests. Ces différents tests peuvent vous servir de référence et de guide.

Tester individuellement des objets DoFn

Le code présent dans les fonctions DoFn de votre pipeline est fréquemment exécuté, et souvent sur plusieurs instances Compute Engine. Vous pouvez donc vous épargner beaucoup de temps et de travail de débogage en effectuant des tests unitaires sur ces objets DoFn avant de les utiliser dans le cadre des tests sur Dataflow.

Java

Le SDK Dataflow pour Java propose un moyen pratique de tester individuellement les objets DoFn. Il s'agit de l'atelier de test DoFnTester, qui est inclus dans le package Transforms du SDK.

L'atelier de test DoFnTester exploite le framework JUnit. Pour utiliser l'atelier de test DoFnTester, procédez comme suit :

  1. Créez un atelier de test DoFnTester. Vous devrez transmettre une instance de l'objet DoFn que vous souhaitez tester à la méthode par défaut statique correspondant à DoFnTester.
  2. Créez une ou plusieurs entrées de test principales du type approprié pour votre objet DoFn. Si votre objet DoFn accepte des entrées latérales et/ou produit des sorties latérales, vous devez également créer les entrées latérales et les balises de sortie latérale.
  3. Appelez DoFnTester.processBatch pour traiter les entrées principales.
  4. Utilisez la méthode Assert.assertThat de JUnit pour vous assurer que les résultats de test renvoyés par processBatch correspondent aux valeurs attendues.

Créer un atelier de test DoFnTester

Pour créer un atelier de test DoFnTester, vous devez commencer par créer une instance de l'objet DoFn à tester. Vous devez ensuite utiliser cette instance lorsque vous créez un atelier de test DoFnTester à l'aide de la méthode par défaut statique .of() :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

Créer des entrées de test

Vous devrez créer une ou plusieurs entrées de test, que l'atelier de test DoFnTester enverra à votre objet DoFn. Pour créer des entrées de test, créez simplement une ou plusieurs variables d'entrée du même type que les entrées acceptées par votre objet DoFn. Dans le cas ci-dessus :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";

Entrées et sorties latérales

Si votre objet DoFn accepte les entrées latérales, vous pouvez créer celles-ci à l'aide de la méthode DoFnTester.setSideInputs.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  PCollectionView<List<Integer>> sideInput = ...;
  Iterable<Integer> value = ...;
  fnTester.setSideInputInGlobalWindow(sideInput, value);

Si votre objet DoFn génère des sorties latérales, vous devez définir les objets TupleTag dont vous aurez besoin pour accéder à chaque sortie. Un objet DoFn générant des sorties latérales produit un tuple PCollectionTuple pour chacune de ces sorties latérales. Vous devez fournir une liste TupleTagList correspondant à chaque sortie latérale de ce tuple.

Supposons que votre objet DoFn produise des sorties latérales de type String et Integer. Vous devez alors créer un objet TupleTag pour chacune de ces sorties et regrouper ces objets dans une liste TupleTagList, puis définir cette liste pour l'atelier de test DoFnTester, comme suit :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  TupleTag<String> tag1 = ...;
  TupleTag<Integer> tag2 = ...;
  TupleTagList tags = TupleTagList.of(tag1).and(tag2);

  fnTester.setSideOutputTags(tags);

Pour plus d'informations, consultez la documentation de ParDo sur les entrées latérales.

Traiter les entrées de test et vérifier les résultats

Pour traiter les entrées (et exécuter ainsi le test sur votre objet DoFn), appelez la méthode DoFnTester.processBatch. Lors de l'appel de processBatch, vous transmettez une ou plusieurs des valeurs d'entrée de test principales pour votre objet DoFn. Si vous définissez des entrées latérales, celles-ci sont disponibles pour chacun des lots d'entrées principales que vous fournissez.

DoFnTester.processBatch renvoie une liste (List) de sorties, c'est-à-dire d'objets dont le type correspond au type de sortie spécifié pour l'objet DoFn. Pour un objet DoFn<String, Integer>, la méthode processBatch renvoie une liste List<Integer> :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

Pour vérifier les résultats de processBatch, utilisez la méthode Assert.assertThat de JUnit afin de vérifier que la liste (List) des sorties contient les valeurs que vous attendez :

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

  Assert.assertThat(testOutputs, Matchers.hasItems(...));

  // Process a larger batch in a single step.
  Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));

Tester des transformations composites

Pour tester une transformation composite que vous avez créée, vous pouvez procéder selon le schéma suivant :

  • Créez un objet TestPipeline.
  • Créez des données d'entrée de test statiques et connues.
  • Utilisez la transformation Create pour créer une collection PCollection contenant vos données d'entrée.
  • Appelez la méthode Apply en appliquant votre transformation composite à la collection PCollection d'entrée, et sauvegardez la collection PCollection de sortie résultante.
  • Utilisez DataflowAssert et ses sous-classes pour vérifier que la collection PCollection de sortie contient les éléments attendus.

Java

Utiliser les classes de test du SDK

L'inclusion des classes TestPipeline et DataflowAssert dans le SDK Cloud Dataflow pour Java répond spécifiquement au besoin de tester les transformations. TestPipeline et DataflowAssert fonctionnent avec des tests configurés pour s'exécuter en local ou auprès du service Cloud Dataflow distant.

TestPipeline

Pour les tests, utilisez TestPipeline à la place de Pipeline lorsque vous créez l'objet pipeline. Contrairement à Pipeline.create, TestPipeline.create gère la définition des options PipelineOptions en interne.

Pour créer un objet TestPipeline, utilisez la commande suivante :

  Pipeline p = TestPipeline.create();

DataflowAssert

DataflowAssert est une assertion sur le contenu d'une collection PCollection. Vous pouvez utiliser DataflowAssert pour vérifier qu'une collection PCollection contient un ensemble spécifique d'éléments attendus.

Voici comment vérifier le contenu d'une collection PCollection donnée à l'aide de DataflowAssert :

  PCollection<String> output = ...;

  // Check whether a PCollection contains some elements in any order.
  DataflowAssert.that(output)
    .containsInAnyOrder(
      "elem1",
      "elem3",
      "elem2");

Tout code utilisant DataflowAssert doit être lié à JUnit et à Hamcrest. Si vous utilisez Maven, vous pouvez créer le lien avec Hamcrest en ajoutant la dépendance suivante au fichier pom.xml de votre projet :

  <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
  </dependency>

Pour plus d'informations sur le fonctionnement de ces classes, consultez la documentation du package com.google.cloud.dataflow.sdk.testing.

Utiliser la transformation Create

Vous pouvez utiliser la transformation Create pour créer une collection PCollection à partir d'une classe de collection en mémoire standard, telle que Java List. Pour plus d'informations, consultez Créer une collection PCollection.

Exemple de test d'une transformation composite

Java

Le code suivant présente un test complet pour une transformation composite. Ce test applique la transformation Count à une collection PCollection d'entrée contenant des éléments String. Il utilise la transformation Create pour créer la collection PCollection d'entrée à partir d'une liste Java List<String>.

  @RunWith(JUnit4.class)
  public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
    "hi", "there", "hi", "hi", "sue", "bob",
    "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    @Test
    public void testCount() {
      // Create a test pipeline.
      Pipeline p = TestPipeline.create();

      // Create an input PCollection.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Apply the Count transform under test.
      PCollection<KV<String, Long>> output =
        input.apply(Count.<String>perElement());

      // Assert on the results.
      DataflowAssert.that(output)
        .containsInAnyOrder(
            KV.of("hi", 4L),
            KV.of("there", 1L),
            KV.of("sue", 2L),
            KV.of("bob", 2L),
            KV.of("", 3L),
            KV.of("ZOW", 1L));

      // Run the pipeline.
      p.run();
  }

Test d'un pipeline de bout en bout

Les classes de test des SDK Dataflow (telles que les classes TestPipeline et DataflowAssert du SDK Dataflow pour Java) vous permettent de tester un pipeline complet de bout en bout. En règle générale, vous devez procéder comme suit pour tester un pipeline entier :

  • Pour chaque source de données d'entrée de votre pipeline, créez des données d'entrée de test statiques connues.
  • Créez des données de sortie de test statiques qui correspondent à ce que vous attendez dans la ou les collections PCollection de sortie finales de votre pipeline.
  • Au lieu de créer un pipeline standard (Pipeline.create), créez un pipeline de test (TestPipeline).
  • À la place des transformations Read de votre pipeline, utilisez la transformation Create pour créer une ou plusieurs collections PCollection à partir de vos données d'entrée statiques.
  • Appliquez les transformations de votre pipeline.
  • Au lieu des transformations Write de votre pipeline, utilisez DataflowAssert pour vérifier que le contenu des collections PCollection finales produites par ce pipeline correspond aux valeurs attendues dans vos données de sortie statiques.

Tester le pipeline WordCount

Java

L'exemple de code suivant présente une méthode possible pour tester l'exemple de pipeline WordCount. Alors que WordCount obtient généralement ses données d'entrée en lisant les lignes d'un fichier texte, le test crée une liste Java List<String> contenant des lignes de texte et utilise une transformation Create pour créer une collection PCollection initiale.

La transformation finale de WordCount (incluse dans la transformation composite CountWords) produit une collection PCollection<String> de comptes de mots formatés adaptés à l'impression. Plutôt que d'écrire cette collection PCollection dans un fichier texte de sortie, notre pipeline de test utilise DataflowAssert pour vérifier que les éléments de la collection PCollection correspondent à ceux d'un tableau String statique contenant les données de sortie attendues.

  @RunWith(JUnit4.class)
  public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.
    @Test
    @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
  }
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.