Une datos de transmisión con Dataflow SQL

En este instructivo, se muestra cómo usar Dataflow SQL para unir una transmisión de datos de Pub/Sub con datos de una tabla de BigQuery.

Objetivos

En este instructivo, realizarás lo siguiente:

  • Escribirás una consulta de SQL de Dataflow que una los datos de transmisión de Pub/Sub con los datos de la tabla de BigQuery.
  • Implementarás un trabajo de Dataflow desde la IU de Dataflow SQL.

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. Habilita las API de Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON de Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager y Data Catalog. .

    Habilita las API

  5. Crea una cuenta de servicio:

    1. En Cloud Console, ve a la página Crear cuenta de servicio.

      Ir a Crear cuenta de servicio
    2. Selecciona un proyecto
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio. Cloud Console completa el campo ID de cuenta de servicio según este nombre.

      Opcional: en el campo Descripción de la cuenta de servicio, ingresa una descripción. Por ejemplo, Service account for quickstart.

    4. Haz clic en Crear y continuar.
    5. Haz clic en el campo Seleccionar una función.

      En Acceso rápido, haz clic en Básico y, luego, en Propietario.

    6. Haga clic en Continuar.
    7. Haz clic en Listo para terminar de crear la cuenta de servicio.

      No cierres la ventana del navegador. La usarás en la próxima tarea.

  6. Para crear una clave de cuenta de servicio, haz lo siguiente:

    1. En Cloud Console, haz clic en la dirección de correo electrónico de la cuenta de servicio que creaste.
    2. Haga clic en Claves.
    3. Haz clic en Agregar clave, luego haz clic en Crear clave nueva.
    4. Haga clic en Crear. Se descargará un archivo de claves JSON en tu computadora.
    5. Haga clic en Cerrar.
  7. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

  8. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  9. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  10. Habilita las API de Cloud Dataflow, Compute Engine, Logging, Cloud Storage, JSON de Cloud Storage, BigQuery, Cloud Pub/Sub, Cloud Resource Manager y Data Catalog. .

    Habilita las API

  11. Crea una cuenta de servicio:

    1. En Cloud Console, ve a la página Crear cuenta de servicio.

      Ir a Crear cuenta de servicio
    2. Selecciona un proyecto
    3. Ingresa un nombre en el campo Nombre de cuenta de servicio. Cloud Console completa el campo ID de cuenta de servicio según este nombre.

      Opcional: en el campo Descripción de la cuenta de servicio, ingresa una descripción. Por ejemplo, Service account for quickstart.

    4. Haz clic en Crear y continuar.
    5. Haz clic en el campo Seleccionar una función.

      En Acceso rápido, haz clic en Básico y, luego, en Propietario.

    6. Haga clic en Continuar.
    7. Haz clic en Listo para terminar de crear la cuenta de servicio.

      No cierres la ventana del navegador. La usarás en la próxima tarea.

  12. Para crear una clave de cuenta de servicio, haz lo siguiente:

    1. En Cloud Console, haz clic en la dirección de correo electrónico de la cuenta de servicio que creaste.
    2. Haga clic en Claves.
    3. Haz clic en Agregar clave, luego haz clic en Crear clave nueva.
    4. Haga clic en Crear. Se descargará un archivo de claves JSON en tu computadora.
    5. Haga clic en Cerrar.
  13. Configura la variable de entorno GOOGLE_APPLICATION_CREDENTIALS en la ruta del archivo JSON que contiene la clave de tu cuenta de servicio. Esta variable solo se aplica a la sesión actual de shell. Por lo tanto, si abres una sesión nueva, deberás volver a configurar la variable.

  14. Instala e inicializa el SDK de Cloud. Elige una de las opciones de instalación. Es posible que debas establecer la propiedad project en el proyecto que usas para esta explicación.
  15. Ve a la IU web de Dataflow en Cloud Console. Esto abrirá el último proyecto al que accediste. Para cambiar a un proyecto diferente, haz clic en el nombre del proyecto en la parte superior de la IU web de Dataflow SQL y busca el proyecto que quieres usar.
    Ir a la IU web de Dataflow SQL

Crea fuentes de ejemplo

Si quieres seguir el ejemplo proporcionado en este instructivo, crea las siguientes fuentes y úsalas en los pasos del instructivo.

  • Un tema de Pub/Sub con el nombre transactions: una transmisión de datos de transacción que llega mediante una suscripción al tema de Pub/Sub. Los datos de cada transacción incluyen información sobre el producto comprado, el precio de oferta, y la ciudad y el estado donde se realizó la compra. Después de crear el tema de Pub/Sub, debes crear una secuencia de comandos que publique mensajes en tu tema. Ejecutarás esta secuencia de comandos en una sección posterior de este instructivo.
  • Una tabla de BigQuery llamada us_state_salesregions: una tabla que proporciona una asignación de estados a regiones de ventas. Antes de crear esta tabla, debes crear un conjunto de datos de BigQuery.

Asigna un esquema a tu tema de Pub/Sub

