Trabajar con flujos de procesamiento de datos de Dataflow

Información general

Puedes usar las canalizaciones de datos de Dataflow para las siguientes tareas:

  • Crea programaciones de tareas recurrentes.
  • Descubra dónde se gastan los recursos en varias ejecuciones de tareas.
  • Definir y gestionar objetivos de actualización de datos.
  • Desglosa las fases de la canalización para corregirlas y optimizarlas.

Para consultar la documentación de la API, consulta la referencia de Data Pipelines.

Funciones

  • Crea una canalización de procesamiento por lotes periódica para ejecutar una tarea por lotes según una programación.
  • Crea un flujo de procesamiento por lotes incremental periódico para ejecutar una tarea por lotes con la versión más reciente de los datos de entrada.
  • Usa el gráfico de resultados de resumen de la canalización para ver el uso de la capacidad agregada y el consumo de recursos de una canalización.
  • Consulte la actualización de datos de una canalización de streaming. Esta métrica, que evoluciona con el tiempo, se puede vincular a una alerta que te avise cuando la actualización sea inferior a un objetivo especificado.
  • Usa gráficos de métricas de la canalización para comparar trabajos de canalizaciones por lotes y detectar anomalías.

Limitaciones

  • Disponibilidad regional: puedes crear flujos de datos en las regiones de Cloud Scheduler disponibles.

  • Cuota:

    • Número predeterminado de flujos de procesamiento por proyecto: 500
    • Número predeterminado de flujos de procesamiento por organización: 2500

      La cuota a nivel de organización está inhabilitada de forma predeterminada. Puedes habilitar las cuotas a nivel de organización. Si lo haces, cada organización podrá tener un máximo de 2500 canalizaciones de forma predeterminada.

  • Etiquetas: no puedes usar etiquetas definidas por el usuario para etiquetar flujos de procesamiento de datos de Dataflow. Sin embargo, cuando usa el campo additionalUserLabels, esos valores se transfieren a su trabajo de Dataflow. Para obtener más información sobre cómo se aplican las etiquetas a los trabajos de Dataflow individuales, consulte Opciones de la canalización.

Tipos de flujos de procesamiento de datos

Dataflow tiene dos tipos de canalizaciones de datos: de streaming y por lotes. Ambos tipos de trabajos de ejecución de flujos de procesamiento se definen en plantillas de Dataflow.

Flujo de procesamiento de datos de streaming
Un flujo de procesamiento de datos de streaming ejecuta un trabajo de streaming de Dataflow inmediatamente después de crearse.
Flujo de procesamiento de datos por lotes

Un flujo de procesamiento de datos por lotes ejecuta una tarea de Dataflow por lotes según una programación definida por el usuario. El nombre del archivo de entrada del flujo de procesamiento por lotes se puede parametrizar para permitir el procesamiento incremental del flujo de procesamiento por lotes.

Flujos de procesamiento por lotes incrementales

Puedes usar marcadores de posición de fecha y hora para especificar un formato de archivo de entrada incremental para una canalización por lotes.

  • Se pueden usar marcadores de posición para el año, el mes, la fecha, la hora, los minutos y los segundos, y deben seguir el formato strftime(). Los marcadores de posición van precedidos del símbolo de porcentaje (%).
  • El formato de los parámetros no se verifica durante la creación de la canalización.
    • Ejemplo: Si especificas "gs://bucket/Y" como ruta de entrada parametrizada, se evaluará como "gs://bucket/Y", porque "Y" sin el prefijo "%" no se asigna al formato strftime().

En cada hora de ejecución programada de la canalización por lotes, la parte del marcador de posición de la ruta de entrada se evalúa con la fecha y hora actuales (o desplazadas). Los valores de fecha se evalúan con la fecha actual de la zona horaria del trabajo programado. Si la ruta evaluada coincide con la ruta de un archivo de entrada, el archivo se recoge para que la canalización por lotes lo procese a la hora programada.

  • Ejemplo: Una canalización por lotes se programa para que se repita al principio de cada hora (PST). Si parametrizas la ruta de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, el 15 de abril del 2021 a las 18:00 (PST), la ruta de entrada se evalúa como gs://bucket-name/2021-04-15/prefix-18_00.csv.

