Implementa una canalización

Después de construir y probar tu canalización de Apache Beam, puedes usar el servicio administrado de Cloud Dataflow para implementarla y ejecutarla. Una vez que te encuentras en el servicio de Cloud Dataflow, el código de tu canalización se convierte en un trabajo de Cloud Dataflow.

El servicio de Cloud Dataflow administra por completo los servicios de Google Cloud Platform (GCP), como Compute Engine y Cloud Storage, para ejecutar tu trabajo de Cloud Dataflow, con aumentos y reducciones de los recursos necesarios de forma automática. El servicio de Cloud Dataflow brinda visibilidad de tu trabajo mediante herramientas como la interfaz de supervisión de Cloud Dataflow y la interfaz de línea de comandos de Cloud Dataflow.

Puedes controlar algunos aspectos de cómo el servicio de Cloud Dataflow ejecuta tu trabajo si configuras los parámetros de ejecución en el código de tu canalización. Por ejemplo, los parámetros de ejecución especifican si los pasos de tu canalización se ejecutan en máquinas virtuales de trabajador, en el backend del servicio de Cloud Dataflow o de manera local.

Además de administrar los recursos de GCP, el servicio de Cloud Dataflow realiza y optimiza de forma automática muchos aspectos del procesamiento paralelo distribuido. Estos incluyen lo siguiente:

  • Paralelización y distribución. Cloud Dataflow particiona tus datos de forma automática y distribuye tu código de trabajador a las instancias de Compute Engine para su procesamiento paralelo.
  • Optimización. Cloud Dataflow usa tu código de canalización a fin de crear un grafo de ejecución que represente los PCollection y las transformaciones de tu canalización y optimiza el grafo para el rendimiento y el uso de recursos más eficiente. Cloud Dataflow también optimiza de forma automática las operaciones que podrían ser costosas, como las agregaciones de datos.
  • Características de ajuste automático. El servicio de Cloud Dataflow incluye varias características que proporcionan ajustes sobre la marcha de la asignación de recursos y la partición de datos, como el Ajuste de escala automático y el Rebalanceo dinámico de trabajos. Estas características ayudan a que el servicio de Cloud Dataflow ejecute tu trabajo de la manera más rápida y eficiente posible.

Ciclo de vida de la canalización: del código de canalización al trabajo de Cloud Dataflow

Cuando ejecutas tu programa, Cloud Dataflow crea un grafo de ejecución a partir del código que construye tu objeto Pipeline, incluidas todas las transformaciones y sus funciones de procesamiento asociadas (como DoFn). Esta fase se llama Tiempo de construcción del grafo. Durante la construcción del grafo, Cloud Dataflow verifica en busca de varios errores y garantiza que el grafo de tu canalización no contenga ninguna operación ilegal. El grafo de ejecución se traduce al formato JSON y se transmite al extremo del servicio de Cloud Dataflow.

Nota: La construcción del grafo también ocurre cuando ejecutas tu canalización de forma local, pero el grafo no se traduce a JSON ni se transmite al servicio. En su lugar, el grafo se ejecuta de forma local en la misma máquina donde iniciaste tu programa de Cloud Dataflow. Consulta la documentación sobre la configuración para la ejecución local a fin de obtener más detalles.

Luego, el servicio de Cloud Dataflow valida el grafo de ejecución JSON. Cuando el grafo se valida, se convierte en un trabajo en el servicio de Cloud Dataflow. Podrás ver tu trabajo, su grafo de ejecución, su estado y su información de registro con la interfaz de supervisión de Cloud Dataflow.

Java: SDK 2.x

El servicio de Cloud Dataflow envía una respuesta a la máquina donde ejecutaste tu programa de Cloud Dataflow. Esta respuesta está encapsulada en el objeto DataflowPipelineJob, que contiene el jobId de tu trabajo de Cloud Dataflow. Puedes usar jobId para supervisar tu trabajo, realizar un seguimiento a este y solucionar sus problemas mediante la interfaz de supervisión de Cloud Dataflow y la interfaz de línea de comandos de Cloud Dataflow. Consulta la referencia de la API para DataflowPipelineJob a fin de obtener más información.

Python

El servicio de Cloud Dataflow envía una respuesta a la máquina donde ejecutaste tu programa de Cloud Dataflow. Esta respuesta está encapsulada en el objeto DataflowPipelineResult, que contiene el job_id de tu trabajo de Cloud Dataflow. Puedes usar job_id para supervisar tu trabajo, realizar un seguimiento a este y solucionar sus problemas mediante la interfaz de supervisión de Cloud Dataflow y la interfaz de línea de comandos de Cloud Dataflow.

Java: SDK 1.x

El servicio de Cloud Dataflow envía una respuesta a la máquina donde ejecutaste tu programa de Cloud Dataflow. Esta respuesta está encapsulada en el objeto DataflowPipelineJob, que contiene el jobId de tu trabajo de Cloud Dataflow. Puedes usar jobId para supervisar tu trabajo, realizar un seguimiento a este y solucionar sus problemas mediante la interfaz de supervisión de Cloud Dataflow y la interfaz de línea de comandos de Cloud Dataflow. Consulta la referencia de la API para DataflowPipelineJob a fin de obtener más información.

Grafo de ejecución

Cloud Dataflow crea un grafo de pasos que representa tu canalización, en función de las transformaciones y los datos que usaste cuando construiste tu objeto Pipeline. Este es el grafo de ejecución de la canalización.

El ejemplo de WordCount, que se incluye con los SDK de Apache Beam, contiene una serie de transformaciones a fin de leer, extraer, contar, dar formato y escribir las palabras individuales de una colección de texto, junto con un conteo de casos para cada palabra. El siguiente diagrama muestra cómo las transformaciones en la canalización de WordCount se expanden en un grafo de ejecución:

