Preguntas frecuentes

En la siguiente sección, se brindan respuestas a algunas preguntas frecuentes sobre Dataflow.

Preguntas generales

¿Dónde puedo obtener más asistencia?

Puedes visitar la Asistencia de Google Cloud para obtener un paquete de asistencia de Google Cloud, incluido Dataflow.

Puedes usar StackOverflow para investigar tu pregunta actual o enviar una nueva. Cuando envíes tu pregunta, etiquétala con google-cloud-dataflow. Los miembros del personal de ingeniería de Google supervisan este grupo y te ayudarán a responder tus preguntas.

También puedes enviar preguntas, solicitudes de funciones, informes de errores o defectos y otros comentarios en el foro UserVoice.

¿Es posible compartir datos a través de instancias de canalización?

No existe un mecanismo de comunicación de canalización cruzada específico de Dataflow para compartir datos o procesar el contexto entre las canalizaciones. Puedes usar un almacenamiento duradero, como Cloud Storage, o un almacenamiento en caché en memoria, como App Engine, para compartir datos entre las instancias de canalización.

¿Existe un mecanismo de programación integrado para ejecutar canalizaciones en un momento o intervalo determinados?

Puedes automatizar la ejecución de canalizaciones de la siguiente manera:

¿Cómo puedo saber qué versión del SDK de Dataflow está instalada o en ejecución en mi entorno?

Los detalles de instalación dependen de tu entorno de desarrollo. Si usas Maven, puedes tener varias versiones del SDK de Dataflow “instaladas” en uno o más repositorios locales de Maven.

Java

Para saber qué versión del SDK de Dataflow se ejecuta en una canalización determinada, puedes ver la salida de la consola cuando se ejecuta con DataflowPipelineRunner o BlockingDataflowPipelineRunner. En la consola, aparecerá un mensaje como el que se muestra a continuación, que contiene la información de la versión del SDK de Dataflow:

Python

Para saber qué versión del SDK de Dataflow se ejecuta en una canalización determinada, puedes ver la salida de la consola cuando se ejecuta con DataflowRunner. En la consola, aparecerá un mensaje como el que se muestra a continuación, que contiene la información de la versión del SDK de Dataflow:

  INFO: Executing pipeline on the Dataflow Service, ...
  Dataflow SDK version: <version>

Interactúa con tu trabajo de Cloud Dataflow

¿Es posible acceder a las máquinas de trabajador de mi trabajo (VM de Compute Engine) mientras mi canalización se está ejecutando?

Puedes ver las instancias de VM de una canalización determinada mediante Google Cloud Console. Desde allí, puedes acceder a cada instancia con SSH. Sin embargo, una vez que el trabajo se completa o falla, el servicio de Dataflow cerrará y limpiará las instancias de VM de manera automática.

¿Por qué no veo el tiempo de CPU reservado para mi trabajo de transmisión en la interfaz de supervisión de Cloud Dataflow?

El servicio de Dataflow informa el tiempo de CPU reservado después de que se completan los trabajos. En el caso de los trabajos no delimitados, esto significa que el tiempo de CPU reservado solo se informa después de que los trabajos se cancelan o fallan.

¿Por qué el estado de trabajo y la información de marca de agua no están disponibles para los trabajos de transmisión actualizados recientemente en la interfaz de supervisión de Cloud Dataflow?

La operación de actualización realiza varios cambios que toman unos minutos en propagarse a la interfaz de supervisión de Dataflow. Prueba actualizar la interfaz de supervisión 5 minutos después de la actualización de tu trabajo.

¿Por qué mis transformaciones compuestas personalizadas aparecen expandidas en la interfaz de supervisión de Dataflow?

En tu código de canalización, es posible que hayas invocado la transformación compuesta de la siguiente manera:

result = transform.apply(input);

Las transformaciones compuestas invocadas de esta manera omiten el anidamiento esperado y, por lo tanto, pueden aparecer expandidas en la interfaz de supervisión de Dataflow. Tu canalización también puede generar advertencias o errores sobre nombres de usuario únicos en el tiempo de ejecución de la canalización.

Para evitar estos problemas, asegúrate de invocar tus transformaciones mediante el formato recomendado:

result = input.apply(transform);

¿Por qué ya no puedo ver la información de mi trabajo en curso en la interfaz de supervisión de Cloud Dataflow, aunque esa información haya aparecido antes?

