Prácticas recomendadas para flujos de trabajo altamente paralelos

En esta página se ofrecen directrices sobre las prácticas recomendadas que se deben seguir al crear y ejecutar flujos de trabajo altamente paralelos de HPC de Dataflow, incluido cómo usar código externo en tus canalizaciones, cómo ejecutar la canalización y cómo gestionar el control de errores.

Incluir código externo en una canalización

Una de las principales diferencias de las pipelines altamente paralelas es que usan código de C++ en el DoFn en lugar de uno de los lenguajes estándar del SDK de Apache Beam. En el caso de las canalizaciones de Java, para que sea más fácil usar bibliotecas de C++ en la canalización, se recomienda usar llamadas a procedimientos externos. En esta sección se describe el enfoque general que se utiliza para ejecutar código externo (C++) en las canalizaciones de Java.

Una definición de una canalización de Apache Beam tiene varios componentes clave:

  • Las PCollections son colecciones inmutables de elementos homogéneos.
  • PTransforms se usan para definir las transformaciones de un PCollection que genera otro PCollection.
  • La canalización es la estructura que te permite, mediante código, declarar las interacciones entre PTransforms y PCollections. El flujo de procesamiento se representa como un grafo acíclico dirigido (DAG).

Cuando uses código de un lenguaje que no sea uno de los lenguajes estándar del SDK de Apache Beam, coloca el código en PTransform, que está dentro de DoFn, y usa uno de los lenguajes estándar del SDK para definir el propio flujo de procesamiento. Te recomendamos que uses el SDK de Apache Beam para Python para definir la canalización, ya que este SDK tiene una clase de utilidad que simplifica el uso de otro código. Sin embargo, puedes usar los otros SDKs de Apache Beam.

Puedes usar el código para llevar a cabo experimentos rápidos sin necesidad de crear una compilación completa. En un sistema de producción, normalmente se crean los propios archivos binarios, lo que te permite ajustar el proceso a tus necesidades.

En el siguiente diagrama se muestran los dos usos de los datos de la canalización:

  • Los datos se usan para impulsar el proceso.
  • Los datos se adquieren durante el tratamiento y se combinan con los datos del conductor.

Dos fases de datos de la canalización

En esta página, los datos principales (de la fuente) se denominan datos de referencia y los datos secundarios (de la fase de procesamiento) se denominan datos de unión.

En un caso práctico del sector financiero, los datos de entrada podrían ser unos cientos de miles de transacciones. Cada operación debe procesarse junto con los datos de mercado. En ese caso, los datos de mercado son los datos de unión. En un caso práctico de contenido multimedia, los datos de control pueden ser archivos de imagen que requieran procesamiento, pero que no necesiten otras fuentes de datos y, por lo tanto, no utilicen datos de combinación.

Consideraciones sobre el tamaño de los datos de conducción

Si el tamaño del elemento de datos de conducción está en el rango de los pocos megabytes, trátelo con el paradigma normal de Apache Beam de crear un objeto PCollection a partir de la fuente y enviar el objeto a las transformaciones de Apache Beam para que se procese.

Si el tamaño del elemento de datos principal es de varios megabytes o gigabytes, como suele ocurrir con los archivos multimedia, puedes colocar los datos principales en Cloud Storage. A continuación, en el objeto PCollection inicial, haz referencia al URI de almacenamiento y solo se usará una referencia de URI a esos datos.

Consideraciones sobre el tamaño de los datos al combinar datos

Si los datos de unión son de unos cientos de megabytes o menos, utiliza una entrada secundaria para obtener estos datos en las transformaciones de Apache Beam. La entrada lateral envía el paquete de datos a todos los trabajadores que lo necesiten.

Si los datos de unión están en el intervalo de gigabytes o terabytes, usa Bigtable o Cloud Storage para combinar los datos de unión con los datos de control, en función de la naturaleza de los datos. Bigtable es ideal para situaciones financieras en las que se suele acceder a los datos de mercado como búsquedas de clave-valor desde Bigtable. Para obtener más información sobre cómo diseñar tu esquema de Bigtable, incluidas recomendaciones para trabajar con datos de series temporales, consulta la siguiente documentación de Bigtable:

Ejecutar el código externo

