Preguntas frecuentes

En la siguiente sección, se brindan respuestas a algunas preguntas frecuentes sobre Cloud Dataflow.

Preguntas generales

¿Dónde puedo obtener asistencia adicional?

Puedes visitar la página del Servicio de asistencia de Google Cloud Platform (GCP) a fin de obtener un paquete de asistencia para GCP, incluido Cloud Dataflow.

Puedes usar StackOverflow para investigar tu pregunta actual o enviar una nueva. Cuando envíes tu pregunta, etiquétala con google-cloud-dataflow. Los miembros del personal de ingeniería de Google supervisan este grupo y te ayudarán a responder tus preguntas.

También puedes enviar preguntas, solicitudes de funciones, informes de errores o defectos y otros comentarios en el foro UserVoice.

¿Es posible compartir datos a través de instancias de canalización?

No existe un mecanismo de comunicación de canalización cruzada específico de Cloud Dataflow para compartir datos o procesar el contexto entre las canalizaciones. Puedes usar un almacenamiento duradero, como Cloud Storage, o una memoria de almacenamiento en caché, como App Engine, para compartir datos entre las instancias de canalización.

¿Existe un mecanismo de programación integrado para ejecutar canalizaciones en un momento o intervalo determinados?

Puedes automatizar la ejecución de canalizaciones de la siguiente manera:

¿Cómo puedo saber qué versión del SDK de Cloud Dataflow está instalada o se ejecuta en mi entorno?

Los detalles de instalación dependen de tu entorno de desarrollo. Si usas Maven, puedes tener varias versiones del SDK de Cloud Dataflow “instaladas” en un repositorio local de Maven (o en más de uno).

Java

Para saber qué versión del SDK de Cloud Dataflow ejecuta una canalización determinada, puedes ver el resultado de la consola cuando se ejecuta con DataflowPipelineRunner o BlockingDataflowPipelineRunner. En la consola, aparecerá un mensaje como el que se muestra a continuación, que contiene la información de la versión del SDK de Cloud Dataflow:

Python

Para saber qué versión del SDK de Cloud Dataflow ejecuta una canalización determinada, puedes ver el resultado de la consola cuando se ejecuta con DataflowRunner. En la consola, aparecerá un mensaje como el que se muestra a continuación, que contiene la información de la versión del SDK de Cloud Dataflow:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Interactúa con tu trabajo de Cloud Dataflow

¿Es posible acceder a las máquinas de trabajador de mi trabajo (VM de Compute Engine) mientras mi canalización se está ejecutando?

Puedes ver las instancias de VM de una canalización determinada con Google Cloud Platform Console. Desde allí, puedes acceder a cada instancia con SSH. Sin embargo, una vez que tu trabajo finalice o exista alguna falla, el servicio Cloud Dataflow se cerrará y limpiará las instancias de VM de manera automática.

¿Por qué no veo el tiempo de CPU reservado para mi trabajo de transmisión en la interfaz de supervisión de Cloud Dataflow?

El servicio de Cloud Dataflow informa el tiempo de CPU reservado después de que se completan los trabajos. En el caso de los trabajos no delimitados, esto significa que el tiempo de CPU reservado solo se informa después de que los trabajos se cancelan o fallan.

¿Por qué el estado de trabajo y la información de marca de agua no están disponibles para los trabajos de transmisión actualizados recientemente en la interfaz de supervisión de Cloud Dataflow?

La operación de actualización realiza varios cambios que demoran unos minutos en propagarse a la interfaz de supervisión de Cloud Dataflow. Prueba actualizar la interfaz de supervisión 5 minutos después de la actualización de tu trabajo.

¿Por qué mis transformaciones compuestas personalizadas aparecen expandidas en la interfaz de supervisión de Cloud Dataflow?

En tu código de canalización, es posible que hayas invocado la transformación compuesta de la siguiente manera:

result = transform.apply(input);

Las transformaciones compuestas invocadas de esta manera omiten el anidamiento esperado y, por lo tanto, pueden aparecer expandidas en la interfaz de supervisión de Cloud Dataflow. Tu canalización también puede generar advertencias o errores sobre nombres de usuario únicos en el tiempo de ejecución de la canalización.

Para evitar estos problemas, asegúrate de invocar tus transformaciones mediante el formato recomendado:

result = input.apply(transform);

¿Por qué ya no puedo ver la información de mi trabajo en curso en la interfaz de supervisión de Cloud Dataflow, aunque esa información haya aparecido anteriormente?

En la actualidad, existe un problema común que puede afectar a algunos trabajos de Cloud Dataflow que comenzaron a ejecutarse hace un mes o más. Es posible que esos trabajos no se carguen de manera correcta en la interfaz de supervisión de Google Dataflow o que muestren información desactualizada, incluso si el trabajo era visible antes.

