Tester votre pipeline

Le test de votre pipeline constitue une étape particulièrement importante dans le développement d'une solution de traitement de données efficace. Du fait de la nature indirecte du modèle de Cloud Dataflow, dans lequel votre code utilisateur construit un graphe de pipeline à exécuter à distance sur Google Cloud Platform, la tâche de débogage des exécutions en échec peut s'avérer complexe. Il est souvent plus simple et plus rapide d'effectuer des tests unitaires en local sur le code de votre pipeline que de déboguer l'exécution d'un pipeline à distance.

Pour identifier et corriger des bogues dans le code de votre pipeline, la méthode la plus efficace et la plus immédiate consiste souvent à effectuer des tests unitaires en local sur ce code avant d'exécuter le pipeline complet sur le service Cloud Dataflow. L'exécution en local des tests unitaires de votre pipeline vous permet également d'utiliser vos outils de débogage locaux habituels/préférés.

Les SDK Dataflow offrent plusieurs méthodes pour tester le code de votre pipeline à différents niveaux. Ainsi, du niveau le plus bas au plus élevé :

  • Vous pouvez tester individuellement les objets fonction, tels que DoFn, dans les transformations de base de votre pipeline.
  • Vous pouvez tester une transformation composite entière en tant qu'unité.
  • Vous pouvez effectuer un test de bout en bout pour un pipeline complet.

Java

Afin de prendre en charge les tests unitaires, le SDK Dataflow pour Java fournit plusieurs classes de test dans le package com.google.cloud.dataflow.sdk.testing. De plus, les transformations incluses dans ce SDK comportent des tests unitaires et les programmes d'exemple du SDK contiennent également des tests. Ces différents tests peuvent vous servir de référence et de guide.

Tester individuellement des objets DoFn

Le code des fonctions DoFn de votre pipeline s'exécute régulièrement, et souvent sur plusieurs instances Compute Engine. Vous pouvez donc vous épargner beaucoup de temps et de travail de débogage en effectuant des tests unitaires sur ces objets DoFn avant de les utiliser dans une exécution Dataflow.

Java

Le SDK Dataflow pour Java constitue un moyen pratique de tester un DoFn appelé DoFnTester, qui est inclus dans le package Transforms du SDK.

DoFnTester utilise le framework JUnit. Pour utiliser DoFnTester, procédez comme suit :

  1. Créez un objet DoFnTester. Vous devez transmettre une instance de DoFn que vous souhaitez tester à la méthode statique par défaut correspondant à DoFnTester.
  2. Créez une ou plusieurs entrées de test principales du type approprié pour votre DoFn. Si votre DoFn accepte des entrées secondaires et/ou produit des sorties secondaires, vous devez également créer les tags des entrées secondaires et des sorties secondaires.
  3. Appelez DoFnTester.processBatch pour traiter les entrées principales.
  4. Utilisez la méthode JUnit Assert.assertThat pour vous assurer que les résultats de test renvoyés par processBatch correspondent aux valeurs attendues.

Créer un atelier de test DoFnTester

Pour créer un DoFnTester, commencez par créer une instance du DoFn que vous souhaitez tester. Vous utilisez ensuite cette instance lorsque vous créez un DoFnTester à l'aide de la méthode statique par défaut .of() :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

Créer des entrées de test

Vous devez créer une ou plusieurs entrées de test que DoFnTester envoie à votre DoFn. Pour créer des entrées de test, il vous suffit de créer une ou plusieurs variables d'entrée du même type que les entrées acceptées par votre DoFn. Dans le cas ci-dessus :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";

Entrées et sorties latérales

Si votre DoFn accepte les entrées secondaires, créez ces entrées secondaires à l'aide de la méthode DoFnTester.setSideInputs.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  PCollectionView<List<Integer>> sideInput = ...;
  Iterable<Integer> value = ...;
  fnTester.setSideInputInGlobalWindow(sideInput, value);

Si votre DoFn produit des sorties secondaires, vous devez définir les objets TupleTag appropriés que vous utiliserez pour accéder à chaque sortie. Un DoFn avec des sorties secondaires produit un PCollectionTuple pour chaque sortie secondaire. Vous devez fournir un TupleTagList correspondant à chaque sortie secondaire dans ce tuple.

Supposons que votre DoFn génère des sorties secondaires de type String et Integer. Vous créez des objets TupleTag pour chacune d'entre elles, les associez à un TupleTagList, puis les définissez comme suit pour le DoFnTester :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  TupleTag<String> tag1 = ...;
  TupleTag<Integer> tag2 = ...;
  TupleTagList tags = TupleTagList.of(tag1).and(tag2);

  fnTester.setSideOutputTags(tags);

Pour en savoir plus, consultez la documentation de ParDo sur les entrées secondaires.

Traiter les entrées de test et vérifier les résultats

Pour traiter les entrées (et exécuter le test sur votre DoFn), appelez la méthode DoFnTester.processBatch. Lorsque vous appelez processBatch, vous transmettez une ou plusieurs valeurs d'entrée de test principales pour votre DoFn. Si vous définissez des entrées latérales, celles-ci sont disponibles pour chacun des lots d'entrées principales que vous fournissez.

DoFnTester.processBatch renvoie une liste (List) de sorties, c'est-à-dire des objets du même type que le type de sortie spécifié par DoFn. Pour un DoFn<String, Integer>, processBatch renvoie List<Integer> :

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

