Solucionar problemas de tu canalización

En esta sección, se presenta una recopilación de sugerencias para solucionar problemas y estrategias de depuración que podrían serte útiles si tienes problemas con la compilación o ejecución de tu canalización de Cloud Dataflow. Esta información puede ayudarte a detectar un error en la canalización, determinar el motivo por el cual falla una ejecución de la canalización y sugerir algunas acciones para corregir el problema.

Cloud Dataflow proporciona comentarios en tiempo real en tu trabajo, y existe un conjunto básico de pasos que puedes seguir para verificar los mensajes de error, registros y estados, como un retraso en el progreso de tu trabajo.

En esta sección, también se incluye un catálogo de errores comunes con los que te puedes encontrar cuando ejecutas la canalización de Apache Beam, y se sugieren algunas medidas correctivas y soluciones alternativas para cada una.

Verificación del estado de la canalización

Puedes detectar cualquier error en las ejecuciones de tu canalización si usas la interfaz de supervisión de Cloud Dataflow.

  1. Ve a Google Cloud Platform Console.
  2. Selecciona tu proyecto de Google Cloud Platform (GCP) de la lista de proyectos.
  3. Haz clic en el menú en la esquina superior izquierda.
  4. Ve a la sección Macrodatos y haz clic en Dataflow. Aparece una lista de proyectos en ejecución en el panel de la derecha.
  5. Selecciona el trabajo de canalización que deseas ver. Puedes observar rápidamente el estado del trabajo en el campo Status (Estado): “Running” (En ejecución), “Succeeded” (Finalizado correctamente) o “Failed” (Con errores).
Figura 1: Una lista de los trabajos de Cloud Dataflow en Developers Console con trabajos en los estados en ejecución, finalizado correctamente y con errores.

Flujo de trabajo básico para solución de problemas

Si uno de tus trabajos de canalización falla, puedes seleccionar el trabajo para ver más información detallada sobre los errores y resultados de la ejecución. Cuando seleccionas un trabajo, puedes ver el grafo de ejecución, además de información sobre el trabajo, en la página Resumen a la derecha del grafo. Durante la ejecución, la parte superior de la página contiene un botón para ver los indicadores y registros si el trabajo generó errores o advertencias.

Figura 2: Un resumen del trabajo de Cloud Dataflow con indicación de errores.

Verificación de mensajes de error de un trabajo

Puedes hacer clic en el botón Registros para ver los registros de mensajes generados por tu código de canalización y el servicio de Cloud Dataflow. Filtra los mensajes que aparecen en el panel de registros mediante el menú desplegable Gravedad mínima. Selecciona el filtro Error para que solo se muestren los mensajes de error.

Haz clic en el ícono triangular en cada mensaje de error para expandirlo.

Figura 3: Una lista de los mensajes de error del trabajo de Cloud Dataflow, con un mensaje expandido.

Visualización de los registros de pasos para el trabajo

Cuando seleccionas un paso en tu grafo de canalización, el panel de registros se activa o desactiva de los registros de trabajos que se muestran generados por el servicio de Cloud Dataflow, con el objetivo de mostrar registros de las instancias de Compute Engine que ejecutan tu paso de la canalización.

Figura 4: El botón de Cloud Logging en el resumen de trabajo de Cloud Dataflow.

Stackdriver Logging combina todos los registros recopilados de las instancias de Compute Engine de tu proyecto en una ubicación. Consulta Mensajes de canalización de Logging para obtener más información sobre el uso de las diferentes funciones de registro de Cloud Dataflow.

Control del rechazo automático de la canalización

En algunos casos, el servicio de Cloud Dataflow identifica que tu canalización puede activar problemas de SDK conocidos. A fin de evitar enviar canalizaciones que posiblemente tendrán problemas, Cloud Dataflow rechazará automáticamente la canalización y mostrará el siguiente mensaje:

The service automatically rejected the workflow because it might trigger an identified bug in the
SDK (details below). If you think this identification is in error, and would like to override this
automated rejection, re-submit this workflow with the following override flag: ${override-flag}.
Bug-details: ${bug-details-and-explanation}.
Contact dataflow-feedback@google.com for further help. Please use this identifier in your communication: ${bug-id}."

Si, después de leer las advertencias en los detalles de error vinculados, de todos modos, quieres tratar de ejecutar tu canalización, puedes anular el rechazo automático. Agrega el marcador --experiments=<override-flag> y vuelve a enviar la canalización.