Aún puedes obtener tu estado de trabajo en la lista de trabajos cuando utilices la interfaz de supervisión o la interfaz de línea de comandos de Cloud Dataflow. Sin embargo, si se presenta este problema, no podrás ver los detalles de tu trabajo.

Cómo programar con el SDK de Apache Beam para Java

¿Puedo transferir datos adicionales (fuera de banda) a una operación ParDo existente?

Sí. Hay varios patrones para seguir que varían según el caso práctico:

  • Puedes serializar información como campos en tu subclase DoFn.
  • Cualquier variable a la que se haga referencia a través de los métodos en una DoFn anónima se serializará de manera automática.
  • Puedes procesar datos dentro de DoFn.startBundle().
  • Puedes pasar datos a través de ParDo.withSideInputs.

Si deseas obtener más información, consulta la documentación de ParDo, en particular, las secciones sobre cómo crear entradas DoFn y complementarias, además de la documentación de referencia de la API de Java para ParDo.

¿Cómo se controlan las excepciones de Java en Cloud Dataflow?

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o punteros nulos durante el cálculo.

Cloud Dataflow procesa los elementos en paquetes arbitrarios y reintentará procesar el paquete completo cuando se produzca un error en cualquier elemento de ese paquete. Cuando se ejecutan en modo por lotes, los paquetes que incluyen un artículo defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará completamente. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintentará indefinidamente, lo que puede hacer que la canalización se detenga de forma permanente.

Las excepciones en el código de usuario (por ejemplo, tus instancias DoFn) se informan en la interfaz de supervisión de Cloud Dataflow. Si ejecutas tu canalización con BlockingDataflowPipelineRunner, también verás mensajes de error impresos en tu consola o ventana de terminal.

Se recomienda que agregues controladores de excepciones para protegerte contra los errores de tu código. Por ejemplo, si deseas descartar elementos que producen fallas en una validación de entrada personalizada en ParDo, usa un bloque try-catch dentro de tu ParDo para manejar la excepción y descartar el elemento. También puedes usar un Aggregator para realizar un seguimiento de los recuentos de errores.

Programa con el SDK de Cloud Dataflow para Python

¿Cómo debo solucionar NameError?

Si obtienes un NameError cuando ejecutas tu canalización con el servicio de Cloud Dataflow, pero no cuando ejecutas de manera local (es decir, mediante DirectRunner), es posible que tus DoFn usen valores en el espacio de nombres global que no están disponibles en el trabajador de Cloud Dataflow.

Las importaciones globales, las funciones y las variables definidas en la sesión principal no se guardan durante la serialización de un trabajo de Cloud Dataflow de forma predeterminada. Si, por ejemplo, tus DoFn se definen en el archivo principal y las importaciones y funciones de referencia en el espacio de nombres global, puedes establecer la opción de canalización --save_main_session en True. Esto hará que el estado del espacio de nombres global se serialice y se cargue en el trabajador de Cloud Dataflow.

Ten en cuenta que, si hay objetos en tu espacio de nombres global que no pueden serializarse, te aparecerá un error de serialización. Si el error está relacionado con un módulo que debería estar disponible en la distribución de Python, puedes resolverlo importando el módulo localmente, que es el lugar en que se usa.

Por ejemplo, en lugar de hacer lo siguiente:

import re
…
def myfunc():
  # use re module

usa este comando:

def myfunc():
  import re
  # use re module

De manera alternativa, si tus DoFn abarcan varios archivos, debes usar un enfoque diferente para empaquetar el flujo de trabajo y administrar las dependencias.

E/S de canalización

¿La fuente y el receptor de TextIO admiten archivos comprimidos, como GZip?

Sí. Java en Cloud Dataflow puede leer archivos comprimidos con gzip y bzip2. Consulta la documentación de TextIO para obtener información adicional.

¿Puedo usar una expresión regular para orientar archivos específicos con la fuente de TextIO?

Cloud Dataflow admite patrones de comodín generales, por lo que la expresión glob puede aparecer en cualquier parte de la ruta de acceso del archivo. Sin embargo, Cloud Dataflow no admite comodines recursivos (**).

¿La fuente de entrada de TextIO admite JSON?

Sí. Sin embargo, para que el servicio de Cloud Dataflow pueda procesar paralelamente la entrada y la salida, los datos de origen deben estar delimitados con un salto de línea.

¿Por qué no se activa el rebalanceo de trabajo dinámico con mi fuente personalizada?

