Preguntas frecuentes

La siguiente sección contiene respuestas a algunas preguntas frecuentes acerca de Dataflow.

Preguntas generales

¿Dónde puedo obtener asistencia adicional?

Puedes visitar Google Cloud Support para obtener un paquete de asistencia para Google Cloud, incluido Dataflow.

Puedes usar StackOverflow para investigar tu pregunta actual o enviar una nueva. Cuando envíes tu pregunta, etiquétala con google-cloud-dataflow. Los miembros del personal de ingeniería de Google supervisan este grupo y te ayudarán a responder tus preguntas.

También puedes enviar preguntas, solicitudes de funciones, informes de errores o defectos y otros comentarios en el foro UserVoice.

¿Es posible compartir datos a través de instancias de canalización?

No existe un mecanismo de comunicación de canalización cruzada específico de Dataflow para compartir datos o procesar el contexto entre canalizaciones. Puedes usar un almacenamiento duradero, como Cloud Storage, o una memoria de almacenamiento en caché, como App Engine, para compartir datos entre las instancias de canalización.

¿Existe un mecanismo de programación integrado para ejecutar canalizaciones en un momento o intervalo determinado?

Puedes automatizar la ejecución de canalizaciones de la siguiente manera:

¿Cómo puedo saber qué versión del SDK de Dataflow está instalada o ejecutándose en mi entorno?

Los detalles de instalación dependen de tu entorno de desarrollo. Si usas Maven, puedes tener varias versiones del SDK de Dataflow "instaladas" en uno o más repositorios locales de Maven.

Java

Para saber qué versión del SDK de Dataflow está ejecutando una canalización determinada, puede ver el resultado de la consola cuando se ejecuta con DataflowPipelineRunner o BlockingDataflowPipelineRunner. La consola contendrá un mensaje como el siguiente, que contiene la información de la versión del SDK de Dataflow:

Python

Para saber qué versión del SDK de Dataflow está ejecutando una canalización determinada, puede consultar el resultado de la consola cuando se ejecuta con DataflowRunner. La consola contendrá un mensaje como el siguiente, que contiene la información de la versión del SDK de Dataflow:

      INFO: Executing pipeline on the Dataflow Service, ...
      Dataflow SDK version: <version>
    

Interactúa con tu trabajo de Cloud Dataflow

¿Es posible acceder a las máquinas de trabajador de mi trabajo (VM de Compute Engine) mientras mi canalización se está ejecutando?

Puedes ver las instancias de VM para una canalización determinada con Google Cloud Console. Desde allí, puedes acceder a cada instancia con SSH. Sin embargo, una vez que el trabajo se completa o falla, el servicio de Dataflow se cierra automáticamente y limpia las instancias de VM.

¿Por qué no veo el tiempo de CPU reservado para mi trabajo de transmisión en la interfaz de supervisión de Cloud Dataflow?

El servicio de Dataflow informa el tiempo de CPU reservado después de que se completan los trabajos. En el caso de los trabajos no delimitados, esto significa que el tiempo de CPU reservado solo se informa después de que los trabajos se cancelan o fallan.

¿Por qué el estado de trabajo y la información de marca de agua no están disponibles para los trabajos de transmisión actualizados recientemente en la interfaz de supervisión de Cloud Dataflow?

La operación de actualización realiza varios cambios que tardan unos minutos en propagarse a la interfaz de Dataflow Monitoring. Prueba actualizar la interfaz de supervisión 5 minutos después de la actualización de tu trabajo.

¿Por qué mis transformaciones compuestas personalizadas aparecen expandidas en la interfaz de Dataflow Monitoring?

En tu código de canalización, es posible que hayas invocado la transformación compuesta de la siguiente manera:

result = transform.apply(input);

Las transformaciones compuestas que se invocan de esta manera omiten la anidación esperada y, por lo tanto, pueden aparecer expandidas en la interfaz de Dataflow Monitoring. Tu canalización también puede generar advertencias o errores sobre nombres de usuario únicos en el tiempo de ejecución de la canalización.

Para evitar estos problemas, asegúrate de invocar tus transformaciones mediante el formato recomendado:

result = input.apply(transform);

¿Por qué ya no puedo ver la información de mi trabajo en curso en la interfaz de supervisión de Cloud Dataflow, aunque esa información haya aparecido anteriormente?

Existe un problema conocido que actualmente puede afectar algunos trabajos de Dataflow que se han estado ejecutando durante un mes o más. Es posible que estos trabajos no se carguen en la interfaz de supervisión de Dataflow o que muestren información desactualizada, incluso si el trabajo era visible anteriormente.

