Implementa una canalización

En este documento, se explica en detalle cómo Dataflow implementa y ejecuta una canalización; también se abarcan temas avanzados, como la optimización y el balanceo de cargas. Si buscas una guía paso a paso a fin de crear y, también, implementar tu primera canalización, usa las guías de inicio rápido de Dataflow para Java, Python o plantillas.

Después de crear y probar una canalización de Apache Beam, puedes usar el servicio administrado de Dataflow para implementarla y ejecutarla. Una vez en el servicio de Dataflow, el código de canalización se convierte en un trabajo de Dataflow.

El servicio de Dataflow administra por completo los servicios de Google Cloud, como Compute Engine y Cloud Storage para ejecutar el trabajo de Dataflow y, así, activar y desactivar los recursos necesarios de forma automática. El servicio de Dataflow proporciona visibilidad al trabajo a través de herramientas como la Interfaz de supervisión de Dataflow y la Interfaz de línea de comandos de Dataflow.

Puedes controlar algunos aspectos sobre cómo el servicio de Dataflow ejecuta el trabajo si configuras los parámetros de ejecución en el código de canalización. Por ejemplo, en los parámetros de ejecución se especifica si los pasos de la canalización se ejecutan en máquinas virtuales de trabajador, en el backend del servicio de Dataflow o de forma local.

Además de administrar los recursos de Google Cloud, el servicio de Dataflow realiza y optimiza de forma automática muchos aspectos del procesamiento paralelo distribuido. Estos incluyen los siguientes procesos:

  • Paralelización y distribución. Dataflow particiona los datos de forma automática y distribuye el código de trabajador a las instancias de Compute Engine para el procesamiento paralelo.
  • Optimización. Dataflow usa el código de canalización para crear un grafo de ejecución que representa las transformaciones y las PCollection de la canalización. Además, optimiza el grafo para lograr el rendimiento y el uso de recursos más eficientes. Dataflow también optimiza de forma automática las operaciones que pueden ser costosas, como la agregación de datos.
  • Características de ajuste automático. El servicio de Dataflow incluye varias características que proporcionan ajustes sobre la marcha de la asignación de recursos y la partición de datos, como el ajuste de escala automático y el rebalanceo dinámico del trabajo. Estas características ayudan al servicio de Dataflow a ejecutar el trabajo de la manera más rápida y eficiente posible.

Ciclo de vida de la canalización: del código de canalización al trabajo de Dataflow

Cuando ejecutas la canalización de Dataflow, Dataflow crea un gráfico de ejecución a partir del código que construye tu objeto Pipeline, incluidas todas las transformaciones y sus funciones de procesamiento asociadas (como DoFn). Esta fase se llama Tiempo de construcción del gráfico y se ejecuta de forma local en la computadora en la que se ejecuta la canalización.

Durante la construcción del gráfico, Apache Beam ejecuta de forma local el código desde el punto de entrada principal del código de la canalización, detiene las llamadas a un paso de origen, receptor o transformación y convierte estas llamadas en nodos del gráfico. Como consecuencia, un fragmento de código en el punto de entrada de una canalización (el método main() de Java o el nivel superior de una secuencia de comandos de Python) se ejecuta de forma local en la máquina que ejecuta la canalización, mientras que el mismo código declarado en un método de un objeto DoFn se ejecuta en los trabajadores de Dataflow.

Además, durante la construcción del gráfico, Apache Beam valida que todos los recursos a los que hace referencia la canalización (como los depósitos de Cloud Storage, las tablas de BigQuery y los temas o suscripciones de Pub/Sub) existan en realidad y sean accesibles. La validación se realiza mediante llamadas estándar a la API a los servicios respectivos, por lo que es fundamental que la cuenta de usuario que se usa para ejecutar una canalización tenga la conectividad adecuada a los servicios necesarios y esté autorizada para llamar a sus API. Antes de enviar la canalización al servicio de Dataflow, Apache Beam también verifica otros errores y se asegura de que el gráfico de la canalización no contenga operaciones ilegales.

El gráfico de ejecución se traduce al formato JSON, y el gráfico de ejecución JSON se transmite al extremo del servicio de Dataflow.

Nota: La creación del grafo también se da cuando ejecutas la canalización de forma local, pero el grafo no se traduce a JSON ni se transmite al servicio. En su lugar, el grafo se ejecuta de forma local en la misma máquina en la que se inició el programa de Dataflow. Consulta la documentación sobre la configuración para la ejecución local a fin de obtener más detalles.

Luego, el servicio de Dataflow valida el grafo de ejecución de JSON. Cuando se valida el grafo, se convierte en un trabajo en el servicio de Dataflow. Podrás ver el trabajo, su grafo de ejecución, el estado y la información de registro mediante la interfaz de supervisión de Dataflow.

Java: SDK 2.x

El servicio de Dataflow envía una respuesta a la máquina en la que se ejecutó el programa de Dataflow. Esta respuesta se encapsula en el objeto DataflowPipelineJob, que contiene el jobId del trabajo de Dataflow. Puedes usar el jobId para supervisar, hacer seguimiento y solucionar problemas del trabajo mediante la interfaz de supervisión de Dataflow y la interfaz de línea de comandos de Dataflow. Consulta la referencia de la API para DataflowPipelineJob a fin de obtener más información.

Python

El servicio de Dataflow envía una respuesta a la máquina en la que se ejecutó el programa de Dataflow. Esta respuesta se encapsula en el objeto DataflowPipelineResult, que contiene el job_id del trabajo de Dataflow. Puedes usar el job_id para supervisar, hacer seguimiento y solucionar problemas del trabajo mediante la interfaz de supervisión de Dataflow y la interfaz de línea de comandos de Dataflow.

Java: SDK 1.x

Grafo de ejecución

Dataflow compila un grafo de pasos que representa la canalización, según las transformaciones y los datos que se usaron para crear el objeto Pipeline. Este es el grafo de ejecución de la canalización.