Las transformaciones en el programa de ejemplo de WordCount expandidas en un grafo de ejecución de los pasos que debe ejecutar el servicio de Cloud Dataflow.
Figura 1: Grafo de ejecución de ejemplo de WordCount

El grafo de ejecución a menudo difiere del orden en el que especificas tus transformaciones cuando construyes la canalización. Esto se debe a que el servicio de Cloud Dataflow realiza varias optimizaciones y fusiones en el grafo de ejecución antes de ejecutarse en recursos de nube administrados. El servicio de Cloud Dataflow respeta las dependencias de datos cuando ejecuta tu canalización; sin embargo, los pasos intermedios sin dependencias de datos se pueden ejecutar en cualquier orden.

Puedes ver el grafo de ejecución no optimizado que Cloud Dataflow genera para tu canalización cuando seleccionas tu trabajo en la Interfaz de supervisión de Cloud Dataflow.

Paralelización y distribución

El servicio de Cloud Dataflow paraleliza y distribuye de forma automática la lógica de procesamiento de tu canalización a los trabajadores que asignas para realizar tu trabajo. Cloud Dataflow usa las abstracciones en el modelo de programación a fin de representar funciones de procesamiento paralelo; por ejemplo, tus transformaciones ParDo hacen que Cloud Dataflow distribuya de forma automática tu código de procesamiento (que se representa con los DoFn) a múltiples trabajadores para que se ejecuten en paralelo.

Estructura tu código de usuario

Puedes pensar en tu código DoFn como pequeñas entidades independientes: podría haber muchas instancias en ejecución en diferentes máquinas, cada una sin conocimiento de las otras. Como tales, las funciones puras (funciones que no dependen de un estado oculto o externo, no tienen efectos secundarios observables y son deterministas) son código ideal para la naturaleza paralela y distribuida de los DoFn.

Sin embargo, el modelo de función pura no es estrictamente rígido; la información de estado o los datos de inicialización externos pueden ser válidos para DoFn y otros objetos de función, siempre que tu código no dependa de elementos que el servicio de Cloud Dataflow no garantiza. Cuando organices tus transformaciones ParDo y crees tus DoFn, ten esto en cuenta:

  • El servicio de Cloud Dataflow garantiza que a cada elemento de tu PCollection de entrada lo procese una instancia de DoFn solo una vez.
  • El servicio Cloud Dataflow no garantiza cuántas veces se invocará un DoFn.
  • El servicio de Cloud Dataflow no garantiza de forma exacta cómo se agrupan los elementos distribuidos, es decir, no garantiza qué elementos (si los hay) se procesan juntos.
  • El servicio de Cloud Dataflow no garantiza la cantidad exacta de instancias de DoFn que se crearán en el transcurso de una canalización.
  • El servicio de Cloud Dataflow es tolerante a errores y puede volver a ejecutar tu código varias veces en el caso de problemas de trabajadores. El servicio de Cloud Dataflow puede crear copias de seguridad de tu código y puede tener problemas con los efectos secundarios manuales (por ejemplo, si tu código crea archivos temporales con nombres no únicos o se basa en ellos).
  • El servicio Cloud Dataflow serializa el procesamiento de elementos por instancia de DoFn. Tu código no necesita ser seguro para subprocesos de forma estricta; sin embargo, cualquier estado compartido entre varias instancias de DoFn debe ser seguro para subprocesos.

Consulta la sección sobre los requisitos para las funciones que proporciona el usuario en la documentación del modelo de programación a fin de obtener más información sobre cómo crear tu código de usuario.

Manejo de errores y excepciones

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o indicadores nulos durante el procesamiento.

Cloud Dataflow procesa los elementos en paquetes arbitrarios y reintenta el paquete completo cuando se produce un error en cualquier elemento de ese paquete. Cuando se ejecuta en modo por lotes, los paquetes que incluyen un elemento defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará por completo. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintenta de forma indefinida, lo que puede hacer que la canalización se paralice de manera permanente.

Optimización de fusiones

Una vez que se valida el formulario JSON del grafo de ejecución de tu canalización, el servicio de Cloud Dataflow puede modificar el grafo para realizar optimizaciones. Estas optimizaciones pueden incluir la fusión de varios pasos o transformaciones en el grafo de ejecución de tu canalización en paso únicos. La fusión de pasos evita que el servicio de Cloud Dataflow tenga que materializar cada PCollection intermedia en tu canalización, lo cual sería costoso en términos de sobrecarga de procesamiento y memoria.

Si bien todas las transformaciones que especificas en la construcción de tu canalización se ejecutan en el servicio, pueden ejecutarse en un orden diferente o como parte de una transformación fusionada más grande para garantizar la ejecución más eficiente de tu canalización. El servicio de Cloud Dataflow respeta las dependencias de datos entre los pasos en el grafo de ejecución, pero los demás pasos se pueden ejecutar en cualquier orden.

Ejemplo de fusión

En el siguiente diagrama, se muestra cómo el servicio de Cloud Dataflow puede optimizar y fusionar el grafo de ejecución del ejemplo de WordCount incluido con el SDK de Apache Beam para Java a fin de lograr una ejecución eficiente:

El grafo de ejecución para el programa de ejemplo de WordCount después de la optimización y la fusión de pasos del servicio de Cloud Dataflow.
Figura 2: Grafo de ejecución optimizado del ejemplo de WordCount

Evita la fusión