Pour vérifier les résultats de processBatch, vous devez utiliser la méthode JUnit Assert.assertThat pour vérifier si la sortie List contient les valeurs attendues :

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

  Assert.assertThat(testOutputs, Matchers.hasItems(...));

  // Process a larger batch in a single step.
  Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));

Tester des transformations composites

Pour tester une transformation composite que vous avez créée, vous pouvez procéder selon le schéma suivant :

  • Créez un objet TestPipeline.
  • Créez des données d'entrée de test statiques et connues.
  • Utilisez la transformation Create pour créer une PCollection de vos données d'entrée.
  • Appelez la méthode Apply pour appliquer votre transformation composite à la PCollection d'entrée et enregistrez la sortie PCollection résultante.
  • Utilisez DataflowAssert et ses sous-classes pour vérifier que la sortie PCollection contient les éléments attendus.

Java

Utiliser les classes de test du SDK

L'inclusion des classes TestPipeline et DataflowAssert dans le SDK Cloud Dataflow pour Java répond spécifiquement au besoin de tester les transformations. TestPipeline et DataflowAssert fonctionnent avec des tests configurés pour s'exécuter localement ou auprès du service Cloud Dataflow distant.

TestPipeline

Pour les tests, utilisez TestPipeline à la place de Pipeline lorsque vous créez l'objet de pipeline. Contrairement à Pipeline.create, TestPipeline.create gère le paramètre PipelineOptions en interne.

Créez un TestPipeline comme suit :

  Pipeline p = TestPipeline.create();

DataflowAssert

DataflowAssert est une assertion sur le contenu d'une PCollection. Vous pouvez utiliser DataflowAssert pour vérifier qu'une PCollection contient un ensemble spécifique d'éléments attendus.

Pour une PCollection donnée, utilisez DataflowAssert pour vérifier le contenu comme suit :

  PCollection<String> output = ...;

  // Check whether a PCollection contains some elements in any order.
  DataflowAssert.that(output)
    .containsInAnyOrder(
      "elem1",
      "elem3",
      "elem2");

Tout code utilisant DataflowAssert doit être lié à JUnit et Hamcrest. Si vous utilisez Maven, associez des liens dans Hamcrest en ajoutant la dépendance suivante au fichier pom.xml de votre projet :

  <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
  </dependency>

Pour plus d'informations sur le fonctionnement de ces classes, consultez la documentation du package com.google.cloud.dataflow.sdk.testing.

Utiliser la transformation Create

Appliquez la transformation Create pour créer une PCollection sortie d'une classe de collection standard en mémoire, telle que Java List. Pour plus d'informations, consultez la section Créer une PCollection.

Exemple de test d'une transformation composite

Java

Le code suivant présente un test complet pour une transformation composite. Le test applique la transformation Count à une entrée PCollection d'éléments String. Le test utilise la transformation Create pour créer l'entrée PCollection à partir d'un List<String> Java.

  @RunWith(JUnit4.class)
  public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
    "hi", "there", "hi", "hi", "sue", "bob",
    "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    @Test
    public void testCount() {
      // Create a test pipeline.
      Pipeline p = TestPipeline.create();

      // Create an input PCollection.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Apply the Count transform under test.
      PCollection<KV<String, Long>> output =
        input.apply(Count.<String>perElement());

      // Assert on the results.
      DataflowAssert.that(output)
        .containsInAnyOrder(
            KV.of("hi", 4L),
            KV.of("there", 1L),
            KV.of("sue", 2L),
            KV.of("bob", 2L),
            KV.of("", 3L),
            KV.of("ZOW", 1L));

      // Run the pipeline.
      p.run();
  }

Test d'un pipeline de bout en bout

Utilisez les classes de test dans les SDK Dataflow (tels que TestPipeline et DataflowAssert dans le SDK Dataflow pour Java) pour tester un pipeline entier de bout en bout. En règle générale, vous devez procéder comme suit pour tester un pipeline entier :

  • Pour chaque source de données d'entrée de votre pipeline, créez des données d'entrée de test statiques connues.
  • Créez des données de sortie de test statiques correspondant à ce que vous attendiez dans la ou les PCollection de sortie finales de votre pipeline.
  • Au lieu de créer un pipeline standard (Pipeline.create), créez un pipeline de test (TestPipeline).
  • À la place des transformations Read de votre pipeline, appliquez la transformation Create pour créer une ou plusieurs PCollection à partir de vos données d'entrée statiques.
  • Appliquez les transformations de votre pipeline.
  • À la place des transformations Write de votre pipeline, utilisez DataflowAssert pour vérifier que le contenu des PCollection finales de votre pipeline correspond aux valeurs attendues dans vos données de sortie statiques.

Tester le pipeline WordCount

Java

L'exemple de code suivant présente une méthode possible pour tester l'exemple de pipeline WordCount. WordCount lit généralement les lignes d'un fichier texte comme données d'entrée. Le test crée un List<String> Java contenant des lignes de texte et utilise une transformation Create pour créer une PCollection initiale.

La transformation finale de WordCount (issue de la transformation composite CountWords) génère un PCollection<String> de comptes de mots formatés adaptés à l'impression. Plutôt que d'écrire la PCollection dans un fichier texte de sortie, notre pipeline de test utilise DataflowAssert pour vérifier que les éléments de la PCollection correspondent à ceux d'un tableau String statique contenant les données de sortie attendues.

  @RunWith(JUnit4.class)
  public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.
    @Test
    @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
  }