Prueba tu canalización

La prueba de tu canalización es un paso importante en el desarrollo de una solución de procesamiento de datos efectiva. La naturaleza indirecta del modelo de Cloud Dataflow, en el que tu código de usuario construye un grafo de canalización para que se ejecute de forma remota en Google Cloud Platform, puede hacer que la depuración con errores ejecute una tarea no trivial. A menudo, es más rápido y más simple realizar pruebas de unidades locales en tu código de canalización que depurar una ejecución remota de la canalización.

Realizar prueba de unidades de tu código de canalización de forma local, antes de realizar ejecuciones completas con el servicio de Cloud Dataflow, por lo general, es la mejor forma y la más directa de identificar y solucionar errores en tu código de canalización. La prueba de unidades de tu canalización de forma local también te permite usar tus herramientas favoritas o familiares de depuración local.

Los SDK de Dataflow proporcionan varias formas de probar unidades de tu código de canalización, desde los niveles más bajos hasta los más altos. Se describen a continuación los niveles, desde el más bajo hasta el más alto:

  • Puedes probar los objetos de funciones individuales, como los DoFn, dentro de las transformaciones centrales de tu canalización.
  • Puedes realizar una prueba completa de una Transformación compuesta como una unidad.
  • Puedes realizar una prueba de extremo a extremo para toda una canalización.

Java

A fin de ser compatible con la prueba de unidades, el SDK de Dataflow para Java proporciona un número de clases de prueba en el paquete com.google.cloud.dataflow.sdk.testing. Además, las transformaciones incluidas en el SDK tienen pruebas de unidades y los programas de ejemplo en el SDK también contienen pruebas. Puedes usar estas pruebas como referencias y guías.

Prueba objetos DoFn individuales

El código en tus funciones DoFn de canalización se ejecuta con frecuencia y muchas veces en varias instancias de Compute Engine. Realizar una prueba de unidades en tus objetos DoFn antes de usarlos en una ejecución de Dataflow puede ahorrar una gran cantidad de tiempo y energía de depuración.

Java

El SDK de Dataflow para Java proporciona una forma conveniente de probar un DoFn individual llamado DoFnTester, que se incluye en el paquete Transforms del SDK.

DoFnTester usa el marco de trabajo JUnit. Si quieres usar DoFnTester, tendrás que seguir los pasos siguientes:

  1. Crea un DoFnTester. Tendrás que pasar una instancia de DoFn que quieres probar al método de fábrica estático para DoFnTester.
  2. Crea una o más entradas de prueba principales del tipo adecuado para tu DoFn. Si tu DoFn toma entradas laterales o produce salidas laterales, también deberías crear las entradas laterales y las etiquetas de salidas laterales.
  3. Llama a DoFnTester.processBatch para procesar las entradas principales.
  4. Usa el método Assert.assertThat de JUnit con el fin de asegurarte de que los resultados de la prueba que se muestran desde processBatch coinciden con los valores esperados.

Crea un DoFnTester

A modo de crear un DoFnTester, primero debes crear una instancia del DoFn que quieres probar. Luego, usa esa instancia cuando crees un DoFnTester con el método de fábrica estático .of():

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;

  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

Crea entradas de prueba

Tienes que crear una o más entradas de prueba para DoFnTester a fin de enviar tu DoFn. Si quieres crear entradas de prueba, solo crea una o más variables de entrada del mismo tipo de entrada que acepta tu DoFn. En el caso anterior, sucede lo siguiente:

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";

Entradas y salidas laterales

Si tu DoFn acepta entradas laterales, puedes crear esas entradas laterales con el método DoFnTester.setSideInputs.

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  PCollectionView<List<Integer>> sideInput = ...;
  Iterable<Integer> value = ...;
  fnTester.setSideInputInGlobalWindow(sideInput, value);

Si tu DoFn produce salidas laterales, tendrás que establecer los objetos TupleTag adecuados que usarás para acceder a cada salida. Un DoFn con salidas laterales produce un PCollectionTuple por cada salida lateral; tendrás que proporcionar un TupleTagList correspondiente a cada salida lateral en esa tupla.

