Trabaja con Data pipelines

Descripción general

Puedes usar Dataflow Data Pipelines para crear programas de trabajos recurrentes, comprender dónde se gastan los recursos en múltiples ejecuciones de trabajos, definir y administrar objetivos de actualización de datos, y desglosar las etapas de canalización individuales a fin de corregir y optimizar tus canalizaciones.

Características de Data Pipelines:

  • Crea una canalización por lotes recurrente para ejecutar un trabajo por lotes de forma programada.
  • Crea una canalización por lotes incremental y recurrente para ejecutar un trabajo por lotes en la última versión de datos de entrada.
  • Usa el cuadro de evaluación de resumen de la canalización para ver el uso agregado de capacidad y el consumo de recursos de una canalización.
  • Ver la actualidad de los datos de una canalización de transmisión Esta métrica, que evoluciona con el tiempo, se puede vincular a una alerta que te notifica cuando la actualidad es inferior a un objetivo especificado.
  • Usa los grafos de métricas de canalización para comparar trabajos de canalización por lotes y encontrar anomalías.

Restricciones de uso de la canalización de datos:

  • Disponibilidad regional: Dado que las canalizaciones de datos de Dataflow usan Cloud Scheduler, una aplicación de App Engine, las canalizaciones de datos están disponibles en las regiones de App Engine disponibles.

  • Límites de cuota:

    • Cantidad máxima de canalizaciones por proyecto: 500
    • Cantidad máxima de canalizaciones por organización: 2,500

Documentación de referencia de la API:

Para ver la documentación de la API, consulta la referencia de las canalizaciones de datos.

Tipos de canalizaciones de datos

Existen dos tipos de canalizaciones de datos de Dataflow: transmisión y lote. Ambos tipos de canalizaciones ejecutan trabajos que se definen en plantillas de Dataflow.

Canalización de transmisión de dato
Una canalización de transmisión de datos ejecuta un trabajo de transmisión de Dataflow inmediatamente después de crearse.
Canalización de datos por lotes
Una canalización de datos por lotes ejecuta un trabajo por lotes de Dataflow en un programa definido por el usuario. El nombre de archivo de entrada de la canalización por lotes se puede parametrizar para permitir el procesamiento de canalización por lotes incremental.

Canalizaciones por lotes incrementales

Puedes usar marcadores de posición de fecha y hora a fin de especificar un formato de archivo de entrada incremental para una canalización por lotes.

  • Se pueden usar marcadores de posición para año, mes, fecha, hora, minuto y segundo, y deben seguir el formato strftime(). Los marcadores de posición van precedidos por el símbolo de porcentaje (%).
  • El formato de los parámetros no se verifica durante la creación de la canalización.
    • Ejemplo: Si especificas “gs://bucket/Y” como la ruta de archivo de entrada parametrizada, se evaluará como “gs://bucket/Y”, ya que “Y” no tendrá. Un "%" anterior no se asigna al formato strftime().

En cada tiempo de ejecución programado de la canalización por lotes, la parte del marcador de posición de la ruta del archivo de entrada se evalúa en la fecha y hora actual (o desplazados en el tiempo) (los valores de fecha se evalúan mediante el uso de la fecha actual en la zona horaria del trabajo programado). Si la ruta de acceso al archivo evaluada coincide con la de un archivo de entrada, la canalización por lotes la identificará para su procesamiento en el momento programado.

  • Ejemplo: Una canalización por lotes está programada para repetirse al inicio de cada hora PST. Si parametrizas la ruta del archivo de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv, el 15 de abril de 2021 a las 6 p.m. PST, la ruta de acceso del archivo de entrada se evaluará como gs://bucket-name/2021-04-15/prefix-18_00.csv.

Usa parámetros de pausa en directo

