Implementa una canalización

Después de construir y probar tu canalización de Apache Beam, puedes usar el servicio administrado de Dataflow para implementarlo y ejecutarlo. Una vez que esté en el servicio de Dataflow, su código de canalización se convertirá en un trabajo de Dataflow.

El servicio de Dataflow administra completamente los servicios de Google Cloud, como Compute Engine y Cloud Storage para ejecutar tu trabajo de Dataflow, arrancando y deshaciendo automáticamente los recursos necesarios. El servicio de Dataflow proporciona visibilidad de su trabajo a través de herramientas como la interfaz de Dataflow Monitoring y la interfaz de línea de comandos de Dataflow.

Puede controlar algunos aspectos de cómo el servicio de Dataflow ejecuta su trabajo al configurar parámetros de ejecución en su código de canalización. Por ejemplo, los parámetros de ejecución especifican si los pasos de su canalización se ejecutan en máquinas virtuales de trabajadores, en el backend del servicio de Dataflow o de manera local.

Además de administrar los recursos de Google Cloud, el servicio de Dataflow realiza y optimiza automáticamente muchos aspectos del procesamiento paralelo distribuido. Estos incluyen:

  • Paralelización y distribución. Dataflow divide automáticamente sus datos y distribuye su código de trabajador en instancias de Compute Engine para el procesamiento paralelo.
  • Optimización. Dataflow usa su código de canalización para crear un gráfico de ejecución que represente las transformaciones y PCollection de su canalización y optimiza el gráfico para lograr el rendimiento y el uso de recursos más eficientes. Dataflow también optimiza automáticamente las operaciones potencialmente costosas, como las agregaciones de datos.
  • Características de ajuste automático. El servicio de Dataflow incluye varias funciones que proporcionan ajustes sobre la marcha de la asignación de recursos y la partición de datos, como el ajuste de escala automático y el reequilibrio dinámico del trabajo. Estas funciones ayudan al servicio de Dataflow a ejecutar su trabajo de la manera más rápida y eficiente posible.

Ciclo de vida de la canalización: del código de canalización al trabajo de Dataflow

Cuando ejecuta su programa Dataflow, Dataflow crea un grafo de ejecución a partir del código que construye su objeto Pipeline, incluidas todas las transformaciones y sus funciones de procesamiento asociadas (como DoFn). Esta fase se llama Tiempo de construcción del grafo. Durante la construcción del gráfico, Dataflow comprueba varios errores y garantiza que el gráfico de tu canalización no contenga ninguna operación ilegal. El gráfico de ejecución se traduce al formato JSON y el gráfico de ejecución de JSON se transmite al extremo del servicio de Dataflow.

Nota: La construcción del grafo también ocurre cuando ejecutas tu canalización de forma local, pero el grafo no se traduce a JSON ni se transmite al servicio. En cambio, el gráfico se ejecuta de forma local en el mismo equipo en el que se inició el programa de Dataflow. Consulta la documentación sobre la configuración para la ejecución local a fin de obtener más detalles.

Luego, el servicio de Dataflow valida el grafo de ejecución de JSON. Cuando el gráfico se valida, se convierte en un trabajo en el servicio de Dataflow. Podrá ver el trabajo, el gráfico de ejecución, el estado y la información de registro mediante la interfaz de Dataflow Monitoring.

Java: SDK 2.x

El servicio de Dataflow envía una respuesta al equipo en el que ejecutó su programa de Dataflow. Esta respuesta está encapsulada en el objeto DataflowPipelineJob, que contiene el trabajo jobId de Dataflow. Puede usar jobId para supervisar, realizar un seguimiento y solucionar problemas de su trabajo mediante la interfaz de Dataflow Monitoring y la interfaz de línea de comandos de Dataflow. Consulta la referencia de la API para DataflowPipelineJob a fin de obtener más información.

Python

El servicio de Dataflow envía una respuesta al equipo en el que ejecutó su programa de Dataflow. Esta respuesta está encapsulada en el objeto DataflowPipelineResult, que contiene el trabajo job_id de Dataflow. Puede usar job_id para supervisar, realizar un seguimiento y solucionar problemas de su trabajo mediante la interfaz de Dataflow Monitoring y la interfaz de línea de comandos de Dataflow.

Java: SDK 1.x

Grafo de ejecución

Dataflow crea un gráfico de los pasos que representan tu canalización, en función de las transformaciones y los datos que usaste cuando construiste el objeto Pipeline. Este es el grafo de ejecución de la canalización.

El ejemplo de WordCount, que se incluye con los SDK de Apache Beam, contiene una serie de transformaciones a fin de leer, extraer, contar, dar formato y escribir las palabras individuales de una colección de texto, junto con un conteo de casos para cada palabra. El siguiente diagrama muestra cómo las transformaciones en la canalización de WordCount se expanden en un grafo de ejecución:

Las transformaciones en el programa de ejemplo de WordCount expandidas en un grafo de ejecución de los pasos que debe ejecutar el servicio de Cloud Dataflow.
Figura 1: Grafo de ejecución de ejemplo de WordCount