Determinación de la causa de error de una canalización

Comúnmente, una ejecución con errores de la canalización en Apache Beam puede atribuirse a una de las siguientes causas:

  • Errores de construcción de grafo o canalización. Estos errores ocurren cuando Cloud Dataflow se ejecuta con un problema y compila el grafo de pasos que compone tu canalización, según se describe en la canalización de Apache Beam.
  • Errores en la validación de un trabajo. El servicio de Cloud Dataflow valida cualquier trabajo de canalización que realices. Los errores en el proceso de validación pueden evitar que tu trabajo se cree o ejecute correctamente. Los errores de validación pueden incluir problemas con el depósito de Cloud Storage del proyecto de GCP o con los permisos de tu proyecto.
  • Excepciones en el código del trabajador. Estos errores se producen cuando hay errores en el código proporcionado por el usuario que Cloud Dataflow distribuye a trabajadores paralelos, como instancias DoFn de una transformación ParDo.
  • Canalizaciones de ejecución lenta o falta de resultado. Si tu canalización se ejecuta de forma lenta o se ejecuta por un período prolongado sin informar resultados, puedes verificar tus cuotas para transmitir fuentes de datos y receptores como Cloud Pub/Sub. También hay algunas transformaciones más adecuadas que otras para canalizaciones de transmisión de gran volumen.
  • Errores provocados por fallas transitorias en otros servicios de GCP. Tu canalización puede fallar debido a una interrupción temporal o a causa de otro problema en los servicios de GCP de los que depende Cloud Dataflow, como Compute Engine o Cloud Storage.

Detección de errores de construcción de canalización o grafo

Un error de construcción de grafo puede producirse cuando Cloud Dataflow compila el grafo de ejecución para la canalización desde el código en tu programa de Cloud Dataflow. Durante el tiempo de construcción del grafo, Cloud Dataflow verifica la existencia de operaciones ilegales.

Si Cloud Dataflow detecta un error en la construcción del grafo, recuerda que no se creará ningún trabajo en el servicio de Cloud Dataflow. Por consiguiente, no verás comentarios en la interfaz de supervisión de Cloud Dataflow. En cambio, verás un mensaje de error similar al siguiente en la ventana de la consola o terminal en la que ejecutas tu canalización de Apache Beam.

Java: SDK 2.x

Por ejemplo, si tu canalización intenta realizar una agregación como GroupByKey en una entrada PCollection de un sistema de ventanas, no activada y no delimitada globalmente, verás un mensaje de error similar al siguiente:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Python

Por ejemplo, si tu canalización usa sugerencias de tipo y el tipo de argumento en una de las transformaciones no es el esperado, verás un mensaje de error similar al siguiente:

... in <module> run()
... in run | beam.Map('count', lambda (word, ones): (word, sum(ones))))
... in __or__ return self.pipeline.apply(ptransform, self)
... in apply transform.type_check_inputs(pvalueish)
... in type_check_inputs self.type_check_inputs_or_outputs(pvalueish, 'input')
... in type_check_inputs_or_outputs pvalue_.element_type))
google.cloud.dataflow.typehints.decorators.TypeCheckError: Input type hint violation at group: expected Tuple[TypeVariable[K], TypeVariable[V]], got <type 'str'>

Java: SDK 1.x

Por ejemplo, si tu canalización intenta realizar una agregación como GroupByKey en una entrada PCollection de un sistema de ventanas, no activada y no delimitada globalmente, verás un mensaje de error similar al siguiente:

...
... Exception in thread "main" java.lang.IllegalStateException:
... GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger.
... Use a Window.into or Window.triggering transform prior to GroupByKey
...

Si te encuentras con este error, verifica el código de tu canalización para asegurarte de que la operación de tu canalización sea legal.

Detección de errores en una validación de trabajo de Cloud Dataflow

Una vez que el servicio de Cloud Dataflow reciba el grafo de tu canalización, el servicio intentará validar el trabajo. Esta validación incluye lo siguiente:

  • Asegurarse de que el servicio pueda acceder a tu trabajo relacionado con los depósitos de Cloud Storage para la etapa de pruebas de archivos y el resultado temporal.
  • Verificar los permisos obligatorios en tu proyecto de GCP.
  • Asegurarse de que el servicio pueda acceder a las fuentes de entrada y salida, como los archivos.

