Prácticas recomendadas para flujos de trabajo altamente paralelos

En esta página, se proporciona orientación sobre las prácticas recomendadas para compilar y ejecutar flujos de trabajo de HPC muy paralelos de Dataflow, incluido cómo usar el código externo en las canalizaciones, cómo ejecutar la canalización y cómo administrar el manejo de errores.

Incluye el código externo en tu canalización

Un diferenciador clave para las canalizaciones altamente paralelas es que usan el código C ++ dentro de DoFn en lugar de uno de los lenguajes estándar del SDK de Apache Beam. Para las canalizaciones de Java, a fin de facilitar el uso de bibliotecas C++ en la canalización, se recomienda que uses llamadas de procedimiento externo. En esta sección, se describe en enfoque general usado para ejecutar un código externo (C++) en canalizaciones de Java.

Una definición de canalización de Apache Beam tiene varios componentes clave:

  • PCollections son colecciones inmutables de elementos homogéneos.
  • PTransforms se usan para definir las transformaciones a una PCollection que genera otro PCollection.
  • La canalización es la construcción que te permite, a través del código, declarar las interacciones entre PTransforms y PCollections. La canalización se representa como un grafo acíclico dirigido (DAG).

Cuando uses código de un lenguaje que no sea uno de los lenguajes estándar del SDK de Apache Beam, coloca el código en el PTransform, que está dentro de DoFn, y usa uno de los lenguajes estándares del SDK para definir la canalización en sí. Recomendamos usar el SDK de Python de Apache Beam para definir la canalización, ya que el SDK de Python tiene una clase de utilidad que facilita el uso de otro código. Sin embargo, puedes usar los otros SDK de Apache Beam.

También puedes usar el código para desarrollar experimentos rápidos sin requerir una compilación completa. Para un sistema de producción, en general creas tus propios objetos binarios, lo que te da la libertad de ajustar el proceso a tus necesidades.

El diagrama siguiente muestra los dos usos de los datos de canalización:

  • Los datos se usan para impulsar el proceso.
  • Los datos se adquieren durante el procesamiento y se unen a los datos del controlador.

Dos etapas de datos de canalización

En esta página, los datos principales (de la fuente) se denominan datos de control y los datos secundarios (de la fase de procesamiento) se denominan datos de unión.

En un caso práctico de finanzas, los datos de controlador pueden ser algunos centenares de transacciones. Cada una debe procesarse en conjunto con los datos del mercado. En ese caso, los datos del mercado son los datos de unión. En un caso práctico de multimedia, los datos de controlador pueden ser archivos de imagen que requieran procesamiento, pero no necesiten otras fuentes de datos y, por lo tanto, no usen datos de unión.

Consideraciones de tamaño para los datos de control

Si el tamaño del elemento de los datos de control tiene un rango de pocos megabytes, trátalo de acuerdo con el paradigma normal de Apache Beam, que consiste en crear un objeto PCollection desde la fuente y enviarlo a las transformaciones de Apache Beam para su procesamiento.

Si el tamaño del elemento que forma parte de los datos de controlador consta de muchos megabytes o gigabytes, como en el caso de los archivos multimedia, puedes ingresar los datos de controlador en Cloud Storage. Luego, en el objeto PCollection inicial, debes hacer referencia a la URI de almacenamiento y solo una referencia de URI a los datos que se usaron.

Consideraciones de tamaño para los datos de unión

Si los datos de unión tienen unos centenares de megabytes o menos, usa una entrada complementaria para dirigirlos a las transformaciones de Apache Beam. La entrada complementaria envía el paquete de datos a cada trabajador que lo necesite.

Si los datos de unión tienen un tamaño en gigabytes o terabytes, usa Bigtable o Cloud Storage para combinar los datos de unión con los datos de control, en función de la naturaleza de los datos. Bigtable es ideal para contextos de finanzas en los que se suele acceder a los datos del mercado mediante búsquedas de clave-valor desde Bigtable. Para obtener más información sobre cómo diseñar el esquema de Bigtable, incluidas recomendaciones sobre cómo trabajar con datos de series temporales, consulta la siguiente documentación de Bigtable:

Ejecuta el código externo

