Cuando seleccionas un trabajo específico de Dataflow, la interfaz de supervisión proporciona una representación gráfica de tu canalización: el gráfico de trabajo. La página del gráfico de trabajo en la consola también proporciona un resumen del trabajo, un registro de trabajo y la información sobre cada paso de la canalización.
El gráfico de trabajo de una canalización representa cada una de sus transformaciones en la canalización como un cuadro. Cada cuadro contiene el nombre de la transformación y la información sobre el estado del trabajo, que incluye lo siguiente:
- En ejecución: el paso está en ejecución.
- En cola: el paso en un trabajo de FlexRS está en cola.
- Finalizado de manera correcta: el paso finalizó con éxito.
- Detenido: el paso se detuvo porque el trabajo se detuvo.
- Desconocido: no se pudo informar el estado del paso.
- Con errores: no se pudo completar el paso.
De forma predeterminada, la página del gráfico del trabajo muestra la Vista de gráfico. Para ver el gráfico del trabajo como una tabla, en Vista de los pasos del trabajo, selecciona Vista de tabla. La vista de tabla contiene la misma información en un formato diferente. La vista de tabla es útil en las siguientes situaciones:
- Tu trabajo tiene muchas etapas, lo que dificulta navegar por el gráfico del trabajo.
- Quieres ordenar los pasos del trabajo por una propiedad específica. Por ejemplo, puedes ordenar la tabla por tiempo para identificar los pasos lentos.
Gráfico de trabajo básico
Código de canalización:Java// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Go// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
Gráfico del trabajo:
|
Transformaciones compuestas
En el gráfico de trabajo, las transformaciones compuestas contienen varias subtransformaciones anidadas y son expandibles. Las transformaciones compuestas expandibles se marcan con una flecha en el grafo. Haz clic en la fecha para expandir la transformación y visualizar las subtransformaciones en su interior.
Código de canalización:Java// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Go// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
Gráfico del trabajo:
|
En tu código de canalización, es posible que hayas invocado la transformación compuesta de la siguiente manera:
result = transform.apply(input);
Las transformaciones compuestas invocadas de esta manera omiten el anidamiento esperado y, por lo tanto, pueden aparecer expandidas en la interfaz de supervisión de Dataflow. Tu canalización también puede generar advertencias o errores sobre nombres de usuario únicos en el tiempo de ejecución de la canalización.
Para evitar estos problemas, asegúrate de invocar tus transformaciones mediante el formato recomendado:
result = input.apply(transform);
Transforma nombres
Dataflow tiene varias maneras de obtener el nombre de la transformación que se muestra en el gráfico de trabajo de supervisión.
Java
- Dataflow puede usar un nombre que asignas cuando aplicas tu transformación. El primer argumento que proporcionas al método
apply
es el nombre de tu transformación. - Dataflow puede inferir el nombre de la transformación, ya sea a partir del nombre de la clase (si creaste una transformación personalizada) o del nombre de tu objeto de función
DoFn
(si usas una transformación central comoParDo
).
Python
- Dataflow puede usar un nombre que asignas cuando aplicas tu transformación. Para establecer el nombre de transformación, debes especificar el argumento
label
. - Dataflow puede inferir el nombre de la transformación, ya sea a partir del nombre de la clase (si creaste una transformación personalizada) o del nombre de tu objeto de función
DoFn
(si usas una transformación central comoParDo
).
Go
- Dataflow puede usar un nombre que asignas cuando aplicas tu transformación. Para establecer el nombre de transformación, debes especificar el
Scope
. - Dataflow puede inferir el nombre de la transformación, ya sea a partir del nombre de la estructura si usas una
DoFn
estructural o del nombre de la función si usas una función funcionalDoFn
.
Comprende las métricas
En esta sección, se proporcionan detalles sobre las métricas asociadas con el gráfico de trabajo.
Tiempo
Cuando haces clic en un paso, la métrica Tiempo se muestra en el panel Información del paso. Esta métrica indica el tiempo aproximado total empleado en todos los subprocesos en todos los trabajadores para las siguientes acciones:
- Inicializando el paso
- Procesamiento de datos
- Redistribución de datos
- Finalización del paso
En el caso de los pasos compuestos, el tiempo indica la suma de tiempo empleado en los pasos que los integran. Esta estimación puede ayudarte a identificar pasos lentos y a diagnosticar a qué parte de tu canalización le lleva más tiempo del que debería.
Métricas de entradas complementarias
En las Métricas de entradas complementarias, se muestra la forma en que tus patrones y algoritmos de acceso de entradas complementarias afectan el rendimiento de tu canalización. Cuando tu canalización usa una entrada complementaria, Dataflow escribe la colección en una capa persistente, como un disco, y tus transformaciones leen de esa colección persistente. Estas operaciones de lectura y escritura afectan el tiempo de ejecución de tu trabajo.
La interfaz de supervisión de Dataflow muestra las métricas de entradas complementarias cuando seleccionas una transformación que crea o consume una colección de entrada complementaria. Puedes ver las métricas en la sección Métricas de entradas complementarias del panel Información de pasos.
Transformaciones que crean una entrada complementaria
Si la transformación seleccionada crea una colección de entrada complementaria, la sección Métricas de entradas complementarias muestra el nombre de la colección, junto con las métricas siguientes:
- Time spent writing: (Tiempo dedicado a la escritura) el tiempo de ejecución empleado para escribir la colección de entradas complementarias.
- Bytes written: (Bytes escritos) la cantidad total de bytes escritos en la colección de entradas complementarias.
- Tiempo y bytes de la lectura de entradas complementarias: una tabla que contiene métricas adicionales de todas las transformaciones que consumen la colección de entradas complementarias llamada consumidores de entradas complementarias.
La tabla de Tiempo y bytes leídos de las entradas complementarias contiene la información siguiente para cada consumidor de entrada complementaria:
- Side input consumer: (Consumidor de entradas complementarias): nombre de la transformación del consumidor de entradas complementarias
- Tiempo dedicado a la escritura: tiempo que este consumidor emplea en leer la colección de entradas complementarias.
- Bytes read: (Bytes leídos) cantidad de bytes que este consumidor lee de la colección de entradas complementarias.
Si tu canalización tiene una transformación compuesta que crea una entrada complementaria, expande la transformación compuesta hasta que veas la subtransformación específica que crea la entrada complementaria. Luego, selecciona esa subtransformación para ver la sección Métricas de entradas complementarias.
En la figura 4, se muestran las métricas de entradas complementarias de una transformación que crea una colección de entradas complementarias.
Transformaciones que consumen una o más entradas complementarias
Si la transformación seleccionada consume una o más entradas complementarias, la sección de las Métricas de entradas complementarias muestra la tabla Tiempo y bytes leídos de las entradas complementarias. Esta tabla contiene la información siguiente para cada colección de entrada complementaria:
- Colección de entrada complementaria: nombre de la colección de la entrada complementaria.
- Tiempo dedicado a la escritura: tiempo que la transformación emplea en leer la colección de entradas complementarias.
- Bytes leídos: cantidad de bytes que la transformación lee de la colección de entradas complementarias.
Si tu canalización tiene una transformación compuesta que lee una entrada complementaria, expande la transformación compuesta hasta que veas la subtransformación específica lee la entrada complementaria. Luego, selecciona esa subtransformación para ver la sección Métricas de entradas complementarias.
En la figura 5, se muestran las métricas de entradas complementarias de una transformación que lee de una colección de entradas complementarias.
Identifica los problemas de rendimiento de las entradas complementarias
Reiteración es un problema de rendimiento de entrada complementaria común. Si tu entrada complementaria PCollection
es demasiado grande, los trabajadores no pueden almacenar en caché toda la colección en la memoria.
En consecuencia, los trabajadores deben leer de la colección de entradas complementarias persistente varias veces.
En la figura 6, las métricas de entradas complementarias muestran que el total de bytes leídos desde la colección de entradas complementarias es mucho mayor que el tamaño de la colección (el total de bytes escritos).
Si deseas mejorar el rendimiento de esta canalización, vuelve a diseñar tu algoritmo para evitar la iteración o la recuperación de los datos de la entrada complementaria. En el ejemplo siguiente, la canalización crea el producto cartesiano de dos colecciones. El algoritmo itera a través de toda la colección de entrada complementaria para cada elemento de la colección principal. Puedes mejorar el patrón de acceso de la canalización si agrupas en lotes varios elementos de la colección principal. Este cambio reduce la cantidad de veces que los trabajadores deben volver a leer la colección de entrada complementaria.
Se puede generar otro problema de rendimiento común si tu canalización realiza una unión mediante la aplicación de un ParDo
con una o más entradas complementarias grandes. En este caso, los trabajadores dedican un alto porcentaje del tiempo de procesamiento a la operación de unión a leer las colecciones de entradas complementarias.
En la figura 7, se muestra un ejemplo de métricas de entradas complementarias cuando ocurre este problema:
Si quieres mejorar el rendimiento de esta canalización, usa CoGroupByKey, en lugar de las entradas complementarias.