Trabaja con canalizaciones de datos de Dataflow

Descripción general

Puedes usar las canalizaciones de datos de Dataflow para las siguientes tareas:

  • Crear programas de trabajos recurrentes
  • Comprender dónde se gastan los recursos en múltiples ejecuciones de trabajos
  • Definir y administrar objetivos de actualización de datos
  • Desglosar en etapas de canalización individuales para corregir y optimizar tus canalizaciones

Para ver la documentación de la API, consulta la referencia de las canalizaciones de datos.

Funciones

  • Crea una canalización por lotes recurrente para ejecutar un trabajo por lotes de forma programada.
  • Crea una canalización por lotes incremental y recurrente para ejecutar un trabajo por lotes en la última versión de datos de entrada.
  • Usa el cuadro de evaluación de resumen de la canalización para ver el uso agregado de capacidad y el consumo de recursos de una canalización.
  • Ver la actualización de datos de una canalización de transmisión. Esta métrica, que evoluciona con el tiempo, se puede vincular a una alerta que te notifica cuando la actualidad es inferior a un objetivo especificado.
  • Usa los grafos de métricas de canalización para comparar trabajos de canalización por lotes y encontrar anomalías.

Limitaciones

  • Disponibilidad regional: Puedes crear canalizaciones de datos en las regiones de Cloud Scheduler disponibles.

  • Cuota:

    • Cantidad predeterminada de canalizaciones por proyecto: 500
    • Cantidad predeterminada de canalizaciones por organización: 2,500

      La cuota a nivel de la organización está inhabilitada de forma predeterminada. Puedes habilitar las cuotas a nivel de la organización y, si lo haces, cada organización puede tener como máximo 2,500 canalizaciones de forma predeterminada.

  • Etiquetas: No puedes usar etiquetas definidas por el usuario para etiquetar canalizaciones de datos de Dataflow. Sin embargo, cuando usas el campo additionalUserLabels, esos valores se pasan a tu trabajo de Dataflow. Para obtener más información sobre cómo se aplican las etiquetas a trabajos de Dataflow individuales, consulta Opciones de canalización.

Tipos de canalizaciones de datos

Dataflow tiene dos tipos de canalizaciones de datos, de transmisión y por lotes. Ambos tipos de canalizaciones ejecutan trabajos que se definen en plantillas de Dataflow.

Canalización de transmisión de datos
Una canalización de transmisión de datos ejecuta un trabajo de transmisión de Dataflow inmediatamente después de crearse.
Canalización de datos por lotes

Una canalización de datos por lotes ejecuta un trabajo por lotes de Dataflow en un programa definido por el usuario. El nombre de archivo de entrada de la canalización por lotes se puede parametrizar para permitir el procesamiento de canalización por lotes incremental.

Canalizaciones por lotes incrementales

Puedes usar marcadores de posición de fecha y hora a fin de especificar un formato de archivo de entrada incremental para una canalización por lotes.

  • Se pueden usar marcadores de posición para año, mes, fecha, hora, minuto y segundo, y deben seguir el formato strftime(). Los marcadores de posición van precedidos por el símbolo de porcentaje (%).
  • El formato de los parámetros no se verifica durante la creación de la canalización.
    • Ejemplo: Si especificas “gs://bucket/Y” como la ruta de entrada parametrizada, se evaluará como “gs://bucket/Y”, ya que “Y” sin un "%" anterior no se asigna al formato strftime().

En cada momento de ejecución de la canalización por lotes programada, la parte del marcador de posición de la ruta de entrada se evalúa según la fecha y hora actual (o desplazadas en el tiempo) Los valores de fecha se evalúan mediante la fecha actual en la zona horaria del trabajo programado. Si la ruta evaluada coincide con la de un archivo de entrada, la canalización por lotes la identifica para su procesamiento en el momento programado.

  • Ejemplo: Una canalización por lotes está programada para repetirse al inicio de cada hora PST. Si parametrizas la ruta de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, el 15 de abril de 2021 a las 6 p.m. PST, la ruta de entrada se evaluará como gs://bucket-name/2021-04-15/prefix-18_00.csv.

Usa parámetros de cambio de hora

