Crear una tarea personalizada con el creador de tareas

El creador de trabajos te permite crear trabajos de Dataflow por lotes y de streaming personalizados. También puedes guardar las tareas del creador de tareas como archivos YAML de Apache Beam para compartirlas y reutilizarlas.

Crear un nuevo flujo de trabajo

Para crear una nueva canalización en el creador de trabajos, sigue estos pasos:

  1. Ve a la página Trabajos de la Google Cloud consola.

    Ir a Tareas

  2. Haz clic en Crear trabajo desde creador.

  3. En Nombre del trabajo, escribe el nombre del trabajo.

  4. Selecciona Lote o Streaming.

  5. Si seleccionas Streaming, elige un modo de ventana. A continuación, introduce una especificación para la ventana, como se indica a continuación:

    • Ventana fija: introduce el tamaño de la ventana en segundos.
    • Ventana de tiempo: introduce el tamaño y el periodo de la ventana en segundos.
    • Ventana de sesión: introduce un intervalo de sesión en segundos.

    Para obtener más información sobre las ventanas, consulta Ventanas y funciones de ventanas.

A continuación, añade fuentes, transformaciones y receptores a la canalización, tal como se describe en las secciones siguientes.

Añadir una fuente a la canalización

Una canalización debe tener al menos una fuente. Al principio, el creador de trabajos se rellena con una fuente vacía. Para configurar la fuente, sigue estos pasos:

  1. En el cuadro Nombre de la fuente, introduce un nombre para la fuente o usa el predeterminado. El nombre aparece en el gráfico de la tarea cuando la ejecutas.

  2. En la lista Tipo de fuente, seleccione el tipo de fuente de datos.

  3. En función del tipo de fuente, proporcione información de configuración adicional. Por ejemplo, si selecciona BigQuery, especifique la tabla de la que se van a leer los datos.

    Si seleccionas Pub/Sub, especifica un esquema de mensaje. Introduzca el nombre y el tipo de datos de cada campo que quiera leer de los mensajes de Pub/Sub. La canalización elimina los campos que no se especifican en el esquema.

  4. Opcional: En algunos tipos de fuentes, puedes hacer clic en Vista previa de los datos de la fuente para ver una vista previa de los datos de la fuente.

Para añadir otra fuente a la canalización, haz clic en Añadir una fuente. Para combinar datos de varias fuentes, añade una transformación SQL o Join a tu canalización.

Añadir una transformación a la canalización

También puedes añadir una o varias transformaciones a la canalización. Puede usar las siguientes transformaciones para manipular, agregar o combinar datos de fuentes y otras transformaciones:

Tipo de transformación Descripción Información de transformación de YAML de Beam
Filtro (Python) Filtrar registros con una expresión de Python.
Transformación de SQL Manipula registros o une varias entradas con una instrucción SQL.
Asignar campos (Python) Añade campos nuevos o reasigna registros completos con expresiones y funciones de Python.
Asignar campos (SQL) Añade o asigna campos de registro con expresiones SQL.
Transformaciones de YAML:
  1. AssertEqual
  2. AssignTimestamps
  3. Combinar
  4. Explode
  5. Filtro
  6. Flatten
  7. Unirme
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Usa cualquier transformación del SDK de YAML de Beam.

Configuración de la transformación de YAML: proporciona los parámetros de configuración de la transformación de YAML como un mapa de YAML. Los pares clave-valor se usan para rellenar la sección de configuración de la transformación YAML de Beam resultante. Para ver los parámetros de configuración admitidos de cada tipo de transformación, consulta la documentación de las transformaciones YAML de Beam. Parámetros de configuración de ejemplo:

Combinar
group_by:
combine:
Unirme
type:
equalities:
fields:
Registro Registra los registros en los registros de los trabajadores de la tarea.
Agrupar por Combina registros con funciones como count() y sum().
Unirme Unir varias entradas en campos iguales.
Explode Divide los registros acoplando los campos de la matriz.

Para añadir una transformación:

  1. Haz clic en Añadir transformación.

  2. En el cuadro de nombre Transformación, introduce un nombre para la transformación o usa el nombre predeterminado. El nombre aparece en el gráfico de la tarea cuando la ejecutas.

  3. En la lista Tipo de transformación, selecciona el tipo de transformación.

  4. En función del tipo de transformación, proporciona información de configuración adicional. Por ejemplo, si seleccionas Filtro (Python), introduce una expresión de Python que quieras usar como filtro.

  5. Selecciona el paso de entrada de la transformación. El paso de entrada es el origen o la transformación cuya salida proporciona la entrada de esta transformación.

Añadir un receptor al flujo de procesamiento

Una canalización debe tener al menos un receptor. Al principio, el creador de trabajos se rellena con un receptor vacío. Para configurar el receptor, sigue estos pasos:

  1. En el cuadro Nombre del receptor, introduce un nombre para el receptor o usa el predeterminado. El nombre aparece en el gráfico de la tarea cuando la ejecutas.

  2. En la lista Tipo de receptor, selecciona el tipo de receptor.

  3. En función del tipo de receptor, proporcione información de configuración adicional. Por ejemplo, si selecciona el sumidero de BigQuery, elija la tabla de BigQuery en la que quiere escribir.

  4. Selecciona el paso de entrada del receptor. El paso de entrada es la fuente o la transformación cuya salida proporciona la entrada de esta transformación.

  5. Para añadir otro receptor a la canalización, haz clic en Añadir un receptor.

Ejecutar el flujo de procesamiento

Para ejecutar una canalización desde el creador de trabajos, sigue estos pasos:

  1. Opcional: Define las opciones de la tarea de Dataflow. Para desplegar la sección Opciones de flujo de datos, haga clic en la flecha .

  2. Haz clic en Ejecutar trabajo. El creador de tareas te lleva al gráfico de tareas de la tarea enviada. Puedes usar el gráfico de tareas para monitorizar el estado de la tarea.

Validar la canalización antes de lanzarla

En el caso de las canalizaciones con configuraciones complejas, como los filtros de Python y las expresiones SQL, puede ser útil comprobar si hay errores de sintaxis en la configuración de la canalización antes de iniciarla. Para validar la sintaxis de la canalización, sigue estos pasos:

  1. Haz clic en Validar para abrir Cloud Shell e iniciar el servicio de validación.
  2. Haz clic en Iniciar validación.
  3. Si se detecta un error durante la validación, aparecerá un signo de exclamación rojo.
  4. Corrige los errores detectados y verifica las correcciones haciendo clic en Validar. Si no se encuentra ningún error, aparecerá una marca de verificación verde.

Ejecutar con gcloud CLI

También puedes ejecutar las canalizaciones YAML de Beam con la CLI de gcloud. Para ejecutar una canalización de compilación de trabajos con gcloud CLI, haz lo siguiente:

  1. Haga clic en Guardar YAML para abrir la ventana Guardar YAML.

  2. Realiza una de las siguientes acciones:

    • Para guardar el archivo en Cloud Storage, introduce una ruta de Cloud Storage y haz clic en Guardar.
    • Para descargar un archivo local, haz clic en Descargar.
  3. Ejecuta el siguiente comando en tu shell o terminal:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    Sustituye YAML_FILE_PATH por la ruta de tu archivo YAML, ya sea localmente o en Cloud Storage.

Siguientes pasos