Usa Cloud Dataflow SQL

En este instructivo, se muestra cómo ejecutar trabajos de Cloud Dataflow con SQL y la IU de SQL de Cloud Dataflow.Para demostrarlo, este instructivo te guía a través de un ejemplo que une una transmisión de datos de Cloud Pub/Sub con datos de una tabla de BigQuery.

Objetivos

En este instructivo, realizarás lo siguiente:

  • Usarás SQL para unir datos de transmisión de Cloud Pub/Sub con datos de la tabla de BigQuery.
  • Implementarás un trabajo de Cloud Dataflow desde la IU de SQL de Cloud Dataflow.

Costos

En este instructivo, se usan componentes facturables de Google Cloud Platform, incluidos los siguientes:

  • Cloud Dataflow
  • Cloud Storage
  • Cloud Pub/Sub

Usa la calculadora de precios para generar una estimación de los costos según el uso previsto. Los usuarios nuevos de GCP pueden optar por a una prueba gratuita.

Antes de comenzar

  1. Accede a tu Cuenta de Google.

    Si todavía no tienes una cuenta, regístrate para obtener una nueva.

  2. Selecciona o crea un proyecto de GCP.

    Ir a la página Administrar recursos

  3. Asegúrate de tener habilitada la facturación para tu proyecto.

    Aprende a habilitar la facturación

  4. Habilita las Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager API necesarias.

    Habilita las API

  5. Configura la autenticación:
    1. En GCP Console, ve a la página Crear clave de la cuenta de servicio.

      Ir a la página Crear clave de la cuenta de servicio
    2. Desde la lista desplegable de la Cuenta de servicio, selecciona Nueva cuenta de servicio.
    3. En el campo Nombre de cuenta de servicio, ingresa un nombre.
    4. En la lista desplegable Función, selecciona Proyecto > Propietario.

      Nota: El campo Función autoriza tu cuenta de servicio para acceder a los recursos. Puedes ver y cambiar este campo luego con GCP Console. Si desarrollas una aplicación de producción, especifica permisos más detallados que Proyecto > Propietario. Para obtener más información, consulta Cómo otorgar funciones a las cuentas de servicio.
    5. Haz clic en Crear. Se descargará un archivo JSON a tu computadora que contiene tus descargas de claves.
  6. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS con la ruta de acceso al archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a tu sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

  7. Instala y, luego, inicializa el SDK de Cloud. Elige una de las opciones de instalación. Es posible que debas establecer la propiedad project en el proyecto que usas para esta explicación.
  8. Ve a la IU web de BigQuery en GCP Console. Así se abrirá el último proyecto al que accediste. Para cambiar a un proyecto diferente, haz clic en el nombre del proyecto en la parte superior de la IU web de BigQuery y busca el proyecto que quieres usar.
    Ir a la IU web de BigQuery

Cambia a la IU de SQL de Cloud Dataflow

En la IU web de BigQuery, sigue estos pasos para cambiar a la IU de Cloud Dataflow.

  1. Haz clic en el menú desplegable Más y selecciona Configuración de consulta.

  2. En el menú Configuración de consultas que se abre a la derecha, selecciona Motor de Cloud Dataflow.

  3. Si tu proyecto no tiene habilitadas las API de Cloud Dataflow ni Data Catalog, se te solicitará que las habilites. Haz clic en Habilitar las API. Puede que la habilitación de las API de Cloud Dataflow y Data Catalog demore unos minutos.

  4. Cuando se complete la habilitación de las API, haz clic en Guardar.

Crea fuentes de ejemplo