Puede seguir obteniendo el estado de su trabajo en la lista de trabajos cuando usa las interfaces de Dataflow Monitoring o de línea de comandos de Dataflow. Sin embargo, si se presenta este problema, no podrás ver los detalles de tu trabajo.

Cómo programar con el SDK de Apache Beam para Java

¿Puedo transferir datos adicionales (fuera de banda) a una operación ParDo existente?

Sí. Hay varios patrones para seguir que varían según el caso práctico:

  • Puedes serializar información como campos en tu subclase DoFn.
  • Cualquier variable a la que se haga referencia a través de los métodos en una DoFn anónima se serializará de manera automática.
  • Puedes procesar datos dentro de DoFn.startBundle().
  • Puedes pasar datos a través de ParDo.withSideInputs.

Si deseas obtener más información, consulta la documentación de ParDo, en particular, las secciones sobre cómo crear entradas DoFn y complementarias, además de la documentación de referencia de la API de Java para ParDo.

¿Cómo se controlan las excepciones de Java en Cloud Dataflow?

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o indicadores nulos durante el procesamiento.

Dataflow procesa los elementos en conjuntos arbitrarios y volverá a intentarlo cuando se genere un error para cualquier elemento de ese paquete. Cuando se ejecuta en modo por lotes, los paquetes que incluyen un elemento defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará completamente. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintenta de forma indefinida, lo que puede hacer que la canalización se paralice de manera permanente.

Las excepciones en el código de usuario (por ejemplo, sus instancias DoFn) se informan en la interfaz de supervisión de Dataflow. Si ejecutas tu canalización con BlockingDataflowPipelineRunner, también verás mensajes de error impresos en tu consola o ventana de terminal.

Se recomienda que agregues controladores de excepciones para protegerte contra los errores de tu código. Por ejemplo, si deseas descartar elementos que producen fallas en una validación de entrada personalizada en ParDo, usa un bloque try-catch dentro de tu ParDo para manejar la excepción y descartar el elemento. También puedes usar un Aggregator para realizar un seguimiento de los recuentos de errores.

Programa con el SDK de Cloud Dataflow para Python

¿Cómo debo solucionar NameError?

Si obtiene un NameError cuando ejecuta su canalización mediante el servicio de Dataflow, pero no cuando ejecuta localmente (es decir, mediante DirectRunner), es posible que sus DoFn s usen valores en el espacio de nombres global no están disponibles en el trabajador de Dataflow.

De manera predeterminada, las importaciones globales, las funciones y las variables definidas en la sesión principal no se guardan durante la serialización de un trabajo de Dataflow. Si, por ejemplo, tus DoFn se definen en el archivo principal y las importaciones y funciones de referencia en el espacio de nombres global, puedes establecer la opción de canalización --save_main_session en True. Esto hará que el estado del espacio de nombres global se muestre y cargue en el trabajador de Dataflow.

Ten en cuenta que, si hay objetos en tu espacio de nombres global que no pueden serializarse, te aparecerá un error de serialización. Si el error está relacionado con un módulo que debería estar disponible en la distribución de Python, puedes resolverlo importando el módulo localmente, que es el lugar en que se usa.

Por ejemplo, en lugar de hacer lo siguiente:

    import re
    …
    def myfunc():
      # use re module
    

usa este comando:

    def myfunc():
      import re
      # use re module
    

De manera alternativa, si tus DoFn abarcan varios archivos, debes usar un enfoque diferente para empaquetar el flujo de trabajo y administrar las dependencias.

E/S de canalización

¿La fuente y el receptor de TextIO admiten archivos comprimidos, como GZip?

Sí. Dataflow Java puede leer archivos comprimidos con gzip y bzip2. Consulta la documentación de TextIO para obtener información adicional.

¿Puedo usar una expresión regular para orientar archivos específicos con la fuente de TextIO?

Dataflow admite patrones de comodines generales. Tu expresión global puede aparecer en cualquier parte de la ruta del archivo. Sin embargo, Dataflow no admite comodines recursivos (**).

¿La fuente de entrada de TextIO admite JSON?

Sí. Sin embargo, para que el servicio de Dataflow pueda paralelizar la entrada y salida, los datos de origen deben delimitarse con un salto de línea.

¿Por qué no se activa el rebalanceo de trabajo dinámico con mi fuente personalizada?

El rebalanceo de trabajo dinámico usa el valor de muestra del método getProgress() de tu fuente personalizada para activarse. La implementación predeterminada para getProgress() muestra null. A fin de garantizar que se active el ajuste de escala automático, asegúrate de que la fuente personalizada anule getProgress() para mostrar un valor apropiado.