El rebalanceo de trabajo dinámico usa el valor de muestra del método getProgress() de tu fuente personalizada para activarse. La implementación predeterminada para getProgress() muestra null. A fin de garantizar que se active el ajuste de escala automático, asegúrate de que la fuente personalizada anule getProgress() para mostrar un valor apropiado.

¿Cómo accedo a los conjuntos de datos de BigQuery, a los temas de Pub/Sub o a las suscripciones que pertenecen a un proyecto diferente de Google Cloud Platform (es decir, no al proyecto con el que uso Cloud Dataflow)?

Consulta la guía de seguridad y permisos de Cloud Dataflow para obtener información sobre cómo acceder a los datos de Cloud Pub/Sub o BigQuery en un proyecto de GCP diferente al que usa Cloud Dataflow.

Transmisión

¿Cómo ejecuto mi canalización en modo de transmisión?

Puedes configurar la marca --streaming en la línea de comandos cuando ejecutas tu canalización. También puedes configurar el modo de transmisión de manera programática cuando construyes tu canalización.

¿Qué fuentes y receptores de datos son compatibles con el modo de transmisión?

Puedes leer datos de transmisión en Cloud Pub/Sub y escribir datos de transmisión en Cloud Pub/Sub o BigQuery.

¿Cuáles son las limitaciones actuales del modo de transmisión?

El modo de transmisión de Cloud Dataflow tiene las siguientes limitaciones:

  • Las fuentes por lotes aún no son compatibles con el modo de transmisión.
  • Las funciones de ajuste de escala automático del servicio de Cloud Dataflow son compatibles con la versión Beta.

Parece que la canalización de transmisión que lee en Pub/Sub se está ralentizando. ¿Qué puedo hacer?

Es posible que la cuota de Cloud Pub/Sub de tu proyecto sea insuficiente. Puedes averiguar si tu proyecto tiene una cuota insuficiente mediante la verificación de errores del cliente 429 (Rate limit exceeded):

  1. Ve a Google Cloud Platform Console.
  2. En el menú de la izquierda, selecciona API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Consulta Códigos de respuesta y busca códigos de error de cliente (4xx).

¿Por qué mi trabajo de transmisión no se ajusta de forma ascendente correctamente cuando actualizo la canalización con un grupo de trabajadores más grande?

Java

Para los trabajos de transmisión que no usan Streaming Engine, no puedes escalar por encima de la cantidad original de trabajadores y los recursos de disco persistente asignados al comienzo de tu trabajo original. Cuando actualizas un trabajo de Cloud Dataflow y especificas una cantidad mayor de trabajadores en el trabajo nuevo, solo puedes especificar una cantidad de trabajadores igual al --maxNumWorkers que especificaste para tu trabajo original.

Python

Para los trabajos de transmisión que no usan Streaming Engine, no puedes escalar por encima de la cantidad original de trabajadores y los recursos de disco persistente asignados al comienzo de tu trabajo original. Cuando actualizas un trabajo de Cloud Dataflow y especificas una cantidad mayor de trabajadores en el trabajo nuevo, solo puedes especificar una cantidad de trabajadores igual al --max_num_workers que especificaste para tu trabajo original.

Ajuste de escala automático de transmisión

¿Qué debo hacer si quiero una cantidad fija de trabajadores?

Para habilitar el ajuste de escala automático de transmisión, debes aceptarlo; no tiene una configuración predeterminada. La semántica de las opciones actuales no está cambiando; por lo tanto, para seguir con una cantidad fija de trabajadores, no necesitas hacer nada.

Me preocupa el hecho de que el ajuste de escala automático aumente el importe de mi factura. ¿Cómo puedo limitarlo?

Java

Cuando especificas --maxNumWorkers, limitas el rango de escalamiento usado para procesar el trabajo.

Python

Cuando especificas --max_num_workers, limitas el rango de escalamiento usado para procesar el trabajo.

¿Cuál es el rango de escalamiento para las canalizaciones de ajuste de escala automático de transmisión?

Java

Para trabajos de transmisión de ajuste de escala automático que no usen Streaming Engine, el servicio de Cloud Dataflow asigna entre 1 y 15 discos persistentes a cada trabajador. Esto significa que la cantidad mínima de trabajadores usados para una canalización de ajuste de escala automático es N/15, en la que N es el valor de --maxNumWorkers.

Para trabajos de transmisión de ajuste de escala automático que usan Streaming Engine, la cantidad mínima de trabajadores es 1.

Cloud Dataflow balancea la cantidad de discos persistentes entre los trabajadores. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --maxNumWorkers=15. La canalización escala de manera automática entre 1 y 15 trabajadores, con 1, 2, 3, 4, 5, 8 o 15 trabajadores, que se corresponden con 15, 8, 5, 4, 3, 2 o 1 disco persistente por trabajador, respectivamente.