Hay algunos casos en tu canalización en los que quizás desees evitar que el servicio de Cloud Dataflow realice optimizaciones de fusión. Estos son casos en los que el servicio de Cloud Dataflow podría definir de forma incorrecta la forma óptima de fusionar las operaciones de la canalización, lo que podría limitar la capacidad del servicio de Cloud Dataflow para hacer uso de todos los trabajadores disponibles.

Por ejemplo, un caso en el que la fusión puede limitar la capacidad de Cloud Dataflow para optimizar el uso de los trabajadores es una ParDo con “fan-out alto” En esta operación, podrías tener una colección de entrada con pocos elementos, pero la ParDo genera una salida que tiene cientos o miles de veces esa cantidad, seguida de otra ParDo. Si el servicio de Cloud Dataflow fusiona estas operaciones ParDo, el paralelismo en este paso se limita como máximo a la cantidad de elementos de la colección de entrada, a pesar de que la PCollection intermedia contiene muchos más.

Puedes evitar esta fusión si agregas una operación a tu canalización que obligue al servicio de Cloud Dataflow a materializar tu PCollection intermedia. Considera usar una de las siguientes operaciones:

  • Puedes insertar un GroupByKey y desagrupar después del primer ParDo. El servicio de Cloud Dataflow nunca fusiona operaciones ParDo en una agregación.
  • Puedes pasar tu PCollection intermedia como una entrada complementaria a otra ParDo. El servicio de Cloud Dataflow siempre materializa las entradas complementarias.

Optimización de combinaciones

Las operaciones de agregación son un concepto importante en el procesamiento de datos a gran escala. La agregación reúne datos que están muy separados en lo conceptual, por lo que es muy útil para la correlación. El modelo de programación de Cloud Dataflow representa las operaciones de agregación como las transformaciones GroupByKey, CoGroupByKey y Combine.

Las operaciones de agregación de Cloud Dataflow combinan datos en todo el conjunto de datos, incluidos los datos que se pueden distribuir entre varios trabajadores. Durante estas operaciones de agregación, a menudo es más eficiente combinar la mayor cantidad de datos posible de forma local antes de combinarlos entre instancias. Cuando aplicas una GroupByKey o alguna otra transformación de agregación, el servicio de Cloud Dataflow realiza de forma automática una combinación parcial local antes de la operación de agrupación principal.

Cuando realizas una combinación parcial o multinivel, el servicio de Cloud Dataflow toma diferentes decisiones según si tu canalización trabaja con datos por lotes o de transmisión. Para datos limitados, el servicio favorece la eficiencia y realizará toda la combinación local posible. Para datos ilimitados, el servicio favorece una latencia más baja y puede que no realice una combinación parcial (ya que puede aumentar la latencia).

Características de ajuste automático

El servicio de Cloud Dataflow contiene varias funciones de ajuste automático que pueden optimizar aún más tu trabajo de Cloud Dataflow de forma dinámica mientras se ejecuta. Estas características incluyen el Ajuste de escala automático y el Rebalanceo dinámico de trabajos.

Ajuste de escala automático

Cuando se habilita el ajuste de escala automático, el servicio de Cloud Dataflow elige de forma automática la cantidad apropiada de instancias de trabajador que se necesitan para ejecutar tu trabajo. El servicio de Cloud Dataflow también puede reasignar de forma dinámica más o menos trabajadores durante el entorno de ejecución para tener en cuenta las características de tu trabajo. Ciertas partes de tu canalización pueden ser más pesadas que otras en términos de procesamiento y el servicio de Cloud Dataflow puede generar de forma automática trabajadores adicionales durante estas fases de tu trabajo (y cerrarlos cuando ya no sean necesarios).

Java: SDK 2.x

El ajuste de escala automático está habilitado de forma predeterminada en todos los trabajos por lotes de Cloud Dataflow. Puedes inhabilitar el ajuste de escala automático si especificas de forma explícita la opción --autoscalingAlgorithm=NONE cuando ejecutas tu canalización; si lo haces, ten en cuenta que el servicio de Cloud Dataflow establece el número de trabajadores según la opción --numWorkers, cuyo valor predeterminado es 3.

Si tu trabajo de Cloud Dataflow usa una versión anterior del SDK, puedes habilitar el ajuste de escala automático si especificas la opción --autoscalingAlgorithm=THROUGHPUT_BASED cuando ejecutas la canalización.

Python

El ajuste de escala automático se habilita de forma predeterminada en todos los trabajos por lotes de Cloud Dataflow creados con la versión 0.5.1 o superior del SDK de Apache Beam para Python. Puedes inhabilitar el ajuste de escala automático si especificas de forma explícita la opción --autoscaling_algorithm=NONE cuando ejecutas tu canalización; si lo haces, ten en cuenta que el servicio de Cloud Dataflow establece la cantidad de trabajadores según la opción --num_workers, cuyo valor predeterminado es 3.

Si tu trabajo de Cloud Dataflow usa una versión anterior del SDK, puedes habilitar el ajuste de escala automático si especificas la opción --autoscaling_algorithm=THROUGHPUT_BASED cuando ejecutas la canalización.

Java: SDK 1.x

El ajuste de escala automático se habilita de forma predeterminada en todos los trabajos por lotes de Cloud Dataflow creados con la versión 1.6.0 o superior del SDK de Cloud Dataflow para Java. Puedes inhabilitar el ajuste de escala automático si especificas de forma explícita la opción --autoscalingAlgorithm=NONE cuando ejecutas tu canalización; si lo haces, ten en cuenta que el servicio de Cloud Dataflow establece la cantidad de trabajadores según la opción --numWorkers, cuyo valor predeterminado es 3.

Si tu trabajo de Cloud Dataflow usa una versión anterior del SDK, puedes habilitar el ajuste de escala automático si especificas la opción --autoscalingAlgorithm=THROUGHPUT_BASED cuando ejecutas la canalización.