Por el momento, existe un problema conocido que puede afectar a algunos trabajos de Dataflow que se ejecutaron durante un mes o más. Es posible que estos trabajos no se carguen en la interfaz de supervisión de Dataflow o que muestren información desactualizada, incluso si el trabajo era visible antes.

Aún puedes obtener el estado del trabajo en la lista de trabajos cuando usas las interfaces de línea de comandos de Dataflow o de la supervisión de Dataflow. Sin embargo, si se presenta este problema, no podrás ver los detalles de tu trabajo.

Cómo programar con el SDK de Apache Beam para Java

¿Puedo transferir datos adicionales (fuera de banda) a una operación ParDo existente?

Sí. Hay varios patrones para seguir que varían según el caso práctico:

  • Puedes serializar información como campos en tu subclase DoFn.
  • Cualquier variable a la que se haga referencia a través de los métodos en una DoFn anónima se serializará de manera automática.
  • Puedes procesar datos dentro de DoFn.startBundle().
  • Puedes pasar datos a través de ParDo.withSideInputs.

Si deseas obtener más información, consulta la documentación de ParDo, en particular, las secciones sobre cómo crear entradas DoFn y complementarias, además de la documentación de referencia de la API de Java para ParDo.

¿Cómo se controlan las excepciones de Java en Cloud Dataflow?

Tu canalización puede generar excepciones durante el procesamiento de datos. Algunos de estos errores son transitorios (p. ej., dificultades temporales para acceder a un servicio externo), mientras que otros son permanentes, como los errores causados por datos de entrada corruptos o no analizables, o punteros nulos durante el cálculo.

Dataflow procesa los elementos en paquetes arbitrarios y vuelve a probar el paquete completo cuando se produce un error para cualquier elemento de ese paquete. Cuando se ejecutan en modo por lotes, los paquetes que incluyen un artículo defectuoso se reintentan 4 veces. Si un paquete individual falla 4 veces, la canalización fallará completamente. Cuando se ejecuta en modo de transmisión, un paquete que incluye un elemento defectuoso se reintentará indefinidamente, lo que puede hacer que la canalización se detenga de forma permanente.

Las excepciones en el código del usuario (por ejemplo, las instancias de DoFn) se informan en la interfaz de supervisión de Dataflow. Si ejecutas tu canalización con BlockingDataflowPipelineRunner, también verás mensajes de error impresos en tu consola o ventana de la terminal.

Se recomienda que agregues controladores de excepciones para protegerte contra los errores del código. Por ejemplo, si deseas descartar elementos que producen fallas en una validación de entrada personalizada en ParDo, usa un bloque try-catch dentro de tu ParDo para manejar la excepción y descartar el elemento. También puedes usar un Aggregator para realizar un seguimiento de los recuentos de errores.

Programa con el SDK de Cloud Dataflow para Python

¿Cómo debo solucionar NameError?

Si obtienes un NameError cuando ejecutas tu canalización mediante el servicio de Dataflow, pero no cuando lo haces de forma local (es decir, mediante DirectRunner), es posible que tus DoFn usen valores en el espacio de nombres global que no estén disponibles en el trabajador de Dataflow.

Las importaciones globales, las funciones y las variables definidas en la sesión principal no se guardan durante la serialización de un trabajo de Dataflow de forma predeterminada. Si, por ejemplo, tus DoFn se definen en el archivo principal y las importaciones y funciones de referencia en el espacio de nombres global, puedes establecer la opción de canalización --save_main_session en True. Esto hará que el estado del espacio de nombres global se serialice y se cargue en el trabajador de Dataflow.

Ten en cuenta que, si hay objetos en tu espacio de nombres global que no pueden serializarse, te aparecerá un error de serialización. Si el error está relacionado con un módulo que debería estar disponible en la distribución de Python, puedes resolverlo importando el módulo localmente, que es el lugar en que se usa.

Por ejemplo, en lugar de hacer lo siguiente:

import re
…
def myfunc():
  # use re module

utiliza esto:

def myfunc():
  import re
  # use re module

De manera alternativa, si tus DoFn abarcan varios archivos, debes usar un enfoque diferente para empaquetar el flujo de trabajo y administrar las dependencias.

E/S de canalización

La fuente y el receptor de TextIO ¿admiten archivos comprimidos, como GZip?

Sí. Java de Dataflow puede leer archivos comprimidos con gzipbzip2. Consulta la documentación de TextIO para obtener información adicional.