Usar parámetros de desfase horario

Puedes usar parámetros de cambio de hora de minutos u horas con los signos + o -. Para que se pueda asociar una ruta de entrada con una fecha y hora evaluadas que se hayan desplazado antes o después de la fecha y hora actuales de la programación de la canalización, incluye estos parámetros entre llaves. Usa el formato {[+|-][0-9]+[m|h]}. La canalización por lotes sigue repitiéndose a la hora programada, pero la ruta de entrada se evalúa con el desfase horario especificado.

  • Ejemplo: Una canalización por lotes se programa para que se repita al principio de cada hora (PST). Si parametrizas la ruta de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, el 15 de abril del 2021 a las 18:00 (PST), la ruta de entrada se evalúa como gs://bucket-name/2021-04-15/prefix-16_00.csv.

Roles de flujo de procesamiento de datos

Para que las operaciones de la canalización de datos de Dataflow se realicen correctamente, necesitas los roles de gestión de identidades y accesos necesarios, que se indican a continuación:

  1. Necesita el rol adecuado para realizar operaciones:

  2. La cuenta de servicio que usa Cloud Scheduler debe tener el rol roles/iam.serviceAccountUser, tanto si la cuenta de servicio la especifica el usuario como si es la cuenta de servicio predeterminada de Compute Engine. Para obtener más información, consulta Roles de la canalización de datos.

  3. Para poder actuar como la cuenta de servicio que usan Cloud Scheduler y Dataflow, debes tener el rol roles/iam.serviceAccountUser en esa cuenta. Si no seleccionas una cuenta de servicio para Cloud Scheduler y Dataflow, se usará la cuenta de servicio predeterminada de Compute Engine.

Crear un flujo de procesamiento de datos

Puedes crear una canalización de datos de Dataflow de dos formas:

  1. Importar una tarea o
  2. Crea un flujo de procesamiento de datos

Página de configuración de las canalizaciones de datos: cuando accedas por primera vez a la función de canalizaciones de Dataflow en la consola de Google Cloud , se abrirá una página de configuración. Habilita las APIs indicadas para crear flujos de procesamiento de datos.

Importar una tarea

Puedes importar una tarea de Dataflow por lotes o de streaming basada en una plantilla clásica o flexible y convertirla en un flujo de datos.

  1. En la Google Cloud consola, ve a la página Trabajos de Dataflow.

    Ir a Tareas

  2. Selecciona un trabajo completado y, en la página Detalles del trabajo, selecciona +Importar como una canalización.

  3. En la página Crear flujo de procesamiento a partir de plantilla, los parámetros se rellenan con las opciones de la tarea importada.

  4. En el caso de una tarea por lotes, en la sección Programa tu flujo de procesamiento, proporciona una programación de recurrencia. Proporcionar una dirección de cuenta de correo para Cloud Scheduler, que se usa para programar ejecuciones por lotes, es opcional. Si no se especifica, se usa la cuenta de servicio predeterminada de Compute Engine.

Crear un flujo de procesamiento de datos

  1. En la Google Cloud consola, ve a la página Dataflow Pipelines de datos.

    Ir a Flujos de datos

  2. Selecciona + Crear flujo de procesamiento de datos.

  3. En la página Crear flujo de procesamiento a partir de plantilla, proporcione un nombre para el flujo de procesamiento y rellene los demás campos de selección de plantilla y de parámetros.

  4. En el caso de una tarea por lotes, en la sección Programa tu flujo de procesamiento, proporciona una programación de recurrencia. Proporcionar una dirección de cuenta de correo para Cloud Scheduler, que se usa para programar ejecuciones por lotes, es opcional. Si no se especifica ningún valor, se usa la cuenta de servicio predeterminada de Compute Engine.

Crear un flujo de procesamiento de datos por lotes