Asignar un esquema te permite ejecutar consultas de SQL en los datos del tema de Pub/Sub. Actualmente, Dataflow SQL espera que los mensajes en los temas de Pub/Sub se serialicen en formato JSON.

Para asignar un esquema al ejemplo de tema de Cloud Pub/Sub transactions:

  1. Crea un archivo de texto y asígnale el nombre transactions_schema.yaml. Copia y pega el texto de esquema siguiente en transactions_schema.yaml.
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: tr_time_str
    description: Transaction time string
    mode: NULLABLE
    type: STRING
  - column: first_name
    description: First name
    mode: NULLABLE
    type: STRING
  - column: last_name
    description: Last name
    mode: NULLABLE
    type: STRING
  - column: city
    description: City
    mode: NULLABLE
    type: STRING
  - column: state
    description: State
    mode: NULLABLE
    type: STRING
  - column: product
    description: Product
    mode: NULLABLE
    type: STRING
  - column: amount
    description: Amount of transaction
    mode: NULLABLE
    type: FLOAT
  1. Asigna el esquema con la herramienta de línea de comandos de gcloud.

    a. Actualiza la herramienta de gcloud con el comando siguiente Asegúrate de tener la versión 242.0.0 o posterior de la herramienta de gcloud.

      gcloud components update
    

    b. Ejecuta el comando siguiente en una ventana de línea de comandos. Reemplaza project-id por el ID del proyecto y path-to-file por la ruta de acceso a tu archivo transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    Para obtener más información sobre los parámetros del comando y los formatos de archivo de esquema permitidos, consulta la página de documentación de actualización de entradas de Data Catalog en Gcloud.

    c. Confirma que tu esquema se asignó de forma correcta al tema de Pub/Sub transactions(opcional). Reemplaza project-id con el ID del proyecto.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Busca fuentes de Pub/Sub

La IU de Dataflow SQL proporciona una forma de encontrar objetos de fuente de datos de Pub/Sub para cualquier proyecto al que tengas acceso, por lo que no es necesario que recuerdes sus nombres completos.

Para el ejemplo de este instructivo, busca el tema de Pub/Sub transactions que creaste:

  1. En el panel izquierdo, busca projectid=project-id transactions. Reemplaza project-id por el ID del proyecto.

    Panel de búsqueda de Data Catalog en el lugar de trabajo de Dataflow SQL.

Observa el esquema

  1. En el panel del explorador de la izquierda de la IU de Dataflow SQL, haz clic en Transacciones o busca un tema de Pub/Sub. Para ello, escribe projectid=project-id system=cloud_pubsub y selecciona el tema.
  2. En Schema (Esquema), podrás ver el esquema que asignaste al tema de Pub/Sub .

    Esquema asignado al tema, incluida una lista de nombres de campos y sus descripciones.

Crea una consulta de SQL

La IU de Dataflow SQL te permite crear consultas de SQL para ejecutar tus trabajos de Dataflow.

La consulta de SQL siguiente es una consulta de enriquecimiento de datos. Agrega un campo adicional, sales_region, a la transmisión de eventos de Pub/Sub (transactions), mediante una tabla de BigQuery (us_state_salesregions) que asigne estados a las regiones de ventas.

Copia y pega la consulta de SQL siguiente en el Editor de consultas. Reemplaza project-id por el ID del proyecto.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Cuando ingresas una consulta en la IU de Dataflow SQL, la consulta validador verifica la sintaxis de consulta. Si la consulta es válida, se muestra un ícono de marca de verificación verde. Si la consulta no es válida, se muestra un ícono de signo de exclamación rojo. Si la sintaxis de la consulta no es válida, se proporcionará información sobre lo que se debe corregir si haces clic en el ícono del validador.

En la captura de pantalla siguiente, se muestra la consulta válida en el Editor de consultas. El validador muestra una marca de verificación verde.

Espacio de trabajo de Dataflow SQL con la consulta del instructivo visible en el editor.

Crea un trabajo de Dataflow para ejecutar la consulta de SQL

Para ejecutar tu consulta de SQL, crea un trabajo de Dataflow desde la IU de Dataflow SQL.

  1. Arriba del Editor de consultas, haz clic en Crear trabajo.

  2. En el panel Crear trabajo de Dataflow que se abre a la derecha, en la opción Destino, selecciona BigQuery. Luego, en ID del conjunto de datos, selecciona dataflow_sql_tutorial y establece el Nombre de la tabla en sales.

    Crea un formulario de trabajo de Dataflow SQL.
  3. Dataflow elige automáticamente la configuración óptima para tu trabajo de Dataflow SQL, pero puedes expandir el menú de Parámetros opcionales para especificar manualmente las siguientes opciones de canalización (opcional):

    • Cantidad máxima de trabajadores
    • Zona
    • Correo electrónico de la cuenta de servicio
    • Tipo de máquina
    • Experimentos adicionales
    • Configuración de la dirección IP del trabajador
    • Red
    • Subred
  4. Haz clic en Create (Crear). Tu trabajo de Dataflow tardará unos minutos en comenzar a ejecutarse.

Visualiza el trabajo de Dataflow