Ajuste de escala automático por lotes

Para los datos limitados en el modo de procesamiento por lotes, Cloud Dataflow elige de forma automática el número de trabajadores según la cantidad de trabajo en cada etapa de tu canalización y la capacidad de procesamiento actual en esa etapa.

Si tu canalización usa una fuente de datos personalizada, existen algunos métodos que puedes implementar para brindar más información al algoritmo de ajuste de escala automático del servicio de Cloud Dataflow y mejorar el rendimiento:

Java: SDK 2.x

  • En tu subclase BoundedSource, implementa el método getEstimatedSizeBytes. El servicio de Cloud Dataflow usa getEstimatedSizeBytes cuando calcula el número inicial de trabajadores que se usarán en tu canalización.
  • En tu subclase BoundedReader, implementa el método getFractionConsumed. El servicio de Cloud Dataflow usa getFractionConsumed para realizar un seguimiento del progreso de la lectura y converger en la cantidad correcta de trabajadores que deben usarse durante una lectura.

Python

  • En tu subclase BoundedSource, implementa el método estimate_size. El servicio de Cloud Dataflow usa estimate_size cuando calcula el número inicial de trabajadores que se usarán en tu canalización.
  • En tu subclase RangeTracker, implementa el método fraction_consumed. El servicio de Cloud Dataflow usa fraction_consumed para realizar un seguimiento del progreso de la lectura y converger en la cantidad correcta de trabajadores que deben usarse durante una lectura.

Java: SDK 1.x

  • En tu subclase BoundedSource, implementa el método getEstimatedSizeBytes. El servicio de Cloud Dataflow usa getEstimatedSizeBytes cuando calcula el número inicial de trabajadores que se usarán en tu canalización.
  • En tu subclase BoundedReader, implementa el método getFractionConsumed. El servicio de Cloud Dataflow usa getFractionConsumed para realizar un seguimiento del progreso de la lectura y converger en la cantidad correcta de trabajadores que deben usarse durante una lectura.

Ajuste de escala automático de transmisión

El ajuste de escala automático de transmisión permite que el servicio de Cloud Dataflow cambie de forma adaptativa la cantidad de trabajadores que se usan para ejecutar tu canalización de transmisión en respuesta a los cambios en la carga y el uso de recursos. El ajuste de escala automático de transmisión es una característica gratuita diseñada para reducir los costos de los recursos que se usan cuando se ejecutan canalizaciones de transmisión.

Sin el ajuste de escala automático, eliges una cantidad fija de trabajadores mediante la especificación de numWorkers o num_workers para ejecutar la canalización. Como la carga de trabajo de entrada varía con el tiempo, este número puede ser demasiado alto o demasiado bajo. Aprovisionar demasiados trabajadores da como resultado un costo adicional innecesario, mientras que aprovisionar muy pocos causa una mayor latencia para los datos procesados. Si se habilita el ajuste de escala automático, los recursos se usan solo cuando son necesarios.

Para tomar decisiones sobre el escalamiento, el ajuste de escala automático se basa en varias señales que evalúan qué tan ocupados están los trabajadores y si pueden manejar bien la transmisión de entrada. Las señales clave incluyen el uso de CPU, la capacidad de procesamiento y el trabajo pendiente. El objetivo es minimizar el trabajo pendiente y maximizar el uso y la capacidad de procesamiento de los trabajadores, además de reaccionar con rapidez a los picos de carga. Cuando habilitas el ajuste de escala automático, no tienes que elegir entre el aprovisionamiento para la carga máxima y los resultados rápidos. Los trabajadores se agregan a medida que aumentan el uso de CPU y el trabajo pendiente, y se quitan cuando disminuyen. De esta manera, solo pagas por lo que necesitas y el trabajo se procesa de la manera más eficiente posible.

Java: SDK 2.x

Fuentes ilimitadas personalizadas

Si tu canalización usa una fuente ilimitada personalizada, la fuente debe informar al servicio de Cloud Dataflow sobre el trabajo pendiente. El trabajo pendiente es una estimación de la entrada en bytes que la fuente aún no procesó. Para informar al servicio sobre el trabajo pendiente, implementa cualquiera de los siguientes métodos en tu clase UnboundedReader.

  • getSplitBacklogBytes(): trabajo pendiente para la división actual de la fuente. El servicio agrega el trabajo pendiente en todas las divisiones.
  • getTotalBacklogBytes(): el trabajo pendiente global en todas las divisiones. En algunos casos, el trabajo pendiente no está disponible para cada división y solo se puede calcular el total de todas las divisiones. Solo la primera división (ID de división “0”) debe proporcionar su trabajo pendiente total.
El repositorio de Apache Beam contiene varios ejemplos de fuentes personalizadas que implementan la clase UnboundedReader.
Habilita el ajuste de escala automático de transmisión

Para habilitar el ajuste de escala automático, configura los siguientes parámetros de ejecución cuando inicias tu canalización:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=N

El ajuste de escala automático puede oscilar entre N/15 y N trabajadores durante la ejecución de una canalización, en la que N es el valor de --maxNumWorkers. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --maxNumWorkers=15 y la canalización escala de manera automática entre 1 y 15 trabajadores.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --maxNumWorkers. Ten esto en cuenta cuando especificas --maxNumWorkers y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --maxNumWorkers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --maxNumWorkers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Python

Habilita el ajuste de escala automático de transmisión

Para habilitar el ajuste de escala automático, configura los siguientes parámetros de ejecución cuando inicias tu canalización:

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