¿Cómo accedo a los conjuntos de datos de BigQuery, a los temas de Pub/Sub o a las suscripciones que pertenecen a un proyecto diferente de Google Cloud Platform (es decir, no al proyecto con el que uso Cloud Dataflow)?

Consulta la guía Seguridad y permisos de Dataflow para obtener información sobre cómo acceder a datos de BigQuery o Pub/Sub en un proyecto de Google Cloud diferente del que usas.

Transmisión

¿Cómo ejecuto mi canalización en modo de transmisión?

Puedes configurar la marca --streaming en la línea de comandos cuando ejecutas tu canalización. También puedes configurar el modo de transmisión de manera programática cuando construyes tu canalización.

¿Qué fuentes y receptores de datos son compatibles con el modo de transmisión?

Puedes leer datos de transmisión de Pub/Sub y escribir datos de transmisión en Pub/Sub o BigQuery.

¿Cuáles son las limitaciones actuales del modo de transmisión?

El modo de transmisión de Dataflow tiene las siguientes limitaciones:

  • Las fuentes de lote aún no son compatibles en el modo de transmisión.
  • Las funciones de ajuste de escala automático del servicio de Dataflow se admiten en versión Beta.

Parece que la canalización de transmisión que lee en Pub/Sub se está ralentizando. ¿Qué puedo hacer?

Es posible que tu proyecto no tenga suficiente cuota de Pub/Sub. Puedes averiguar si tu proyecto tiene una cuota insuficiente mediante la verificación de errores del cliente 429 (Rate limit exceeded):

  1. Ve a Google Cloud Console.
  2. En el menú de la izquierda, selecciona API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Consulta Códigos de respuesta y busca códigos de error de cliente (4xx).

¿Por qué mi trabajo de transmisión no se ajusta de forma ascendente correctamente cuando actualizo la canalización con un grupo de trabajadores más grande?

Java

Para los trabajos de transmisión que no usan Streaming Engine, no puedes escalar por encima de la cantidad original de trabajadores y los recursos de disco persistente asignados al comienzo de tu trabajo original. Cuando actualiza un trabajo de Dataflow y especifica un número mayor de trabajadores en el nuevo, solo puede especificar una cantidad de trabajadores igual a --maxNumWorkers que especificó para su trabajo original.

Python

Para los trabajos de transmisión que no usan Streaming Engine, no puedes escalar por encima de la cantidad original de trabajadores y los recursos de disco persistente asignados al comienzo de tu trabajo original. Cuando actualiza un trabajo de Dataflow y especifica un número mayor de trabajadores en el nuevo, solo puede especificar una cantidad de trabajadores igual a --max_num_workers que especificó para su trabajo original.

Ajuste de escala automático de transmisión

¿Qué debo hacer si quiero una cantidad fija de trabajadores?

Para habilitar el ajuste de escala automático de transmisión, debes aceptarlo; no tiene una configuración predeterminada. La semántica de las opciones actuales no está cambiando; por lo tanto, para seguir con una cantidad fija de trabajadores, no necesitas hacer nada.

Me preocupa el hecho de que el ajuste de escala automático aumente el importe de mi factura. ¿Cómo puedo limitarlo?

Java

Cuando especificas --maxNumWorkers, limitas el rango de escalamiento usado para procesar el trabajo.

Python

Cuando especificas --max_num_workers, limitas el rango de escalamiento usado para procesar el trabajo.

¿Cuál es el rango de escalamiento para las canalizaciones de ajuste de escala automático de transmisión?

Java

Para transmitir trabajos de ajuste de escala automático que no usen Streaming Engine, el servicio de Dataflow asigna entre 1 y 15 Discos persistentes a cada trabajador. Esto significa que la cantidad mínima de trabajadores usados para una canalización de ajuste de escala automático es N/15, en la que N es el valor de --maxNumWorkers.

Para trabajos de transmisión de ajuste de escala automático que usan Streaming Engine, la cantidad mínima de trabajadores es 1.

Dataflow equilibra la cantidad de discos persistentes entre los trabajadores. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --maxNumWorkers=15. La canalización escala de manera automática entre 1 y 15 trabajadores, con 1, 2, 3, 4, 5, 8 o 15 trabajadores, que se corresponden con 15, 8, 5, 4, 3, 2 o 1 disco persistente por trabajador, respectivamente.

--maxNumWorkers puede ser 1,000 como máximo.

Python