¿Puedo usar una expresión regular para orientar archivos específicos con la fuente de TextIO?

Dataflow admite patrones de comodines generales, por lo que la expresión glob puede aparecer en cualquier parte de la ruta de acceso del archivo. Sin embargo, Dataflow no admite comodines recurrentes (**).

¿La fuente de entrada de TextIO admite JSON?

Sí. Sin embargo, para que el servicio de Dataflow pueda paralelizar la entrada y salida, los datos de origen deben delimitarse con un salto de línea.

¿Por qué no se activa el rebalanceo de trabajo dinámico con mi fuente personalizada?

El rebalanceo de trabajo dinámico usa el valor de muestra del método getProgress() de tu fuente personalizada para activarse. La implementación predeterminada para getProgress() muestra null. A fin de garantizar que se active el ajuste de escala automático, asegúrate de que la fuente personalizada anule getProgress() para mostrar un valor apropiado.

¿Cómo accedo a los conjuntos de datos de BigQuery, a los temas de Pub/Sub o a las suscripciones que pertenecen a un proyecto diferente de Google Cloud Platform (es decir, no al proyecto con el que uso Cloud Dataflow)?

Consulta la guía Seguridad y permisos de Dataflow si deseas obtener información para acceder a los datos de BigQuery o Pub/Sub en un proyecto de Google Cloud distinto del que usas con Dataflow.

¿Por qué veo errores “rateLimitExceeded” cuando uso el conector de BigQuery y qué debo hacer para solucionarlos?

BigQuery tiene límites de cuota a corto plazo que se aplican cuando se envían demasiadas solicitudes a la API durante un período breve. Es posible que la canalización de Dataflow exceda de forma temporal esa cuota. Cuando esto sucede, las solicitudes a la API de tu canalización de Dataflow a BigQuery pueden fallar, lo que puede generar errores rateLimitExceeded en los registros de trabajador. Ten en cuenta que Dataflow reintenta estas fallas, por lo que puedes ignorar estos errores de forma segura. Si crees que la canalización se ve afectada de manera significativa debido a errores rateLimitExceeded, comunícate con la Asistencia de Google Cloud.

Uso el conector de BigQuery para realizar operaciones de escritura en BigQuery mediante inserciones de transmisión, y mi capacidad de procesamiento de escritura es menor que la esperada. ¿Qué puedo hacer para solucionar este problema?

Una capacidad de procesamiento lenta puede deberse a que la canalización excede la cuota de inserción de transmisión de BigQuery disponible. Si este es el caso, deberías ver mensajes de error relacionados con las cuotas de BigQuery en los registros del trabajador de Dataflow (busca errores quotaExceeded). Si ves esos errores, considera establecer la opción de receptor de BigQuery ignoreInsertIds() cuando uses el SDK de Apache Beam para Java o usar la opción ignore_insert_ids cuando uses el SDK de Apache Beam para Python a fin de ser apto de forma automática y recibir una capacidad de procesamiento de inserción de transmisión de BigQuery de un GB/s por proyecto. Para obtener más información sobre las advertencias relacionadas con la anulación de duplicación automática de mensajes, consulta la documentación de BigQuery. Si quieres aumentar la cuota de inserción de transmisión de BigQuery por encima de un GB/s, envía una solicitud a través de Cloud Console.

Si no ves errores relacionados con la cuota en los registros de trabajadores, el problema podría ser que los parámetros predeterminados relacionados con la agrupación en paquetes o en lotes no proporcionen un paralelismo adecuado para que la canalización realice un escalamiento. Existen varias opciones de configuración relacionadas con el conector de BigQuery de Dataflow que puedes considerar ajustar para lograr un rendimiento esperado cuando escribas en BigQuery mediante inserciones de transmisión. Por ejemplo, en el SDK de Apache Beam para Java, ajusta numStreamingKeys a fin de que coincida con la cantidad máxima de trabajadores y considera aumentar insertBundleParallelism para configurar el conector de BigQuery con el objetivo de escribir en BigQuery mediante más subprocesos paralelos. Si deseas conocer las opciones de configuración disponibles en el SDK de Apache Beam para Java, consulta BigQueryPipelineOptions. Obtén información sobre las opciones de configuración disponibles en el SDK de Apache Beam para Python en la página sobre transformaciones WriteToBigQuery.

Transmisión

¿Cómo ejecuto mi canalización en modo de transmisión?