Si quieres seguir el ejemplo proporcionado en este instructivo, crea las fuentes siguientes y úsalas en los pasos del instructivo.

  • Un tema de Cloud Pub/Sub llamado transactions: una transmisión de datos de transacciones que se entrega a través de una suscripción al tema de Cloud Pub/Sub. Los datos de cada transacción incluyen información sobre el producto comprado, el precio de oferta y la ciudad y estado en los que se realizó la compra. Después de crear el tema de Cloud Pub/Sub, debes crear una secuencia de comandos que publique mensajes en tu tema. Ejecutarás esta secuencia de comandos en una sección posterior de este instructivo.
  • Una tabla de BigQuery llamada us_state_salesregions: una tabla que proporciona una asignación de estados a regiones de ventas. Antes de crear esta tabla, debes crear un conjunto de datos de BigQuery.

Asigna un esquema a tu tema de Cloud Pub/Sub

La asignación de un esquema te permite ejecutar consultas de SQL en tus datos de tema de Cloud Pub/Sub. En la actualidad, SQL de Cloud Dataflow espera que los mensajes de los temas de Cloud Pub/Sub se serialicen en formato JSON. En el futuro, se generará compatibilidad con otros formatos, como Avro.

Para asignar un esquema al ejemplo de tema de Cloud Pub/Sub transactions:

  1. Crea un archivo de texto y otórgale el nombre transactions_schema.yaml. Copia y pega el texto de esquema siguiente en transactions_schema.yaml.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: attributes
    description: Pub/Sub message attributes
    mode: NULLABLE
    type: MAP<STRING,STRING>
  - column: payload
    description: Pub/Sub message payload
    mode: NULLABLE
    type: STRUCT
    subcolumns:
    - column: tr_time_str
      description: Transaction time string
      mode: NULLABLE
      type: STRING
    - column: first_name
      description: First name
      mode: NULLABLE
      type: STRING
    - column: last_name
      description: Last name
      mode: NULLABLE
      type: STRING
    - column: city
      description: City
      mode: NULLABLE
      type: STRING
    - column: state
      description: State
      mode: NULLABLE
      type: STRING
    - column: product
      description: Product
      mode: NULLABLE
      type: STRING
    - column: amount
      description: Amount of transaction
      mode: NULLABLE
      type: FLOAT
  1. Asigna el esquema con la herramienta de línea de comandos de gcloud.

    a. Actualiza la herramienta de gcloud con el comando siguiente Asegúrate de tener la versión 242.0.0 o posterior de la herramienta de gcloud.

      gcloud components update
    

    b. Ejecuta el comando siguiente en una ventana de línea de comandos. Reemplaza project-id por el ID del proyecto y path-to-file por la ruta a tu archivo transactions_schema.yaml.

      gcloud beta data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Para obtener más información sobre los parámetros del comando y los formatos de archivo de esquema permitidos, consulta la página de documentación de gcloud beta data-catalog entries update.

    c. Confirma que tu esquema se haya asignado de forma correcta al tema transactions de Cloud Pub/Sub. Reemplaza project-id por el ID del proyecto.

      gcloud beta data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Busca fuentes de Cloud Pub/Sub

La IU de SQL de Cloud Dataflow proporciona una forma de buscar objetos de fuente de datos de Cloud Pub/Sub para cualquier proyecto al que tengas acceso, de modo que no tengas que recordar los nombres completos.

Para el ejemplo en este instructivo, debes agregar el tema transactions de Cloud Pub/Sub que creaste:

  1. En el panel de navegación izquierdo, haz clic en la lista desplegable Agregar datos y selecciona Fuentes de Cloud Dataflow.

  2. En el panel Agregar fuente de Cloud Dataflow que se abre a la derecha, elige temas de Cloud Pub/Sub. En el cuadro de búsqueda, ingresa transactions. Selecciona el tema y haz clic en Agregar.

Observa el esquema

  1. En el panel de navegación a la izquierda de la IU de SQL de Cloud Dataflow, haz clic en Fuentes de Cloud Dataflow.
  2. Haz clic en Temas de Cloud Pub/Sub.
  3. Haz clic en Transacciones.
  4. En Esquema, puedes ver el esquema que asignaste al tema transactions de Cloud Pub/Sub.