El grafo de ejecución a menudo difiere del orden en el que especificas tus transformaciones cuando construyes la canalización. Esto se debe a que el servicio de Dataflow realiza diversas optimizaciones y fusiones en el gráfico de ejecución antes de ejecutarse en recursos de nube administrados. El servicio de Dataflow respeta las dependencias de datos al ejecutar la canalización. Sin embargo, los pasos sin dependencias de datos entre ellos se pueden ejecutar en cualquier orden.

Puede ver el gráfico de ejecución no optimizado que Dataflow generó para su canalización cuando selecciona su trabajo en la interfaz de Dataflow Monitoring.

Paralelización y distribución

El servicio de Dataflow paralela y distribuye automáticamente la lógica de procesamiento en su canalización a los trabajadores que asignó para realizar su trabajo. Dataflow usa las abstracciones en el modelo de programación para representar las funciones de procesamiento paralelo. por ejemplo, las transformaciones ParDo hacen que Dataflow distribuya automáticamente su código de procesamiento (representado por DoFn) a varios trabajadores para que se ejecuten en paralelo.

Estructura tu código de usuario

Puedes pensar en tu código DoFn como pequeñas entidades independientes: podría haber muchas instancias en ejecución en diferentes máquinas, cada una sin conocimiento de las otras. Como tales, las funciones puras (funciones que no dependen de un estado oculto o externo, no tienen efectos secundarios observables y son deterministas) son código ideal para la naturaleza paralela y distribuida de los DoFn.

Sin embargo, el modelo de función puro no es estrictamente rígido. La información de estado o los datos de inicialización externa pueden ser válidos para DoFn y otros objetos de función, siempre y cuando el código no dependa de lo que el servicio de Dataflow no garantice. Cuando organices tus transformaciones ParDo y crees tus DoFn, ten esto en cuenta:

  • El servicio de Dataflow garantiza que a cada elemento de tu PCollection de entrada lo procese una instancia de DoFn solo una vez.
  • El servicio de Dataflow no garantiza cuántas veces se invocará un DoFn.
  • El servicio de Dataflow no garantiza exactamente cómo se agrupan los elementos distribuidos; es decir, no garantiza qué elementos (si los hay) se procesan juntos.
  • El servicio de Dataflow no garantiza la cantidad exacta de instancias DoFn que se crearán en el transcurso de una canalización.
  • El servicio de Dataflow es tolerante a errores y puede volver a ejecutar tu código varias veces en el caso de problemas de trabajadores. El servicio de Dataflow puede crear copias de seguridad de tu código y puede tener problemas con efectos secundarios manuales (por ejemplo, si tu código se basa en archivos temporales con nombres no únicos).
  • El servicio de Dataflow serializa el procesamiento de elementos por instancia DoFn. Tu código no necesita ser seguro para subprocesos de forma estricta; sin embargo, cualquier estado compartido entre varias instancias de DoFn debe ser seguro para subprocesos.

Consulta la sección sobre los requisitos para las funciones que proporciona el usuario en la documentación del modelo de programación a fin de obtener más información sobre cómo crear tu código de usuario.

Manejo de errores y excepciones

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o indicadores nulos durante el procesamiento.

Dataflow procesa los elementos en paquetes arbitrarios y vuelve a intentar todo el paquete cuando se produce un error en cualquier elemento de ese paquete. Cuando se ejecuta en modo por lotes, los paquetes que incluyen un elemento defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará completamente. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintenta de forma indefinida, lo que puede hacer que la canalización se paralice de manera permanente.

Optimización de fusiones

Una vez que se haya validado el formulario JSON del gráfico de ejecución de su canalización, el servicio de Dataflow puede modificar el gráfico para realizar optimizaciones. Estas optimizaciones pueden incluir la fusión de varios pasos o transformaciones en el grafo de ejecución de tu canalización en paso únicos. Los pasos de fusión impiden que el servicio de Dataflow tenga que materializar todos los intermedios PCollection de su canalización, lo que puede ser costoso en términos de memoria y sobrecarga de procesamiento.

Si bien todas las transformaciones que especificas en la construcción de tu canalización se ejecutan en el servicio, pueden ejecutarse en un orden diferente o como parte de una transformación fusionada más grande para garantizar la ejecución más eficiente de tu canalización. El servicio de Dataflow respeta las dependencias de datos entre los pasos del grafo de ejecución, pero los pasos se pueden ejecutar en cualquier orden.

Ejemplo de fusión

El siguiente diagrama muestra cómo el grafo de ejecución del ejemplo de WordCount incluido con el SDK de Apache Beam para Java podría optimizarse y fusionarse mediante el servicio de Dataflow para una ejecución eficiente:

El grafo de ejecución para el programa de ejemplo de WordCount después de la optimización y la fusión de pasos del servicio de Cloud Dataflow.
Figura 2: Grafo de ejecución optimizado del ejemplo de WordCount

Evita la fusión

Hay algunos casos en su canalización en los que es posible que desee evitar que el servicio de Dataflow realice optimizaciones de fusión. Estos son casos en los que el servicio de Dataflow podría definir incorrectamente la forma óptima de fusionar operaciones en la canalización, lo que podría limitar la capacidad del servicio de Dataflow de utilizar todos los trabajadores disponibles.