Puedes ejecutar el código externo en Apache Beam de muchas maneras.

  • Crea un proceso que se llame desde un objeto DoFn dentro de una transformación de Dataflow.

  • Usa JNI con el SDK de Java.

  • Crea un subproceso directamente desde el objeto DoFn. Aunque este enfoque no es el más eficiente, es sólido y fácil de implementar. Debido a los posibles problemas para usar JNI, en esta página se muestra un uso de una llamada de subproceso.

Cuando diseñes tu flujo de trabajo, considera la canalización completa de extremo a extremo. La falta de eficiencia en la forma en que se ejecuta el proceso se ve compensada por el hecho de que el movimiento de datos desde la fuente hasta el receptor se realiza con una sola canalización. Si comparas este enfoque con otros, ten en cuenta los tiempos de extremo a extremo de la canalización, así como los costos de extremo a extremo.

Mueve los objetos binarios a los hosts

Cuando usas un lenguaje nativo de Apache Beam, el SDK de Apache Beam mueve todo el código requerido a los trabajadores de forma automática. Sin embargo, cuando realizas una llamada a un código externo, tienes que mover el código de forma manual.

Archivos binarios almacenados en depósitos

Para mover el código, sigue estos pasos: En el ejemplo, se muestran los pasos para el SDK de Java de Apache Beam.

  1. Almacena el código externo compilado, junto con la información sobre el control de versiones, en Cloud Storage.
  2. En el método @Setup, crea un bloque sincronizado para verificar si el archivo de código está disponible en el recurso local. En lugar de implementar una verificación física, puedes confirmar la disponibilidad usando una variable estática cuando finalice el primer subproceso.
  3. Si el archivo no está disponible, usa la biblioteca cliente para extraer el archivo del bucket de Cloud Storage y trasladarlo al trabajador local. Un enfoque recomendado para esta tarea consiste en usar la clase FileSystems de Apache Beam.
  4. Después de que se ha movido el archivo, confirma que el bit de ejecución está configurado en el archivo del código.
  5. En un sistema de producción, verifica el hash de los objetos binarios para asegurarte de que el archivo se haya copiado correctamente.

Usar la función filesToStage de Apache Beam también es una opción, pero quita algunas de las ventajas de la capacidad del ejecutor de empaquetar y mover el código de Java de forma automática. Además, debido a que la llamada al subproceso necesita una ubicación de archivo absoluta, debes usar código para determinar la ruta de la clase y, por lo tanto, la ubicación del archivo que movió filesToStage. No recomendamos este enfoque.

Ejecuta los objetos binarios externos

Antes de ejecutar un código externo, tienes que crear un wrapper. Escribe el wrapper en el mismo lenguaje que el código externo (por ejemplo, C++) o como una secuencia de comandos de shell. El wrapper te permite pasar los controladores del archivo e implementar optimizaciones, como se describe en la sección Diseña el procesamiento para ciclos de CPU pequeños en esta página. No es necesario que el wrapper sea sofisticado. El siguiente fragmento muestra el esquema de un wrapper en C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Este código lee dos parámetros de la lista de argumentos. El primer parámetro es la ubicación del archivo que se muestra como resultado, al que se envían los datos. El segundo parámetro corresponde a los datos que el código repetirá al usuario. En implementaciones de verdad, este código solo reproduciría "Hello, world"!

Después de escribir el código del wrapper, ejecuta el código del wrapper de la siguiente manera:

  1. Transmite los datos a los objetos binarios del código externo.
  2. Ejecuta los objetos binarios, detecta errores y registra estos errores y los resultados.
  3. Maneja la información del registro.
  4. Captura los datos del procesamiento finalizado.

Transmite los datos a los objetos binarios

Para comenzar el proceso de ejecución de la biblioteca, transmite los datos al código de C++. En este paso, puedes aprovechar la integración de Dataflow en otras herramientas de Google Cloud. Una herramienta como Bigtable puede lidiar con conjuntos de datos de gran tamaño y controlar accesos de latencia baja y de simultaneidad alta, lo que permite a miles de núcleos acceder al conjunto de datos al mismo tiempo. Además, Bigtable puede realizar un procesamiento previo de los datos, lo que permite darles forma, enriquecerlos y filtrarlos. Puedes hacer todo este trabajo en las transformaciones de Apache Beam antes de ejecutar el código externo.