El ajuste de escala automático puede oscilar entre N/15 y N trabajadores durante la ejecución de una canalización, en la que N es el valor de max_num_workers. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --max_num_workers=15 y la canalización escala de manera automática entre 1 y 15 trabajadores.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --max_num_workers. Ten esto en cuenta cuando especificas --max_num_workers y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --max_num_workers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --max_num_workers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Java: SDK 1.x

Si tu canalización usa una fuente ilimitada personalizada, es esencial que la fuente informe al servicio de Cloud Dataflow sobre el trabajo pendiente. El trabajo pendiente es una estimación de la entrada en bytes que la fuente aún no procesó. Para informar al servicio sobre el trabajo pendiente, implementa uno de los siguientes métodos en tu clase UnboundedReader:

  • getSplitBacklogBytes(): trabajo pendiente para la división actual de la fuente. El servicio agrega el trabajo pendiente en todas las divisiones.
  • getTotalBacklogBytes(): el trabajo pendiente global en todas las divisiones. En algunos casos, el trabajo pendiente no está disponible para cada división y solo se puede calcular el total de todas las divisiones. Solo la primera división (ID de división “0”) debe proporcionar su trabajo pendiente total.
Habilita el ajuste de escala automático de transmisión

Para habilitar el ajuste de escala automático, configura los siguientes parámetros de ejecución cuando inicias tu canalización:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=<N>

El ajuste de escala automático puede oscilar entre N/15 y N trabajadores durante la ejecución de una canalización, en la que N es el valor de --maxNumWorkers. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes configurar --maxNumWorkers=15, y la canalización se escalará de forma automática entre 1 y 15 trabajadores.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --maxNumWorkers. Ten esto en cuenta cuando especificas --maxNumWorkers y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

En la actualidad, PubsubIO es la única fuente que admite el ajuste de escala automático en las canalizaciones de transmisión. Todos los receptores que proporcionan los SDK son compatibles. En esta versión Beta, el Ajuste de escala automático funciona mejor cuando se realizan operaciones de lectura desde suscripciones a Cloud Pub/Sub vinculadas a temas publicados con lotes pequeños y cuando se escribe en receptores con baja latencia. En casos extremos (es decir, suscripciones a Cloud Pub/Sub con lotes de publicación grandes o receptores con una latencia muy alta), el ajuste de escala automático pierde especificidad. Esto se mejorará en versiones futuras.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --maxNumWorkers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --maxNumWorkers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Escala de forma manual una canalización de transmisión

Hasta que el ajuste de escala automático esté disponible de forma general en el modo de transmisión, puedes usar una solución alternativa para escalar de forma manual la cantidad de trabajadores que ejecutan tu canalización de transmisión mediante la característica Actualizar de Cloud Dataflow.

Java: SDK 2.x

Para escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --maxNumWorkers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --numWorkers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar un nuevo número de trabajadores con el parámetro --numWorkers. El valor que establezcas para el --numWorkers nuevo debe estar entre N y --maxNumWorkers, en el que N es igual a --maxNumWorkers/15.

La actualización de la canalización reemplaza tu trabajo en ejecución por uno nuevo, con el nuevo número de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Python

Para escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --max_num_workers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --num_workers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar un nuevo número de trabajadores con el parámetro --num_workers. El valor que establezcas para el --num_workers nuevo debe estar entre N y --max_num_workers, en el que N es igual a --max_num_workers/15.

La actualización de la canalización reemplaza tu trabajo en ejecución por uno nuevo, con el nuevo número de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Java: SDK 1.x

Si sabes que tendrás que escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --maxNumWorkers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --numWorkers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar un nuevo número de trabajadores con el parámetro --numWorkers. El valor que establezcas para el --numWorkers nuevo debe estar entre N y --maxNumWorkers, en el que N es igual a --maxNumWorkers/15.

La actualización reemplaza tu trabajo en ejecución por uno nuevo, con el nuevo número de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Rebalanceo dinámico del trabajo

La característica de Rebalanceo dinámico de trabajos del servicio de Cloud Dataflow permite que el servicio vuelva a particionar de forma dinámica los trabajos en función de las condiciones del entorno de ejecución. Las siguientes son algunas de esas condiciones:

  • Desbalances en asignaciones de trabajos
  • Trabajadores que tardan más de lo esperado en terminar
  • Trabajadores que terminan más rápido de lo esperado

El servicio de Cloud Dataflow detecta estas condiciones de forma automática y puede reasignar el trabajo de manera dinámica a los trabajadores sin usar o con poco uso para disminuir el tiempo de procesamiento general de tu trabajo.

Limitaciones

El rebalanceo dinámico de trabajos solo ocurre cuando el servicio de Cloud Dataflow procesa algunos datos de entrada en paralelo: cuando lee datos de una fuente de entrada externa, cuando trabaja con una PCollection materializada intermedia o cuando trabaja con el resultado de una agregación como GroupByKey. Si se fusionan una gran cantidad de pasos en tu trabajo, hay menos PCollection intermedias y el Rebalanceo dinámico de trabajos se limitará a la cantidad de elementos en la fuente de PCollection materializada. Si quieres asegurarte de que el Rebalanceo dinámico de trabajos se pueda aplicar a una PCollection en particular en tu canalización, puedes evitar la fusión de diferentes maneras para garantizar un paralelismo dinámico.

El Rebalanceo dinámico de trabajos no puede volver a paralelizar datos de forma más precisa que un solo registro. Si tus datos contienen registros individuales que causan grandes demoras en el tiempo de procesamiento, aún pueden demorar tu trabajo, ya que Cloud Dataflow no puede subdividir y redistribuir un registro individual “activo” a varios trabajadores.

Java: SDK 2.x