--maxNumWorkers puede ser 1,000 como máximo.

Python

Para trabajos de transmisión de ajuste de escala automático que no usen Streaming Engine, el servicio de Cloud Dataflow asigna entre 1 y 15 discos persistentes a cada trabajador. Esto significa que la cantidad mínima de trabajadores usados para una canalización de ajuste de escala automático es N/15, en la que N es el valor de --max_num_workers.

Para trabajos de transmisión de ajuste de escala automático que usan Streaming Engine, la cantidad mínima de trabajadores es 1.

Cloud Dataflow balancea la cantidad de discos persistentes entre los trabajadores. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --max_num_workers=15. La canalización escala de manera automática entre 1 y 15 trabajadores, con 1, 2, 3, 4, 5, 8 o 15 trabajadores, que se corresponden con 15, 8, 5, 4, 3, 2 o 1 disco persistente por trabajador, respectivamente.

--max_num_workers puede ser 1,000 como máximo.

¿Cuál es la cantidad máxima de trabajadores que puede usar el ajuste de escala automático?

Java

Cloud Dataflow funciona dentro de los límites de la cuota de recuento de instancias de Compute Engine de tu proyecto o maxNumWorkers, lo que sea menor.

Python

Cloud Dataflow funciona dentro de los límites de la cuota de recuento de instancias de Compute Engine de tu proyecto o max_num_workers, lo que sea menor.

¿Puedo desactivar el ajuste de escala automático en mi canalización de transmisión?

Java

Sí. Establece --autoscalingAlgorithm=NONE. Actualiza la canalización con especificaciones fijas del clúster, como se describe en la documentación del escalamiento manual, en el que numWorkers se encuentra dentro del rango de escalamiento.

Python

Sí. Establece --autoscaling_algorithm=NONE. Actualiza la canalización con especificaciones fijas del clúster, como se describe en la documentación del escalamiento manual, en el que num_workers se encuentra dentro del rango de escalamiento.

¿Puedo cambiar el rango de escalamiento en mi canalización de transmisión?

Java

Sí, pero no puedes hacerlo con la Actualización*. Debes detener la canalización con Cancelar o Desviar y volver a implementarla con el maxNumWorkers nuevo deseado.

Python

Sí, pero no puedes hacerlo con la Actualización*. Debes detener la canalización con Cancelar o Desviar y volver a implementarla con el max_num_workers nuevo deseado.

Configura tu proyecto de Google Cloud Platform para usar Cloud Dataflow

¿Cómo puedo determinar si el proyecto que estoy usando con Cloud Dataflow posee un depósito de Cloud Storage que quiero leer o escribir?

Para determinar si tu proyecto de GCP posee un depósito de Cloud Storage en particular, puedes usar el siguiente comando de consola:

gsutil acl get gs://<your-bucket>

El comando genera como resultado una string JSON similar a la siguiente:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

Las entradas relevantes son aquellas en las que la “función” es propietario. El projectNumber asociado te indica qué proyecto es el propietario de ese depósito. Si el número de proyecto no coincide con el número de tu proyecto, tendrás que elegir una de las siguientes opciones:

  • Crear un depósito nuevo que pertenezca a tu proyecto
  • Proporcionar acceso al depósito para las cuentas adecuadas

¿Cómo creo un depósito nuevo que pertenezca a mi proyecto de Cloud Dataflow?

A fin de crear un nuevo depósito en el proyecto de GCP en que estás utilizando Cloud Dataflow, puedes usar el siguiente comando de consola:

gsutil mb -p <Project to own the bucket> <bucket-name>

¿Cómo puedo lograr que un depósito perteneciente a un proyecto diferente se pueda leer o escribir en el proyecto de Google Cloud Platform que uso con Cloud Dataflow?

Consulta la guía de seguridad y permisos de Cloud Dataflow para obtener información sobre cómo tu canalización de Cloud Dataflow puede acceder a los recursos de GCP que pertenecen a un proyecto de GCP diferente.

Cuando intento ejecutar mi trabajo de Cloud Dataflow, veo un error que dice: “Es necesario habilitar algunas API de Cloud en tu proyecto para que Cloud Dataflow ejecute este trabajo”. ¿Qué tengo que hacer?

Para ejecutar un trabajo de Cloud Dataflow, debes habilitar las siguientes API de GCP en tu proyecto:

  • API de Compute Engine (Compute Engine)
  • API de Cloud Logging
  • Cloud Storage
  • API de Cloud Storage JSON
  • API de BigQuery
  • Cloud Pub/Sub
  • API de Cloud Datastore

Consulta la sección sobre cómo comenzar a habilitar las API de GCP para obtener instrucciones detalladas.

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.