Si tu trabajo no pasa el proceso de validación, verás un mensaje de error en la interfaz de supervisión de Cloud Dataflow, así como en la ventana de tu consola o terminal si estás usando una ejecución de bloqueo. El mensaje de error será similar al siguiente:

Java: SDK 2.x

INFO: To access the Cloud Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "my_project" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.DataflowRunner.run(DataflowRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Python

INFO:root:Created job with id: [2016-03-08_14_12_01-2117248033993412477]
... Checking required Cloud APIs are enabled.
... Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_RUNNING.
... Combiner lifting skipped for step group: GroupByKey not followed by a combiner.
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
... Fusing consumer split into read
...
... Starting 1 workers...
...
... Executing operation read+split+pair_with_one+group/Reify+group/Write
... Executing failure step failure14
... Workflow failed.
Causes: ... read+split+pair_with_one+group/Reify+group/Write failed.
Causes: ... Unable to view metadata for files: gs://dataflow-samples/shakespeare/missing.txt.
... Cleaning up.
... Tearing down pending resources...
INFO:root:Job 2016-03-08_14_12_01-2117248033993412477 is in state JOB_STATE_FAILED.

Java: SDK 1.x

INFO: To access the Cloud Dataflow monitoring console, please navigate to
  https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_18_59_25-16868399470801620798
Submitted job: 2016-03-08_18_59_25-16868399470801620798
...
... Starting 3 workers...
... Executing operation BigQuery-Read+AnonymousParDo+BigQuery-Write
... Executing BigQuery import job "dataflow_job_16868399470801619475".
... Stopping worker pool...
... Workflow failed. Causes: ...BigQuery-Read+AnonymousParDo+BigQuery-Write failed.
Causes: ... BigQuery getting table "non_existent_table" from dataset "cws_demo" in project "x" failed.
Message: Not found: Table x:cws_demo.non_existent_table HTTP Code: 404
... Worker pool stopped.
... com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner run
INFO: Job finished with status FAILED
Exception in thread "main" com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException:
  Job 2016-03-08_18_59_25-16868399470801620798 failed with status FAILED
    at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:155)
    at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.run(BlockingDataflowPipelineRunner.java:56)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
    at com.google.cloud.dataflow.integration.BigQueryCopyTableExample.main(BigQueryCopyTableExample.java:74)

Detección de una excepción en un código de trabajador

Mientras se ejecuta tu trabajo, puedes encontrar errores o excepciones en el código del trabajador. Estos errores significan que los DoFn en el código de tu canalización generaron excepciones no controladas, que provocaron trabajos con errores en tu trabajo de Cloud Dataflow.

Las excepciones en el código del usuario (por ejemplo, tus instancias DoFn) se informan en la interfaz de supervisión de Cloud Dataflow. Si ejecutas tu canalización con una ejecución de bloqueo, también verás mensajes de error impresos en la ventana de tu consola o terminal, como el siguiente:

Java: SDK 2.x

INFO: To access the Cloud Dataflow monitoring console, please navigate to https://console.developers.google.com/project/example_project/dataflow/job/2017-05-23_14_02_46-1117850763061203461
Submitted job: 2017-05-23_14_02_46-1117850763061203461
...
... To cancel the job using the 'gcloud' tool, run: gcloud beta dataflow jobs --project=example_project cancel 2017-05-23_14_02_46-1117850763061203461
... Autoscaling is enabled for job 2017-05-23_14_02_46-1117850763061203461.
... The number of workers will be between 1 and 15.
... Autoscaling was automatically enabled for job 2017-05-23_14_02_46-1117850763061203461.
...
... Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoads/GetTempFilePrefix+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/WithKeys/AddKeys/Map/ParMultiDo(Anonymous)+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Reify+BigQueryIO.Write/BatchLoads/TempFilePrefixView/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)/GroupByKey/Write+BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
... Workers have started successfully.
...
... org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process SEVERE: 2017-05-23T21:06:33.711Z: (c14bab21d699a182): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ArithmeticException: / by zero
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:146)
        at com.google.cloud.dataflow.worker.runners.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104)
        at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowAndCombineFn.closeWindow(BatchGroupAlsoByWindowAndCombineFn.java:191)
...
... Cleaning up.
... Stopping worker pool...
... Worker pool stopped.