Si estableces un número fijo de fragmentos para el resultado final de tu canalización (por ejemplo, si escribes datos con TextIO.Write.withNumShards), Cloud Dataflow limitará la paralelización según el número de fragmentos que eliges.

Python

Si estableces un número fijo de fragmentos para el resultado final de tu canalización (por ejemplo, si escribes datos con beam.io.WriteToText(..., num_shards=...)), Cloud Dataflow limitará la paralelización según el número de fragmentos que eliges.

Java: SDK 1.x

Si estableces un número fijo de fragmentos para el resultado final de tu canalización (por ejemplo, si escribes datos con TextIO.Write.withNumShards), Cloud Dataflow limitará la paralelización según el número de fragmentos que eliges.

La limitación de fragmentos fijos se puede considerar temporal y puede estar sujeta a cambios en versiones futuras del servicio de Cloud Dataflow.

Trabaja con fuentes de datos personalizadas

Java: SDK 2.x

Si tu canalización usa una fuente de datos personalizada que proporcionaste, debes implementar el método splitAtFraction para permitir que tu fuente funcione con la característica de Rebalanceo dinámico de trabajos.

Python

Si tu canalización usa una fuente de datos personalizada que proporcionaste, tu RangeTracker debe implementar try_claim, try_split, position_at_fraction y fraction_consumed para permitir que tu fuente funcione con la característica de Rebalanceo dinámico de trabajos.

Consulta la referencia de la API sobre RangeTracker para obtener más información.

Java: SDK 1.x

Si tu canalización usa una fuente de datos personalizada que proporcionaste, debes implementar el método splitAtFraction para permitir que tu fuente funcione con la característica de Rebalanceo dinámico de trabajos.

Uso y administración de recursos

El servicio de Cloud Dataflow administra por completo los recursos de GCP en función de cada trabajo. Esto incluye activar y desactivar instancias de Compute Engine (también llamadas trabajadores o VM) y acceder a los depósitos de Cloud Storage de tu proyecto para la E/S y la etapa de pruebas de archivos temporales. Sin embargo, si tu canalización interactúa con tecnologías de almacenamiento de datos de GCP como BigQuery y Cloud Pub/Sub, debes administrar los recursos y la cuota para esos servicios.

Cloud Dataflow usa una ubicación que proporciona el usuario en Cloud Storage de forma específica para habilitar por etapas archivos. Esta ubicación está bajo tu control y debes asegurarte de que su vida útil se mantenga siempre que algún trabajo realice operaciones de lectura en ella. Puedes reusar la misma ubicación de etapa de pruebas para múltiples ejecuciones de trabajos, ya que el almacenamiento en caché integrado del SDK puede acelerar el tiempo de inicio de tus trabajos.

Trabajos

Puedes ejecutar hasta 25 trabajos simultáneos de Cloud Dataflow por proyecto de GCP.

El servicio de Cloud Dataflow en la actualidad se limita a procesar solicitudes de trabajos JSON que tengan un tamaño de 20 MB o menos. El tamaño de la solicitud de trabajo está vinculado de manera específica a la representación JSON de tu canalización; una canalización más grande implica una solicitud más grande.

Ejecuta tu canalización con la siguiente opción para estimar el tamaño de su solicitud JSON:

Java: SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

Java: SDK 1.x

--dataflowJobFile=< path to output file >

Este comando escribe una representación JSON de tu trabajo en un archivo. El tamaño del archivo serializado es una estimación apropiada del tamaño de la solicitud; el tamaño real será un poco mayor debido a que se incluye información adicional en la solicitud.

Para obtener más información, consulta la página de solución de problemas para “413 Entidad de solicitud demasiado grande” / “El tamaño de la representación JSON serializada de la canalización excede el límite permitido”.

Además, el tamaño del grafo de tu trabajo no debe superar los 10 MB. Para obtener más información, consulta la página sobre “El grafo del trabajo es demasiado grande. Vuelve a intentarlo con un grafo de trabajo más pequeño, o divide tu trabajo en dos o más trabajos más pequeños”.

Trabajadores

El servicio de Cloud Dataflow en la actualidad permite un máximo de 1,000 instancias de Compute Engine por trabajo. El tipo de máquina predeterminado es n1-standard-1 para un trabajo por lotes y n1-standard-4 para la transmisión; cuando se usan los tipos de máquina predeterminados, el servicio Cloud Dataflow puede asignar hasta 4,000 núcleos por trabajo.

Cloud Dataflow admite trabajadores de la serie n1 así como tipos personalizados de máquinas. Puedes especificar un tipo de máquina para tu canalización si configuras el parámetro de ejecución apropiado cuando se crea la canalización.

Java: SDK 2.x

Para cambiar el tipo de máquina, establece la opción --workerMachineType.

Python

Para cambiar el tipo de máquina, establece la opción --worker_machine_type.

Java: SDK 1.x

Para cambiar el tipo de máquina, establece la opción --workerMachineType.

Cuota de recursos

El servicio de Cloud Dataflow verifica que tu proyecto de GCP tenga la cuota de recursos de Compute Engine necesaria a fin de ejecutar tu trabajo, ya sea para iniciarlo o escalar al número máximo de instancias de trabajadores. Tu trabajo no se iniciará si no hay suficiente cuota de recursos disponible.

La característica Ajuste de escala automático de Cloud Dataflow tiene límites que impone la cuota de Compute Engine disponible para tu proyecto. Si tu trabajo tiene una cuota suficiente cuando comienza, pero otro trabajo usa el resto de la cuota disponible de tu proyecto, el primer trabajo se ejecutará, pero no podrá escalar por completo.