Puedes ejecutar código externo en Apache Beam de muchas formas.

  • Crea un proceso al que se llame desde un objeto DoFn dentro de una transformación de Dataflow.

  • Usa JNI con el SDK de Java.

  • Crea un subproceso directamente desde el objeto DoFn. Aunque este enfoque no es el más eficiente, es sólido y fácil de implementar. Debido a los posibles problemas que pueden surgir al usar JNI, en esta página se muestra cómo usar una llamada de subproceso.

Al diseñar tu flujo de trabajo, ten en cuenta la canalización completa de principio a fin. Las ineficiencias que pueda haber en la forma en que se ejecuta el proceso se compensan con el hecho de que el movimiento de datos desde el origen hasta el receptor se realiza con una sola canalización. Si comparas este enfoque con otros, ten en cuenta los tiempos y los costes de extremo a extremo de la canalización.

Extrae los archivos binarios en los hosts

Cuando usas un lenguaje nativo de Apache Beam, el SDK de Apache Beam mueve automáticamente todo el código necesario a los trabajadores. Sin embargo, cuando haces una llamada a un código externo, debes mover el código manualmente.

Archivos binarios almacenados en segmentos

Para mover el código, sigue estos pasos: En este ejemplo se muestran los pasos para usar el SDK de Apache Beam para Java.

  1. Almacena el código externo compilado, junto con la información de control de versiones, en Cloud Storage.
  2. En el método @Setup, crea un bloque sincronizado para comprobar si el archivo de código está disponible en el recurso local. En lugar de implementar una comprobación física, puedes confirmar la disponibilidad mediante una variable estática cuando finalice el primer subproceso.
  3. Si el archivo no está disponible, usa la biblioteca de cliente de Cloud Storage para extraerlo del segmento de Cloud Storage al trabajador local. Una opción recomendada es usar la clase FileSystems de Apache Beam para esta tarea.
  4. Una vez que se haya movido el archivo, confirma que el bit de ejecución esté definido en el archivo de código.
  5. En un sistema de producción, compruebe el hash de los archivos binarios para asegurarse de que el archivo se ha copiado correctamente.

También puedes usar la función Apache Beam filesToStage, pero se pierden algunas de las ventajas de la capacidad del runner para empaquetar y mover automáticamente tu código Java. Además, como la llamada al subproceso necesita una ubicación de archivo absoluta, debes usar código para determinar la ruta de clase y, por lo tanto, la ubicación del archivo movido por filesToStage. No recomendamos este método.

Ejecutar los archivos binarios externos

Antes de poder ejecutar código externo, debes crear un envoltorio para él. Escribe este envoltorio en el mismo idioma que el código externo (por ejemplo, C++) o como un script de shell. El envoltorio te permite transferir identificadores de archivos e implementar optimizaciones, tal como se describe en la sección Diseñar el procesamiento para ciclos de CPU pequeños de esta página. No es necesario que tu envoltorio sea sofisticado. En el siguiente fragmento se muestra un esquema de un envoltorio en C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

Este código lee dos parámetros de la lista de argumentos. El primer parámetro es la ubicación del archivo de retorno donde se insertan los datos. El segundo parámetro son los datos que el código devuelve al usuario. En las implementaciones del mundo real, este código haría más que mostrar "Hello, world".

Después de escribir el código de envoltorio, ejecuta el código externo haciendo lo siguiente:

  1. Transmite los datos a los archivos binarios del código externo.
  2. Ejecuta los archivos binarios, detecta los errores y registra los errores y los resultados.
  3. Gestiona la información de registro.
  4. Recoge datos del procesamiento completado.

Transmitir los datos a los archivos binarios

Para iniciar el proceso de ejecución de la biblioteca, transmite datos al código de C++. En este paso, puedes aprovechar la integración de Dataflow con otras herramientas de Google Cloud Platform. Una herramienta como Bigtable puede gestionar conjuntos de datos muy grandes y controlar el acceso de baja latencia y alta simultaneidad, lo que permite que miles de núcleos accedan al conjunto de datos al mismo tiempo. Además, Bigtable puede preprocesar datos, lo que permite darles forma, enriquecerlos y filtrarlos. Todo este trabajo se puede hacer en las transformaciones de Apache Beam antes de ejecutar el código externo.