En el caso de un sistema de producción, el procedimiento recomendado consiste en usar un búfer de protocolo para encapsular los datos de entrada. Puedes convertir los datos de entrada en bytes y codificarlos en Base64 antes de pasarlos a la biblioteca externa. Las dos formas de pasar estos datos a la biblioteca externa son las siguientes:

  • Datos de entrada pequeños. Si tienes datos de tamaño pequeño que no exceden la longitud máxima para el argumento de un comando, pasa el argumento en la posición 2 del proceso que se compila mediante java.lang.ProcessBuilder.
  • Datos de entrada grandes. Si tienes datos de mayor tamaño, crea un archivo cuyo nombre incluya un UUID para almacenar los datos que requiere el proceso.

Ejecuta el código de C++, detecta errores y crea registros

La captura y el manejo de la información de errores es una parte fundamental de tu canalización. Los recursos que usa el ejecutor de Dataflow son efímeros y, a menudo, es difícil inspeccionar los archivos de registro del trabajador. Debes asegurarte de capturar y enviar toda la información útil al registro del ejecutor de Dataflow y de almacenar los datos de registro en uno o más depósitos de Cloud Storage.

Se recomienda redireccionar stdout y stderr a archivos, lo que te permite evitar todas las consideraciones de memoria insuficiente. Por ejemplo, en el ejecutor de Dataflow que llama al código de C++, puedes incluir líneas como estas:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Maneja la información de registro

Muchos casos prácticos incluyen el procesamiento de millones de elementos. Un procesamiento exitoso genera registros de muy poco o ningún valor, por lo que debes tomar una decisión de negocios acerca de la retención de los datos de registro. Por ejemplo, considera estas alternativas en lugar de retener todos los datos de registro:

  • Si la información contenida en los registros de procesamiento exitoso de elementos carece de valor, no la conserves.
  • Crea una lógica que genere muestras de los datos de registro, por ejemplo, tomar una de cada 10,000 entradas de registro. Si el proceso es homogéneo, como muchas iteraciones del código generan datos de registro básicamente idénticos, este enfoque proporciona un equilibrio entre la retención de datos de registro y la optimización del procesamiento.

En el caso de las condiciones de error, el volumen de datos que se envía a los registros podría ser considerable. Una estrategia efectiva para manejar grandes cantidades de datos de registro de errores consiste en leer las primeras líneas de la entrada de registro y enviar solo esas líneas a Cloud Logging. Puedes cargar el resto del archivo de registro en los buckets de Cloud Storage. Este método te permite mirar las primeras líneas de los registros de error más tarde y luego, si es necesario, consultar Cloud Storage para todo el archivo.

También es útil verificar el tamaño del archivo de registro. Si el tamaño del archivo es cero, puedes ignorarlo sin problemas o registrar un mensaje de registro simple que indique que el archivo no tenía datos.

Captura los datos del procesamiento finalizado

No se recomienda usar stdout para pasar el resultado de un procesamiento a la función DoFn. Puede que haya otros códigos a los que llame C++, o incluso tus propios códigos, que envíen mensajes a stdout, lo que contamina la transmisión de stdoutput, que de otra forma contendría datos de registro. En cambio, es mejor realizar un cambio en el código del wrapper de C++ que le permita al código aceptar un parámetro que indique en qué ubicación se debe crear el archivo que almacena el valor. Idealmente, este archivo debería almacenarse de forma independiente del lenguaje mediante búferes de protocolo, lo que permite al código de C++ pasar un objeto de vuelta al código de Java o Python. El objeto DoFn puede leer el resultado directamente desde el archivo y pasar la información del resultado a su propia llamada de output.

La experiencia demostró la importancia de ejecutar pruebas de unidades en el proceso en sí. Es importante implementar una prueba de unidades que ejecute el proceso de forma independiente de la canalización de Dataflow. La biblioteca puede depurarse de forma mucho más eficiente si es independiente y no necesita ejecutar toda la canalización.

Procesamiento de diseños para ciclos de CPU pequeños

Llamar a un subproceso presenta una sobrecarga. Según tu carga de trabajo, quizás tengas que hacer trabajo extra para reducir la proporción entre el trabajo que se hace y la sobrecarga administrativa de iniciar y cerrar el proceso.