El ejemplo de WordCount, que se incluye con los SDK de Apache Beam, contiene una serie de transformaciones a fin de leer, extraer, contar, dar formato y escribir las palabras individuales de una colección de texto, junto con un conteo de casos para cada palabra. El siguiente diagrama muestra cómo las transformaciones en la canalización de WordCount se expanden en un grafo de ejecución:

Las transformaciones en el programa de ejemplo de WordCount expandidas en un grafo de ejecución de los pasos que debe ejecutar el servicio de Cloud Dataflow.
Figura 1: Grafo de ejecución de ejemplo de WordCount

El grafo de ejecución a menudo difiere del orden en el que especificas las transformaciones cuando creas la canalización. Esto se debe a que el servicio de Dataflow realiza varias optimizaciones y fusiones en el grafo de ejecución antes de que se ejecute en recursos de nube administrados. El servicio de Dataflow respeta las dependencias de datos cuando ejecuta la canalización. Sin embargo, los pasos sin dependencias de datos entre ellos se pueden ejecutar en cualquier orden.

Puedes ver el grafo de ejecución no optimizado que Dataflow generó para la canalización cuando seleccionas el trabajo en la interfaz de supervisión de Dataflow.

Paralelización y distribución

El servicio de Dataflow paraleliza y distribuye de forma automática la lógica de procesamiento de la canalización a los trabajadores que asignaste para que realicen el trabajo. Dataflow usa las abstracciones en el modelo de programación para representar las funciones de procesamiento paralelo; por ejemplo, las transformaciones ParDo hacen que Dataflow distribuya de forma automática el código de procesamiento (representado por DoFn) a varios trabajadores para que se ejecuten en paralelo.

Estructura tu código de usuario

Puedes pensar en tu código DoFn como pequeñas entidades independientes: podría haber muchas instancias en ejecución en diferentes máquinas, cada una sin conocimiento de las otras. Como tales, las funciones puras (funciones que no dependen de un estado oculto o externo, no tienen efectos secundarios observables y son deterministas) son código ideal para la naturaleza paralela y distribuida de los DoFn.

Sin embargo, el modelo de función pura no es estrictamente rígido. La información de estado o los datos de inicialización externa pueden ser válidos para DoFn y otros objetos de función, siempre y cuando el código no dependa de lo que el servicio de Dataflow no garantice. Cuando organices tus transformaciones ParDo y crees tus DoFn, ten en cuenta los siguientes lineamientos:

  • El servicio de Dataflow garantiza que a cada elemento de la PCollection de entrada lo procese una instancia de DoFn solo una vez.
  • El servicio de Dataflow no garantiza cuántas veces se invocará un DoFn.
  • El servicio de Dataflow no garantiza con exactitud cómo se agrupan los elementos distribuidos; es decir, no garantiza qué elementos (si los hay) se procesan juntos.
  • El servicio de Dataflow no garantiza la cantidad exacta de instancias de DoFn que se crearán en el transcurso de una canalización.
  • El servicio de Dataflow es tolerante a errores y puede que vuelva a probar el código varias veces en el caso de que surjan problemas con los trabajadores. El servicio de Dataflow puede crear copias de seguridad del código y puede tener problemas que generen efectos secundarios manuales (por ejemplo, si tu código crea o se basa en archivos temporales con nombres no únicos).
  • El servicio de Dataflow serializa el procesamiento de elementos por instancia de DoFn. Tu código no necesita contar con seguridad para subprocesos de forma estricta; sin embargo, cualquier estado compartido entre varias instancias de DoFn debe ser seguro para subprocesos.

Consulta la sección sobre los requisitos para las funciones que proporciona el usuario en la documentación del modelo de programación a fin de obtener más información sobre cómo crear tu código de usuario.

Manejo de errores y excepciones

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o indicadores nulos durante el procesamiento.

Dataflow procesa los elementos en paquetes arbitrarios y vuelve a probar el paquete completo cuando se produce un error de cualquier elemento de ese paquete. Cuando se ejecuta en modo por lotes, los paquetes que incluyen un elemento defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará por completo. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintenta de forma indefinida, lo que puede hacer que la canalización se paralice de manera permanente.

Optimización de fusiones

Una vez que se haya validado el formulario JSON del grafo de ejecución de la canalización, es posible que el servicio de Dataflow modifique el grafo para realizar optimizaciones. Estas optimizaciones pueden incluir la fusión de varios pasos o transformaciones en el grafo de ejecución de tu canalización en pasos únicos. Los pasos de fusión evitan que el servicio de Dataflow tenga que materializar cada PCollection intermedia en la canalización, lo que podría ser costoso en términos de sobrecarga de procesamiento y memoria.

Si bien todas las transformaciones que especificas en la construcción de tu canalización se ejecutan en el servicio, pueden ejecutarse en un orden diferente o como parte de una transformación fusionada más grande para garantizar la ejecución más eficiente de tu canalización. El servicio de Dataflow respeta las dependencias de datos entre los pasos en el grafo de ejecución, pero los demás pasos se pueden ejecutar en cualquier orden.

Ejemplo de fusión

En el siguiente diagrama, se muestra cómo el grafo de ejecución del ejemplo de WordCount incluido en el SDK de Apache Beam para Java podría optimizarse y fusionarse mediante el servicio de Dataflow a fin de obtener una ejecución eficiente:

El grafo de ejecución para el programa de ejemplo de WordCount optimizado y la fusión de pasos del servicio de Cloud Dataflow.
Figura 2: Grafo de ejecución optimizado del ejemplo de WordCount

Evita la fusión

Hay algunos casos de la canalización en los que tal vez quieras evitar que el servicio de Dataflow realice optimizaciones de fusión. Estos son casos en los que el servicio de Dataflow podría suponer de forma incorrecta la mejor manera de fusionar operaciones en la canalización, lo que podría limitar la capacidad del servicio de Dataflow de usar todos los trabajadores disponibles.

Por ejemplo, un caso en el que la fusión puede limitar la capacidad de Dataflow para optimizar el uso de los trabajadores es una ParDo con “fan-out alto”. En esta operación, podrías tener una colección de entrada con pocos elementos, pero la ParDo genera una salida que tiene cientos o miles de veces esa cantidad, seguida de otra ParDo. Si el servicio de Dataflow fusiona estas operaciones ParDo, el paralelismo en este paso se limita a la cantidad máxima de elementos en la colección de entrada, aunque la PCollection intermedia contenga muchos más elementos.