Por ejemplo, un caso en el que la fusión puede limitar la capacidad de Cloud Dataflow para optimizar el uso de los trabajadores es una ParDo con “fan-out alto” En esta operación, podrías tener una colección de entrada con pocos elementos, pero la ParDo genera una salida que tiene cientos o miles de veces esa cantidad, seguida de otra ParDo. Si el servicio de Dataflow fusiona estas operaciones ParDo, el paralelismo en este paso se limita a la cantidad máxima de elementos en la colección de entrada, aunque el PCollection intermedio contenga muchos más elementos.

Para evitar esa fusión, agregue una operación a su canalización que obligue al servicio de Dataflow a materializar su PCollection intermedio. Considera usar una de las siguientes operaciones:

  • Puedes insertar un GroupByKey y desagrupar después del primer ParDo. El servicio de Dataflow nunca fusiona las operaciones ParDo en una agregación.
  • Puedes pasar tu PCollection intermedia como una entrada complementaria a otra ParDo. El servicio de Dataflow siempre materializa las entradas complementarias.

Optimización de combinaciones

Las operaciones de agregación son un concepto importante en el procesamiento de datos a gran escala. La agregación reúne datos que están muy separados en lo conceptual, por lo que es muy útil para la correlación. El modelo de programación de Dataflow representa las operaciones de agregación como las transformaciones GroupByKey, CoGroupByKey y Combine.

Las operaciones de agregación de Dataflow combinan datos de todo el conjunto de datos, incluidos los datos que pueden estar repartidos entre varios trabajadores. Durante estas operaciones de agregación, a menudo es más eficiente combinar la mayor cantidad de datos posible de forma local antes de combinarlos entre instancias. Cuando aplica una GroupByKey u otra transformación de agregación, el servicio de Dataflow realiza automáticamente la combinación parcial de forma local antes de la operación de agrupación principal.

Cuando se realiza la combinación parcial o multinivel, el servicio de Dataflow toma decisiones diferentes en función de si su canalización funciona con datos de transmisión o por lotes. Para datos limitados, el servicio favorece la eficiencia y realizará toda la combinación local posible. Para datos ilimitados, el servicio favorece una latencia más baja y puede que no realice una combinación parcial (ya que puede aumentar la latencia).

Características de ajuste automático

El servicio de Dataflow contiene varias funciones de ajuste automático que pueden optimizar de manera dinámica su trabajo de Dataflow mientras se ejecuta. Estas características incluyen el Ajuste de escala automático y el Rebalanceo dinámico de trabajos.

Autoscaling

Con el ajuste de escala automático habilitado, el servicio de Dataflow elige automáticamente la cantidad adecuada de instancias de trabajador necesarias para ejecutar su trabajo. El servicio de Dataflow también puede reasignar de forma dinámica más o menos trabajadores durante el tiempo de ejecución para tener en cuenta las características de tu trabajo. Ciertas partes de tu canalización pueden ser más pesadas que otras en términos de procesamiento y el servicio de Dataflow puede generar de forma automática trabajadores adicionales durante estas fases de tu trabajo (y cerrarlos cuando ya no sean necesarios).

Java: SDK 2.x

El ajuste de escala automático se habilita de forma predeterminada en todos los trabajos de transmisión y por lotes de Dataflow mediante Streaming Engine. Puedes inhabilitar el ajuste de escala automático si especificas la marca --autoscalingAlgorithm=NONE cuando ejecutas la canalización. Si es así, tenga en cuenta que el servicio de Dataflow establece la cantidad de trabajadores según la opción --numWorkers, que se establece de forma predeterminada en 3.

Con el ajuste de escala automático habilitado, el servicio de Dataflow no permite que el usuario controle la cantidad exacta de instancias de trabajadores asignadas a su trabajo. Aún puede limitar la cantidad de trabajadores al especificar la opción --maxNumWorkers cuando ejecuta su canalización.

Para los trabajos por lotes, la marca --maxNumWorkers es opcional. El predeterminado es 1000. Para los trabajos de transmisión con Streaming Engine, la marca --maxNumWorkers es opcional. El valor predeterminado es 100. Para los trabajos de transmisión que no usan Streaming Engine, la marca --maxNumWorkers es obligatoria.

Python

El ajuste de escala automático está habilitado de forma predeterminada en todos los trabajos por lotes de Dataflow creados con el SDK de Apache Beam para Python, versión 0.5.1 o superior. Puedes inhabilitar el ajuste de escala automático si especificas la marca --autoscaling_algorithm=NONE cuando ejecutas la canalización. Si es así, tenga en cuenta que el servicio de Dataflow establece la cantidad de trabajadores según la opción --num_workers, que se establece de forma predeterminada en 3.

Java: SDK 1.x

Dataflow se basa en el paralelismo de una canalización. El paralelismo de una canalización es una estimación de la cantidad de subprocesos necesarios para procesar los datos de manera más eficiente en un momento determinado.

El paralelismo se calcula cada pocos minutos, a menos que el ancho de banda de un servicio externo sea demasiado bajo. Cuando el paralelismo aumenta, Dataflow escala y agrega trabajadores. Cuando el paralelismo disminuye, Dataflow reduce la escala y quita trabajadores.