Puedes usar parámetros de cambio de tiempo + o - minutos u horas, entre llaves con el formato “{[+|-][0-9]+[m|h]}” para admitir la coincidencia de una ruta del archivo de entrada con una fecha y hora evaluada que se desplazará antes o después de la fecha y hora actual de la programación de la canalización. La canalización por lotes continuará repitiéndose a la hora programada, pero la ruta del archivo de entrada se evaluará con la compensación de tiempo especificada.

  • Ejemplo: Una canalización por lotes está programada para repetirse al inicio de cada hora PST. Si parametrizas la ruta del archivo de entrada como gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}, el 15 de abril de 2021 a las 6 p.m. PST, la ruta de acceso del archivo de entrada se evaluará como gs://bucket-name/2021-04-15/prefix-16_00.csv.

Funciones de la canalización de datos

Para que las operaciones de canalización de datos se realicen correctamente, el usuario debe tener las funciones de IAM necesarias, como se indica a continuación:

  1. Un usuario debe tener la función adecuada para realizar operaciones:

  2. Un usuario debe poder actuar como la cuenta de servicio que usan Cloud Scheduler y Dataflow si se le otorga la función roles/iam.serviceAccountUser en esa cuenta. Si el usuario no selecciona una cuenta de servicio para Cloud Scheduler y Dataflow, se usa la cuenta de servicio predeterminada de Compute Engine.

Crea una canalización de datos

Puedes crear una canalización de datos de las siguientes dos maneras:

  1. Importa un trabajo
  2. Crea una canalización de datos

Página de configuración de canalizaciones de datos: Cuando accedes por primera vez a la función de canalizaciones de Dataflow en Cloud Console, se abre una página de configuración.

  1. Habilita las API enumeradas
  2. Selecciona una región para la aplicación de App Engine que Cloud Scheduler usará a fin de programar las canalizaciones.

Importa un trabajo

Puedes importar un trabajo por lotes o de transmisión de Dataflow que se base en una plantilla clásica o flexible y convertirla en una canalización de datos.

  1. Ve a la página Trabajos de Dataflow en Cloud Console, selecciona un trabajo completado y, luego, en la página Detalles del trabajo selecciona “+IMPORTAR COMO CANALIZACIÓN”.

  2. En la página Crear canalización a partir de una plantilla, se selecciona la opción de canalización “canalización de datos”. Otros parámetros se propagarán con las opciones del trabajo importado.

    1. Para un trabajo por lotes, proporciona una programación de recurrencia en la sección “Programa tu canalización” en los parámetros de plantilla. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica, se usa la cuenta de servicio de Compute Engine predeterminada. Nota: El usuario debe tener la función roles/iam.serviceAccountUser en la cuenta de servicio que usa Cloud Scheduler, ya sea una especificada por el usuario o la predeterminada de Compute Engine (consulta Funciones de canalización de datos).

Crea una canalización de datos

  1. Ve a la página Dataflow Pipelines en Cloud Console y, luego, selecciona “+CREAR CANALIZACIÓN DE DATOS”.

  2. En la página Crear canalización a partir de una plantilla en Administración de trabajos, selecciona “Data pipelines”, proporciona un nombre de canalización y completa los otros campos de selección y parámetro de la plantilla.

    1. Para un trabajo por lotes, proporciona una programación de recurrencia en la sección “Programa tu canalización” en los parámetros de plantilla. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica, se usa la cuenta de servicio de Compute Engine predeterminada. Nota: El usuario debe tener la función roles/iam.serviceAccountUser en la cuenta de servicio que usa Cloud Scheduler, ya sea una especificada por el usuario o la predeterminada de Compute Engine (consulta Funciones de canalización de datos).

Crea una canalización de datos por lotes

Para crear esta canalización de datos por lotes de muestra, debes tener acceso a los siguientes recursos de tu proyecto:

En esta canalización de ejemplo, se usa la plantilla de canalización por lotes Texto de Cloud Storage en BigQuery, que lee archivos en formato CSV desde Cloud Storage, ejecuta una transformación y, luego, inserta valores en your-project-id:your-dataset-name.three_column_table.

  1. Crea los siguientes archivos en tu unidad local:
    1. Un archivo bq_three_column_table.json que contiene el siguiente esquema de la tabla de BigQuery de destino.
{
  "BigQuery Schema": [
    {
      "name": "col1",
      "type": "STRING"
    },
    {
      "name": "col2",
      "type": "STRING"
    },
    {
      "name": "col3",
      "type": "INT64"
    }
  ]
}
  1. Un archivo split_csv_3cols.js de JavaScript, que implementa una transformación simple en los datos de entrada antes de la inserción en BigQuery.
function transform(line) {
    var values = line.split(',');
    var obj = new Object();
    obj.col1 = values[0];
    obj.col2 = values[1];
    obj.col3 = values[2];
    var jsonString = JSON.stringify(obj);
    return jsonString;
}
  1. Un archivo CSV file01.csv con varios registros que se insertarán en la tabla de BigQuery.
    b8e5087a,74,27531
    7a52c051,4a,25846
    672de80f,cd,76981
    111b92bf,2e,104653
    ff658424,f0,149364
    e6c17c75,84,38840
    833f5a69,8f,76892
    d8c833ff,7d,201386
    7d3da7fb,d5,81919
    3836d29b,70,181524
    ca66e6e5,d7,172076
    c8475eb6,03,247282
    558294df,f3,155392
    737b82a8,c7,235523
    82c8f5dc,35,468039
    57ab17f9,5e,480350
    cbcdaf84,bd,354127
    52b55391,eb,423078
    825b8863,62,88160
    26f16d4f,fd,397783
      
  2. Usa gsutil para copiar los archivos a las carpetas de un bucket de Cloud Storage en tu proyecto, de la siguiente manera:
    1. Copiar bq_three_column_table.json y split_csv_3cols.js a gs://your-bucket/text_to_bigquery/
      gsutil cp bq_three_column_table.json gs://your-bucket/text_to_bigquery/
        gsutil cp split_csv_3cols.js gs://your-bucket/text_to_bigquery/
      
    2. Copia file01.csv en gs://your-bucket/inputs/
      gsutil cp file01.csv gs://your-bucket/inputs/
      
  3. Crea una carpeta “tmp” en your-bucket desde el navegador de Cloud Storage. Selecciona el nombre de tu carpeta para abrir la página de detalles del bucket y haz clic en CREAR CARPETA para crear una carpeta “tmp” en tu bucket.
  4. Ve a la página Dataflow Pipelines y, luego, selecciona “CREAR CANALIZACIÓN DE DATOS”. Ingresa o selecciona los siguientes elementos en la página Crear canalización a partir de una plantilla:

    1. Administra trabajos:
      1. Selecciona “Data pipelines”.
      2. Nombre de la canalización: Ingresa "text_to_bq_batch_data_pipeline".
      3. Haz clic en CONTINUAR.
    2. Selecciona plantillas:
      1. Extremo regional: Selecciona una región de Compute Engine.
      2. Lista de plantillas: en “Procesar datos en forma masiva (por lotes)”, selecciona “Text File on Cloud Storage to BigQuery”. Descripción: Canalización por lotes Lee archivos de texto almacenados en Cloud Storage, los transforma mediante una función de JavaScript definida por el usuario (UDF) y envía el resultado a BigQuery”. Nota: No selecciones la canalización de transmisión con el mismo nombre en “Procesar datos de forma continua (transmisión)”.
      3. Haz clic en CONTINUAR.
    3. Parámetros de la plantilla:
      1. Programa la canalización: selecciona un programa, como por hora en el minuto 25, en tu zona horaria. Puedes editar la programación después de enviar la canalización, como se explica a continuación.
    4. Parámetros obligatorios:
      1. Ruta de acceso de UDF de JavaScript en Cloud Storage:
        gs://your-bucket/text_to_bigquery/split_csv_3cols.js
        
      2. Ruta de JSON:
        gs://your-bucket/text_to_bigquery/bq_three_column_table.json
        
      3. Nombre de la UDF de JavaScript: “transform”
      4. Tabla de salida de BigQuery (nombre de tabla completamente calificado):
        your_project_id:your_dataset.three_column_table
        
      5. Ruta de acceso de entrada de Cloud Storage:
        gs://your_bucket/inputs/file*.csv
        
      6. _Directorio temporal de BigQuery:
        gs://your_bucket/tmp
        
      7. Ubicación temporal:
        gs://your_bucket/tmp
        
    5. Haz clic en ENVIAR.
  5. Confirma la información de la canalización y de la plantilla y visualiza el historial actual y anterior desde la página Detalles de la canalización.