En un sistema de producción, lo más recomendable es usar un protocol buffer para encapsular los datos de entrada. Puedes convertir los datos de entrada en bytes y codificarlos en base64 antes de pasarlos a la biblioteca externa. Hay dos formas de transferir estos datos a la biblioteca externa:

  • Datos de entrada pequeños. Si se trata de datos pequeños que no superan la longitud máxima de un argumento de comando del sistema, pasa el argumento a la posición 2 del proceso que se está creando con java.lang.ProcessBuilder.
  • Datos de entrada de gran tamaño. Si los datos son de mayor tamaño, crea un archivo cuyo nombre incluya un UUID para que contenga los datos que necesita el proceso.

Ejecutar el código de C++, detectar errores y registrar

Capturar y gestionar la información de errores es una parte fundamental de tu canalización. Los recursos que usa el ejecutor de Dataflow son efímeros y, a menudo, es difícil inspeccionar los archivos de registro de los trabajadores. Debes asegurarte de registrar y enviar toda la información útil a los registros del ejecutor de Dataflow, así como de almacenar los datos de registro en uno o varios segmentos de Cloud Storage.

Lo recomendable es redirigir stdout y stderr a archivos, lo que te permite evitar problemas de falta de memoria. Por ejemplo, en el ejecutor de flujo de datos que llama al código de C++, puedes incluir líneas como las siguientes:

Java

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

Gestionar información de registro

En muchos casos prácticos, se procesan millones de elementos. El procesamiento correcto genera registros con poco o ningún valor, por lo que debes tomar una decisión empresarial sobre la conservación de los datos de registro. Por ejemplo, puedes plantearte estas alternativas a conservar todos los datos de registro:

  • Si la información contenida en los registros del procesamiento correcto de elementos no es valiosa, no la conserves.
  • Crea una lógica que muestree los datos de registro, como muestrear solo cada 10.000 entradas de registro. Si el procesamiento es homogéneo, como cuando muchas iteraciones del código generan datos de registro esencialmente idénticos, este enfoque proporciona un equilibrio eficaz entre la conservación de los datos de registro y la optimización del procesamiento.

En el caso de las condiciones de error, la cantidad de datos volcados en los registros puede ser grande. Una estrategia eficaz para gestionar grandes cantidades de datos de registro de errores es leer las primeras líneas de la entrada de registro y enviar solo esas líneas a Cloud Logging. Puedes cargar el resto del archivo de registro en segmentos de Cloud Storage. Este enfoque te permite consultar las primeras líneas de los registros de errores más adelante y, si es necesario, consultar Cloud Storage para ver el archivo completo.

También es útil comprobar el tamaño del archivo de registro. Si el tamaño del archivo es cero, puedes ignorarlo o registrar un mensaje de registro sencillo que indique que el archivo no tenía datos.

Capturar datos de un procesamiento completado

No se recomienda usar stdout para devolver el resultado del cálculo a la función DoFn. Otro código al que llame tu código C++, e incluso tu propio código, también podría enviar mensajes a stdout, lo que contaminaría el flujo stdoutput que, de lo contrario, contendría datos de registro. En su lugar, es mejor modificar el código del envoltorio de C++ para que acepte un parámetro que indique dónde crear el archivo que almacena el valor. Lo ideal es que este archivo se almacene de forma independiente del lenguaje mediante buffers de protocolo, lo que permite que el código de C++ devuelva un objeto al código de Java o Python. El objeto DoFn puede leer el resultado directamente del archivo y transferir la información del resultado a su propia llamada output.

La experiencia ha demostrado la importancia de ejecutar pruebas unitarias que se ocupen del proceso en sí. Es importante implementar una prueba unitaria que ejecute el proceso independientemente de la canalización de Dataflow. La depuración de la biblioteca se puede hacer de forma mucho más eficiente si es independiente y no tiene que ejecutar toda la canalización.

Diseño de procesamiento para ciclos de CPU pequeños

Llamar a un subproceso tiene una sobrecarga. En función de tu carga de trabajo, es posible que tengas que hacer un esfuerzo adicional para reducir la proporción entre el trabajo realizado y la sobrecarga administrativa de iniciar y cerrar el proceso.