En la siguiente tabla, se resume cuándo el ajuste de escala automático aumenta o disminuye la cantidad de trabajadores en canalizaciones por lotes y transmisión:

Canalizaciones por lotes Canalizaciones de transmisión
Cómo escalar de forma vertical

Si el trabajo restante lleva más tiempo que generar trabajadores nuevos y los trabajadores actuales están utilizando, en promedio, más del 5% de sus CPU, Dataflow puede escalar.

Las fuentes con lo siguiente pueden limitar la cantidad de trabajadores nuevos: una pequeña cantidad de datos, datos no dividibles (como archivos comprimidos) y datos procesados por módulos de I/O que no dividen datos.

Los receptores configurados para escribir en una cantidad fija de fragmentos, como un destino de Cloud Storage que escribe en archivos existentes, pueden limitar la cantidad de trabajadores nuevos.

Si una canalización de transmisión está atrasada y los trabajadores están utilizando, en promedio, más del 20% de sus CPU, Dataflow puede escalar. Los retrasos se borran en aproximadamente 150 segundos, dado el rendimiento actual por trabajador.

Cómo reducir el escalamiento

Si el trabajo restante toma menos tiempo que la creación de nuevos trabajadores y los trabajadores actuales están utilizando, en promedio, más del 5% de sus CPU, Dataflow puede reducirse.

Si un backlog de canalización de transmisión es inferior a 20 segundos y los trabajadores están utilizando, en promedio, menos del 80% de las CPU, Dataflow puede reducirse. Después de reducir la escala, la nueva cantidad de trabajadores utiliza, en promedio, menos del 75% de sus CPU.

Sin ajuste de escala automático

Si la I/O demora más que el procesamiento de datos o los trabajadores están utilizando, en promedio, menos del 5% de sus CPU, el paralelismo no se vuelve a calcular.

Si los trabajadores están utilizando, en promedio, menos del 20% de su CPU, el paralelismo no se vuelve a calcular.

Ajuste de escala automático por lotes

Para las canalizaciones por lotes, Dataflow elige automáticamente la cantidad de trabajadores según la cantidad de trabajo en cada etapa de su canalización y el rendimiento actual en esa etapa. Dataflow determina la cantidad de datos que procesa el conjunto actual de trabajadores y extrapola la cantidad de tiempo que el resto del trabajo tarda en procesar.

Si su canalización utiliza una fuente de datos personalizada que implementó, existen algunos métodos que puede implementar para proporcionar más información al algoritmo de ajuste de escala automático del servicio Dataflow y mejorar el rendimiento:

Java: SDK 2.x

  • En tu subclase BoundedSource, implementa el método getEstimatedSizeBytes. El servicio de Dataflow usa getEstimatedSizeBytes cuando calcula el número inicial de trabajadores que se usarán para su canalización.
  • En tu subclase BoundedReader, implementa el método getFractionConsumed. El servicio de Dataflow usa getFractionConsumed para realizar un seguimiento del progreso de lectura y converger en la cantidad correcta de trabajadores que se usarán durante una lectura.

Python

  • En tu subclase BoundedSource, implementa el método estimate_size. El servicio de Dataflow usa estimate_size cuando calcula el número inicial de trabajadores que se usarán para su canalización.
  • En tu subclase RangeTracker, implementa el método fraction_consumed. El servicio de Dataflow usa fraction_consumed para realizar un seguimiento del progreso de lectura y converger en la cantidad correcta de trabajadores que se usarán durante una lectura.

Java: SDK 1.x

Ajuste de escala automático de transmisión

El ajuste de escala automático de transmisión permite que el servicio de Dataflow cambie de forma adaptativa la cantidad de trabajadores utilizados para ejecutar la canalización de transmisión en respuesta a cambios en la carga y el uso de recursos. El ajuste de escala automático de transmisión es una característica gratuita diseñada para reducir los costos de los recursos que se usan cuando se ejecutan canalizaciones de transmisión.

Sin el ajuste de escala automático, eliges una cantidad fija de trabajadores mediante la especificación de numWorkers o num_workers para ejecutar la canalización. Como la carga de trabajo de entrada varía con el tiempo, este número puede ser demasiado alto o demasiado bajo. Aprovisionar demasiados trabajadores da como resultado un costo adicional innecesario, mientras que aprovisionar muy pocos causa una mayor latencia para los datos procesados. Si se habilita el ajuste de escala automático, los recursos se usan solo cuando son necesarios.

El objetivo de ajuste de escala automático de canalización de transmisión es minimizar la acumulación mientras maximiza la utilización y el rendimiento del trabajador, y reacciona rápidamente a los picos de carga. Cuando habilitas el ajuste de escala automático, no tienes que elegir entre el aprovisionamiento para la carga máxima y los resultados rápidos. Los trabajadores se agregan a medida que aumentan el uso de CPU y el trabajo pendiente, y se quitan cuando disminuyen. De esta manera, solo pagas por lo que necesitas y el trabajo se procesa de la manera más eficiente posible.

Java: SDK 2.x