Crea una consulta de SQL

La IU de SQL de Cloud Dataflow te permite crear consultas de SQL para ejecutar tus trabajos de Cloud Dataflow.

La consulta de SQL siguiente es una consulta de enriquecimiento de datos. Esta agrega un campo adicional, sales_region, a la transmisión de eventos de Cloud Pub/Sub (transactions), mediante una tabla de BigQuery (us_state_salesregions) que asigna estados a regiones de venta.

Copia y pega la consulta de SQL siguiente en el Editor de consultas. Reemplaza project-id por el ID del proyecto.

SELECT tr.payload.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.payload.state = sr.state_code

Cuando ingresas una consulta en la IU de SQL de Cloud Dataflow, el validador de consultas verifica la sintaxis de la consulta. Se muestra un ícono de marca de verificación verde si la consulta es válida. Si la consulta no es válida, se muestra un ícono de signo de exclamación rojo. Si la sintaxis de la consulta no es válida, se proporcionará información sobre lo que se debe corregir si haces clic en el ícono del validador.

En la captura de pantalla siguiente, se muestra la consulta válida en el Editor de consultas. El validador muestra una marca de verificación verde.

Ingresa tu consulta en el editor.

Crea un trabajo de Cloud Dataflow para ejecutar tu consulta de SQL

Para ejecutar la consulta de SQL, crea un trabajo de Cloud Dataflow desde la IU de SQL de Cloud Dataflow.

  1. Debajo del Editor de consultas, haz clic en Crear trabajo de Cloud Dataflow.

  2. En el panel Crear trabajo de Cloud Dataflow que se abre a la derecha, cambia el Nombre de la tabla predeterminado a dfsqltable_sales.

  3. Haz clic en Crear. Tu trabajo de Cloud Dataflow tardará unos minutos en comenzar a ejecutarse.

  4. El panel Resultados de la consulta aparece en la IU. Para volver al panel Resultados de la consulta de un trabajo posterior, busca el trabajo en el panel Historial de trabajos y usa el botón Abrir consulta en el editor, como se muestra en Visualiza el trabajo y el resultado de Cloud Dataflow.

  5. En Información del trabajo, haz clic en el enlace ID del trabajo. Se abrirá una pestaña nueva del navegador con la página Detalles del trabajo de Cloud Dataflow en la IU web de Cloud Dataflow.

Visualiza el trabajo y el resultado de Cloud Dataflow

Cloud Dataflow convierte tu consulta de SQL en una canalización de Apache Beam. En la IU web de Cloud Dataflow que se abre en una pestaña nueva del navegador, podrás ver una representación gráfica de tu canalización.

Puedes hacer clic en las casillas para ver un desglose de las transformaciones que se generan en la canalización. Por ejemplo, si haces clic en el cuadro superior de la representación gráfica que tiene la etiqueta Ejecutar consulta de SQL, aparecerá un gráfico que muestra las operaciones que ocurren en segundo plano.

Los dos cuadros superiores representan las dos entradas que uniste: el tema de Cloud Pub/Sub, transactions y la tabla de BigQuery, us_state_salesregions.

Para ver la tabla de resultados que contiene los resultados del trabajo, vuelve a la pestaña del navegador con la IU de SQL de Cloud Dataflow. En el panel de navegación izquierdo, en de tu proyecto, haz clic en el conjunto de datos dataflow_sql_dataset que creaste. Luego, haz clic en la tabla de resultados dfsqltable_sales. En la pestaña Vista previa, se muestran los contenidos de la tabla de resultados.

Visualiza los trabajos anteriores y edita tus consultas

La IU de SQL de Cloud Dataflow almacena trabajos y consultas anteriores en el panel Historial de trabajos. Los trabajos se enumeran según el día en que comenzó el trabajo. La lista de trabajos muestra primero los días que contienen trabajos en ejecución. Luego, la lista muestra los días sin trabajos en ejecución.