Dataflow convierte tu consulta de SQL en una canalización de Apache Beam. En la IU web de Dataflow que se abrió en una pestaña del navegador nueva, puedes ver una representación gráfica de tu canalización.

Canalización de consulta de SQL que se muestra en la IU web de Dataflow.

Puedes hacer clic en las casillas para ver un desglose de las transformaciones que se generan en la canalización. Por ejemplo, si haces clic en el cuadro superior de la representación gráfica que tiene la etiqueta Run SQL Query (Ejecutar consulta de SQL), aparecerá un gráfico con las operaciones que se realizan en segundo plano.

Los dos cuadros superiores representan las dos entradas que uniste: el tema de Pub/Sub, transactions, y la tabla de BigQuery, us_state_salesregions.

El resultado de escritura de una unión de dos entradas se completa en 25 segundos.

Para ver la tabla de resultados con los resultados del trabajo, ve a la IU de BigQuery. En el panel de navegación izquierdo, debajo de tu proyecto, haz clic en el conjunto de datos dataflow_sql_tutorial que creaste. Luego, haz clic en la tabla de resultados sales. En la pestaña Vista previa, se muestran los contenidos de la tabla de resultados.

La tabla de vista previa de ventas contiene columnas para tr_time_str, first_name, last_name, city, state, product, amount y sales_region.

Visualiza los trabajos anteriores y edita tus consultas

La IU de Dataflow almacena trabajos y consultas anteriores en la página Trabajos.

Puedes usar la lista del historial de trabajos para ver las consultas de SQL anteriores. Por ejemplo, en caso de que quieras modificar tu consulta para agregar ventas por región de ventas cada 15 segundos. Usa la página Trabajos para acceder al trabajo en ejecución que iniciaste antes en el instructivo, copia la consulta de SQL y ejecuta otro trabajo con una consulta modificada.

  1. En la página Trabajos de Dataflow, haz clic en el trabajo que deseas editar.

  2. En la página Detalles del trabajo, busca la consulta de SQL en el panel derecho en Opciones de canalización. Busca la fila para queryString.

    La opción de canalización de trabajo llamada queryString.
  3. Copia y pega tu consulta de SQL en el Editor de consultas para agregar ventanas de saltos de tamaño constante. Si copias la siguiente consulta, reemplaza project-id por el ID de tu proyecto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. Haz clic en Crear trabajo para crear uno nuevo con la consulta modificada.

Limpia

Para evitar que se generen cargos en tu cuenta de facturación de Cloud por los recursos que se usaron en este instructivo, sigue estos pasos:

  1. Detén tu secuencia de comandos de publicación transactions_injector.py si aún se está ejecutando.

  2. Detén tus trabajos de Dataflow en ejecución. Ve a la IU web de Dataflow en Cloud Console.

    Ir a la IU web de Dataflow

    Para cada trabajo que creaste a partir de esta explicación, sigue estos pasos:

    1. Haz clic en el nombre del trabajo.

    2. En el panel Resumen del trabajo, haz clic en Detener trabajo. Aparecerá el cuadro de diálogo Detener trabajo con las opciones para detenerlo:

    3. Haz clic en Cancelar.

    4. Haz clic en Detener trabajo. El servicio detiene la transferencia y procesamiento de todos los datos lo antes posible. Debido a que Cancelar detiene el procesamiento de manera inmediata, es posible que pierdas los datos “en tránsito”. Detener un trabajo puede tardar unos minutos.

  3. Borra tu conjunto de datos de BigQuery. Ve a la IU web de BigQuery en Cloud Console.

    Ir a la IU web de BigQuery

    1. En el panel Explorador, en la sección Recursos, haz clic en el conjunto de datos dataflow_sql_tutorial que creaste.

    2. En el lado derecho del panel de detalles, haz clic en Borrar conjunto de datos. Con esta acción, se borra el conjunto de datos, la tabla y todos los datos.

    3. En el cuadro de diálogo Borrar conjunto de datos, ingresa el nombre del conjunto de datos (dataflow_sql_tutorial) y, luego, haz clic en Borrar para confirmar el comando de borrado.

  4. Borra tu tema de Pub/Sub. Ve a la página de temas de Pub/Sub en Cloud Console.

    Ir a la página de Cloud Pub/Sub

    1. Marca la casilla de verificación junto al tema transactions.

    2. Haz clic en Borrar para borrar el tema de forma definitiva.

    3. Ve a la página de suscripciones de Pub/Sub.

    4. Marca la casilla de verificación junto a las suscripciones a transactions restantes. Si tus trabajos ya no están en ejecución, es posible que no haya ninguna suscripción.

    5. Haz clic en Borrar para borrar las suscripciones de manera permanente.

  5. Borra el bucket de etapa de pruebas de Dataflow en Cloud Storage. En Cloud Console, ve al navegador de Cloud Storage.

    Ir al navegador de Cloud Storage

    1. Marca la casilla de verificación junto al bucket de etapa de pruebas de Dataflow.

    2. Haz clic en Borrar para borrar el bucket de manera permanente.

¿Qué sigue?