Puedes configurar la marca --streaming en la línea de comandos cuando ejecutas tu canalización. También puedes configurar el modo de transmisión de manera programática cuando construyes tu canalización.

¿Qué fuentes y receptores de datos son compatibles con el modo de transmisión?

Puedes leer datos de transmisión de Pub/Sub y escribir datos de transmisión en Pub/Sub o BigQuery.

¿Cuáles son las limitaciones actuales del modo de transmisión?

El modo de transmisión de Dataflow tiene las siguientes limitaciones:

  • Las fuentes de lote aún no son compatibles en el modo de transmisión.
  • Las características de ajuste de escala automático del servicio de Dataflow son compatibles con la versión beta.

Parece que la canalización de transmisión que lee en Pub/Sub se está ralentizando. ¿Qué puedo hacer?

Es posible que la cuota de Pub/Sub de tu proyecto sea insuficiente. Puedes averiguar si tu proyecto tiene una cuota insuficiente mediante la verificación de errores del cliente 429 (Rate limit exceeded):

  1. Ve a Google Cloud Console.
  2. En el menú de la izquierda, selecciona API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Marca Códigos de respuesta y busca códigos de error de cliente (4xx).

¿Por qué mi trabajo de transmisión no se ajusta de forma ascendente correctamente cuando actualizo la canalización con un grupo de trabajadores más grande?

Java

Para los trabajos de transmisión que no usan Streaming Engine, no puedes escalar por encima de la cantidad original de trabajadores y los recursos de disco persistente asignados al comienzo de tu trabajo original. Cuando actualizas un trabajo de Dataflow y especificas una mayor cantidad de trabajadores en el trabajo nuevo, solo puedes especificar una cantidad de trabajadores equivalente a la --maxNumWorkers que indicaste en el trabajo original.

Python

Para los trabajos de transmisión que no usan Streaming Engine, no puedes escalar por encima de la cantidad original de trabajadores y los recursos de disco persistente asignados al comienzo de tu trabajo original. Cuando actualizas un trabajo de Dataflow y especificas una mayor cantidad de trabajadores en el trabajo nuevo, solo puedes especificar una cantidad de trabajadores equivalente a la --max_num_workers que indicaste en el trabajo original.

Ajuste de escala automático de transmisión

¿Qué debo hacer si quiero una cantidad fija de trabajadores?

Para habilitar el ajuste de escala automático de transmisión, debes aceptarlo; no tiene una configuración predeterminada. La semántica de las opciones actuales no está cambiando; por lo tanto, para seguir con una cantidad fija de trabajadores, no necesitas hacer nada.

Me preocupa el hecho de que el ajuste de escala automático aumente el importe de mi factura. ¿Cómo puedo limitarlo?

Java

Cuando especificas --maxNumWorkers, limitas el rango de escalamiento usado para procesar el trabajo.

Python

Cuando especificas --max_num_workers, limitas el rango de escalamiento usado para procesar el trabajo.

¿Cuál es el rango de escalamiento para las canalizaciones de ajuste de escala automático de transmisión?

Java

Para trabajos de ajuste de escala automático de transmisión que no usen Streaming Engine, el servicio de Dataflow asigna entre 1 y 15 discos persistentes a cada trabajador. Esto significa que la cantidad mínima de trabajadores usados para una canalización de ajuste de escala automático es N/15, en la que N es el valor de --maxNumWorkers.

Para trabajos de ajuste de escala automático de transmisión que usan Streaming Engine, la cantidad mínima de trabajadores es 1.

Dataflow balancea la cantidad de discos persistentes entre los trabajadores. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --maxNumWorkers=15. La canalización escala de manera automática entre 1 y 15 trabajadores, con 1, 2, 3, 4, 5, 8 o 15 trabajadores, que se corresponden con 15, 8, 5, 4, 3, 2 o 1 disco persistente por trabajador, respectivamente.

--maxNumWorkers puede ser 1,000 como máximo.

Python

Para trabajos de ajuste de escala automático de transmisión que no usen Streaming Engine, el servicio de Dataflow asigna entre 1 y 15 discos persistentes a cada trabajador. Esto significa que la cantidad mínima de trabajadores usados para una canalización de ajuste de escala automático es N/15, en la que N es el valor de --max_num_workers.

Para trabajos de ajuste de escala automático de transmisión que usan Streaming Engine, la cantidad mínima de trabajadores es 1.