También puedes ejecutar una canalización por lotes a pedido con el botón Ejecutar en la consola de Dataflow Pipelines.

Crea una canalización de transmisión de datos de muestra

Puedes crear una canalización de transmisión de datos de muestra si sigues las instrucciones de canalización por lotes de muestra, con las siguientes diferencias:

  • Programa de la canalización. No especificas un programa para una canalización de transmisión de datos. El trabajo de transmisión de Dataflow se inicia de inmediato.

  • Selección de plantillas: En “Procesar datos de forma continua (transmisión)”, selecciona “Archivos de texto en Cloud Storage a BigQuery”. Descripción: Una canalización de transmisión que puede leer archivos de texto almacenados en Cloud Storage, realizar una transformación a través de una función de JavaScript definida por el usuario y transmitir los resultados a BigQuery. En esta canalización, se requiere una función de JavaScript y una representación JSON del TableSchema de BigQuery.

  • Tipo de máquina de trabajador: la canalización procesará el conjunto inicial de archivos que coinciden con el patrón gs://<your_bucket>/inputs/file*.csv y cualquier archivo adicional que coincida con este patrón que subas a la carpeta inputs/. Si el tamaño de los archivos CSV supera varios GB, para evitar posibles errores de memoria insuficiente, selecciona un tipo de máquina con más memoria que el tipo de máquina predeterminado n1-standard-4, como n1-highmem-8.

Investiga incumplimientos de los objetivos de canalización

Canalizaciones por lotes recurrentes

En la página Detalles de la canalización en Cloud Console, usa los grafos de “Estado de cada trabajo” y “Duración del subproceso en cada paso” del panel de estado de la canalización a fin de realizar un análisis inicial del estado de tu canalización.

Investigación de ejemplo:

  1. Tienes una canalización por lotes recurrente que se ejecuta cada hora 3 minutos después de la hora, cada trabajo se ejecuta normalmente durante unos 9 minutos y tienes un objetivo para que todos los trabajos se completen en menos de 10 minutos.

  2. El grafo de “estado del trabajo” muestra que un trabajo se ejecutó por más de 10 minutos.

  3. En la tabla de historial de actualizaciones o ejecuciones, busca el trabajo que se ejecutó durante la hora de interés y, luego, haz clic en la página de detalles del trabajo de Dataflow. En esa página, encontrarás la etapa de ejecución más larga y, luego, buscarás en los registros los posibles errores para determinar la causa de la demora.

Canalizaciones de transmisión

En la pestaña de información de canalización en la Detalles de la canalización en Cloud Console, usa el grafo de actualidad de datos del panel de estado de la canalización para un análisis inicial del estado de tu canalización.

Investigación de ejemplo:

  1. Tienes una canalización de transmisión que suele producir un resultado con una actualidad de los datos de 20 segundos.

  2. Establece un objetivo de una garantía de 30 segundos de actualidad de datos. Cuando revisas el grafo de actualización de datos, observarás que entre las 9 y las 10 a.m., la actualidad aumentó a casi 40 segundos.

  3. Cambia a la pestaña MÉTRICAS DE LA CANALIZACIÓN y, luego, visualiza los grafos de Capacidad de procesamiento, Uso de CPU y Uso de memoria para realizar un análisis más detallado.