Para crear esta canalización de datos por lotes de ejemplo, debe tener acceso a los siguientes recursos de su proyecto:

Este ejemplo de flujo de procesamiento usa la plantilla de flujo de procesamiento por lotes De archivos de texto de Cloud Storage a BigQuery. Esta plantilla lee archivos en formato CSV de Cloud Storage, ejecuta una transformación y, a continuación, inserta valores en una tabla de BigQuery con tres columnas.

  1. Crea los siguientes archivos en tu unidad local:

    1. Un archivo bq_three_column_table.json que contenga el siguiente esquema de la tabla de BigQuery de destino.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. Un archivo split_csv_3cols.js JavaScript que implementa una transformación sencilla en los datos de entrada antes de insertarlos en BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. Un archivo file01.csv CSV con varios registros que se insertan en la tabla de BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Usa el comando gcloud storage cp para copiar los archivos en carpetas de un segmento de Cloud Storage de tu proyecto, como se indica a continuación:

    1. Copiar bq_three_column_table.json y split_csv_3cols.js en gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
    2. Copiar file01.csv en gs://BUCKET_ID/inputs/

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
  3. En la Google Cloud consola, ve a la página Segmentos de Cloud Storage.

    Ir a Contenedores

  4. Para crear una carpeta tmp en tu segmento de Cloud Storage, selecciona el nombre de la carpeta para abrir la página de detalles del segmento y, a continuación, haz clic en Crear carpeta.

    Botón Crear carpeta en la página Detalles del segmento.

  5. En la Google Cloud consola, ve a la página Dataflow Pipelines de datos.

    Ir a Flujos de datos

  6. Selecciona Crear flujo de procesamiento de datos. En la página Crear flujo de procesamiento a partir de plantilla, introduce o selecciona los siguientes elementos:

    1. En Nombre de la canalización, escribe text_to_bq_batch_data_pipeline.
    2. En Endpoint regional, selecciona una región de Compute Engine. Las regiones de origen y de destino deben coincidir. Por lo tanto, el segmento de Cloud Storage y la tabla de BigQuery deben estar en la misma región.
    3. En Plantilla de Dataflow, en Procesar datos de forma masiva (por lotes), selecciona Archivos de texto en Cloud Storage a BigQuery.

    4. En Programar flujo de procesamiento, seleccione una programación, como Cada hora en el minuto 25 de su zona horaria. Puedes editar la programación después de enviar la canalización. Proporcionar una dirección de cuenta de correo para Cloud Scheduler, que se usa para programar ejecuciones por lotes, es opcional. Si no se especifica, se usa la cuenta de servicio predeterminada de Compute Engine.

    5. En Parámetros obligatorios, introduzca lo siguiente:

      1. En Ruta de la UDF de JavaScript en Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. En Ruta de JSON:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. En Nombre de la función definida por el usuario de JavaScript, escriba transform.
      4. En Tabla de salida de BigQuery:
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. En Ruta de entrada de Cloud Storage:
        BUCKET_ID/inputs/file01.csv
        
      6. En Directorio temporal de BigQuery:
        BUCKET_ID/tmp
        
      7. En Ubicación temporal:
        BUCKET_ID/tmp
        
    6. Haz clic en Crear canalización.

  7. Confirma la información de la canalización y la plantilla, y consulta el historial actual y anterior en la página Detalles de la canalización.

    Página de detalles del flujo de procesamiento.

Puedes editar la programación de la canalización de datos en el panel Información de la canalización de la página Detalles de la canalización.

Botón de edición situado junto a la programación del flujo de procesamiento.

También puedes ejecutar un flujo de procesamiento por lotes bajo demanda con el botón Ejecutar de la consola de flujos de procesamiento de Dataflow.

Crear un flujo de procesamiento de datos de streaming de ejemplo