Supongamos que tu DoFn produce salidas laterales de tipo String y tipo Integer. Crea objetos TupleTag por cada una y agrúpalas en un TupleTagList. Luego, establécelo para el DoFnTester de la forma siguiente:

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  TupleTag<String> tag1 = ...;
  TupleTag<Integer> tag2 = ...;
  TupleTagList tags = TupleTagList.of(tag1).and(tag2);

  fnTester.setSideOutputTags(tags);

Consulta la documentación de ParDo sobre entradas laterales para obtener más información.

Procesa las entradas de prueba y revisa los resultados

A fin de procesar las entradas (y así ejecutar la prueba en tu DoFn), llama al método DoFnTester.processBatch. Cuando llamas a processBatch, debes pasar uno o más valores de entrada de prueba principal para tu DoFn. Si configuras entradas laterales, estas estarán disponibles para cada lote de entradas principales que proporciones.

DoFnTester.processBatch muestra una List de salidas, es decir, objetos del mismo tipo que el tipo de salida especificada de DoFn. Para un DoFn<String, Integer>, processBatch muestra un List<Integer>:

  static class MyDoFn extends DoFn<String, Integer> { ... }
  MyDoFn myDoFn = ...;
  DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn);

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

Si quieres verificar los resultados de processBatch, usa el método Assert.assertThat de JUnit para probar si la List de salidas contiene los valores que esperabas:

  String testInput = "test1";
  List<Integer> testOutputs = fnTester.processBatch(testInput);

  Assert.assertThat(testOutputs, Matchers.hasItems(...));

  // Process a larger batch in a single step.
  Assert.assertThat(fnTester.processBatch("input1", "input2", "input3"), Matchers.hasItems(...));

Prueba transformaciones compuestas

A fin de probar una transformación compuesta que creaste, puedes usar el patrón siguiente:

  • Crea una TestPipeline.
  • Crea algún dato entrada de prueba conocida y estática.
  • Usa la transformación Create para crear una PCollection de tus datos de entrada.
  • Apply tu transformación compuesta a la entrada PCollection y guarda el resultado PCollection.
  • Usa DataflowAssert y sus subclases para verificar que el resultado PCollection contenga los elementos que esperas.

Java

Usa el SDK para clases de prueba

TestPipeline y DataflowAssert son clases incluidas en el SDK de Cloud Dataflow para Java a fin de probar de forma específica las transformaciones de prueba. TestPipeline y DataflowAssert trabajan con pruebas configuradas para ejecutarse de forma local o en el servicio de Cloud Dataflow remoto.

TestPipeline

Para las pruebas, usa TestPipeline en lugar de Pipeline cuando creas el objeto de canalización. A diferencia de Pipeline.create, TestPipeline.create controla las opciones de configuración de PipelineOptions de forma interna.

Debes crear una TestPipeline de la manera siguiente:

  Pipeline p = TestPipeline.create();

DataflowAssert

DataflowAssert es una afirmación sobre el contenido de una PCollection. Puedes usar DataflowAssert para verificar que una PCollection contenga un conjunto específico de elementos esperados.

Para una PCollection dada, puedes usar DataflowAssert con el fin de verificar los contenidos de la manera siguiente:

  PCollection<String> output = ...;

  // Check whether a PCollection contains some elements in any order.
  DataflowAssert.that(output)
    .containsInAnyOrder(
      "elem1",
      "elem3",
      "elem2");

Cualquier código que use DataflowAssert debe vincularse con JUnit y Hamcrest. Si usas Maven, puedes vincular con Hamcrest si agregas la dependencia a continuación al archivo pom.xml de tu proyecto:

  <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-all</artifactId>
    <version>1.3</version>
    <scope>test</scope>
  </dependency>

Para obtener más información sobre cómo funcionan estas clases, consulta la documentación del paquete com.google.cloud.dataflow.sdk.testing.

Usa la transformación Create

Puedes usar la transformación Create a fin de crear una PCollection a partir de una clase de colección en memoria estándar, como Java List. Consulta Crea una PCollection para obtener más información.

