Preguntas frecuentes

En la siguiente sección, se brindan respuestas a algunas preguntas frecuentes acerca de Cloud Dataflow.

Preguntas generales

¿Dónde puedo obtener más asistencia?

Puedes visitar el Servicio de asistencia de Google Cloud Platform (GCP) a fin de obtener un paquete de asistencia para GCP, incluido Cloud Dataflow.

Puedes usar StackOverflow para investigar tu pregunta actual o enviar una nueva. Cuando envíes tu pregunta, etiquétala con google-cloud-dataflow. Los miembros del personal de ingeniería de Google supervisan este grupo y te ayudarán a responder tus preguntas.

También puedes enviar preguntas, solicitudes de funciones, informes de errores o defectos y otros comentarios en el foro UserVoice.

¿Es posible compartir datos a través de instancias de canalización?

No existe un mecanismo de comunicación de canalización cruzada específico de Cloud Dataflow para compartir datos o procesar el contexto entre las canalizaciones. Puedes usar un almacenamiento duradero, como Cloud Storage, o una memoria de almacenamiento en caché (p. ej., App Engine) para compartir datos entre las instancias de canalización.

¿Existe un mecanismo de programación integrado para ejecutar canalizaciones en un momento o intervalo determinado?

¿Cómo puedo saber qué versión del SDK de Cloud Dataflow está instalada o está ejecutándose en mi entorno?

Los detalles de instalación dependen de tu entorno de desarrollo. Si utilizas Maven, puedes tener varias versiones del SDK de Cloud Dataflow “instaladas” en un repositorio local de Maven (o en más de uno).

Java

Para saber qué versión del SDK de Cloud Dataflow se está ejecutando en una canalización determinada, observa los resultados de la consola cuando se ejecuta con DataflowPipelineRunner o BlockingDataflowPipelineRunner. La consola tendrá un mensaje como el que se muestra a continuación, que contiene la información de la versión del SDK de Cloud Dataflow:

Python

Para saber qué versión del SDK de Cloud Dataflow se está ejecutando en una canalización determinada, observa los resultados de la consola cuando se ejecuta con DataflowRunner. La consola tendrá un mensaje como el que se muestra a continuación, que contiene la información de la versión del SDK de Cloud Dataflow:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Cómo interactuar con tu trabajo de Cloud Dataflow

¿Es posible acceder a las máquinas de trabajador de mi trabajo (VM de Compute Engine) mientras mi canalización se está ejecutando?

Puedes ver las instancias de VM de una canalización determinada con Google Cloud Platform Console. Desde allí, puedes acceder a cada instancia con SSH. Sin embargo, una vez que tu trabajo finalice o exista alguna falla, el servicio Cloud Dataflow se cerrará y limpiará las instancias de VM de manera automática.

¿Por qué no veo el tiempo de CPU reservado para mi trabajo de transmisión en la interfaz de supervisión de Cloud Dataflow?

El servicio de Cloud Dataflow informa el tiempo de CPU reservado después de que se completan los trabajos. En el caso de los trabajos no delimitados, esto significa que el tiempo de CPU reservado solo se informa después de que los trabajos se cancelan o fallan.

¿Por qué el estado de trabajo y la información de marca de agua no están disponibles para los trabajos de transmisión actualizados recientemente en la interfaz de supervisión de Cloud Dataflow?

La operación de actualización realiza varios cambios que demoran unos minutos en propagarse a la interfaz de supervisión de Cloud Dataflow. Prueba actualizar la interfaz de supervisión 5 minutos después de la actualización de tu trabajo.

¿Por qué mis transformaciones compuestas personalizadas aparecen expandidas en la interfaz de supervisión de Cloud Dataflow?

En tu código de canalización, es posible que hayas invocado la transformación compuesta de la siguiente manera:

result = transform.apply(input);

Las transformaciones compuestas invocadas de esta manera omiten el anidamiento esperado y, por lo tanto, pueden aparecer expandidas en la interfaz de supervisión de Cloud Dataflow. Tu canalización también puede generar advertencias o errores sobre nombres de usuario únicos en el tiempo de ejecución de la canalización.