En el caso práctico de elementos multimedia, el tamaño del elemento que forma parte de los datos de controlador puede estar en megabytes altos o en gigabytes. Como resultado, el procesamiento de cada elemento de datos puede tardar varios minutos. En este caso, el costo de llamar al subproceso es insignificante en comparación con el tiempo de procesamiento total. El mejor enfoque en esta situación es que un elemento único inicie su propio proceso.

Sin embargo, en otros casos prácticos, como las finanzas, el procesamiento requiere unidades de tiempo de CPU muy pequeñas (decenas de milisegundos). En este caso, la sobrecarga de llamar al subproceso es desproporcionadamente grande. Una solución a este problema consiste en usar la transformación GroupByKey de Apache Beam para crear lotes de entre 50 y 100 elementos que se envíen al proceso. Por ejemplo, puedes seguir estos pasos:

  • En una función DoFn, crea un par clave-valor. Si tienes que procesar transacciones financieras, puedes usar el número de transacción como la clave. Si no tienes un número único que puedas usar como clave, puedes generar una suma de verificación a partir de los datos y usar una función de módulo para crear particiones de 50 elementos.
  • Envía la clave a una función GroupByKey.create, que muestra una colección KV<key,Iterable<data>> que contiene los 50 elementos que luego puedes enviar al proceso.

Limita el paralelismo de trabajadores

Cuando trabajas con lenguajes que son compatibles de forma nativa con el ejecutor de Dataflow, no necesitas pensar en lo que pasa con el trabajador. Dataflow tiene muchos procesos que supervisan el control de flujo y los subprocesos en los modos por lotes o de transmisión.

Sin embargo, si usas un lenguaje externo, como C++, deberías tener en cuenta que estás haciendo algo fuera de lo común cuando inicias subprocesos. En el modo por lotes, el ejecutor de Dataflow usa una pequeña proporción entre subprocesos en funcionamiento y CPU en comparación con el modo de transmisión. Se recomienda, especialmente en el modo de transmisión, usar un semáforo dentro de tu clase para controlar de forma más directa el paralelismo de un trabajador individual.

Por ejemplo, en el caso del procesamiento multimedia, quizás no quieras que un solo trabajador se encargue del procesamiento de cientos de elementos de transcodificación en forma paralela. En casos como ese, puedes crear una clase de utilidad que proporcione permisos a la función DoFn para el trabajo que se está realizando. Si usas esta clase, puedes controlar de forma directa los subprocesos trabajadores dentro de tu canalización.

Usa receptores de datos de alta capacidad en Google Cloud

Después de que se hayan procesado los datos, se envían al receptor de datos. El receptor tiene que ser capaz de manejar el volumen de resultados que crea tu solución de procesamiento en red.

En el siguiente diagrama, se muestran algunos de los receptores disponibles en Google Cloud cuando Dataflow ejecuta una carga de trabajo en red.

Receptores disponibles en Google Cloud

Bigtable, BigQuery y Cloud Pub/Sub pueden manejar flujos de datos muy grandes. Por ejemplo, cada nodo de Bigtable puede manejar 10,000 inserciones por segundo en tamaños de hasta 1,000 con una escalabilidad horizontal sencilla. Como resultado, un clúster de Bigtable de 100 nodos puede absorber 1,000,000 de mensajes por segundo generados por la cuadrícula de Dataflow.

Administra segfaults

Cuando usas código de C++ dentro de una canalización, debes decidir cómo administrar segfaults, ya que tienen ramificaciones no locales si no se tratan de forma correcta. El ejecutor de Dataflow crea procesos según sea necesario en Java, Python o Go y, luego, asigna trabajo a los procesos en forma de paquetes.

Si la llamada al código de C++ se realiza con herramientas estrechamente vinculadas, como JNI o Cython, y segfaults de procesos de C++, el proceso de llamada y la máquina virtual de Java JVM) también fallan. En este caso, los datos incorrectos no son detectables. Para que los datos incorrectos se puedan capturar, usa un acoplamiento más flexible, que bifurca los datos incorrectos y permite que la canalización continúe. Sin embargo, con el código de C++ consolidado que se probó por completo en todas las variaciones de datos, puedes usar mecanismos como Cython.

¿Qué sigue?