Una prueba de ejemplo de una transformación compuesta

Java

En el código a continuación, se muestra una prueba completa de una transformación compuesta. La prueba aplica la transformación Count a una entrada PCollection de elementos String. La prueba usa la transformación Create para crear la entrada PCollection desde una List<String> de Java.

  @RunWith(JUnit4.class)
  public class CountTest {

  // Our static input data, which will make up the initial PCollection.
  static final String[] WORDS_ARRAY = new String[] {
    "hi", "there", "hi", "hi", "sue", "bob",
    "hi", "sue", "", "", "ZOW", "bob", ""};

  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    @Test
    public void testCount() {
      // Create a test pipeline.
      Pipeline p = TestPipeline.create();

      // Create an input PCollection.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Apply the Count transform under test.
      PCollection<KV<String, Long>> output =
        input.apply(Count.<String>perElement());

      // Assert on the results.
      DataflowAssert.that(output)
        .containsInAnyOrder(
            KV.of("hi", 4L),
            KV.of("there", 1L),
            KV.of("sue", 2L),
            KV.of("bob", 2L),
            KV.of("", 3L),
            KV.of("ZOW", 1L));

      // Run the pipeline.
      p.run();
  }

Prueba una canalización de extremo a extremo

Puedes usar las clases de prueba en los SDK de Dataflow (como TestPipeline y DataflowAssert en el SDK de Dataflow para Java) a modo de probar una canalización entera de extremo a extremo. Por lo general, para probar una canalización completa, tienes que seguir estos pasos:

  • Por cada fuente de entrada de datos a tu canalización, crea algún dato de entrada de prueba conocida y estática.
  • Crea algún dato de salida de prueba estática que coincida con lo que esperas en las PCollection de salida final de tu canalización.
  • Crea una TestPipeline en lugar de la Pipeline.create estándar.
  • En lugar de las transformaciones Read de tu canalización, usa la transformación Create para crear una o más PCollections desde tus datos de entrada estática.
  • Aplica las transformaciones de tu canalización.
  • En lugar de las transformaciones Write de tu canalización, usa DataflowAssert para verificar que los contenidos de las PCollections finales que produce tu canalización coincidan con los valores esperados en tu dato de salida estática.

Prueba la canalización de WordCount

Java

En el código de ejemplo siguiente, se muestra cómo se podría probar la canalización de ejemplo de WordCount. WordCount, por lo general, lee las líneas desde una archivo de texto en busca de datos de entrada; en cambio, la prueba crea una List<String> de Java que contiene algunas líneas de texto y usa una transformación Create para crear una PCollection inicial.

La transformación final de WordCount (desde la transformación compuesta CountWords) produce una PCollection<String> de recuentos de palabras con formato adecuado para imprimir. En vez de escribir esa PCollection en un archivo de texto de salida, nuestra canalización de prueba usa DataflowAssert para verificar que los elementos de la PCollection coincidan con los de un arreglo de String estático que contiene nuestros datos de salida esperados.

  @RunWith(JUnit4.class)
  public class WordCountTest {

    // Our static input data, which will comprise the initial PCollection.
    static final String[] WORDS_ARRAY = new String[] {
      "hi there", "hi", "hi sue bob",
      "hi sue", "", "bob hi"};

    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);

    // Our static output data, which is the expected data that the final PCollection must match.
    static final String[] COUNTS_ARRAY = new String[] {
        "hi: 5", "there: 1", "sue: 2", "bob: 2"};

    // Example test that tests the pipeline's transforms.
    @Test
    @Category(com.google.cloud.dataflow.sdk.testing.RunnableOnService.class)
    public void testCountWords() throws Exception {
      Pipeline p = TestPipeline.create();

      // Create a PCollection from the WORDS static input data.
      PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());

      // Run ALL the pipeline's transforms (in this case, the CountWords composite transform).
      PCollection<String> output = input.apply(new CountWords());

      // Assert that the output PCollection matches the COUNTS_ARRAY known static output data.
      DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY);

      // Run the pipeline.
      p.run();
    }
  }
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.