Para crear un flujo de procesamiento de datos de streaming de ejemplo, sigue las instrucciones del flujo de procesamiento por lotes de ejemplo, con las siguientes diferencias:

  • En Programación del flujo de procesamiento, no especifiques ninguna programación para un flujo de procesamiento de datos de streaming. El trabajo de streaming de Dataflow se inicia inmediatamente.
  • En Plantilla de Dataflow, en Procesar datos continuamente (streaming), selecciona Archivos de texto en Cloud Storage a BigQuery.
  • En el caso de Tipo de máquina de trabajador, la canalización procesa el conjunto inicial de archivos que coinciden con el patrón gs://BUCKET_ID/inputs/file01.csv y los archivos adicionales que coincidan con este patrón y que subas a la carpeta inputs/. Si el tamaño de los archivos CSV supera varios GB, para evitar posibles errores de falta de memoria, selecciona un tipo de máquina con más memoria que el tipo de máquina n1-standard-4 predeterminado, como n1-highmem-8.

Solución de problemas

En esta sección se explica cómo resolver problemas con las canalizaciones de datos de Dataflow.

No se puede iniciar un trabajo de flujo de procesamiento de datos

Cuando usas las canalizaciones de datos para crear una programación de tareas periódicas, es posible que tu tarea de Dataflow no se inicie y que aparezca un error de estado 503 en los archivos de registro de Cloud Scheduler.

Este problema se produce cuando Dataflow no puede ejecutar el trabajo temporalmente.

Para solucionar este problema, configura Cloud Scheduler para que vuelva a intentar ejecutar la tarea. Como el problema es temporal, es posible que el trabajo se complete cuando se vuelva a intentar. Para obtener más información sobre cómo definir los valores de reintento en Cloud Scheduler, consulta Crear un trabajo.

Investigar infracciones de objetivos de la cartera

En las siguientes secciones se describe cómo investigar las canalizaciones que no cumplen los objetivos de rendimiento.

Flujos de procesamiento por lotes periódicos

Para hacer un análisis inicial del estado de tu flujo de procesamiento, ve a la página Información del flujo de procesamiento de la Google Cloud consola y consulta los gráficos Estado de los trabajos individuales y Tiempo de los hilos por paso. Estos gráficos se encuentran en el panel de estado de la canalización.

Investigación de ejemplo:

  1. Tienes una canalización por lotes periódica que se ejecuta cada hora a los 3 minutos de cada hora. Cada tarea suele ejecutarse durante unos 9 minutos. Tienes el objetivo de completar todos los trabajos en menos de 10 minutos.

  2. El gráfico de estado de la tarea muestra que una tarea se ha ejecutado durante más de 10 minutos.

  3. En la tabla de historial Actualización/Ejecución, busca el trabajo que se haya ejecutado durante la hora que te interese. Haz clic para ir a la página de detalles del trabajo de Dataflow. En esa página, busca la fase que se esté ejecutando durante más tiempo y, a continuación, consulta los registros para ver si hay errores que puedan ser la causa del retraso.

Flujos de procesamiento en streaming

Para hacer un análisis inicial del estado de su flujo de procesamiento, vaya a la página Detalles del flujo de procesamiento y, en la pestaña Información del flujo de procesamiento, consulte el gráfico de actualización de datos. Este gráfico se encuentra en el panel de estado de la canalización.

Investigación de ejemplo:

  1. Tienes una canalización de streaming que normalmente genera un resultado con una actualidad de los datos de 20 segundos.

  2. Te has fijado el objetivo de tener una garantía de actualización de datos de 30 segundos. Al revisar el gráfico de actualización de datos, observas que, entre las 9:00 y las 10:00, la actualización de datos ha aumentado hasta casi 40 segundos.

    Gráfico de actualización de datos que muestra un aumento en el número de minutos de actualización de datos.

  3. Cambia a la pestaña Métricas de la canalización y consulta los gráficos de uso de CPU y de memoria para obtener más información.

Error: El ID de la canalización ya existe en el proyecto

Si intentas crear una nueva canalización con un nombre que ya existe en tu proyecto, recibirás este mensaje de error: Pipeline Id already exist within the project. Para evitar este problema, elige siempre nombres únicos para tus canalizaciones.