Para evitar esa fusión, agrega una operación a la canalización que obligue al servicio de Dataflow a materializar la PCollection intermedia. Considera usar una de las siguientes operaciones:

  • Puedes insertar una GroupByKey y desagrupar después de la primera ParDo. El servicio de Dataflow nunca fusiona las operaciones ParDo en una agregación.
  • Puedes pasar la PCollection intermedia como una entrada complementaria a otra ParDo. El servicio de Dataflow siempre materializa las entradas complementarias.
  • Puedes insertar un paso Reshuffle. Reshuffle evita la fusión, controla los datos y realiza la anulación de duplicación de registros. Dataflow admite que se vuelva a reproducir aleatoriamente, aunque está marcada como obsoleta en la documentación de Apache Beam.

Optimización de combinaciones

Las operaciones de agregación son un concepto importante en el procesamiento de datos a gran escala. La agregación reúne datos que están muy separados en lo conceptual, por lo que es muy útil para la correlación. El modelo de programación de Dataflow representa las operaciones de agregación como las transformaciones GroupByKey, CoGroupByKey y Combine.

Las operaciones de agregación de Dataflow combinan datos de todo el conjunto de datos, incluidos los que se podrían distribuir entre varios trabajadores. Durante estas operaciones de agregación, a menudo es más eficiente combinar la mayor cantidad de datos posible de forma local antes de combinarlos entre instancias. Cuando aplicas una GroupByKey o alguna otra transformación de agregación, el servicio de Dataflow realiza una combinación parcial a nivel local de forma automática antes de la operación de agrupación principal.

Cuando se realiza una combinación parcial o multinivel, el servicio de Dataflow toma decisiones diferentes en función de si la canalización funciona con datos de transmisión o por lotes. Para datos limitados, el servicio favorece la eficiencia y realizará toda la combinación local posible. Para datos ilimitados, el servicio favorece una latencia más baja y puede que no realice una combinación parcial (ya que puede aumentar la latencia).

Características del ajuste automático

El servicio de Dataflow contiene varias características de ajuste automático que pueden optimizar de manera dinámica aún más el trabajo de Dataflow durante la ejecución. Estas características incluyen el Ajuste de escala automático y el Rebalanceo dinámico de trabajos.

Ajuste de escala automático

Con el ajuste de escala automático habilitado, el servicio de Dataflow elige de forma automática la cantidad adecuada de instancias de trabajador necesarias para ejecutar el trabajo. El servicio de Dataflow también puede reasignar de forma dinámica más o menos trabajadores durante el tiempo de ejecución para tener en cuenta las características del trabajo. Ciertas partes de la canalización pueden ser más pesadas que otras en términos de procesamiento, y el servicio de Dataflow puede activar trabajadores adicionales de forma automática durante estas fases del trabajo (y desactivarlos cuando ya no sean necesarios).

Java: SDK 2.x

El ajuste de escala automático está habilitado de forma predeterminada en todos los trabajos por lotes de Dataflow y los trabajos de transmisión que usan Streaming Engine. Puedes inhabilitar el ajuste de escala automático si especificas la marca --autoscalingAlgorithm=NONE cuando ejecutas la canalización. De realizarlo, ten en cuenta que el servicio de Dataflow establece la cantidad de trabajadores según la opción --numWorkers, que se establece de forma predeterminada en 3.

Si habilitas el ajuste de escala automático, el servicio de Dataflow no permite que el usuario controle la cantidad exacta de instancias de trabajador asignadas al trabajo. Aún puedes limitar la cantidad de trabajadores si especificas la opción --maxNumWorkers cuando ejecutas la canalización.

La marca --maxNumWorkers es opcional para los trabajos por lotes. El valor predeterminado es 1000. La marca --maxNumWorkers es opcional para los trabajos de transmisión que usan Streaming Engine. El valor predeterminado es 100. La marca --maxNumWorkers es obligatoria para los trabajos de transmisión que no usan Streaming Engine.

Python

El ajuste de escala automático está habilitado de forma predeterminada en todos los trabajos por lotes de Dataflow creados mediante el SDK de Apache Beam para Python versión 0.5.1 o posterior. Puedes inhabilitar el ajuste de escala automático si especificas de forma explícita la marca --autoscaling_algorithm=NONE cuando ejecutas la canalización. De realizarlo, ten en cuenta que el servicio de Dataflow establece la cantidad de trabajadores según la opción --num_workers, que se establece de forma predeterminada en 3.

Java: SDK 1.x

Dataflow escala según el paralelismo de una canalización. El paralelismo de una canalización es una estimación de la cantidad de subprocesos necesarios para procesar los datos de la manera más eficiente en un momento determinado.

El paralelismo se calcula cada pocos minutos, a menos que el ancho de banda del servicio externo sea demasiado bajo. Cuando aumenta el paralelismo, Dataflow escala verticalmente y agrega trabajadores. Cuando el paralelismo disminuye, Dataflow disminuye el escalamiento y quita trabajadores.

En la siguiente tabla, se resume cuándo el ajuste de escala automático aumenta o disminuye la cantidad de trabajadores en canalizaciones por lotes y de transmisión:

Canalizaciones por lotes Canalizaciones de transmisión
Escalamiento vertical

Si el trabajo restante lleva más tiempo que la activación de trabajadores nuevos, y los trabajadores actuales usan en promedio más del 5 % de las CPU, Dataflow puede escalar verticalmente.

Las fuentes con lo que se detalla a continuación pueden limitar la cantidad de trabajadores nuevos: una pequeña cantidad de datos, datos no divisibles (como archivos comprimidos) y datos procesados por módulos de E/S que no dividen datos.

Los receptores configurados para escribir en una cantidad fija de fragmentos, como un destino de Cloud Storage que escribe en archivos existentes, pueden limitar la cantidad de trabajadores nuevos.