Nota: El servicio de Cloud Dataflow vuelve a intentar las tareas con errores hasta 4 veces en el modo por lotes y una cantidad ilimitada de veces en el modo de transmisión. En modo por lotes, tu trabajo fallará; en modo de transmisión, podría detenerse indefinidamente.

Python

INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
...
INFO:root:... Expanding GroupByKey operations into optimizable parts.
INFO:root:... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
INFO:root:... Annotating graph with Autotuner information.
INFO:root:... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
INFO:root:...: Starting 1 workers...
INFO:root:...: Executing operation group/Create
INFO:root:...: Value "group/Session" materialized.
INFO:root:...: Executing operation read+split+pair_with_one+group/Reify+group/Write
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: ...: Workers have started successfully.
INFO:root:Job 2016-03-08_14_21_32-8974754969325215880 is in state JOB_STATE_RUNNING.
INFO:root:...: Traceback (most recent call last):
  File ".../dataflow_worker/batchworker.py", line 384, in do_work self.current_executor.execute(work_item.map_task)
  ...
  File ".../apache_beam/examples/wordcount.runfiles/py/apache_beam/examples/wordcount.py", line 73, in <lambda>
ValueError: invalid literal for int() with base 10: 'www'

Nota: El servicio de Cloud Dataflow vuelve a intentar las tareas con errores hasta 4 veces en modo por lotes y una cantidad ilimitada de veces en modo de transmisión. En modo por lotes, tu trabajo fallará; en modo de transmisión, podría detenerse indefinidamente.

Java: SDK 1.x

INFO: To access the Cloud Dataflow monitoring console, please navigate to https://console.developers.google.com/project/google.com%3Aclouddfe/dataflow/job/2016-03-08_19_09_07-6448127003704955959
Submitted job: 2016-03-08_19_09_07-6448127003704955959
...
... Expanding GroupByKey operations into optimizable parts.
... Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
... Annotating graph with Autotuner information.
... Fusing adjacent ParDo, Read, Write, and Flatten operations
...
... Starting 1 workers...
... Executing operation TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/GroupByKey/Create
... Executing operation AnonymousParDo+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/Window.Into()+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/RandomKey+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/GroupByKey/Reify+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/GroupByKey/Write
... Workers have started successfully.
... java.lang.ArithmeticException: / by zero
    at com.google.cloud.dataflow.integration.WorkerCrashRecovery.crash(WorkerCrashRecovery.java:166)
    at com.google.cloud.dataflow.integration.WorkerCrashRecovery.access$200(WorkerCrashRecovery.java:51)
    at com.google.cloud.dataflow.integration.WorkerCrashRecovery$1.processElement(WorkerCrashRecovery.java:199)
... Executing operation TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/GroupByKey/Close
... Executing operation TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/GroupByKey/Read+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/GroupByKey/GroupByWindow+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.ReshardForWrite/Ungroup+TextIO.Write/DataflowPipelineRunner.BatchTextIOWrite/DataflowPipelineRunner.BatchTextIONativeWrite
... Stopping worker pool...
... Worker pool stopped.
... Cleaning up.

Nota: El servicio de Cloud Dataflow vuelve a intentar las tareas con errores hasta 4 veces en el modo por lotes y una cantidad ilimitada de veces en el modo de transmisión. En modo por lotes, tu trabajo fallará; en modo de transmisión, podría detenerse indefinidamente.

Piensa en la posibilidad de agregar controladores de excepciones a fin de protegerte contra los errores de tu código. Por ejemplo, si deseas eliminar elementos que producen fallas en una validación de entrada personalizada realizada en ParDo, controla la excepción en tu DoFn y elimina el elemento. También puedes hacer seguimiento de los elementos con errores de diferentes formas:

Java: SDK 2.x

  • Para hacer seguimiento de las propiedades de una canalización en ejecución, puedes usar la clase Metrics, como se muestra en el siguiente ejemplo:
    final Counter counter = Metrics.counter("stats", "even-items");
    PCollection<Integer> input = pipeline.apply(...);
    ...
    input.apply(ParDo.of(new DoFn<Integer, Integer>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        if (c.element() % 2 == 0) {
          counter.inc();
        }
    });
    
  • Puedes registrar los elementos con errores y verificar el resultado mediante Cloud Logging.
  • Puedes hacer que ParDo escriba los elementos con errores en un resultado adicional para analizarlo posteriormente.