Puedes usar la lista del historial de trabajos para editar consultas de SQL anteriores y ejecutar trabajos nuevos de Cloud Dataflow. Por ejemplo, en caso de que quieras modificar tu consulta para agregar ventas por región de ventas cada 15 segundos. Usa el panel Historial de trabajos para acceder al trabajo en ejecución que iniciaste antes en el instructivo. Con este, cambia la consulta de SQL y ejecuta otro trabajo con la consulta modificada.

  1. En el panel de navegación de izquierdo, haz clic en Historial de trabajos.

  2. En Historial de trabajos, haz clic en Cloud Dataflow. Aparecerán todos los trabajos anteriores de tu proyecto.

  3. Haz clic en el trabajo que quieres editar. Haz clic en Abrir en editor de consultas.

  4. Edita tu consulta de SQL en el Editor de consultas para agregar ventanas de saltos de tamaño constante. Reemplaza project-id por el ID del proyecto si copias la consulta siguiente.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.payload.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.payload.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. Debajo del Editor de consultas, haz clic en Crear trabajo de Cloud Dataflow para crear uno nuevo con la consulta modificada.

Realiza una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud Platform por los recursos que usaste en este instructivo:

  1. Detén tu secuencia de comandos de publicación transactions_injector.py si aún se está ejecutando.

  2. Detén tus trabajos en ejecución de Cloud Dataflow. Ve a la IU web de Cloud Dataflow en GCP Console.

    Ir a la IU web de Cloud Dataflow

    Para cada trabajo que creaste a partir de este instructivo, sigue estos pasos:

    1. Haz clic en el nombre del trabajo.

    2. En el panel Resumen del trabajo, haz clic en Detener trabajo. Aparecerá el cuadro de diálogo Detener trabajo con las opciones para detenerlo:

    3. Haz clic en Cancelar.

    4. Haz clic en Detener trabajo. El servicio detiene la transferencia y procesamiento de todos los datos lo antes posible. Debido a que Cancelar detiene el procesamiento de manera inmediata, es posible que pierdas los datos “en tránsito”. Detener un trabajo puede tardar unos minutos.

  3. Borra tu conjunto de datos de BigQuery. Ve a la IU web de BigQuery en GCP Console.

    Ir a la IU web de BigQuery

    1. En el panel de navegación, en la sección Recursos, haz clic en el conjunto de datos dataflow_sql_dataset que creaste.

    2. En el lado derecho del panel de detalles, haz clic en Borrar conjunto de datos. Con esta acción, se borra el conjunto de datos, la tabla y todos los datos.

    3. En el cuadro de diálogo Borrar conjunto de datos, ingresa el nombre del conjunto de datos (dataflow_sql_dataset) y, luego, haz clic en Borrar para confirmar el comando de borrado.

  4. Borra tu tema de Cloud Pub/Sub. Dirígete a la página de temas de Cloud Pub/Sub en GCP Console.

    Ir a la página de Cloud Pub/Sub

    1. Marca la casilla de verificación junto al tema transactions.

    2. Haz clic en Borrar para borrar el tema de forma definitiva.

    3. Dirígete a la página de suscripciones de Cloud Pub/Sub.

    4. Marca la casilla de verificación junto a las suscripciones restantes a transactions. Si tus trabajos ya no están en ejecución, es posible que no haya ninguna suscripción.

    5. Haz clic en Borrar para borrar las suscripciones de manera permanente.

  5. Borra el depósito de etapa de pruebas de Cloud Dataflow en Cloud Storage. Dirígete al navegador de Cloud Storage en GCP Console.

    Ir al navegador de Cloud Storage

    1. Marca la casilla de verificación junto al depósito de pruebas de Cloud Dataflow.

    2. Haz clic en Borrar para borrar el depósito de manera permanente.

Qué sigue

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.