Si una canalización de transmisión está pendiente y los trabajadores usan, en promedio, más del 20% de sus CPU, Dataflow puede escalar verticalmente. Los trabajos pendientes se borran en un plazo aproximado de 150 segundos, según la capacidad de procesamiento actual por trabajador.

Disminución del escalamiento

Si el trabajo restante lleva menos tiempo que iniciar trabajadores nuevos y, los trabajadores actuales usan en promedio más del 5 % de las CPU, Dataflow puede disminuir el escalamiento.

Si un trabajo pendiente de canalización de transmisión es inferior a 20 segundos y los trabajadores usan en promedio menos del 80 % de las CPU, Dataflow puede disminuir el escalamiento. Después de disminuir el escalamiento, la nueva cantidad de trabajadores usa en promedio menos del 75 % de las CPU.

Sin ajuste de escala automático

Si la E/S demora más que el procesamiento de datos o los trabajadores usan en promedio menos del 5 % de las CPU, el paralelismo no se vuelve a calcular.

Si los trabajadores usan en promedio menos del 20 % de la CPU, el paralelismo no se vuelve a calcular.

Ajuste de escala automático por lotes

Para las canalizaciones por lotes, Dataflow elige de forma automática la cantidad de trabajadores según la cantidad de trabajo en cada etapa de la canalización y la capacidad de procesamiento actual en esa etapa. Dataflow determina cuántos datos procesa el conjunto actual de trabajadores y extrapola cuánto tiempo tarda el resto del trabajo en procesarse.

Si en la canalización se usa una fuente de datos personalizada que implementaste, hay algunos métodos que puedes implementar que proporcionan más información al algoritmo de ajuste de escala automático del servicio de Dataflow y que pueden mejorar el rendimiento:

Java: SDK 2.x

  • En la subclase BoundedSource, implementa el método getEstimatedSizeBytes. El servicio de Dataflow usa getEstimatedSizeBytes cuando calcula la cantidad inicial de trabajadores para usar en la canalización.
  • En la subclase BoundedReader, implementa el método getFractionConsumed. El servicio de Dataflow usa getFractionConsumed para realizar un seguimiento del progreso de lectura y converger en la cantidad correcta de trabajadores que se usarán durante una lectura.

Python

  • En la subclase BoundedSource, implementa el método estimate_size. El servicio de Dataflow usa estimate_size cuando calcula la cantidad inicial de trabajadores para usar en la canalización.
  • En la subclase RangeTracker, implementa el método fraction_consumed. El servicio de Dataflow usa fraction_consumed para realizar un seguimiento del progreso de lectura y converger en la cantidad correcta de trabajadores que se usarán durante una lectura.

Java: SDK 1.x

Ajuste de escala automático de transmisión

El ajuste de escala automático de transmisión permite que el servicio de Dataflow cambie de forma adaptable la cantidad de trabajadores que se usan para ejecutar la canalización la transmisión en respuesta a los cambios en la carga y el uso de recursos. El ajuste de escala automático de transmisión es una característica gratuita diseñada para reducir los costos de los recursos que se usan cuando se ejecutan canalizaciones de transmisión.

Sin el ajuste de escala automático, eliges una cantidad fija de trabajadores mediante la especificación de numWorkers o num_workers para ejecutar la canalización. Como la carga de trabajo de entrada varía con el tiempo, este número puede ser demasiado alto o demasiado bajo. Aprovisionar demasiados trabajadores da como resultado un costo adicional innecesario, mientras que aprovisionar muy pocos causa una mayor latencia para los datos procesados. Si se habilita el ajuste de escala automático, los recursos se usan solo cuando son necesarios.

El objetivo del ajuste de escala automático de canalización de transmisión es minimizar la acumulación mientras se maximiza el uso y la capacidad de procesamiento del trabajador, además de reaccionar con rapidez a los aumentos repentinos de carga. Cuando habilitas el ajuste de escala automático, no tienes que elegir entre el aprovisionamiento para la carga máxima y los resultados rápidos. Los trabajadores se agregan a medida que aumentan el uso de CPU y el trabajo pendiente, y se quitan cuando disminuyen. De esta manera, solo pagas por lo que necesitas, y el trabajo se procesa de la manera más eficiente posible.

Java: SDK 2.x

Fuentes no delimitadas personalizadas

Si en la canalización se usa una fuente no delimitada personalizada, la fuente debe informar al servicio de Dataflow sobre el trabajo pendiente. El trabajo pendiente es una estimación de la entrada en bytes que la fuente aún no procesó. Para informar al servicio sobre el trabajo pendiente, implementa cualquiera de los siguientes métodos en tu clase UnboundedReader.

  • getSplitBacklogBytes(): Trabajo pendiente para la división actual de la fuente. El servicio agrega el trabajo pendiente en todas las divisiones.
  • getTotalBacklogBytes(): el trabajo pendiente global en todas las divisiones. En algunos casos, el trabajo pendiente no está disponible para cada división y solo se puede calcular el total de todas las divisiones. Solo la primera división (ID de división “0”) debe proporcionar su trabajo pendiente total.
El repositorio de Apache Beam contiene varios ejemplos de fuentes personalizadas que implementan la clase UnboundedReader.
Habilita el ajuste de escala automático de transmisión

Para los trabajos de transmisión que usan Streaming Engine, el ajuste de escala automático se habilita de forma predeterminada.

A fin de habilitar el ajuste de escala automático para trabajos que no usan Streaming Engine, configura los siguientes parámetros de ejecución cuando inicies la canalización:

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=N

Para los trabajos de transmisión que no usan Streaming Engine, la cantidad mínima de trabajadores es 1/15 del valor --maxNumWorkers.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --maxNumWorkers. Ten esto en cuenta cuando especificas --maxNumWorkers, y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --maxNumWorkers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --maxNumWorkers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Python

Habilita el ajuste de escala automático de transmisión

Para habilitar el ajuste de escala automático, configura los siguientes parámetros de ejecución cuando inicias tu canalización:

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