Fuentes ilimitadas personalizadas

Si su canalización utiliza una fuente personalizada ilimitada, la fuente debe informar al servicio de Dataflow sobre el trabajo acumulado. El trabajo pendiente es una estimación de la entrada en bytes que la fuente aún no procesó. Para informar al servicio sobre el trabajo pendiente, implementa cualquiera de los siguientes métodos en tu clase UnboundedReader.

  • getSplitBacklogBytes(): trabajo pendiente para la división actual de la fuente. El servicio agrega el trabajo pendiente en todas las divisiones.
  • getTotalBacklogBytes(): el trabajo pendiente global en todas las divisiones. En algunos casos, el trabajo pendiente no está disponible para cada división y solo se puede calcular el total de todas las divisiones. Solo la primera división (ID de división “0”) debe proporcionar su trabajo pendiente total.
El repositorio de Apache Beam contiene varios ejemplos de fuentes personalizadas que implementan la clase UnboundedReader.
Habilita el ajuste de escala automático de transmisión

Para los trabajos de transmisión mediante Streaming Engine, el ajuste de escala automático está habilitado de forma predeterminada.

Para habilitar el ajuste de escala automático de los trabajos que no usan Streaming Engine, establece los siguientes parámetros de ejecución cuando inicies tu canalización:

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

Para los trabajos de transmisión que no usan Streaming Engine, la cantidad mínima de trabajadores es 1/15 del valor --maxNumWorkers.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --maxNumWorkers. Ten esto en cuenta cuando especificas --maxNumWorkers y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --maxNumWorkers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --maxNumWorkers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Python

Habilita el ajuste de escala automático de transmisión

Para habilitar el ajuste de escala automático, configura los siguientes parámetros de ejecución cuando inicias tu canalización:

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

Para los trabajos de transmisión que no usan Streaming Engine, la cantidad mínima de trabajadores es 1/15 del valor --maxNumWorkers.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --maxNumWorkers. Ten esto en cuenta cuando especificas --maxNumWorkers y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --max_num_workers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --max_num_workers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Java: SDK 1.x

Escala de forma manual una canalización de transmisión

Hasta que el ajuste de escala automático esté disponible en modo de transmisión, hay una solución que puede usar para escalar manualmente la cantidad de trabajadores que ejecutan su canalización de transmisión mediante la función Update de Dataflow.

Java: SDK 2.x

Para escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --maxNumWorkers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --numWorkers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar un nuevo número de trabajadores con el parámetro --numWorkers. El valor que establezcas para el --numWorkers nuevo debe estar entre N y --maxNumWorkers, en el que N es igual a --maxNumWorkers/15.

La actualización de la canalización reemplaza tu trabajo en ejecución por uno nuevo, con el nuevo número de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Python

Para escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --max_num_workers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --num_workers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar un nuevo número de trabajadores con el parámetro --num_workers. El valor que establezcas para el --num_workers nuevo debe estar entre N y --max_num_workers, en el que N es igual a --max_num_workers/15.

La actualización de la canalización reemplaza tu trabajo en ejecución por uno nuevo, con el nuevo número de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Java: SDK 1.x

Rebalanceo dinámico del trabajo

La característica de Rebalanceo dinámico de trabajos del servicio de Dataflow permite que el servicio vuelva a particionar de forma dinámica los trabajos en función de las condiciones del entorno de ejecución. Las siguientes son algunas de esas condiciones:

  • Desbalances en asignaciones de trabajos
  • Trabajadores que tardan más de lo esperado en terminar
  • Trabajadores que terminan más rápido de lo esperado

El servicio de Dataflow detecta estas condiciones de forma automática y puede reasignar el trabajo de manera dinámica a los trabajadores sin usar o con poco uso para disminuir el tiempo de procesamiento general de tu trabajo.

Limitaciones

El rebalanceo dinámico de trabajos solo ocurre cuando el servicio de Dataflow procesa algunos datos de entrada en paralelo: cuando lee datos de una fuente de entrada externa, cuando trabaja con una PCollection materializada intermedia o cuando trabaja con el resultado de una agregación como GroupByKey. Si se fusionan una gran cantidad de pasos en tu trabajo, hay menos PCollection intermedias y el Rebalanceo dinámico de trabajos se limitará a la cantidad de elementos en la fuente de PCollection materializada. Si quieres asegurarte de que el Rebalanceo dinámico de trabajos se pueda aplicar a una PCollection en particular en tu canalización, puedes evitar la fusión de diferentes maneras para garantizar un paralelismo dinámico.

El Rebalanceo dinámico de trabajos no puede volver a paralelizar datos de forma más precisa que un solo registro. Si tus datos contienen registros individuales que causan grandes demoras en el tiempo de procesamiento, aún pueden demorar tu trabajo, ya que Dataflow no puede subdividir y redistribuir un registro individual “activo” a varios trabajadores.

Java: SDK 2.x

Si estableces un número fijo de fragmentos para el resultado final de tu canalización (por ejemplo, si escribes datos con TextIO.Write.withNumShards), Cloud Dataflow limitará la paralelización según el número de fragmentos que eliges.

Python