Python

  • Puedes usar la clase Metrics para seguir las propiedades de una canalización en ejecución, como se muestra en el siguiente ejemplo:
    class FilterTextFn(beam.DoFn):
          """A DoFn that filters for a specific key based on a regex."""
    
          def __init__(self, pattern):
            self.pattern = pattern
            # A custom metric can track values in your pipeline as it runs. Create
            # custom metrics to count unmatched words, and know the distribution of
            # word lengths in the input PCollection.
            self.word_len_dist = Metrics.distribution(self.__class__,
                                                      'word_len_dist')
            self.unmatched_words = Metrics.counter(self.__class__,
                                                   'unmatched_words')
    
          def process(self, element):
            word = element
            self.word_len_dist.update(len(word))
            if re.match(self.pattern, word):
              yield element
            else:
              self.unmatched_words.inc()
    
        filtered_words = (
            words | 'FilterText' >> beam.ParDo(FilterTextFn('s.*')))
    
  • Puedes registrar los elementos con errores y verificar el resultado mediante Cloud Logging.
  • Puedes hacer que ParDo escriba los elementos con errores en un resultado adicional para analizarlo posteriormente.

Java: SDK 1.x

  • Puedes registrar los elementos con errores y verificar el resultado mediante Cloud Logging.
  • Puedes hacer que ParDo escriba los elementos con errores en un resultado complementario para analizarlo posteriormente.

Solución de problemas de canalizaciones de ejecución lenta o falta de resultado

Java: SDK 2.x

Si tienes una canalización de transmisión de gran volumen, que se ejecuta de forma lenta o se detiene, hay algunos elementos que puedes verificar:

Cuota de Pub/Sub

Si tu canalización lee una entrada desde Cloud Pub/Sub, tu proyecto de GCP puede tener una cuota de Cloud Pub/Sub insuficiente. Si tu trabajo genera una gran cantidad de errores 429 (Rate Limit Exceeded), es una indicación de una cuota insuficiente. Prueba los siguientes pasos para comprobar estos errores:

  1. Ve a Google Cloud Platform Console.
  2. En el panel de navegación de la izquierda, haz clic en API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Revisa los Códigos de respuesta y busca códigos de error del cliente (4xx).

Uso de .withFanout en tus transformaciones de combinación

Si tu canalización procesa entradas PCollection de gran volumen y no delimitadas, recomendamos hacer lo siguiente:

  • Usa Combine.Globally.withFanout en lugar de Combine.Globally.
  • Usa Combine.PerKey.withHotKeyFanout en lugar de Count.PerKey.

Python

Si tienes una canalización de transmisión de gran volumen, que se ejecuta de forma lenta o se detiene, hay algunos elementos que puedes verificar:

Cuota de Pub/Sub

Si tu canalización lee una entrada desde Cloud Pub/Sub, tu proyecto de GCP puede tener una cuota de Cloud Pub/Sub insuficiente. Si tu trabajo genera una gran cantidad de errores 429 (Rate Limit Exceeded), es una indicación de una cuota insuficiente. Prueba los siguientes pasos para comprobar estos errores:

  1. Ve a Google Cloud Platform Console.
  2. En el panel de navegación de la izquierda, haz clic en API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Revisa los Códigos de respuesta y busca códigos de error del cliente (4xx).

Java: SDK 1.x

Si tienes una canalización de transmisión de gran volumen, que se ejecuta de forma lenta o se detiene, hay algunos elementos que puedes verificar:

Cuota de Pub/Sub

Si tu canalización lee una entrada desde Cloud Pub/Sub, tu proyecto de GCP puede tener una cuota de Cloud Pub/Sub insuficiente. Si tu trabajo genera una gran cantidad de errores 429 (Rate Limit Exceeded), es una indicación de una cuota insuficiente. Prueba los siguientes pasos para comprobar estos errores:

  1. Ve a Google Cloud Platform Console.
  2. En el panel de navegación de la izquierda, haz clic en API y servicios.
  3. En el Cuadro de búsqueda, busca Cloud Pub/Sub.
  4. Haz clic en la pestaña Uso.
  5. Revisa los Códigos de respuesta y busca códigos de error del cliente (4xx).

Uso de .withFanout en tus transformaciones de combinación