Para los trabajos de transmisión que no usan Streaming Engine, la cantidad mínima de trabajadores es 1/15 del valor --maxNumWorkers.

Las canalizaciones de transmisión se implementan con un grupo fijo de discos persistentes, cuya cantidad es igual a --maxNumWorkers. Ten esto en cuenta cuando especificas --maxNumWorkers, y asegúrate de que este valor sea una cantidad suficiente de discos para tu canalización.

Uso y precios

El uso de Compute Engine se basa en el número promedio de trabajadores, mientras que el uso de discos persistentes se basa en el número exacto de --max_num_workers. Los discos persistentes se redistribuyen de modo que cada trabajador obtenga el mismo número de discos adjuntos.

En el ejemplo anterior, en el que --max_num_workers=15, se te cobran entre 1 y 15 instancias de Compute Engine y 15 discos persistentes.

Java: SDK 1.x

Realiza un ajuste de escala manual de una canalización de transmisión

Hasta que el ajuste de escala automático esté disponible en el modo de transmisión a nivel general, puedes usar una solución alternativa para escalar de forma manual la cantidad de trabajadores que ejecutan la canalización de transmisión mediante la función Actualizar de Dataflow.

Java: SDK 2.x

Para escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --maxNumWorkers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --numWorkers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar una nueva cantidad de trabajadores con el parámetro --numWorkers. El valor que establezcas para el --numWorkers nuevo debe estar entre N y --maxNumWorkers, en el que N es igual a --maxNumWorkers/15.

La actualización de la canalización reemplaza tu trabajo en ejecución por uno nuevo mediante la nueva cantidad de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Python

Para escalar tu canalización de transmisión durante la ejecución, asegúrate de configurar los siguientes parámetros de ejecución cuando inicies tu canalización:

  • Haz que --max_num_workers sea igual a la cantidad máxima de trabajadores que deseas tener disponibles para tu canalización.
  • Haz que --num_workers sea igual a la cantidad inicial de trabajadores que deseas que use tu canalización cuando comience a ejecutarse.

Puedes Actualizar tu canalización una vez que se encuentre en ejecución y especificar una nueva cantidad de trabajadores con el parámetro --num_workers. El valor que establezcas para el --num_workers nuevo debe estar entre N y --max_num_workers, en el que N es igual a --max_num_workers/15.

La actualización de la canalización reemplaza tu trabajo en ejecución por uno nuevo mediante la nueva cantidad de trabajadores, a la vez que conserva toda la información de estado asociada con el trabajo anterior.

Java: SDK 1.x

Rebalanceo dinámico del trabajo

La característica de rebalanceo dinámico de trabajos del servicio de Dataflow permite que el servicio vuelva a particionar de forma dinámica los trabajos en función de las condiciones del entorno de ejecución. A continuación, se mencionan algunas de esas condiciones:

  • Desbalances en asignaciones de trabajos
  • Trabajadores que tardan más de lo esperado en terminar
  • Trabajadores que terminan más rápido de lo esperado

El servicio de Dataflow detecta estas condiciones de forma automática y puede reasignar trabajos de manera dinámica a los trabajadores sin usar o con poco uso para disminuir el tiempo de procesamiento general del trabajo.

Limitaciones

El rebalanceo dinámico de trabajos solo ocurre cuando el servicio de Dataflow procesa algunos datos de entrada en paralelo: cuando lee datos de una fuente de entrada externa, cuando trabaja con una PCollection materializada intermedia o cuando trabaja con el resultado de una agregación como GroupByKey. Si se fusionan una gran cantidad de pasos en tu trabajo, hay menos PCollection intermedias, y el rebalanceo dinámico de trabajos se limitará a la cantidad de elementos en la fuente de PCollection materializada. Si quieres asegurarte de que el Rebalanceo dinámico de trabajos se pueda aplicar a una PCollection en particular en tu canalización, puedes evitar la fusión de diferentes maneras para garantizar un paralelismo dinámico.

El rebalanceo dinámico de trabajos no puede volver a paralelizar datos de forma más precisa que un solo registro. Si los datos contienen registros individuales que causan grandes demoras en el tiempo de procesamiento, incluso pueden demorar el trabajo, ya que Dataflow no puede subdividir y redistribuir un registro individual “activo” a varios trabajadores.

Java: SDK 2.x

Si estableces un número fijo de fragmentos para el resultado final de tu canalización (por ejemplo, si escribes datos mediante TextIO.Write.withNumShards), Cloud Dataflow limitará la paralelización según el número de fragmentos que hayas elegido.

Python

Si estableciste una cantidad fija de fragmentos para el resultado final de la canalización (por ejemplo, mediante la escritura de datos mediante beam.io.WriteToText(..., num_shards=...)), Dataflow limitará la paralelización según la cantidad de fragmentos que hayas elegido.

Java: SDK 1.x

La limitación de fragmentos fijos se puede considerar temporal y puede estar sujeta a cambios en versiones futuras del servicio de Dataflow.

Trabaja con fuentes de datos personalizadas

Java: SDK 2.x

Si en tu canalización se usa una fuente de datos personalizada que proporcionaste, debes implementar el método splitAtFraction para permitir que tu fuente funcione con la característica de rebalanceo dinámico de trabajos.

Python

Si tu canalización usa una fuente de datos personalizada que proporcionaste, tu RangeTracker debe implementar try_claim, try_split, position_at_fraction y fraction_consumed para permitir que tu fuente funcione con la característica de Rebalanceo dinámico de trabajos.

Consulta la referencia de la API sobre RangeTracker para obtener más información.

Java: SDK 1.x

Uso y administración de recursos

El servicio de Dataflow administra por completo los recursos en Google Cloud por trabajo. Esto incluye activar y desactivar instancias de Compute Engine (también llamadas trabajadoresVM) y acceder a los depósitos de Cloud Storage de tu proyecto para la E/S y la etapa de pruebas de archivos temporales. Sin embargo, si la canalización interactúa con tecnologías de almacenamiento de datos de Google Cloud, como BigQueryPub/Sub, debes administrar los recursos y la cuota de esos servicios tú mismo.

