Descripción general
Puedes usar las canalizaciones de datos de Dataflow para las siguientes tareas:
- Crear programas de trabajos recurrentes
- Comprender dónde se gastan los recursos en múltiples ejecuciones de trabajos
- Definir y administrar objetivos de actualización de datos
- Desglosar en etapas de canalización individuales para corregir y optimizar tus canalizaciones
Para ver la documentación de la API, consulta la referencia de las canalizaciones de datos.
Funciones
- Crea una canalización por lotes recurrente para ejecutar un trabajo por lotes de forma programada.
- Crea una canalización por lotes incremental y recurrente para ejecutar un trabajo por lotes en la última versión de datos de entrada.
- Usa el cuadro de evaluación de resumen de la canalización para ver el uso agregado de capacidad y el consumo de recursos de una canalización.
- Ver la actualización de datos de una canalización de transmisión. Esta métrica, que evoluciona con el tiempo, se puede vincular a una alerta que te notifica cuando la actualidad es inferior a un objetivo especificado.
- Usa los grafos de métricas de canalización para comparar trabajos de canalización por lotes y encontrar anomalías.
Limitaciones
Disponibilidad regional: Puedes crear canalizaciones de datos en las regiones de Cloud Scheduler disponibles.
Cuota:
- Cantidad predeterminada de canalizaciones por proyecto: 500
Cantidad predeterminada de canalizaciones por organización: 2,500
La cuota a nivel de la organización está inhabilitada de forma predeterminada. Puedes habilitar las cuotas a nivel de la organización y, si lo haces, cada organización puede tener como máximo 2,500 canalizaciones de forma predeterminada.
Etiquetas: No puedes usar etiquetas definidas por el usuario para etiquetar canalizaciones de datos de Dataflow. Sin embargo, cuando usas el campo
additionalUserLabels
, esos valores se pasan a tu trabajo de Dataflow. Para obtener más información sobre cómo se aplican las etiquetas a trabajos de Dataflow individuales, consulta Opciones de canalización.
Tipos de canalizaciones de datos
Dataflow tiene dos tipos de canalizaciones de datos, de transmisión y por lotes. Ambos tipos de canalizaciones ejecutan trabajos que se definen en plantillas de Dataflow.
- Canalización de transmisión de datos
- Una canalización de transmisión de datos ejecuta un trabajo de transmisión de Dataflow inmediatamente después de crearse.
- Canalización de datos por lotes
Una canalización de datos por lotes ejecuta un trabajo por lotes de Dataflow en un programa definido por el usuario. El nombre de archivo de entrada de la canalización por lotes se puede parametrizar para permitir el procesamiento de canalización por lotes incremental.
Canalizaciones por lotes incrementales
Puedes usar marcadores de posición de fecha y hora a fin de especificar un formato de archivo de entrada incremental para una canalización por lotes.
- Se pueden usar marcadores de posición para año, mes, fecha, hora, minuto y segundo, y deben seguir el formato
strftime()
. Los marcadores de posición van precedidos por el símbolo de porcentaje (%). - El formato de los parámetros no se verifica durante la creación de la canalización.
- Ejemplo: Si especificas “gs://bucket/Y” como la ruta de entrada parametrizada, se evaluará como “gs://bucket/Y”, ya que “Y” sin un "%" anterior no se asigna al formato
strftime()
.
- Ejemplo: Si especificas “gs://bucket/Y” como la ruta de entrada parametrizada, se evaluará como “gs://bucket/Y”, ya que “Y” sin un "%" anterior no se asigna al formato
En cada momento de ejecución de la canalización por lotes programada, la parte del marcador de posición de la ruta de entrada se evalúa según la fecha y hora actual (o desplazadas en el tiempo) Los valores de fecha se evalúan mediante la fecha actual en la zona horaria del trabajo programado. Si la ruta evaluada coincide con la de un archivo de entrada, la canalización por lotes la identifica para su procesamiento en el momento programado.
- Ejemplo: Una canalización por lotes está programada para repetirse al inicio de cada hora PST. Si parametrizas la ruta de entrada como
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv
, el 15 de abril de 2021 a las 6 p.m. PST, la ruta de entrada se evaluará comogs://bucket-name/2021-04-15/prefix-18_00.csv
.
Usa parámetros de cambio de hora
Puedes usar parámetros de cambio de hora con + o - minutos u horas.
Para admitir la coincidencia de una ruta de entrada con una fecha y hora evaluadas que se desplazan antes o después de la fecha y hora actuales de la programación de la canalización, encierra estos parámetros entre llaves.
Usa el formato {[+|-][0-9]+[m|h]}
. La canalización por lotes continúa repitiéndose a la hora programada, pero la ruta del archivo de entrada se evalúa con la compensación de tiempo especificada.
- Ejemplo: Una canalización por lotes está programada para repetirse al inicio de cada hora PST. Si parametrizas la ruta de entrada como
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}
, el 15 de abril de 2021 a las 6 p.m. PST, la ruta de entrada se evaluará comogs://bucket-name/2021-04-15/prefix-16_00.csv
.
Roles de la canalización de datos
Para que las operaciones de canalización de datos de Dataflow se realicen correctamente, debes tener los roles de IAM necesarios, como se indica a continuación:
Necesitas el rol adecuado para realizar operaciones:
Datapipelines.admin
: Puede realizar todas las operaciones de canalización de datos.Datapipelines.viewer
: Puede ver trabajos y canalizaciones de datos.Datapipelines.invoker
: Puede invocar una ejecución de trabajo de canalización de datos (esta función se puede habilitar mediante la API)
La cuenta de servicio que use Cloud Scheduler debe tener el rol
roles/iam.serviceAccountUser
, ya sea una cuenta de servicio especificada por el usuario o la cuenta de servicio de Compute Engine predeterminada. Para obtener más información, consulta Roles de canalización de datos.Debes poder actuar como la cuenta de servicio que usan Cloud Scheduler y Dataflow si se te otorga el rol
roles/iam.serviceAccountUser
en esa cuenta. Si no seleccionas una cuenta de servicio para Cloud Scheduler y Dataflow, se usa la cuenta de servicio predeterminada de Compute Engine.
Crea una canalización de datos
Puedes crear una canalización de datos de las siguientes dos maneras:
La página de configuración de canalizaciones de datos: Cuando accedes por primera vez a la función de canalizaciones de Dataflow en la consola de Google Cloud, se abre una página de configuración. Habilita las API enumeradas para crear canalizaciones de datos.
Importa un trabajo
Puedes importar un trabajo por lotes o de transmisión de Dataflow que se base en una plantilla clásica o flexible y convertirla en una canalización de datos.
En la consola de Google Cloud, ve a la página Trabajos de Dataflow.
Selecciona un trabajo completado y, luego, en la página Detalles del trabajo, selecciona +Importar como una canalización.
En la página Crear una canalización a partir de una plantilla, los parámetros se propagan con las opciones del trabajo importado.
Para un trabajo por lotes, en la sección Programa tu canalización, proporciona una programación de recurrencia. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica, se usa la cuenta de servicio de Compute Engine predeterminada.
Crea una canalización de datos
En la consola de Google Cloud, ve a la página Canalizaciones de datos de Dataflow.
Selecciona +Crear canalización de datos.
En la página Crear canalización a partir de una plantilla, proporciona un nombre de canalización y completa los otros campos de selección y parámetro de la plantilla.
Para un trabajo por lotes, en la sección Programa tu canalización, proporciona una programación de recurrencia. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica un valor, se usa la cuenta de servicio predeterminada de Compute Engine.
Crea una canalización de datos por lotes
Para crear esta canalización de datos por lotes de muestra, debes tener acceso a los siguientes recursos de tu proyecto:
- Un bucket de Cloud Storage para almacenar archivos de entrada y salida
- Un conjunto de datos de BigQuery para crear una tabla.
En esta canalización de ejemplo, se usa la plantilla de canalización por lotes Texto de Cloud Storage a BigQuery. Esta plantilla lee archivos en formato CSV desde Cloud Storage, ejecuta una transformación y, luego, inserta valores en una tabla de BigQuery con tres columnas.
Crea los siguientes archivos en tu unidad local:
Un archivo
bq_three_column_table.json
que contiene el siguiente esquema de la tabla de BigQuery de destino.{ "BigQuery Schema": [ { "name": "col1", "type": "STRING" }, { "name": "col2", "type": "STRING" }, { "name": "col3", "type": "INT64" } ] }
Un archivo
split_csv_3cols.js
de JavaScript, que implementa una transformación simple en los datos de entrada antes de la inserción en BigQuery.function transform(line) { var values = line.split(','); var obj = new Object(); obj.col1 = values[0]; obj.col2 = values[1]; obj.col3 = values[2]; var jsonString = JSON.stringify(obj); return jsonString; }
Un archivo CSV
file01.csv
con varios registros que se insertan en la tabla de BigQuery.b8e5087a,74,27531 7a52c051,4a,25846 672de80f,cd,76981 111b92bf,2e,104653 ff658424,f0,149364 e6c17c75,84,38840 833f5a69,8f,76892 d8c833ff,7d,201386 7d3da7fb,d5,81919 3836d29b,70,181524 ca66e6e5,d7,172076 c8475eb6,03,247282 558294df,f3,155392 737b82a8,c7,235523 82c8f5dc,35,468039 57ab17f9,5e,480350 cbcdaf84,bd,354127 52b55391,eb,423078 825b8863,62,88160 26f16d4f,fd,397783
Usa el comando
gcloud storage cp
para copiar los archivos a las carpetas de un bucket de Cloud Storage en tu proyecto, de la siguiente manera:Copiar
bq_three_column_table.json
ysplit_csv_3cols.js
ags://BUCKET_ID/text_to_bigquery/
gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
Copia
file01.csv
engs://BUCKET_ID/inputs/
:gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
En la consola de Google Cloud, ve a la página Buckets de Cloud Storage.
Para crear una carpeta
tmp
en el bucket de Cloud Storage, selecciona el nombre de tu carpeta para abrir la página de detalles del bucket y, luego, haz clic en Crear carpeta.En la consola de Google Cloud, ve a la página Canalizaciones de datos de Dataflow.
Selecciona Crear canalización de datos. Ingresa o selecciona los siguientes elementos en la página Crear canalización a partir de una plantilla:
- En Nombre de la canalización, ingresa
text_to_bq_batch_data_pipeline
. - En Extremo regional, selecciona una región de Compute Engine. Las regiones de origen y de destino deben coincidir. Por lo tanto, tu bucket de Cloud Storage y la tabla de BigQuery deben estar en la misma región.
En Plantilla de Dataflow, en Procesar datos en forma masiva (por lotes), selecciona Archivos de texto en Cloud Storage a BigQuery.
En Programa la canalización, selecciona un programa, como por hora en el minuto 25 en tu zona horaria. Puedes editar el programa después de enviar la canalización. Es opcional proporcionar una dirección de cuenta de correo electrónico para Cloud Scheduler, que se usa a fin de programar ejecuciones por lotes. Si no se especifica, se usa la cuenta de servicio de Compute Engine predeterminada.
En Parámetros obligatorios, ingresa lo siguiente:
- Para la ruta de UDF de JavaScript en Cloud Storage:
gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
- En Ruta de acceso JSON:
BUCKET_ID/text_to_bigquery/bq_three_column_table.json
- En nombre de la UDF de JavaScript:
transform
- En Tabla de salida de BigQuery, haz lo siguiente:
PROJECT_ID:DATASET_ID.three_column_table
- En Ruta de entrada de Cloud Storage:
BUCKET_ID/inputs/file01.csv
- En Directorio de BigQuery temporal, haz lo siguiente:
BUCKET_ID/tmp
- En Ubicación temporal:
BUCKET_ID/tmp
- Para la ruta de UDF de JavaScript en Cloud Storage:
Haz clic en Crear canalización.
- En Nombre de la canalización, ingresa
Confirma la información de la canalización y de la plantilla y visualiza el historial actual y anterior desde la página Detalles de la canalización.
Puedes editar el programa de canalización de datos desde el panel de información de canalización en la página Detalles de la canalización.
También puedes ejecutar una canalización por lotes a pedido con el botón Ejecutar en la consola de Dataflow Pipelines.
Crea una canalización de transmisión de datos de muestra
Puedes crear una canalización de transmisión de datos de muestra si sigues las instrucciones de canalización por lotes de muestra, con las siguientes diferencias:
- En Programación de canalización, no especifiques un programa para una canalización de transmisión de datos. El trabajo de transmisión de Dataflow se inicia de inmediato.
- En Plantilla de Dataflow, en Procesar datos continuamente (stream), selecciona Archivos de texto en Cloud Storage a BigQuery.
- En Tipo de máquina del trabajador, la canalización procesa el conjunto inicial de archivos que coinciden con el patrón
gs://BUCKET_ID/inputs/file01.csv
y cualquier archivo adicional que coincida con este patrón que subas a la carpetainputs/
. Si el tamaño de los archivos CSV supera varios GB, para evitar posibles errores de memoria insuficiente, selecciona un tipo de máquina con más memoria que el tipo de máquina predeterminadon1-standard-4
, comon1-highmem-8
.
Soluciona problemas
En esta sección, se muestra cómo resolver problemas con las canalizaciones de datos de Dataflow.
No se puede iniciar el trabajo de canalización de datos
Cuando usas canalizaciones de datos para crear una programación de trabajo recurrente, es posible que tu trabajo de Dataflow no se inicie y aparezca un error de estado 503
en los archivos de registro de Cloud Scheduler.
Este problema se produce cuando Dataflow no puede ejecutar el trabajo de forma temporal.
A fin de solucionar este problema, configura Cloud Scheduler para volver a intentar el trabajo. Debido a que el problema es temporal, cuando el trabajo se reintenta, podría tener éxito. Para obtener más información sobre cómo configurar los valores de reintento en Cloud Scheduler, consulta Crea un trabajo.
Investiga incumplimientos de los objetivos de canalización
En las siguientes secciones, se describe cómo investigar las canalizaciones que no cumplen con los objetivos de rendimiento.
Canalizaciones por lotes recurrentes
Para un análisis inicial del estado de tu canalización, en la página Información de la canalización en la consola de Google Cloud, usa los grafos Estado de cada trabajo y Duración del subproceso por paso. Estos gráficos se encuentran en el panel de estado de la canalización.
Investigación de ejemplo:
Tienes una canalización por lotes recurrente que se ejecuta cada hora 3 minutos después de la hora. Cada trabajo se ejecuta normalmente durante unos 9 minutos. Tienes un objetivo para que todos los trabajos se completen en menos de 10 minutos.
El gráfico de estado del trabajo muestra que un trabajo se ejecutó por más de 10 minutos.
En la tabla de historial de actualizaciones o ejecuciones, busca el trabajo que se ejecutó durante la hora de interés. Haz clic para ir a la página de detalles del trabajo de Dataflow. En esa página, encontrarás la etapa de ejecución más larga y, luego, buscarás en los registros los posibles errores para determinar la causa de la demora.
Canalizaciones de transmisión
Para realizar un análisis inicial del estado de tu canalización, en la página Detalles de la canalización, en la pestaña Información de la canalización, usa el grafo de actualidad de los datos. Este grafo se encuentra en el panel de estado de la canalización.
Investigación de ejemplo:
Tienes una canalización de transmisión que suele producir un resultado con una actualidad de los datos de 20 segundos.
Establece un objetivo de una garantía de 30 segundos de actualidad de datos. Cuando revisas el grafo de actualización de datos, observarás que entre las 9 y las 10 a.m., la actualidad aumentó a casi 40 segundos.
Cambia a la pestaña Métricas de la canalización y, luego, visualiza los grafos de Capacidad de procesamiento, Uso de CPU y Uso de memoria para realizar un análisis más detallado.
Error: El ID de la canalización ya existe en el proyecto
Si intentas crear una canalización nueva con un nombre que ya existe en tu proyecto, recibirás este mensaje de error: Pipeline Id already exist within the
project
. A fin de evitar este problema, elige siempre nombres únicos para tus canalizaciones.