Sin embargo, el servicio de Cloud Dataflow no administra los aumentos de cuota para trabajos que excedan las cuotas de recursos en tu proyecto. Tienes la responsabilidad de realizar las solicitudes necesarias a fin de obtener cuotas de recursos adicionales, para lo cual puedes usar Google Cloud Platform Console.

Recursos de discos persistentes

El servicio de Cloud Dataflow en la actualidad está limitado a 15 discos persistentes por instancia de trabajador cuando se ejecuta un trabajo de transmisión. Cada disco persistente pertenece a una máquina virtual de Compute Engine individual. Tu trabajo no puede tener más trabajadores que discos persistentes; la asignación de recursos mínima es una proporción de 1:1 entre trabajadores y discos.

El tamaño predeterminado de cada disco persistente es de 250 GB en modo por lotes y 400 GB en modo de transmisión.

Ubicaciones

De forma predeterminada, el servicio de Cloud Dataflow implementa recursos de Compute Engine en la zona us-central1-f de la región us-central1. Puedes anular esta configuración si especificas el parámetro --region. Si necesitas usar una zona específica para tus recursos, usa el parámetro --zone cuando creas tu canalización. Sin embargo, recomendamos que solo especifiques la región y dejes la zona sin especificar. Esto permite que el servicio de Cloud Dataflow seleccione de forma automática la mejor zona dentro de la región, en función de la capacidad de zona disponible en el momento de la solicitud de creación de trabajo. Para obtener más información, consulta la documentación de extremos regionales.

Streaming Engine

En la actualidad, el ejecutor de canalizaciones de Cloud Dataflow ejecuta todos los pasos de tu canalización de transmisión en las máquinas virtuales de trabajador y consume la CPU, la memoria y el almacenamiento en el disco persistente del trabajador. Streaming Engine de Cloud Dataflow traslada la ejecución de la canalización fuera de las VM de trabajador hacia el backend del servicio de Cloud Dataflow.

Beneficios de Streaming Engine

El modelo de Streaming Engine tiene los siguientes beneficios:

  • Una reducción en los recursos consumidos de CPU, memoria y almacenamiento en el disco persistente en las VM de trabajador. Streaming Engine funciona mejor con tipos de máquinas de trabajador más pequeñas (n1-standard-2 en lugar de n1-standard-4) y no requiere un disco persistente además de un disco de arranque pequeño, lo que genera un menor consumo de recursos y cuotas.
  • Ajuste de escala automático más sensible en la respuesta a las variaciones en el volumen de datos entrantes. Streaming Engine ofrece un escalamiento más uniforme y detallado de los trabajadores.
  • Compatibilidad mejorada, ya que no es necesario volver a implementar las canalizaciones para aplicar las actualizaciones del servicio.

La mayor parte de la reducción en los recursos de los trabajadores proviene de derivar el trabajo al servicio de Cloud Dataflow. Por esa razón, hay un costo asociado con el uso de Streaming Engine. Sin embargo, se espera que la factura total de las canalizaciones de Cloud Dataflow que usan Streaming Engine sea aproximadamente la misma en comparación con el costo total de las canalizaciones de Cloud Dataflow que no usan esta opción.

Usa Streaming Engine

En la actualidad, Streaming Engine está disponible para la transmisión de canalizaciones en las siguientes regiones. Estará disponible en otras regiones más adelante.

  • us-central1 (Iowa)
  • us-west1 (Oregón)
  • europe-west1 (Bélgica)
  • europe-west4 (Países Bajos)
  • asia-east1 (Taiwán)
  • asia-northeast1 (Tokio)

Java: SDK 2.x

Para usar Streaming Engine en tus canales de transmisión, especifica el siguiente parámetro:

  • --enableStreamingEngine si usas el SDK de Apache Beam para la versión 2.11.0 o posterior de Java.
  • --experiments=enable_streaming_engine si usas el SDK de Apache Beam para la versión 2.10.0 de Java.

Si usas Cloud Dataflow Streaming Engine para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Streaming Engine esté disponible en la actualidad. Cloud Dataflow selecciona de manera automática la zona en la región que especificaste. Si especificas el parámetro --zone y lo estableces en una zona fuera de las regiones disponibles, Cloud Dataflow informa un error.

Streaming Engine funciona mejor con tipos de máquina de trabajador más pequeñas, por lo que le recomendamos que configures --workerMachineType=n1-standard-2. También puedes establecer --diskSizeGb=30 porque Streaming Engine solo necesita espacio para la imagen de arranque del trabajador y los registros locales. Estos valores son los predeterminados.

Python

Para usar Streaming Engine en tus canales de transmisión, especifica el siguiente parámetro:

--enable_streaming_engine

Si usas Cloud Dataflow Streaming Engine para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Streaming Engine esté disponible en la actualidad. Cloud Dataflow selecciona de manera automática la zona en la región que especificaste. Si especificas el parámetro --zone y lo estableces en una zona fuera de las regiones disponibles, Cloud Dataflow informa un error.

Streaming Engine funciona mejor con tipos de máquina de trabajador más pequeñas, por lo que recomendamos que configures --machine_type=n1-standard-2. También puedes establecer --disk_size_gb=30 porque Streaming Engine solo necesita espacio para la imagen de arranque del trabajador y los registros locales. Estos valores son los predeterminados.

Java: SDK 1.x

Streaming Engine no es compatible con el SDK de Cloud Dataflow para la versión 1.x de Java. Si deseas usar esta función, debes usar el SDK de Apache Beam para Java 2.8.0 o superior.

Cloud Dataflow Shuffle