Si estableciste una cantidad fija de shards para el resultado final de tu canalización (por ejemplo, escribiendo datos con beam.io.WriteToText(..., num_shards=...)), Dataflow limitará la paralelización en función de la cantidad de shards que hayas elegido.

Java: SDK 1.x

La limitación de fragmentos fijos se puede considerar temporal y puede estar sujeta a cambios en versiones futuras del servicio de Dataflow.

Trabaja con fuentes de datos personalizadas

Java: SDK 2.x

Si tu canalización usa una splitAtFractionfuente de datos personalizada que proporcionaste, debes implementar el método para permitir que tu fuente funcione con la característica de Rebalanceo dinámico de trabajos.

Python

Si tu canalización usa una fuente de datos personalizada que proporcionaste, tu RangeTracker debe implementar try_claim, try_split, position_at_fraction y fraction_consumed para permitir que tu fuente funcione con la característica de Rebalanceo dinámico de trabajos.

Consulta la referencia de la API sobre RangeTracker para obtener más información.

Java: SDK 1.x

Uso y administración de recursos

El servicio de Dataflow administra completamente los recursos en Google Cloud por trabajo. Esto incluye activar y desactivar instancias de Compute Engine (también llamadas trabajadores o VM) y acceder a los depósitos de Cloud Storage de tu proyecto para la E/S y la etapa de pruebas de archivos temporales. Sin embargo, si tu canalización interactúa con tecnologías de almacenamiento de datos de Google Cloud, como BigQuery y Pub/Sub, debes administrar los recursos y la cuota de esos servicios.

Dataflow usa una ubicación proporcionada por el usuario en Cloud Storage específicamente para la etapa de archivos. Esta ubicación está bajo tu control y debes asegurarte de que su vida útil se mantenga siempre que algún trabajo realice operaciones de lectura en ella. Puedes reusar la misma ubicación de etapa de pruebas para múltiples ejecuciones de trabajos, ya que el almacenamiento en caché integrado del SDK puede acelerar el tiempo de inicio de tus trabajos.

Trabajos

Puedes ejecutar hasta 100 trabajos de Dataflow simultáneos por proyecto de Google Cloud.

El servicio de Dataflow en la actualidad se limita a procesar solicitudes de trabajos JSON que tengan un tamaño de 20 MB o menos. El tamaño de la solicitud de trabajo está vinculado de manera específica a la representación JSON de tu canalización; una canalización más grande implica una solicitud más grande.

Ejecuta tu canalización con la siguiente opción para estimar el tamaño de su solicitud JSON:

Java: SDK 2.x

    --dataflowJobFile=< path to output file >
    

Python

    --dataflow_job_file=< path to output file >
    

Java: SDK 1.x

Este comando escribe una representación JSON de tu trabajo en un archivo. El tamaño del archivo serializado es una estimación apropiada del tamaño de la solicitud; el tamaño real será un poco mayor debido a que se incluye información adicional en la solicitud.

Para obtener más información, consulta la página de solución de problemas para “413 Entidad de solicitud demasiado grande” / “El tamaño de la representación JSON serializada de la canalización excede el límite permitido”.

Además, el tamaño del grafo de tu trabajo no debe superar los 10 MB. Para obtener más información, consulta la página sobre “El grafo del trabajo es demasiado grande. Vuelve a intentarlo con un grafo de trabajo más pequeño, o divide tu trabajo en dos o más trabajos más pequeños”.

Trabajadores

Actualmente, el servicio de Dataflow permite un máximo de 1,000 instancias de Compute Engine por trabajo. El tipo de máquina predeterminado es n1-standard-1 para un trabajo por lotes y n1-standard-4 para la transmisión; cuando se usan los tipos de máquina predeterminados, el servicio de Dataflow puede asignar hasta 4000 núcleos por trabajo.

Dataflow es compatible con los trabajadores de la serie n1 y con los tipos de máquina personalizados. Puedes especificar un tipo de máquina para tu canalización si configuras el parámetro de ejecución apropiado cuando se crea la canalización.

Java: SDK 2.x

Para cambiar el tipo de máquina, establece la opción --workerMachineType.

Python

Para cambiar el tipo de máquina, establece la opción --worker_machine_type.

Java: SDK 1.x

Cuota de recursos

El servicio de Dataflow comprueba que tu proyecto de Google Cloud tenga la cuota de recursos de Compute Engine necesaria para ejecutar tu trabajo, tanto para iniciar el trabajo como para escalar hasta la cantidad máxima de instancias de trabajador. Tu trabajo no se iniciará si no hay suficiente cuota de recursos disponible.

La función Ajuste de escala automático de Dataflow está limitada por la cuota disponible de Compute Engine de tu proyecto. Si tu trabajo tiene una cuota suficiente cuando comienza, pero otro trabajo usa el resto de la cuota disponible de tu proyecto, el primer trabajo se ejecutará, pero no podrá escalar por completo.

Sin embargo, el servicio de Dataflow no administra los aumentos de cuota de los trabajos que exceden las cuotas de recursos de su proyecto. Usted es responsable de realizar las solicitudes necesarias para la cuota de recursos adicional, para lo cual puede usar Google Cloud Console.

