Gráfico de trabajo de Dataflow

Cuando seleccionas un trabajo específico de Dataflow, la interfaz de supervisión proporciona una representación gráfica de tu canalización: el gráfico de trabajo. La página del gráfico de trabajo en la consola también proporciona un resumen del trabajo, un registro de trabajo y la información sobre cada paso de la canalización.

El gráfico de trabajo de una canalización representa cada una de sus transformaciones en la canalización como un cuadro. Cada cuadro contiene el nombre de la transformación y la información sobre el estado del trabajo, que incluye lo siguiente:

  • En ejecución: el paso está en ejecución.
  • En cola: el paso en un trabajo de FlexRS está en cola.
  • Finalizado de manera correcta: el paso finalizó con éxito.
  • Detenido: el paso se detuvo porque el trabajo se detuvo.
  • Desconocido: no se pudo informar el estado del paso.
  • Con errores: no se pudo completar el paso.

De forma predeterminada, la página del gráfico del trabajo muestra la Vista de gráfico. Para ver el gráfico del trabajo como una tabla, en Vista de los pasos del trabajo, selecciona Vista de tabla. La vista de tabla contiene la misma información en un formato diferente. La vista de tabla es útil en las siguientes situaciones:

  • Tu trabajo tiene muchas etapas, lo que dificulta navegar por el gráfico del trabajo.
  • Quieres ordenar los pasos del trabajo por una propiedad específica. Por ejemplo, puedes ordenar la tabla por tiempo para identificar los pasos lentos.

Gráfico de trabajo básico

Código de canalización:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)
Gráfico del trabajo:

El gráfico de trabajo de una canalización de WordCount como se muestra en la interfaz de supervisión de Dataflow.

Figura 1: Código de canalización para una canalización WordCount que se muestra con el gráfico de ejecución resultante en la interfaz de supervisión Dataflow.

Transformaciones compuestas

En el gráfico de trabajo, las transformaciones compuestas contienen varias subtransformaciones anidadas y son expandibles. Las transformaciones compuestas expandibles se marcan con una flecha en el grafo. Haz clic en la fecha para expandir la transformación y visualizar las subtransformaciones en su interior.

Código de canalización:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Gráfico del trabajo:

El gráfico de trabajo de una canalización de WordCount con la transformación CountWords expandida a fin de mostrar las transformaciones que la componen.

Figura 2: Código de canalización de los pasos secundarios de la transformación CountWords mostrado con el gráfico de trabajo expandido de toda la canalización.

En tu código de canalización, es posible que hayas invocado la transformación compuesta de la siguiente manera:

result = transform.apply(input);

Las transformaciones compuestas invocadas de esta manera omiten el anidamiento esperado y, por lo tanto, pueden aparecer expandidas en la interfaz de supervisión de Dataflow. Tu canalización también puede generar advertencias o errores sobre nombres de usuario únicos en el tiempo de ejecución de la canalización.

Para evitar estos problemas, asegúrate de invocar tus transformaciones mediante el formato recomendado:

result = input.apply(transform);

Transforma nombres

Dataflow tiene varias maneras de obtener el nombre de la transformación que se muestra en el gráfico de trabajo de supervisión.

Java

  • Dataflow puede usar un nombre que asignas cuando aplicas tu transformación. El primer argumento que proporcionas al método apply es el nombre de tu transformación.
  • Dataflow puede inferir el nombre de la transformación, ya sea a partir del nombre de la clase (si creaste una transformación personalizada) o del nombre de tu objeto de función DoFn (si usas una transformación central como ParDo).

Python

  • Dataflow puede usar un nombre que asignas cuando aplicas tu transformación. Para establecer el nombre de transformación, debes especificar el argumento label.
  • Dataflow puede inferir el nombre de la transformación, ya sea a partir del nombre de la clase (si creaste una transformación personalizada) o del nombre de tu objeto de función DoFn (si usas una transformación central como ParDo).

Go

  • Dataflow puede usar un nombre que asignas cuando aplicas tu transformación. Para establecer el nombre de transformación, debes especificar el Scope.
  • Dataflow puede inferir el nombre de la transformación, ya sea a partir del nombre de la estructura si usas una DoFn estructural o del nombre de la función si usas una función funcional DoFn.

Comprende las métricas

En esta sección, se proporcionan detalles sobre las métricas asociadas con el gráfico de trabajo.

Tiempo

Cuando haces clic en un paso, la métrica Tiempo se muestra en el panel Información del paso. Esta métrica indica el tiempo aproximado total empleado en todos los subprocesos en todos los trabajadores para las siguientes acciones:

  • Inicializando el paso
  • Procesamiento de datos
  • Redistribución de datos
  • Finalización del paso

En el caso de los pasos compuestos, el tiempo indica la suma de tiempo empleado en los pasos que los integran. Esta estimación puede ayudarte a identificar pasos lentos y a diagnosticar a qué parte de tu canalización le lleva más tiempo del que debería.

Puedes ver el tiempo que le toma a un paso ejecutarse en tu canalización.
Figura 3: La métrica Tiempo puede ayudarte a garantizar que tu canalización se ejecute de manera eficaz.

Métricas de entradas complementarias

En las Métricas de entradas complementarias, se muestra la forma en que tus patrones y algoritmos de acceso de entradas complementarias afectan el rendimiento de tu canalización. Cuando tu canalización usa una entrada complementaria, Dataflow escribe la colección en una capa persistente, como un disco, y tus transformaciones leen de esa colección persistente. Estas operaciones de lectura y escritura afectan el tiempo de ejecución de tu trabajo.