En el caso práctico de los medios, el tamaño del elemento de datos de control puede ser de varios megabytes o gigabytes. Por lo tanto, el procesamiento de cada elemento de datos puede tardar varios minutos. En ese caso, el coste de llamar al subproceso es insignificante en comparación con el tiempo de procesamiento general. En esta situación, lo mejor es que un solo elemento inicie su propio proceso.

Sin embargo, en otros casos prácticos, como las finanzas, el procesamiento requiere unidades de tiempo de CPU muy pequeñas (decenas de milisegundos). En ese caso, la sobrecarga de llamar al subproceso es desproporcionadamente grande. Una solución a este problema es usar la transformación GroupByKey de Apache Beam para crear lotes de entre 50 y 100 elementos que se introducirán en el proceso. Por ejemplo, puedes seguir estos pasos:

  • En una función DoFn, crea un par clave-valor. Si procesa transacciones financieras, puede usar el número de transacción como clave. Si no tienes un número único que puedas usar como clave, puedes generar una suma de comprobación a partir de los datos y usar una función de módulo para crear particiones de 50 elementos.
  • Envía la clave a una función GroupByKey.create, que devuelve una colección KV<key,Iterable<data>> que contiene los 50 elementos que puedes enviar al proceso.

Limitar el paralelismo de los trabajadores

Cuando trabajas con un lenguaje que se admite de forma nativa en el runner de Dataflow, no tienes que preocuparte por lo que le ocurre al trabajador. Dataflow tiene muchos procesos que supervisan el control de flujo y los subprocesos en modo por lotes o de streaming.

Sin embargo, si usas un lenguaje externo como C++, ten en cuenta que estás haciendo algo poco habitual al iniciar subprocesos. En el modo por lotes, el ejecutor de Dataflow usa una proporción pequeña de subprocesos de trabajo en relación con las CPUs en comparación con el modo de streaming. Te recomendamos que, sobre todo en el modo de streaming, crees un semáforo en tu clase para controlar de forma más directa el paralelismo de un trabajador concreto.

Por ejemplo, en el procesamiento de contenido multimedia, puede que no quieras que un solo trabajador procese cientos de elementos de transcodificación en paralelo. En casos como estos, puedes crear una clase de utilidad que proporcione permisos a la función DoFn para el trabajo que se esté llevando a cabo. Esta clase te permite controlar directamente los hilos de trabajo de tu canalización.

Usar receptores de datos de alta capacidad en Google Cloud Platform

Una vez que se han procesado los datos, se envían a un receptor de datos. El receptor debe poder gestionar el volumen de resultados que genera tu solución de procesamiento de cuadrículas.

En el siguiente diagrama se muestran algunos de los receptores disponibles en Google Cloud Platform cuando Dataflow ejecuta una carga de trabajo de malla.

Receptores disponibles en Google Cloud Platform

Bigtable, BigQuery y Pub/Sub pueden gestionar flujos de datos muy grandes. Por ejemplo, cada nodo de Bigtable puede gestionar 10.000 inserciones por segundo de hasta 1 KB con una escalabilidad horizontal sencilla. Por lo tanto, un clúster de Bigtable de 100 nodos puede absorber 1.000.000 de mensajes por segundo generados por la cuadrícula de Dataflow.

Gestionar errores de segmentación

Cuando usas código C++ en una canalización, debes decidir cómo gestionar los errores de segmentación, ya que tienen ramificaciones no locales si no se gestionan correctamente. El runner de Dataflow crea procesos según sea necesario en Java, Python o Go y, a continuación, asigna trabajo a los procesos en forma de paquetes.

Si la llamada al código C++ se realiza mediante herramientas estrechamente acopladas, como JNI o Cython, y el proceso de C++ falla, el proceso de llamada y la máquina virtual Java (JVM) también fallan. En este caso, no se pueden detectar los puntos de datos incorrectos. Para que se puedan detectar los puntos de datos incorrectos, usa un acoplamiento más flexible, que se ramifique a partir de los datos incorrectos y permita que la canalización continúe. Sin embargo, con código C++ maduro que se haya probado exhaustivamente con todas las variaciones de datos, puedes usar mecanismos como Cython.

Siguientes pasos