Cloud Dataflow Shuffle es la operación base detrás de las transformaciones de Cloud Dataflow, como GroupByKey, CoGroupByKey y Combine. La operación de Cloud Dataflow Shuffle particiona y agrupa los datos por clave de forma escalable, eficiente y tolerante a errores. En la actualidad, Cloud Dataflow usa una implementación de distribución que se ejecuta por completo en las máquinas virtuales de trabajador y consume la CPU, la memoria y el almacenamiento en los discos persistentes del trabajador. La función basada en servicios de Cloud Dataflow Shuffle, disponible solo para canalizaciones por lotes, traslada la operación de redistribución fuera de las VM de trabajador hacia el backend del servicio de Cloud Dataflow.

Beneficios de Cloud Dataflow Shuffle

Cloud Dataflow Shuffle basada en servicios tiene los siguientes beneficios:

  • Tiempo de ejecución más rápido de las canalizaciones por lotes para la mayoría de los tipos de trabajo de canalización
  • Una reducción en los recursos consumidos de CPU, memoria y almacenamiento en el disco persistente en las VM de trabajador.
  • Ajuste de escala automático optimizado porque las VM ya no contienen datos de distribución y, por lo tanto, se puede reducir antes su escala
  • Mejor tolerancia a errores: una VM en mal estado que contenga datos de Cloud Dataflow Shuffle no causará que todo el trabajo falle, como sucedería si no se usara esta característica

La mayor parte de la reducción en los recursos de los trabajadores proviene de derivar el trabajo de distribución al servicio de Cloud Dataflow. Por esa razón, hay un cargo asociado con el uso de Cloud Dataflow Shuffle. Sin embargo, la factura total de las canalizaciones de Cloud Dataflow que usan la implementación de Cloud Dataflow basada en servicios debería ser menor o igual al costo total de las canalizaciones de Cloud Dataflow que no usan esta opción.

Para la mayoría de los tipos de trabajo de canalización, Cloud Dataflow Shuffle debería ejecutarse más rápido que una implementación de distribución que se ejecuta en VM de trabajador. Sin embargo, la duración puede variar de una ejecución a otra. Si ejecutas una canalización que tiene plazos importantes, recomendamos que asignes suficiente tiempo de búfer antes del plazo. Además, considera solicitar una cuota mayor para Shuffle.

Consideraciones sobre discos

Cuando usas la característica de Cloud Dataflow Shuffle basada en servicios, no necesitas adjuntar grandes discos persistentes a tus VM de trabajador. Cloud Dataflow adjunta de forma automática un pequeño disco de arranque de 25 GB. Sin embargo, debido a este tamaño de disco pequeño, hay consideraciones importantes que debes tener en cuenta cuando usas Cloud Dataflow Shuffle.

  • Una VM de trabajador usa parte de los 25 GB de espacio en disco para el sistema operativo, los archivos binarios, los registros y los contenedores. Los trabajos que usan una cantidad significativa de espacio en disco y exceden la capacidad de disco restante pueden fallar cuando usas Cloud Dataflow Shuffle.
  • Los trabajos que usan una gran cantidad de E/S de disco pueden ser lentos debido al rendimiento del disco pequeño. Para obtener más información sobre las diferencias de rendimiento entre los tamaños de disco, consulta la página sobre el rendimiento del disco persistente de Compute Engine.

Si alguna de estas consideraciones se aplica a tu trabajo, puedes usar las opciones de canalización para especificar un tamaño de disco más grande.

Usa Cloud Dataflow Shuffle

En la actualidad, Cloud Dataflow Shuffle basado en servicios está disponible en las siguientes regiones:

  • us-central1 (Iowa)
  • us-west1 (Oregón)
  • europe-west1 (Bélgica)
  • europe-west4 (Países Bajos)
  • asia-east1 (Taiwán)
  • asia-northeast1 (Tokio)

Cloud Dataflow Shuffle estará disponible en más regiones en el futuro.

Java: SDK 2.x

Para usar Cloud Dataflow Shuffle basado en servicios en tus canalizaciones por lotes, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Cloud Dataflow Shuffle para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Cloud Dataflow selecciona de manera automática la zona en la región que especificaste. Si especificas el parámetro --zone y lo estableces en una zona fuera de las regiones disponibles, Cloud Dataflow informa un error.

Python

Para usar Cloud Dataflow Shuffle basado en servicios en tus canalizaciones por lotes, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Cloud Dataflow Shuffle para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Cloud Dataflow selecciona de manera automática la zona en la región que especificaste. Si especificas el parámetro --zone y lo estableces en una zona fuera de las regiones disponibles, Cloud Dataflow informa un error.

Java: SDK 1.x

Para usar Cloud Dataflow Shuffle basado en servicios en tus canalizaciones por lotes, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Cloud Dataflow Shuffle para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Cloud Dataflow selecciona de manera automática la zona en la región que especificaste. Si especificas el parámetro --zone y lo estableces en una zona fuera de las regiones disponibles, Cloud Dataflow informa un error.

Programación de recursos flexibles de Cloud Dataflow

Cloud Dataflow FlexRS reduce los costos del procesamiento por lotes mediante técnicas de programación avanzadas, el servicio de Cloud Dataflow Shuffle y un combinación de instancias de máquinas virtuales (VM) interrumpibles y VM normales. Mediante la ejecución de VM interrumpibles y normales en paralelo, Cloud Dataflow mejora la experiencia del usuario si Compute Engine detiene instancias de VM interrumpibles durante un evento del sistema. FlexRS ayuda a garantizar que la canalización progrese y no pierda el trabajo anterior cuando Compute Engine interrumpe tus VM interrumpibles. Para obtener más información sobre FlexRS, consulta esta página sobre cómo usar la programación flexible de recursos en Cloud Dataflow.
¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.