Para evitar estos problemas, asegúrate de invocar tus transformaciones mediante el formato recomendado:

result = input.apply(transform);

¿Por qué ya no puedo ver la información de mi trabajo en curso en la interfaz de supervisión de Cloud Dataflow, aunque esa información haya aparecido anteriormente?

En la actualidad, existe un problema conocido que puede afectar a algunos trabajos de Cloud Dataflow que comenzaron a ejecutarse hace un mes o más. Es posible que esos trabajos no se carguen correctamente en la interfaz de supervisión de Google Dataflow, o que muestren información desactualizada, incluso si el trabajo era visible antes.

Aún puedes obtener tu estado de trabajo en la lista de trabajos cuando utilices la interfaz de supervisión o la interfaz de línea de comandos de Cloud Dataflow. Sin embargo, si se presenta este problema, no podrás ver los detalles de tu trabajo.

Cómo programar con el SDK de Apache Beam para Java

¿Puedo transferir datos adicionales (fuera de banda) a una operación ParDo existente?

Sí. Hay varios patrones para seguir que varían según el caso práctico:

  • Puedes serializar la información en forma de campos dentro de tu subclase DoFn.
  • Cualquier variable a la cual se haga referencia a través de los métodos en una DoFn anónima se serializará automáticamente.
  • Puedes calcular datos dentro de DoFn.startBundle().
  • Puedes transferir datos a través de ParDo.withSideInputs.

Si deseas obtener más información, consulta la documentación de ParDo, específicamente, las secciones acerca de cómo crear entradas DoFn y complementarias, además de la documentación de referencia de la API de Java para ParDo.

¿Cómo se controlan las excepciones de Java en Cloud Dataflow?

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o punteros nulos durante el cálculo.

Cloud Dataflow procesa los elementos en paquetes arbitrarios y reintentará procesar el paquete completo cuando se produzca un error en cualquier elemento de ese paquete. Cuando se ejecutan en modo por lotes, los paquetes que incluyen un artículo defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará completamente. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintentará indefinidamente, lo que puede hacer que la canalización se detenga de forma permanente.

Las excepciones en el código de usuario (por ejemplo, las instancias DoFn) se informan en la interfaz de supervisión de Cloud Dataflow. Si ejecutas tu canalización con BlockingDataflowPipelineRunner, también verás mensajes de error impresos en la ventana de consola o terminal.

Piensa en la posibilidad de agregar controladores de excepciones a fin de protegerte contra los errores de tu código. Por ejemplo, si deseas eliminar elementos que producen fallas en una validación de entrada personalizada realizada en un ParDo, usa un bloque try/catch dentro de tu ParDo para controlar la excepción y eliminar el elemento. También es posible que desees utilizar un Aggregator para realizar un seguimiento de los recuentos de errores.

Cómo programar con el SDK de Cloud Dataflow para Python

¿Cómo controlo los NameError?

Si te aparece un NameError cuando ejecutas tu canalización mediante el servicio de Cloud Dataflow, pero no cuando la ejecutas a nivel local (es decir, mediante la herramienta DirectRunner), es posible que tus DoFn estén utilizando valores en el espacio de nombres global que no están disponibles en el trabajador de Cloud Dataflow.

Las importaciones globales, las funciones y las variables definidas en la sesión principal no se guardan durante la serialización de un trabajo de Cloud Dataflow de forma predeterminada. Si, por ejemplo, tus DoFn están definidos en el archivo principal y hacen referencia a importaciones y funciones del espacio de nombres global, puedes configurar la opción de canalización --save_main_session en True. Esto hará que el estado del espacio de nombres global se serialice y se cargue en el trabajador de Cloud Dataflow.

Ten en cuenta que, si hay objetos en tu espacio de nombres global que no pueden serializarse, te aparecerá un error de serialización. Si el error está relacionado con un módulo que debería estar disponible en la distribución de Python, puedes resolverlo importando el módulo localmente, que es el lugar en que se usa.

Por ejemplo, en lugar de hacer lo siguiente:

import re
…
def myfunc():
  # use re module

utiliza esto:

def myfunc():
  import re
  # use re module