Recursos de discos persistentes

Actualmente, el servicio de Dataflow está limitado a 15 discos persistentes por instancia de trabajador cuando se ejecuta un trabajo de transmisión. Cada disco persistente pertenece a una máquina virtual de Compute Engine individual. Tu trabajo no puede tener más trabajadores que discos persistentes; la asignación de recursos mínima es una proporción de 1:1 entre trabajadores y discos.

Para los trabajos que se ejecutan en VM de trabajadores, el tamaño predeterminado de cada disco persistente es 250 GB en modo por lotes y 400 GB en modo de transmisión. Los trabajos que usan Streaming Engine o Dataflow Shuffle se ejecutan en el backend del servicio de Dataflow y usan discos más pequeños.

Locations

De forma predeterminada, el servicio de Dataflow implementa recursos de Compute Engine en la zona us-central1-f de la región us-central1. Puedes anular esta configuración si especificas el parámetro --region. Si necesitas usar una zona específica para tus recursos, usa el parámetro --zone cuando creas tu canalización. Sin embargo, recomendamos que solo especifiques la región y dejes la zona sin especificar. Esto permite que el servicio de Dataflow seleccione automáticamente la mejor zona dentro de la región según la capacidad de zona disponible en el momento de la solicitud de creación de trabajo. Para obtener más información, consulta la documentación de extremos regionales.

Streaming Engine

En la actualidad, el ejecutor de canalizaciones de Cloud Dataflow ejecuta todos los pasos de tu canalización de transmisión en las máquinas virtuales de trabajador y consume la CPU, la memoria y el almacenamiento en el disco persistente del trabajador. Streaming Engine de Dataflow traslada la ejecución de la canalización fuera de las VM de los trabajadores y hacia el backend del servicio de Dataflow.

Beneficios de Streaming Engine

El modelo de Streaming Engine tiene los siguientes beneficios:

  • Una reducción en los recursos consumidos de CPU, memoria y almacenamiento en el disco persistente en las VM de trabajador Streaming Engine funciona mejor con tipos de máquinas de trabajador más pequeñas (n1-standard-2 en lugar de n1-standard-4) y no requiere un disco persistente además de un disco de arranque pequeño, lo que genera un menor consumo de recursos y cuotas.
  • Ajuste de escala automático más sensible en la respuesta a las variaciones en el volumen de datos entrantes. Streaming Engine ofrece un escalamiento más uniforme y detallado de los trabajadores.
  • Compatibilidad mejorada, ya que no es necesario volver a implementar las canalizaciones para aplicar las actualizaciones del servicio

La mayor parte de la reducción en los recursos de los trabajadores proviene de derivar el trabajo al servicio de Dataflow. Por esa razón, hay un costo asociado con el uso de Streaming Engine. Sin embargo, se espera que la factura total de las canalizaciones de Dataflow que usan Streaming Engine sea aproximadamente la misma en comparación con el costo total de las canalizaciones de Dataflow que no usan esta opción.

Usa Streaming Engine

En la actualidad, Streaming Engine está disponible para la transmisión de canalizaciones en las siguientes regiones. Estará disponible en otras regiones más adelante.

  • us-central1 (Iowa)
  • us-east1 (Carolina del Sur)
  • us-west1 (Oregón)
  • europe-west1 (Bélgica)
  • europe-west4 (Países Bajos)
  • asia-east1 (Taiwán)
  • asia-northeast1 (Tokio)

Java: SDK 2.x

Para usar Streaming Engine en tus canales de transmisión, especifica el siguiente parámetro:

  • --enableStreamingEngine si usas el SDK de Apache Beam para la versión 2.11.0 o posterior de Java.
  • --experiments=enable_streaming_engine si usas el SDK de Apache Beam para la versión 2.10.0 de Java.

Si usas Dataflow Streaming Engine para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Streaming Engine esté disponible en la actualidad. Dataflow selecciona automáticamente la zona en la región que especificó. Si especifica el parámetro --zone y lo configura en una zona fuera de las regiones disponibles, Dataflow informa un error.

Streaming Engine funciona mejor con tipos de máquina de trabajador más pequeñas, por lo que recomendamos que configures --workerMachineType=n1-standard-2. También puedes establecer --diskSizeGb=30 porque Streaming Engine solo necesita espacio para la imagen de arranque del trabajador y los registros locales. Estos valores son los predeterminados.

Python

Para usar Streaming Engine en tus canales de transmisión, especifica el siguiente parámetro:

--enable_streaming_engine

Si usas Dataflow Streaming Engine para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Streaming Engine esté disponible en la actualidad. Dataflow selecciona automáticamente la zona en la región que especificó. Si especifica el parámetro --zone y lo configura en una zona fuera de las regiones disponibles, Dataflow informa un error.

Streaming Engine funciona mejor con tipos de máquina de trabajador más pequeñas, por lo que recomendamos que configures --machine_type=n1-standard-2. También puedes establecer --disk_size_gb=30 porque Streaming Engine solo necesita espacio para la imagen de arranque del trabajador y los registros locales. Estos valores son los predeterminados.

Java: SDK 1.x

Dataflow Shuffle