Si tu canalización procesa entradas PCollection de gran volumen y no delimitadas, recomendamos hacer lo siguiente:

  • Usa Combine.Globally.withFanout en lugar de Combine.Globally.
  • Usa Combine.PerKey.withHotKeyFanout en lugar de Count.PerKey.

Errores comunes y acciones

En esta sección, se describen algunos errores comunes que puedes encontrar cuando ejecutas tu trabajo de Cloud Dataflow, y se sugieren algunas medidas para corregir o tratar esos errores.

Excepciones con tiempo de espera agotado para RPC, excepciones de DEADLINE_EXCEEDED o errores de Server Unresponsive

Si te encuentras con tiempos de espera agotados de RPC, excepciones de DEADLINE_EXCEEDED o errores de Server Unresponsive mientras se ejecuta tu trabajo, comúnmente estos indican que existe uno de estos dos problemas:

  • Es posible que a la red de VPC que se usa para tu trabajo le falte una regla de firewall. La regla de firewall necesita habilitarse en todo el tráfico de TCP en las VM en la red de VPC que especificaste en tus opciones de canalización.

    Consulta Cómo especificar tu red y subred para obtener más detalles.

  • Tu trabajo se delimita aleatoriamente. Considera una de las siguientes medidas, o una combinación de ellas:

    Java: SDK 2.x

    • Agrega más trabajadores. Prueba configurar --numWorkers con un valor mayor cuando ejecutes la canalización.
    • Aumenta el tamaño del disco conectado para los trabajadores. Prueba configurar --diskSizeGb con un valor mayor cuando ejecutes la canalización.
    • Usa un disco persistente respaldado por SSD. Prueba configurar --workerDiskType="compute.googleapis.com/projects/<project>/zones/<zone>/diskTypes/pd-ssd" cuando ejecutes la canalización.

    Python

    • Agrega más trabajadores. Prueba configurar --num_workers con un valor mayor cuando ejecutes la canalización.
    • Aumenta el tamaño del disco conectado para los trabajadores. Prueba configurar --disk_size_gb con un valor mayor cuando ejecutes la canalización.
    • Usa un disco persistente respaldado por SSD. Prueba configurar --worker_disk_type="compute.googleapis.com/projects/<project>/zones/<zone>/diskTypes/pd-ssd" cuando ejecutes tu canalización.

    Java: SDK 1.x

    • Agrega más trabajadores. Prueba configurar --numWorkers con un valor mayor cuando ejecutes la canalización.
    • Aumenta el tamaño del disco conectado para los trabajadores. Prueba configurar --diskSizeGb con un valor mayor cuando ejecutes la canalización.
    • Usa un disco persistente respaldado por SSD. Prueba configurar --workerDiskType="compute.googleapis.com/projects/<project>/zones/<zone>/diskTypes/pd-ssd" cuando ejecutes la canalización.

Errores de espacio de disco como RESOURCE_EXHAUSTED: IO error: No space left on disk

Estos errores comúnmente indican que asignaste espacio insuficiente en el disco local para procesar tu trabajo. Si ejecutas tu trabajo con una configuración predeterminada, se ejecutará en 3 trabajadores, cada uno con 250 GB de espacio local en el disco y sin ajuste de escala automático. Considera modificar la configuración predeterminada a fin de aumentar la cantidad de trabajadores disponibles para tu trabajo, aumentar el espacio predeterminado en el disco por trabajador o habilitar el ajuste de escala automático.

413 Request Entity Too Large / “El tamaño de la representación serializada de JSON de la canalización supera el límite permitido”

Si encuentras un error sobre la carga útil de JSON cuando envías tu trabajo, significa que la representación JSON en tu canalización supera el tamaño máximo solicitado de 10 MB. Estos errores pueden aparecer como uno de los siguientes mensajes en la ventana de tu terminal o consola:

  • 413 Request Entity Too Large
  • “El tamaño de la representación serializada JSON de la canalización supera el límite permitido”
  • “No se pudo crear un trabajo de flujo de trabajo: se recibió una carga útil de JSON no válido”
  • “No se pudo crear un trabajo de flujo de trabajo: la carga útil solicitada supera el límite permitido”

El tamaño de tu trabajo se relaciona específicamente con la representación JSON de la canalización. Una canalización mayor implica una solicitud mayor. Cloud Dataflow actualmente dispone de una limitación de 10 MB para las solicitudes.