Dataflow usa una ubicación que proporciona el usuario en Cloud Storage de forma específica para archivos de etapa de pruebas. Esta ubicación está bajo tu control y debes asegurarte de que su vida útil se mantenga siempre que algún trabajo realice operaciones de lectura en ella. Puedes volver a usar la misma ubicación de etapa de pruebas para múltiples ejecuciones de trabajos, ya que el almacenamiento en caché integrado del SDK puede acelerar el tiempo de inicio de tus trabajos.

Trabajos

Puedes ejecutar hasta 100 trabajos de Dataflow simultáneos por proyecto de Google Cloud.

En la actualidad, el servicio de Dataflow se limita a procesar solicitudes de trabajos JSON que tengan un tamaño de 20 MB o menor. El tamaño de la solicitud de trabajo está vinculado en especial a la representación JSON de tu canalización; una canalización más grande implica una solicitud más grande.

Ejecuta tu canalización con la siguiente opción para estimar el tamaño de su solicitud JSON:

Java: SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

Java: SDK 1.x

Mediante este comando, se escribe una representación JSON de tu trabajo en un archivo. El tamaño del archivo serializado es una estimación apropiada del tamaño de la solicitud; el tamaño real será un poco mayor debido a que se incluye información adicional en la solicitud.

Para obtener más información, consulta la página de solución de problemas para “413 Entidad de solicitud demasiado grande” / “El tamaño de la representación JSON serializada de la canalización excede el límite permitido”.

Además, el tamaño del grafo de tu trabajo no debe superar los 10 MB. Para obtener más información, consulta la página sobre “El grafo del trabajo es demasiado grande. Vuelve a intentarlo con un grafo de trabajo más pequeño, o divide tu trabajo en dos o más trabajos más pequeños”.

Trabajadores

En la actualidad, el servicio de Dataflow permite un máximo de 1,000 instancias de Compute Engine por trabajo. El tipo de máquina predeterminado es n1-standard-1 para trabajos por lotes y n1-standard-4 para trabajos de transmisión. Por lo tanto, cuando se usan los tipos de máquina predeterminados, el servicio de Dataflow puede asignar hasta 4,000 núcleos por trabajo. Si necesitas más núcleos para tu trabajo, puedes seleccionar un tipo de máquina más grande.

Puedes usar cualquiera de las familias de tipos de máquinas de Compute Engine disponibles, así como los tipos de máquinas personalizados. Para obtener mejores resultados, usa tipos de máquina n1. Los tipos de máquinas de núcleo compartido, como los trabajadores de la serie f1 y g1, no son compatibles con el Acuerdo de Nivel de Servicio de Dataflow.

Dataflow factura por la cantidad de CPU virtuales y GB de memoria en los trabajadores. La facturación es independiente de la familia de tipos de máquinas. Puedes especificar un tipo de máquina para tu canalización si configuras el parámetro de ejecución apropiado cuando se crea la canalización.

Java: SDK 2.x

Para cambiar el tipo de máquina, establece la opción --workerMachineType.

Python

Para cambiar el tipo de máquina, establece la opción --worker_machine_type.

Java: SDK 1.x

Cuota de recursos

El servicio de Dataflow verifica que el proyecto de Google Cloud tenga la cuota de recursos de Compute Engine necesaria para ejecutar el trabajo, a fin de iniciar el trabajo y escalar al número máximo de instancias de trabajador. Tu trabajo no se iniciará si no hay suficiente cuota de recursos disponible.

La característica Ajuste de escala automático de Dataflow está limitada por la cuota disponible de Compute Engine del proyecto. Si tu trabajo tiene una cuota suficiente cuando se inicia, pero otro trabajo usa el resto de la cuota disponible de tu proyecto, el primer trabajo se ejecutará pero no podrá escalar por completo.

Sin embargo, el servicio de Dataflow no administra los aumentos de cuota de los trabajos que excedan las cuotas de recursos del proyecto. Tú tienes la responsabilidad de realizar las solicitudes necesarias para la cuota de recursos adicionales, en la que puedes usar Google Cloud Console.

Recursos de discos persistentes

En la actualidad, el servicio de Dataflow está limitado a 15 discos persistentes por instancia de trabajador cuando se ejecuta un trabajo de transmisión. Cada disco persistente pertenece a una máquina virtual de Compute Engine individual. Tu trabajo no puede tener más trabajadores que discos persistentes; la asignación de recursos mínima es una proporción de 1:1 entre trabajadores y discos.

El tamaño predeterminado de cada disco persistente para los trabajos que se ejecutan en VM de trabajador es de 250 GB en modo por lotes y 400 GB en modo de transmisión. Los trabajos que usan Streaming Engine o Dataflow Shuffle se ejecutan en el backend del servicio de Dataflow y usan discos más pequeños.

Ubicaciones

De forma predeterminada, el servicio de Dataflow implementa los recursos de Compute Engine en la zona us-central1-f de la región us-central1. Puedes anular esta configuración si especificas el parámetro --region. Si necesitas usar una zona específica para tus recursos, usa el parámetro --zone cuando creas tu canalización. Sin embargo, recomendamos que solo especifiques la región y dejes la zona sin especificar. Esto permite que el servicio de Dataflow seleccione de forma automática la mejor zona dentro de la región en función de la capacidad de zona disponible en el momento de la solicitud de creación de trabajo. Para obtener más información, consulta la documentación de extremos regionales.

Streaming Engine

En la actualidad, el ejecutor de canalizaciones de Dataflow ejecuta los pasos de la canalización de transmisión por completo en máquinas virtuales de trabajador, lo que consume CPU, memoria y almacenamiento en disco persistente. Streaming Engine de Dataflow retira la ejecución de la canalización de las VM de trabajador y la traslada hacia el backend del servicio de Dataflow.

Beneficios de Streaming Engine