Puedes usar parámetros de cambio de hora con + o - minutos u horas. Para admitir la coincidencia de una ruta de entrada con una fecha y hora evaluadas que se desplazan antes o después de la fecha y hora actuales de la programación de la canalización, encierra estos parámetros entre llaves. Usa el formato {[+|-][0-9]+[m|h]}. La canalización por lotes continúa repitiéndose a la hora programada, pero la ruta del archivo de entrada se evalúa con la compensación de tiempo especificada.

  • Ejemplo: Una canalización por lotes está programada para repetirse al inicio de cada hora PST. Si parametrizas la ruta de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, el 15 de abril de 2021 a las 6 p.m. PST, la ruta de entrada se evaluará como gs://bucket-name/2021-04-15/prefix-16_00.csv.

Roles de la canalización de datos

Para que las operaciones de canalización de datos de Dataflow se realicen correctamente, debes tener los roles de IAM necesarios, como se indica a continuación:

  1. Necesitas el rol adecuado para realizar operaciones:

  2. La cuenta de servicio que use Cloud Scheduler debe tener el rol roles/iam.serviceAccountUser, ya sea una cuenta de servicio especificada por el usuario o la cuenta de servicio de Compute Engine predeterminada. Para obtener más información, consulta Roles de canalización de datos.

  3. Debes poder actuar como la cuenta de servicio que usan Cloud Scheduler y Dataflow si se te otorga el rol roles/iam.serviceAccountUser en esa cuenta. Si no seleccionas una cuenta de servicio para Cloud Scheduler y Dataflow, se usa la cuenta de servicio predeterminada de Compute Engine.

Crea una canalización de datos

Puedes crear una canalización de datos de las siguientes dos maneras:

  1. Importa un trabajo
  2. Crea una canalización de datos

La página de configuración de canalizaciones de datos: Cuando accedes por primera vez a la función de canalizaciones de Dataflow en la consola de Google Cloud, se abre una página de configuración. Habilita las API enumeradas para crear canalizaciones de datos.

Importa un trabajo

Puedes importar un trabajo por lotes o de transmisión de Dataflow que se base en una plantilla clásica o flexible y convertirla en una canalización de datos.

  1. En la consola de Google Cloud, ve a la página Trabajos de Dataflow.

    Ir a Trabajos

  2. Selecciona un trabajo completado y, luego, en la página Detalles del trabajo, selecciona +Importar como una canalización.

  3. En la página Crear una canalización a partir de una plantilla, los parámetros se propagan con las opciones del trabajo importado.

  4. Para un trabajo por lotes, en la sección Programa tu canalización, proporciona una programación de recurrencia. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica, se usa la cuenta de servicio de Compute Engine predeterminada.

Crea una canalización de datos

  1. En la consola de Google Cloud, ve a la página Canalizaciones de datos de Dataflow.

    Ir a Canalizaciones de datos

  2. Selecciona +Crear canalización de datos.

  3. En la página Crear canalización a partir de una plantilla, proporciona un nombre de canalización y completa los otros campos de selección y parámetro de la plantilla.

  4. Para un trabajo por lotes, en la sección Programa tu canalización, proporciona una programación de recurrencia. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica un valor, se usa la cuenta de servicio predeterminada de Compute Engine.

Crea una canalización de datos por lotes

Para crear esta canalización de datos por lotes de muestra, debes tener acceso a los siguientes recursos de tu proyecto:

En esta canalización de ejemplo, se usa la plantilla de canalización por lotes Texto de Cloud Storage a BigQuery. Esta plantilla lee archivos en formato CSV desde Cloud Storage, ejecuta una transformación y, luego, inserta valores en una tabla de BigQuery con tres columnas.

  1. Crea los siguientes archivos en tu unidad local:

    1. Un archivo bq_three_column_table.json que contiene el siguiente esquema de la tabla de BigQuery de destino.

      {
        "BigQuery Schema": [
          {
            "name": "col1",
            "type": "STRING"
          },
          {
            "name": "col2",
            "type": "STRING"
          },
          {
            "name": "col3",
            "type": "INT64"
          }
        ]
      }
      
    2. Un archivo split_csv_3cols.js de JavaScript, que implementa una transformación simple en los datos de entrada antes de la inserción en BigQuery.

      function transform(line) {
          var values = line.split(',');
          var obj = new Object();
          obj.col1 = values[0];
          obj.col2 = values[1];
          obj.col3 = values[2];
          var jsonString = JSON.stringify(obj);
          return jsonString;
      }
      
    3. Un archivo CSV file01.csv con varios registros que se insertan en la tabla de BigQuery.

      b8e5087a,74,27531
      7a52c051,4a,25846
      672de80f,cd,76981
      111b92bf,2e,104653
      ff658424,f0,149364
      e6c17c75,84,38840
      833f5a69,8f,76892
      d8c833ff,7d,201386
      7d3da7fb,d5,81919
      3836d29b,70,181524
      ca66e6e5,d7,172076
      c8475eb6,03,247282
      558294df,f3,155392
      737b82a8,c7,235523
      82c8f5dc,35,468039
      57ab17f9,5e,480350
      cbcdaf84,bd,354127
      52b55391,eb,423078
      825b8863,62,88160
      26f16d4f,fd,397783
      
  2. Usa el comando gcloud storage cp para copiar los archivos a las carpetas de un bucket de Cloud Storage en tu proyecto, de la siguiente manera:

    1. Copiar bq_three_column_table.json y split_csv_3cols.js a gs://BUCKET_ID/text_to_bigquery/

      gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
      gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
    2. Copia file01.csv en gs://BUCKET_ID/inputs/:

      gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
  3. En la consola de Google Cloud, ve a la página Buckets de Cloud Storage.

    Ir a Buckets

  4. Para crear una carpeta tmp en el bucket de Cloud Storage, selecciona el nombre de tu carpeta para abrir la página de detalles del bucket y, luego, haz clic en Crear carpeta.

    Botón Crear carpeta en la página Detalles del bucket.

  5. En la consola de Google Cloud, ve a la página Canalizaciones de datos de Dataflow.

    Ir a Canalizaciones de datos

  6. Selecciona Crear canalización de datos. Ingresa o selecciona los siguientes elementos en la página Crear canalización a partir de una plantilla:

    1. En Nombre de la canalización, ingresa text_to_bq_batch_data_pipeline.
    2. En Extremo regional, selecciona una región de Compute Engine. Las regiones de origen y de destino deben coincidir. Por lo tanto, tu bucket de Cloud Storage y la tabla de BigQuery deben estar en la misma región.
    3. En Plantilla de Dataflow, en Procesar datos en forma masiva (por lotes), selecciona Archivos de texto en Cloud Storage a BigQuery.

    4. En Programa la canalización, selecciona un programa, como por hora en el minuto 25 en tu zona horaria. Puedes editar el programa después de enviar la canalización. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica, se usa la cuenta de servicio de Compute Engine predeterminada.

    5. En Parámetros obligatorios, ingresa lo siguiente:

      1. Para la ruta de UDF de JavaScript en Cloud Storage:
        gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
        
      2. En Ruta de acceso JSON:
        BUCKET_ID/text_to_bigquery/bq_three_column_table.json
        
      3. En nombre de la UDF de JavaScript: transform
      4. En Tabla de salida de BigQuery, haz lo siguiente:
        PROJECT_ID:DATASET_ID.three_column_table
        
      5. En Ruta de entrada de Cloud Storage:
        BUCKET_ID/inputs/file01.csv
        
      6. En Directorio de BigQuery temporal, haz lo siguiente:
        BUCKET_ID/tmp
        
      7. En Ubicación temporal:
        BUCKET_ID/tmp
        
    6. Haz clic en Crear canalización.

  7. Confirma la información de la canalización y de la plantilla y visualiza el historial actual y anterior desde la página Detalles de la canalización.

    Página Detalles de la canalización.

Puedes editar el programa de canalización de datos desde el panel de información de canalización en la página Detalles de la canalización.

Botón Editar junto al programa de canalización.

También puedes ejecutar una canalización por lotes a pedido con el botón Ejecutar en la consola de Dataflow Pipelines.