De forma alternativa, si tu DoFn abarca varios archivos, debes utilizar un enfoque diferente para empaquetar el flujo de trabajo y administrar las dependencias.

E/S de canalización

La fuente y el receptor de TextIO ¿admiten archivos comprimidos, como GZip?

Sí. Java de Cloud Dataflow puede leer archivos comprimidos con gzip y bzip2. Consulta la documentación de TextIO para obtener información adicional.

¿Puedo usar una expresión regular para orientar archivos específicos con la fuente de TextIO?

Cloud Dataflow admite patrones de comodín generales; la expresión glob puede aparecer en cualquier parte de la ruta de acceso del archivo. Sin embargo, Cloud Dataflow no admite comodines recursivos (**).

La fuente de entrada de TextIO ¿admite JSON?

Sí. Sin embargo, para que el servicio de Cloud Dataflow pueda procesar paralelamente la entrada y la salida, los datos de origen deben estar delimitados con un salto de línea.

¿Por qué no se está activando el rebalanceo de trabajo dinámico con mi fuente personalizada?

El rebalanceo de trabajo dinámico usa el valor de retorno del método getProgress() de tu fuente personalizada para activarse. La implementación predeterminada para getProgress() muestra null. A fin de garantizar que se active el ajuste de escala automático, asegúrate de que la fuente personalizada anule getProgress() para mostrar un valor apropiado.

¿Cómo accedo a los conjuntos de datos de BigQuery, a los temas de Pub/Sub o a las suscripciones que pertenecen a un proyecto diferente de Google Cloud Platform (es decir, no al proyecto con el que estoy usando Cloud Dataflow)?

Consulta la guía de seguridad y permisos de Cloud Dataflow para obtener información sobre cómo acceder a los datos de Cloud Pub/Sub o BigQuery en un proyecto de GCP diferente al que utiliza Cloud Dataflow.

Transmisión

¿Cómo ejecuto mi canalización en modo de transmisión?

Puedes configurar el marcador --streaming en la línea de comandos cuando ejecutas la canalización. También puedes configurar el modo de transmisión de manera programática cuando construyes tu canalización.

¿Qué fuentes y receptores de datos son compatibles con el modo de transmisión?

Puedes leer datos de transmisión en Cloud Pub/Sub y escribir datos de transmisión en Cloud Pub/Sub o BigQuery.

¿Cuáles son las limitaciones actuales del modo de transmisión?

El modo de transmisión de Cloud Dataflow tiene las siguientes limitaciones:

  • Las fuentes de lote aún no son compatibles en el modo de transmisión.
  • Las funciones de ajuste de escala automático del servicio de Cloud Dataflow son compatibles en Beta.

Parece que la canalización de transmisión que leo en Pub/Sub se está desacelerando. ¿Qué puedo hacer?

Es posible que la cuota de Cloud Pub/Sub de tu proyecto sea insuficiente. Para averiguar si tu proyecto tiene una cuota insuficiente, verifica si hay errores del cliente relacionados con 429 (Rate limit exceeded):

  1. Ve a Google Cloud Platform Console.
  2. En el menú de la izquierda, selecciona API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Revisa los Códigos de respuesta y busca códigos de error del cliente (4xx).

¿Por qué mi trabajo de transmisión no se ajusta de forma ascendente correctamente cuando actualizo la canalización con un grupo de trabajadores más grande?

Cuando actualizas un trabajo de Cloud Dataflow y especificas una mayor cantidad de trabajadores en el nuevo trabajo, solo puedes establecer una cantidad de trabajadores equivalente al --maxNumWorkers que indicaste respecto a tu trabajo original. No puedes escalar más allá de la cantidad de trabajadores original (y, por lo tanto, de los recursos de Persistent Disk) asignados al inicio del trabajo original.

Este es un problema conocido con respecto al servicio de Cloud Dataflow y se está investigando activamente.

Ajuste de escala automático de transmisión

Nota: El SDK de Cloud Dataflow para Python no admite el ajuste de escala automático de transmisión en la actualidad.

¿Qué debo hacer si quiero una cantidad de trabajadores fija?