El modelo de Streaming Engine tiene los siguientes beneficios:

  • Una reducción en los recursos consumidos de CPU, memoria y almacenamiento en el disco persistente en las VM de trabajador. Streaming Engine funciona mejor con tipos de máquinas de trabajador más pequeñas (n1-standard-2 en lugar de n1-standard-4) y no requiere un disco persistente además de un disco de arranque pequeño, lo que genera un menor consumo de recursos y cuotas.
  • Ajuste de escala automático más sensible en la respuesta a las variaciones en el volumen de datos entrantes. Streaming Engine ofrece un escalamiento más uniforme y detallado de los trabajadores.
  • Compatibilidad mejorada, ya que no es necesario volver a implementar las canalizaciones para aplicar las actualizaciones del servicio.

La mayor parte de la reducción en los recursos de los trabajadores proviene de derivar el trabajo al servicio de Dataflow. Por esa razón, hay un costo asociado con el uso de Streaming Engine. Sin embargo, se espera que la factura total de las canalizaciones de Dataflow que usan Streaming Engine no cambie demasiado en comparación con el costo total de las canalizaciones de Dataflow que no usan esta opción.

Usa Streaming Engine

En la actualidad, Streaming Engine está disponible para la transmisión de canalizaciones en las siguientes regiones. Estará disponible en otras regiones más adelante.

  • us-west1 (Oregón)
  • us-central1 (Iowa)
  • us-east1 (Carolina del Sur)
  • us-east4 (Virginia del Norte)
  • europe-west2 (Londres)
  • europe-west1 (Bélgica)
  • europe-west4 (Países Bajos)
  • europe-west3 (Fráncfort)
  • asia-east1 (Taiwán)
  • asia-northeast1 (Tokio)

Java: SDK 2.x

Para usar Streaming Engine en tus canales de transmisión, especifica el siguiente parámetro:

  • --enableStreamingEngine si usas el SDK de Apache Beam para la versión 2.11.0 o posterior de Java.
  • --experiments=enable_streaming_engine si usas el SDK de Apache Beam para la versión 2.10.0 de Java.

Si usas Dataflow Streaming Engine para la canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Streaming Engine esté disponible en la actualidad. Dataflow selecciona de forma automática la zona en la región especificada. Si especificas el parámetro --zone y lo configuras en una zona fuera de las regiones disponibles, Dataflow informará un error.

Streaming Engine funciona mejor con tipos de máquina de trabajador más pequeñas, por lo que recomendamos que configures --workerMachineType=n1-standard-2. También puedes establecer --diskSizeGb=30 porque Streaming Engine solo necesita espacio para la imagen de arranque del trabajador y los registros locales. Estos valores son los predeterminados.

Python

Para usar Streaming Engine en tus canales de transmisión, especifica el siguiente parámetro:

--enable_streaming_engine

Si usas Dataflow Streaming Engine para la canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Streaming Engine esté disponible en la actualidad. Dataflow selecciona de forma automática la zona en la región especificada. Si especificas el parámetro --zone y lo configuras en una zona fuera de las regiones disponibles, Dataflow informará un error.

Streaming Engine funciona mejor con tipos de máquina de trabajador más pequeñas, por lo que recomendamos que configures --machine_type=n1-standard-2. También puedes establecer --disk_size_gb=30 porque Streaming Engine solo necesita espacio para la imagen de arranque del trabajador y los registros locales. Estos valores son los predeterminados.

Java: SDK 1.x

Dataflow Shuffle

Dataflow Shuffle es la operación base detrás de las transformaciones de Dataflow, como GroupByKey, CoGroupByKey y Combine. La operación de Dataflow Shuffle particiona y agrupa los datos por clave de forma escalable, eficiente y tolerante a errores. En la actualidad, Dataflow usa una implementación de redistribución que se ejecuta en su totalidad en máquinas virtuales de trabajadores y consume CPU del trabajador, memoria y almacenamiento en disco persistente. La característica Dataflow Shuffle basada en servicios, disponible solo para canalizaciones por lotes, retira la operación de redistribución de las VM de trabajador y las traslada hacia el backend del servicio de Dataflow.

Beneficios de Dataflow Shuffle

Dataflow Shuffle está basado en servicios y tiene los siguientes beneficios:

  • Tiempo de ejecución más rápido de las canalizaciones por lotes para la mayoría de los tipos de trabajo de canalización
  • Una reducción en los recursos consumidos de CPU, memoria y almacenamiento en el disco persistente en las VM de trabajador.
  • Mejor ajuste de escala automático, porque las VM ya no contienen datos de distribución y, por lo tanto, se puede reducir antes su escala
  • Mejor tolerancia a errores: una VM en mal estado que contenga datos de Dataflow Shuffle no hará que falle todo el trabajo, como sucedería si no se usara esta característica.

La mayor parte de la reducción de los recursos de los trabajadores proviene de derivar el trabajo de redistribución en el servicio de Dataflow. Por ese motivo, existe una carga asociada con el uso de Dataflow Shuffle. Sin embargo, se espera que la factura total de las canalizaciones de Dataflow que usan la implementación de Dataflow basada en servicios sea menor o igual que el costo de las canalizaciones de Dataflow que no usan esta opción.

Para la mayoría de los tipos de trabajo de canalización, se espera que Dataflow Shuffle se ejecute más rápido que la implementación de redistribución que se ejecuta en las VM de trabajador. Sin embargo, la duración puede variar de una ejecución a otra. Si ejecutas una canalización que tiene plazos importantes, recomendamos que asignes suficiente tiempo de búfer antes del plazo. Además, considera solicitar una cuota mayor para Shuffle.

Consideraciones sobre discos

Cuando se usa la característica Dataflow Shuffle basada en servicios, no es necesario conectar grandes discos persistentes a las VM de trabajador. Dataflow conecta de forma automática un pequeño disco de arranque de 25 GB. Sin embargo, debido al pequeño tamaño de este disco, hay importantes consideraciones que debes tener en cuenta cuando usas Dataflow Shuffle:

  • Una VM de trabajador usa parte de los 25 GB de espacio en disco para el sistema operativo, los archivos binarios, los registros y los contenedores. Los trabajos que usan una cantidad significativa de disco y exceden la capacidad restante de este pueden fallar cuando usas Dataflow Shuffle.
  • Los trabajos que usan una gran cantidad de E/S de disco pueden ser lentos debido al rendimiento del disco pequeño. Para obtener más información sobre las diferencias de rendimiento entre los tamaños de disco, consulta la página sobre el rendimiento del disco persistente de Compute Engine.