Para estimar el tamaño de la solicitud JSON de tu canalización, ejecuta tu canalización con la siguiente opción:

Java: SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

Java: SDK 1.x

--dataflowJobFile=< path to output file >

Este comando escribe una representación JSON de tu trabajo en un archivo. El tamaño del archivo serializado es una estimación apropiada del tamaño de la solicitud. El tamaño real será un poco mayor debido a que se incluye información adicional en la solicitud.

Algunas condiciones en tu canalización pueden provocar que la representación JSON supere el límite. Entre las condiciones comunes, se incluyen las siguientes:

  • Una transformación Create que incluye una gran cantidad de datos en memoria
  • Una instancia DoFn de gran tamaño que se serializa para la transmisión a trabajadores remotos
  • Una transformación DoFn como una instancia de clase interna anónima que (posiblemente de forma involuntaria) incorpora gran cantidad de datos para que sean serializados

Para evitar estas condiciones, considera reestructurar tu canalización.

“La cantidad total de objetos BoundedSource generados por la operación splitIntoBundles() es mayor que el límite permitido” o “El tamaño total de los objetos BoundedSource generados por la operación splitIntoBundles() es mayor que el límite permitido”.

Java: SDK 2.x

Puedes encontrarte con este error si lees a partir de una gran cantidad de archivos mediante TextIO, AvroIO o alguna otra fuente basada en archivos. El límite específico depende de los detalles de tu fuente (p. ej., un esquema incorporado en AvroIO.Read permitirá una menor cantidad de archivos), pero corresponde a alrededor de decenas de miles de archivos en una canalización.

También puedes encontrarte con este error si creaste una fuente de datos personalizada para tu canalización y el método splitIntoBundles de tu fuente muestra una lista de objetos BoundedSource que necesita más de 20 MB cuando se serializa.

El límite permitido para el tamaño total de los objetos BoundedSource generados por la operación splitIntoBundles() de tu fuente personalizada es de 20 MB. Puedes trabajar con esta limitación si modificas tu subclase BoundedSource personalizada, de manera que el tamaño total de los objetos BoundedSource generados sea menor que el límite de 20 MB. Por ejemplo, en un principio, tu fuente puede generar menos divisiones y usar un Reequilibrio dinámico del trabajo para seguir dividiendo entradas según se requiera.

Java: SDK 1.x

Puedes encontrarte con este error si lees a partir de una gran cantidad de archivos mediante TextIO, AvroIO o alguna otra fuente basada en archivos. El límite específico depende de los detalles de tu fuente (p. ej., un esquema incorporado en AvroIO.Read permitirá una menor cantidad de archivos), pero corresponde a alrededor de decenas de miles de archivos en una canalización.

También puedes encontrarte con este error si creaste una fuente de datos personalizada para tu canalización y el método splitIntoBundles de tu fuente muestra una lista de objetos BoundedSource que necesita más de 20 MB cuando se serializa.

El límite permitido para el tamaño total de los objetos BoundedSource generados por la operación splitIntoBundles() de tu fuente personalizada es de 20 MB. Puedes trabajar con esta limitación si modificas tu subclase BoundedSource personalizada, de manera que el tamaño total de los objetos BoundedSource generados sea menor que el límite de 20 MB. Por ejemplo, en un principio, tu fuente puede generar menos divisiones y usar un Reequilibrio dinámico del trabajo para seguir dividiendo entradas según se requiera.

Trabajos que solían ejecutarse ahora fallan debido a que “No se puede acceder al paquete en etapa de pruebas”

Verifica que el depósito de Cloud Storage utilizado para la etapa de pruebas no tenga una configuración de TTL que haga que los paquetes en etapa de pruebas se borren.

Errores de codificación, IOExceptions o un comportamiento inesperado en el código de usuario.

Los SDK de Apache Beam y los trabajadores de Dataflow dependen de componentes de terceros. Estos componentes importan dependencias adicionales. La incompatibilidad de las versiones puede provocar un comportamiento inesperado en el servicio. Si usas alguno de estos paquetes en tu código, ten en cuenta que algunas bibliotecas no se pueden desviar. Es posible que necesites fijar las versiones anteriores que estarán dentro del alcance durante la ejecución. El documento SDK y dependencias de trabajadores contiene una lista de dependencias y sus versiones obligatorias.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.