Para habilitar el ajuste de escala automático de transmisión, debes aceptarlo; no tiene una configuración predeterminada. La semántica de las opciones actuales no está cambiando; por lo tanto, para seguir con una cantidad fija de trabajadores, no necesitas hacer nada.

Me preocupa el hecho de que el ajuste de escala automático aumente el importe de mi factura. ¿Cómo puedo limitarlo?

Si determinas el valor --maxNumWorkers, limitarás el rango de escalamiento que se utilizará para procesar tu trabajo.

¿Cuál es el rango de escalamiento para las canalizaciones de ajuste de escala automático de transmisión?

La cantidad de trabajadores utilizados en relación con los rangos de ajuste de escala automático varía entre N/15 y N trabajadores, en los cuales N representa el valor de --maxNumWorkers. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes configurar --maxNumWorkers=15, y la canalización se escalará automáticamente entre 1 y 15 trabajadores.

--maxNumWorkers puede llegar hasta 1,000 como máximo.

¿Cuál es la cantidad máxima de trabajadores que puede usar el ajuste de escala automático?

Cloud Dataflow opera dentro de los límites de la cuota de recuento de instancias de Compute Engine perteneciente a tu proyecto o maxNumWorkers, lo que sea menor.

¿Puedo desactivar el ajuste de escala automático en mi canalización de transmisión?

Sí. Configura --autoscalingAlgorithm=NONE. Actualiza la canalización con especificaciones de clúster fijas, como se describe en la documentación de escalamiento manual, en las cuales numWorkers se encuentra dentro del rango de escalamiento.

¿Puedo cambiar el rango de escalamiento en mi canalización de transmisión?

Sí, pero no puedes hacerlo con la Actualización*. Tendrás que detener la canalización mediante Cancelar o Desviar, y volver a implementarla con el nuevo maxNumWorkers deseado.

Cómo configurar tu proyecto de Google Cloud Platform para utilizar Cloud Dataflow

¿Cómo puedo determinar si el proyecto que estoy usando con Cloud Dataflow posee un depósito de Cloud Storage que quiero leer o escribir?

Para determinar si tu proyecto de GCP posee un depósito de Cloud Storage en particular, puedes usar el siguiente comando de consola:

gsutil acl get gs://<your-bucket>

El comando genera como resultado una string JSON similar a la siguiente:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

Las entradas relevantes son aquellas que pertenecen a la “función”. El projectNumber asociado indica qué proyecto posee ese depósito. Si el número de proyecto no coincide con el número de tu proyecto, tendrás que elegir una de las siguientes opciones:

  • Crear un depósito nuevo que pertenezca a tu proyecto.
  • Proporcionar acceso al depósito para las cuentas adecuadas.

¿Cómo creo un nuevo depósito que pertenezca a mi proyecto de Cloud Dataflow?

A fin de crear un nuevo depósito en el proyecto de GCP en que estás utilizando Cloud Dataflow, puedes usar el siguiente comando de consola:

gsutil mb -p <Project to own the bucket> <bucket-name>

¿Cómo puedo lograr que un depósito perteneciente a un proyecto diferente se pueda leer o escribir para el proyecto de Google Cloud Platform que estoy usando con Cloud Dataflow?

Consulta la guía de seguridad y permisos de Cloud Dataflow para obtener información sobre cómo tu canalización de Cloud Dataflow puede acceder a los recursos de GCP que pertenecen a un proyecto de GCP diferente.

Cuando intento ejecutar mi trabajo de Cloud Dataflow, veo un error que dice: “Es necesario habilitar algunas API de Cloud en tu proyecto para que Cloud Dataflow ejecute este trabajo”. ¿Qué tengo que hacer?

Para ejecutar un trabajo de Cloud Dataflow, debes habilitar las siguientes API de GCP en tu proyecto:

  • API de Compute Engine (Compute Engine)
  • API de Cloud Logging
  • Cloud Storage
  • API de Cloud Storage JSON
  • API de BigQuery
  • Cloud Pub/Sub
  • API de Cloud Datastore

Consulta la sección acerca de cómo comenzar a habilitar las API de GCP para obtener instrucciones detalladas.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.