Si alguna de estas consideraciones se aplica a tu trabajo, puedes usar las opciones de canalización para especificar un tamaño de disco más grande.

Usa Dataflow Shuffle

En la actualidad, Dataflow Shuffle basado en servicios está disponible en las siguientes regiones:

  • us-west1 (Oregón)
  • us-central1 (Iowa)
  • us-east1 (Carolina del Sur)
  • us-east4 (Virginia del Norte)
  • europe-west2 (Londres)
  • europe-west1 (Bélgica)
  • europe-west4 (Países Bajos)
  • europe-west3 (Fráncfort)
  • asia-east1 (Taiwán)
  • asia-northeast1 (Tokio)

En el futuro, Dataflow Shuffle estará disponible en otras regiones.

Java: SDK 2.x

Para poder usar Dataflow Shuffle basado en servicios en las canalizaciones por lotes, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Dataflow Shuffle para la canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Dataflow selecciona de forma automática la zona en la región especificada. Si se especifica el parámetro --zone y se configura en una zona fuera de las regiones disponibles, Dataflow informará un error.

Python

Para poder usar Dataflow Shuffle basado en servicios en las canalizaciones por lotes, especifica el siguiente parámetro:
--experiments=shuffle_mode=service

Si usas Dataflow Shuffle para la canalización, no especifiques el parámetro --zone. En su lugar, especifica el parámetro --region y establece el valor en una de las regiones en las que Shuffle está disponible en la actualidad. Dataflow selecciona de forma automática la zona en la región especificada. Si se especifica el parámetro --zone y se configura en una zona fuera de las regiones disponibles, Dataflow informará un error.

Java: SDK 1.x

Programación flexible de recursos de Dataflow

Dataflow FlexRS reduce los costos de procesamiento por lotes mediante el uso de técnicas de programación avanzadas, el servicio Dataflow Shuffle y una combinación de instancias de máquinas virtuales interrumpibles (VM) y de VM normales. Mediante la ejecución de VM interrumpibles y VM normales en paralelo, Dataflow mejora la experiencia del usuario en caso de que Compute Engine detenga las instancias de VM interrumpibles durante un evento del sistema. FlexRS ayuda a garantizar que la canalización progrese y no pierdas trabajo anterior cuando Compute Engine interrumpe las VM interrumpibles. Para obtener más información sobre FlexRS, consulta Usa la programación flexible de recursos en Dataflow.

Dataflow Runner v2

El ejecutor de Dataflow de producción actual usa trabajadores específicos del lenguaje cuando ejecuta canalizaciones de Apache Beam. El ejecutor de Dataflow cambiará a una arquitectura más basada en servicios para mejorar la escalabilidad, la generalidad, la extensibilidad y la eficiencia. Estos cambios incluyen una arquitectura de trabajador más eficiente y portátil empaquetada junto con el servicio Shuffle y Streaming Engine.

El nuevo ejecutor de Dataflow, Dataflow Runner v2, está disponible para pruebas con canalizaciones de transmisión de Python. Te recomendamos probar Dataflow Runner v2 con tu carga de trabajo actual antes de que esté habilitada de forma predeterminada en todas las canalizaciones nuevas. No tienes que hacer ningún cambio en tu código de canalización para aprovechar esta nueva arquitectura.

Beneficios de usar Dataflow Runner v2

A partir de las canalizaciones de transmisión de Python, las funciones nuevas estarán disponibles solo en Dataflow Runner v2. Además, la eficiencia mejorada de la arquitectura de Dataflow Runner v2 podría mejorar el rendimiento de tus trabajos de Dataflow.

Durante la prueba inicial de Dataflow Runner v2, es posible que notes una reducción en tu factura. El modelo de facturación para Dataflow Runner v2 aún no es definitivo, por lo que tu factura podría volver a los niveles actuales cercanos, ya que el nuevo ejecutor está habilitado en todas las canalizaciones.

Usa Dataflow Runner v2

Dataflow Runner v2 está disponible en regiones que tienen extremos regionales de Dataflow.

Java: SDK 2.x

Dataflow Runner v2 no está disponible para Java en este momento.

Python

Dataflow Runner v2 requiere Streaming Engine. Para habilitarlos, especifica el siguiente parámetro:
--experiments=use_runner_v2

Depura trabajos de Dataflow Runner v2

Para depurar trabajos con Dataflow Runner v2, debes seguir los pasos de depuración estándar. Sin embargo, ten en cuenta lo siguiente cuando uses Dataflow Runner v2:

  • Los trabajos de Dataflow Runner v2 ejecutan dos tipos de procesos en la VM de trabajador: el proceso del SDK y el proceso de aprovechamiento del ejecutor. Puede haber uno o más procesos del SDK, según la canalización y el tipo de VM, pero solo hay un proceso de aprovechamiento del ejecutor por VM.
  • Los procesos del SDK ejecutan el código de usuario y otras funciones específicas del lenguaje, mientras que el proceso de aprovechamiento del ejecutor administra todo lo demás.
  • El proceso de aprovechamiento del ejecutor espera a que todos los procesos del SDK se conecten a él antes de comenzar a solicitar trabajo de Dataflow.
  • Es posible que los trabajos se retrasen si la VM de trabajador instala y descarga dependencias durante el inicio del proceso del SDK. Si hay problemas en un proceso de SDK, como el inicio o la instalación de bibliotecas, el trabajador informa que está en mal estado.
  • Los registros de VM de trabajador, disponibles a través del visor de registros o la interfaz de supervisión de Dataflow, incluyen los registros del proceso del aprovechamiento del ejecutor y de los procesos del SDK.
  • Para diagnosticar problemas en tu código de usuario, examina los registros de trabajador de los procesos del SDK. Si encuentras errores en los registros del aprovechamiento del ejecutor, comunícate con el equipo de asistencia para informar un error.