La interfaz de supervisión de Dataflow muestra las métricas de entradas complementarias cuando seleccionas una transformación que crea o consume una colección de entrada complementaria. Puedes ver las métricas en la sección Métricas de entradas complementarias del panel Información de pasos.

Transformaciones que crean una entrada complementaria

Si la transformación seleccionada crea una colección de entrada complementaria, la sección Métricas de entradas complementarias muestra el nombre de la colección, junto con las métricas siguientes:

  • Time spent writing: (Tiempo dedicado a la escritura) el tiempo de ejecución empleado para escribir la colección de entradas complementarias.
  • Bytes written: (Bytes escritos) la cantidad total de bytes escritos en la colección de entradas complementarias.
  • Tiempo y bytes de la lectura de entradas complementarias: una tabla que contiene métricas adicionales de todas las transformaciones que consumen la colección de entradas complementarias llamada consumidores de entradas complementarias.

La tabla de Tiempo y bytes leídos de las entradas complementarias contiene la información siguiente para cada consumidor de entrada complementaria:

  • Side input consumer: (Consumidor de entradas complementarias): nombre de la transformación del consumidor de entradas complementarias
  • Tiempo dedicado a la escritura: tiempo que este consumidor emplea en leer la colección de entradas complementarias.
  • Bytes read: (Bytes leídos) cantidad de bytes que este consumidor lee de la colección de entradas complementarias.

Si tu canalización tiene una transformación compuesta que crea una entrada complementaria, expande la transformación compuesta hasta que veas la subtransformación específica que crea la entrada complementaria. Luego, selecciona esa subtransformación para ver la sección Métricas de entradas complementarias.

En la figura 4, se muestran las métricas de entradas complementarias de una transformación que crea una colección de entradas complementarias.

Puedes seleccionar que la transformación y sus métricas de entradas complementarias sean visibles en el panel lateral de información del paso.
Figura 4: El gráfico de trabajo tiene una transformación compuesta expandida (MakeMapView). Se selecciona la subtransformación que crea la entrada complementaria (CreateDataflowView) y las métricas de entradas complementarias son visibles en el panel lateral Información del paso.

Transformaciones que consumen una o más entradas complementarias

Si la transformación seleccionada consume una o más entradas complementarias, la sección de las Métricas de entradas complementarias muestra la tabla Tiempo y bytes leídos de las entradas complementarias. Esta tabla contiene la información siguiente para cada colección de entrada complementaria:

  • Colección de entrada complementaria: nombre de la colección de la entrada complementaria.
  • Tiempo dedicado a la escritura: tiempo que la transformación emplea en leer la colección de entradas complementarias.
  • Bytes leídos: cantidad de bytes que la transformación lee de la colección de entradas complementarias.

Si tu canalización tiene una transformación compuesta que lee una entrada complementaria, expande la transformación compuesta hasta que veas la subtransformación específica lee la entrada complementaria. Luego, selecciona esa subtransformación para ver la sección Métricas de entradas complementarias.

En la figura 5, se muestran las métricas de entradas complementarias de una transformación que lee de una colección de entradas complementarias.

Puedes seleccionar que la transformación y sus métricas de entradas complementarias sean visibles en el panel lateral de información del paso.
Figura 5: La transformación JoinBothCollections lee de una colección de entradas complementarias. JoinBothCollections está seleccionado en el gráfico de trabajo, y se observan las métricas de entradas complementarias en el panel lateral Información del paso.

Identifica los problemas de rendimiento de las entradas complementarias

Reiteración es un problema de rendimiento de entrada complementaria común. Si tu entrada complementaria PCollection es demasiado grande, los trabajadores no pueden almacenar en caché toda la colección en la memoria. En consecuencia, los trabajadores deben leer de la colección de entradas complementarias persistente varias veces.

En la figura 6, las métricas de entradas complementarias muestran que el total de bytes leídos desde la colección de entradas complementarias es mucho mayor que el tamaño de la colección (el total de bytes escritos).

Puedes seleccionar que la transformación y sus métricas de entradas complementarias sean visibles en el panel lateral de información del paso.
Figura 6: Ejemplo de reiteración. La colección de entradas complementarias es de 563 MB y la suma de bytes leídos cuando se consumen las transformaciones es de casi 12 GB.

Si deseas mejorar el rendimiento de esta canalización, vuelve a diseñar tu algoritmo para evitar la iteración o la recuperación de los datos de la entrada complementaria. En el ejemplo siguiente, la canalización crea el producto cartesiano de dos colecciones. El algoritmo itera a través de toda la colección de entrada complementaria para cada elemento de la colección principal. Puedes mejorar el patrón de acceso de la canalización si agrupas en lotes varios elementos de la colección principal. Este cambio reduce la cantidad de veces que los trabajadores deben volver a leer la colección de entrada complementaria.

Se puede generar otro problema de rendimiento común si tu canalización realiza una unión mediante la aplicación de un ParDo con una o más entradas complementarias grandes. En este caso, los trabajadores dedican un alto porcentaje del tiempo de procesamiento a la operación de unión a leer las colecciones de entradas complementarias.

En la figura 7, se muestra un ejemplo de métricas de entradas complementarias cuando ocurre este problema:

Puedes seleccionar que la transformación y sus métricas de entradas complementarias sean visibles en el panel lateral de información del paso.
Figura 7: La transformación JoinBothCollections tiene un tiempo de ejecución total de 18 min y 31 s. Los trabajadores dedican la mayor parte del tiempo de ejecución (10 min y 3 s) a leer de la colección de entradas complementarias de 10 GB.

Si quieres mejorar el rendimiento de esta canalización, usa CoGroupByKey, en lugar de las entradas complementarias.