Dataflow Shuffle es la operación base detrás de las transformaciones de Dataflow, como GroupByKey, CoGroupByKey y Combine. La operación de Dataflow Shuffle particiona y agrupa los datos por clave de forma escalable, eficiente y tolerante a errores. Actualmente, Dataflow usa una implementación aleatoria que se ejecuta completamente en máquinas virtuales de trabajadores y consume CPU del trabajador, memoria y almacenamiento en disco persistente. La función de Dataflow Shuffle basada en el servicio, disponible solo para canalizaciones por lotes, mueve la operación aleatoria de las VM del trabajador al backend del servicio de Dataflow.

Beneficios de Dataflow Shuffle

Dataflow Shuffle basado en servicios tiene los siguientes beneficios:

  • Tiempo de ejecución más rápido de las canalizaciones por lotes para la mayoría de los tipos de trabajo de canalización
  • Una reducción en los recursos consumidos de CPU, memoria y almacenamiento en el disco persistente en las VM de trabajador.
  • Ajuste de escala automático optimizado porque las VM ya no contienen datos de distribución y, por lo tanto, se puede reducir antes su escala
  • Mejor tolerancia a errores: una VM en mal estado que contenga datos de Dataflow Shuffle no causará que todo el trabajo falle, como sucedería si no se usara esta característica

La mayor parte de la reducción en los recursos de los trabajadores proviene de derivar el trabajo de distribución al servicio de Dataflow. Por ese motivo, existe una carga asociada con el uso de Dataflow Shuffle. Sin embargo, se espera que la factura total de canalizaciones de Dataflow que utilizan la implementación de Dataflow basada en servicios sea menor o igual que el costo de las canalizaciones de Dataflow que no utilizan esta opción.

Para la mayoría de los tipos de trabajos de canalización, se espera que Dataflow Shuffle se ejecute más rápido que la implementación aleatoria que se ejecuta en las VM de los trabajadores. Sin embargo, la duración puede variar de una ejecución a otra. Si ejecutas una canalización que tiene plazos importantes, recomendamos que asignes suficiente tiempo de búfer antes del plazo. Además, considera solicitar una cuota mayor para Shuffle.

Consideraciones de disco

Cuando se usa la función Dataflow Shuffle basada en el servicio, no es necesario adjuntar los discos persistentes de gran tamaño a las VM de los trabajadores. Dataflow adjunta automáticamente un pequeño disco de arranque de 25 GB. Sin embargo, debido a este pequeño tamaño de disco, hay algunas consideraciones importantes que debe tener en cuenta cuando use Dataflow Shuffle:

  • Una VM de trabajador usa parte de los 25 GB de espacio en disco para el sistema operativo, los archivos binarios, los registros y los contenedores. Los trabajos que usan una cantidad significativa de disco y exceden la capacidad restante del disco pueden fallar cuando usa Dataflow Shuffle.
  • Los trabajos que usan una gran cantidad de E/S de disco pueden ser lentos debido al rendimiento del disco pequeño. Para obtener más información sobre las diferencias de rendimiento entre los tamaños de disco, consulta la página sobre el rendimiento del disco persistente de Compute Engine.

Si alguna de estas consideraciones se aplica a tu trabajo, puedes usar las opciones de canalización para especificar un tamaño de disco más grande.

Cómo usar Dataflow Shuffle

Actualmente, Dataflow Shuffle basado en el servicio está disponible en las siguientes regiones:

  • us-central1 (Iowa)
  • us-east1 (Carolina del Sur)
  • us-west1 (Oregón)
  • europe-west1 (Bélgica)
  • europe-west4 (Países Bajos)
  • asia-east1 (Taiwán)
  • asia-northeast1 (Tokio)

Dataflow Shuffle estará disponible en más regiones en el futuro.

Java: SDK 2.x

Para usar Dataflow Shuffle basado en el servicio en tus canalizaciones de lote, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Dataflow Shuffle para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Dataflow selecciona automáticamente la zona en la región que especificó. Si especifica el parámetro --zone y lo configura en una zona fuera de las regiones disponibles, Dataflow informa un error.

Python

Para usar Dataflow Shuffle basado en el servicio en tus canalizaciones de lote, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Dataflow Shuffle para tu canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Dataflow selecciona automáticamente la zona en la región que especificó. Si especifica el parámetro --zone y lo configura en una zona fuera de las regiones disponibles, Dataflow informa un error.

Java: SDK 1.x

Programación de recursos flexibles de Dataflow

Dataflow FlexRS reduce los costos de procesamiento por lotes mediante el uso de técnicas de programación avanzadas, el servicio Dataflow Shuffle y una combinación de instancias de máquinas virtuales interrumpibles (VM) y VM normales. Mediante la ejecución de VM interrumpibles y normales en paralelo, Dataflow mejora la experiencia del usuario si Compute Engine detiene instancias de VM interrumpibles durante un evento del sistema. FlexRS ayuda a garantizar que la canalización progrese y no pierda el trabajo anterior cuando Compute Engine interrumpe tus VM interrumpibles. Para obtener más información acerca de FlexRS, consulte Cómo usar la programación flexible de recursos en Dataflow.