Dataflow balancea la cantidad de discos persistentes entre los trabajadores. Por ejemplo, si tu canalización necesita 3 o 4 trabajadores en estado estable, puedes establecer --max_num_workers=15. La canalización escala de manera automática entre 1 y 15 trabajadores, con 1, 2, 3, 4, 5, 8 o 15 trabajadores, que se corresponden con 15, 8, 5, 4, 3, 2 o 1 disco persistente por trabajador, respectivamente.

--max_num_workers puede ser 1,000 como máximo.

¿Cuál es la cantidad máxima de trabajadores que puede usar el ajuste de escala automático?

Java

Dataflow funciona dentro de los límites de la cuota de recuento de instancias de Compute Engine de tu proyecto o maxNumWorkers, lo que sea menor.

Python

Dataflow funciona dentro de los límites de la cuota de recuento de instancias de Compute Engine de tu proyecto o max_num_workers, lo que sea menor.

¿Puedo desactivar el ajuste de escala automático en mi canalización de transmisión?

Java

Sí. Establece --autoscalingAlgorithm=NONE. Actualiza la canalización con especificaciones fijas del clúster, como se describe en la documentación del escalamiento manual, en el que numWorkers se encuentra dentro del rango de escalamiento.

Python

Sí. Establece --autoscaling_algorithm=NONE. Actualiza la canalización con especificaciones fijas del clúster, como se describe en la documentación del escalamiento manual, en el que num_workers se encuentra dentro del rango de escalamiento.

¿Puedo cambiar el rango de escalamiento en mi canalización de transmisión?

Java

Sí, pero no puedes hacerlo con la Actualización*. Debes detener la canalización con Cancelar o Desviar y volver a implementarla con el maxNumWorkers nuevo deseado.

Python

Sí, pero no puedes hacerlo con la Actualización*. Debes detener la canalización con Cancelar o Desviar y volver a implementarla con el max_num_workers nuevo deseado.

Configura tu proyecto de Google Cloud Platform para usar Cloud Dataflow

¿Cómo puedo determinar si el proyecto que estoy usando con Cloud Dataflow posee un bucket de Cloud Storage que quiero leer o escribir?

Para determinar si tu proyecto de Google Cloud es propietario de un bucket de Cloud Storage en particular, puedes usar el siguiente comando de la consola:

gsutil acl get gs://<your-bucket>

El comando genera como salida una string JSON similar a la siguiente:

[
  {
    "entity": "project-owners-123456789",
    "projectTeam": {
      "projectNumber": "123456789",
      "team": "owners"
    },
    "role": "OWNER"
  },
  ....
]

Las entradas relevantes son aquellas en las que la “función” es propietario. El projectNumber asociado te indica qué proyecto es el propietario de ese bucket. Si el número de proyecto no coincide con el número de tu proyecto, tendrás que elegir una de las siguientes opciones:

  • Crear un bucket nuevo que pertenezca a tu proyecto.
  • Proporcionar acceso al bucket para las cuentas adecuadas.

¿Cómo creo un bucket nuevo que pertenezca a mi proyecto de Cloud Dataflow?

Para crear un bucket nuevo en el proyecto de Google Cloud en el que usas Dataflow, puedes usar el siguiente comando de la consola:

gsutil mb -p <Project to own the bucket> <bucket-name>

¿Cómo puedo lograr que un bucket perteneciente a un proyecto diferente se pueda leer o escribir en el proyecto de Google Cloud Platform que uso con Cloud Dataflow?

Consulta la guía de Seguridad y permisos de Dataflow a fin de obtener información para que tu canalización de Dataflow pueda acceder a los recursos de Google Cloud que pertenecen a un proyecto de Google Cloud distinto.

Cuando intento ejecutar mi trabajo de Cloud Dataflow, veo un error que dice: “Es necesario habilitar algunas API de Cloud en tu proyecto para que Cloud Dataflow ejecute este trabajo”. ¿Qué tengo que hacer?

Para ejecutar un trabajo de Dataflow, debes habilitar las siguientes API de Google Cloud en tu proyecto:

  • API de Compute Engine (Compute Engine)
  • API de Cloud Logging
  • Cloud Storage
  • API de Cloud Storage JSON
  • API de BigQuery
  • Pub/Sub
  • API de Datastore

Consulta la sección Comienza a habilitar las API de Google Cloud para obtener instrucciones detalladas.