Crea una canalización de transmisión de datos de muestra

Puedes crear una canalización de transmisión de datos de muestra si sigues las instrucciones de canalización por lotes de muestra, con las siguientes diferencias:

  • En Programación de canalización, no especifiques un programa para una canalización de transmisión de datos. El trabajo de transmisión de Dataflow se inicia de inmediato.
  • En Plantilla de Dataflow, en Procesar datos continuamente (stream), selecciona Archivos de texto en Cloud Storage a BigQuery.
  • En Tipo de máquina del trabajador, la canalización procesa el conjunto inicial de archivos que coinciden con el patrón gs://BUCKET_ID/inputs/file01.csv y cualquier archivo adicional que coincida con este patrón que subas a la carpeta inputs/. Si el tamaño de los archivos CSV supera varios GB, para evitar posibles errores de memoria insuficiente, selecciona un tipo de máquina con más memoria que el tipo de máquina predeterminado n1-standard-4, como n1-highmem-8.

Soluciona problemas

En esta sección, se muestra cómo resolver problemas con las canalizaciones de datos de Dataflow.

No se puede iniciar el trabajo de canalización de datos

Cuando usas canalizaciones de datos para crear una programación de trabajo recurrente, es posible que tu trabajo de Dataflow no se inicie y aparezca un error de estado 503 en los archivos de registro de Cloud Scheduler.

Este problema se produce cuando Dataflow no puede ejecutar el trabajo de forma temporal.

A fin de solucionar este problema, configura Cloud Scheduler para volver a intentar el trabajo. Debido a que el problema es temporal, cuando el trabajo se reintenta, podría tener éxito. Para obtener más información sobre cómo configurar los valores de reintento en Cloud Scheduler, consulta Crea un trabajo.

Investiga incumplimientos de los objetivos de canalización

En las siguientes secciones, se describe cómo investigar las canalizaciones que no cumplen con los objetivos de rendimiento.

Canalizaciones por lotes recurrentes

Para un análisis inicial del estado de tu canalización, en la página Información de la canalización en la consola de Google Cloud, usa los grafos Estado de cada trabajo y Duración del subproceso por paso. Estos gráficos se encuentran en el panel de estado de la canalización.

Investigación de ejemplo:

  1. Tienes una canalización por lotes recurrente que se ejecuta cada hora 3 minutos después de la hora. Cada trabajo se ejecuta normalmente durante unos 9 minutos. Tienes un objetivo para que todos los trabajos se completen en menos de 10 minutos.

  2. El gráfico de estado del trabajo muestra que un trabajo se ejecutó por más de 10 minutos.

  3. En la tabla de historial de actualizaciones o ejecuciones, busca el trabajo que se ejecutó durante la hora de interés. Haz clic para ir a la página de detalles del trabajo de Dataflow. En esa página, encontrarás la etapa de ejecución más larga y, luego, buscarás en los registros los posibles errores para determinar la causa de la demora.

Canalizaciones de transmisión

Para realizar un análisis inicial del estado de tu canalización, en la página Detalles de la canalización, en la pestaña Información de la canalización, usa el grafo de actualidad de los datos. Este grafo se encuentra en el panel de estado de la canalización.

Investigación de ejemplo:

  1. Tienes una canalización de transmisión que suele producir un resultado con una actualidad de los datos de 20 segundos.

  2. Establece un objetivo de una garantía de 30 segundos de actualidad de datos. Cuando revisas el grafo de actualización de datos, observarás que entre las 9 y las 10 a.m., la actualidad aumentó a casi 40 segundos.

    Gráfico de actualidad de los datos que muestra un aumento en la cantidad de minutos de actualidad de los datos.

  3. Cambia a la pestaña Métricas de la canalización y, luego, visualiza los grafos de Capacidad de procesamiento, Uso de CPU y Uso de memoria para realizar un análisis más detallado.

Error: El ID de la canalización ya existe en el proyecto

Si intentas crear una canalización nueva con un nombre que ya existe en tu proyecto, recibirás este mensaje de error: Pipeline Id already exist within the project. A fin de evitar este problema, elige siempre nombres únicos para tus canalizaciones.