Para transmitir trabajos de ajuste de escala automático que no usen Streaming Engine, el servicio de Dataflow asigna entre 1 y 15 Discos persistentes a cada trabajador. Esto significa que la cantidad mínima de trabajadores usados para una canalización de ajuste de escala automático es N/15, en la que N es el valor de --max_num_workers.

Para trabajos de transmisión de ajuste de escala automático que usan Streaming Engine, la cantidad mínima de trabajadores es 1.

Dataflow equilibra la cantidad de discos persistentes entre los trabajadores. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --max_num_workers=15. La canalización escala de manera automática entre 1 y 15 trabajadores, con 1, 2, 3, 4, 5, 8 o 15 trabajadores, que se corresponden con 15, 8, 5, 4, 3, 2 o 1 disco persistente por trabajador, respectivamente.

--max_num_workers puede ser 1,000 como máximo.

¿Cuál es la cantidad máxima de trabajadores que puede usar el ajuste de escala automático?

Java

Dataflow opera dentro de los límites de la cuota de recuento de instancias de Compute Engine de tu proyecto o maxNumWorkers, lo que sea menor.

Python

Dataflow opera dentro de los límites de la cuota de recuento de instancias de Compute Engine de tu proyecto o max_num_workers, lo que sea menor.

¿Puedo desactivar el ajuste de escala automático en mi canalización de transmisión?

Java

Sí. Establece --autoscalingAlgorithm=NONE. Actualiza la canalización con especificaciones fijas del clúster, como se describe en la documentación del escalamiento manual, en el que numWorkers se encuentra dentro del rango de escalamiento.

Python

Sí. Establece --autoscaling_algorithm=NONE. Actualiza la canalización con especificaciones fijas del clúster, como se describe en la documentación del escalamiento manual, en el que num_workers se encuentra dentro del rango de escalamiento.

¿Puedo cambiar el rango de escalamiento en mi canalización de transmisión?

Java

Sí, pero no puedes hacerlo con la Actualización*. Debes detener la canalización con Cancelar o Desviar y volver a implementarla con el maxNumWorkers nuevo deseado.

Python

Sí, pero no puedes hacerlo con la Actualización*. Debes detener la canalización con Cancelar o Desviar y volver a implementarla con el max_num_workers nuevo deseado.

Configura tu proyecto de Google Cloud Platform para usar Cloud Dataflow

¿Cómo puedo determinar si el proyecto que estoy usando con Cloud Dataflow posee un depósito de Cloud Storage que quiero leer o escribir?

Para determinar si tu proyecto de Google Cloud es propietario de un depósito de Cloud Storage en particular, puedes usar el siguiente comando de consola:

gsutil acl get gs://<your-bucket>

El comando genera como resultado una string JSON similar a la siguiente:

    [
      {
        "entity": "project-owners-123456789",
        "projectTeam": {
          "projectNumber": "123456789",
          "team": "owners"
        },
        "role": "OWNER"
      },
      ....
    ]
    

Las entradas relevantes son aquellas que pertenecen a la “función”. El projectNumber asociado te indica qué proyecto es el propietario de ese depósito. Si el número de proyecto no coincide con el número de tu proyecto, tendrás que elegir una de las siguientes opciones:

  • Crear un depósito nuevo que pertenezca a tu proyecto
  • Proporcionar acceso al depósito para las cuentas adecuadas.

¿Cómo creo un depósito nuevo que pertenezca a mi proyecto de Cloud Dataflow?

Para crear un depósito nuevo en el proyecto de Google Cloud en el que usas Dataflow, puedes usar el siguiente comando de la consola:

gsutil mb -p <Project to own the bucket> <bucket-name>

¿Cómo puedo lograr que un depósito perteneciente a un proyecto diferente se pueda leer o escribir en el proyecto de Google Cloud Platform que uso con Cloud Dataflow?

Consulte la guía Seguridad y permisos de Dataflow para obtener información sobre cómo su canalización de Dataflow puede acceder a los recursos de Google Cloud que pertenecen a otro proyecto de Google Cloud.

Cuando intento ejecutar mi trabajo de Cloud Dataflow, veo un error que dice: “Es necesario habilitar algunas API de Cloud en tu proyecto para que Cloud Dataflow ejecute este trabajo”. ¿Qué tengo que hacer?

Para ejecutar un trabajo de Dataflow, debes habilitar las siguientes API de Google Cloud en tu proyecto:

  • API de Compute Engine (Compute Engine)
  • API de Cloud Logging
  • Cloud Storage
  • API de Cloud Storage JSON
  • API de BigQuery
  • Pub/Sub
  • API de Datastore

Consulta la sección Cómo comenzar a habilitar las API de Google Cloud para obtener instrucciones detalladas.