Une datos de transmisión con Dataflow SQL


En este instructivo, se muestra cómo usar Dataflow SQL para unir una transmisión de datos de Pub/Sub con datos de una tabla de BigQuery.

Objetivos

En este instructivo, realizarás lo siguiente:

  • Escribirás una consulta de SQL de Dataflow que una los datos de transmisión de Pub/Sub con los datos de la tabla de BigQuery.
  • Implementarás un trabajo de Dataflow desde la IU de Dataflow SQL.

Costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Antes de comenzar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  14. Instala e inicializa la CLI de gcloud Elige una de las opciones de instalación. Es posible que debas establecer la propiedad project en el proyecto que usas para esta explicación.
  15. Ve a la IU web de Dataflow SQL en la consola de Google Cloud. Esto abrirá el último proyecto al que accediste. Para cambiar a un proyecto diferente, haz clic en el nombre del proyecto en la parte superior de la IU web de Dataflow SQL y busca el proyecto que quieres usar.
    Ir a la IU web de Dataflow SQL

Crea fuentes de ejemplo

Si quieres seguir el ejemplo proporcionado en este instructivo, crea las siguientes fuentes y úsalas en los pasos del instructivo.

  • Un tema de Pub/Sub con el nombre transactions: una transmisión de datos de transacción que llega mediante una suscripción al tema de Pub/Sub. Los datos de cada transacción incluyen información sobre el producto comprado, el precio de oferta, y la ciudad y el estado donde se realizó la compra. Después de crear el tema de Pub/Sub, debes crear una secuencia de comandos que publique mensajes en tu tema. Ejecutarás esta secuencia de comandos en una sección posterior de este instructivo.
  • Una tabla de BigQuery llamada us_state_salesregions: una tabla que proporciona una asignación de estados a regiones de ventas. Antes de crear esta tabla, debes crear un conjunto de datos de BigQuery.

Asigna un esquema a tu tema de Pub/Sub

Asignar un esquema te permite ejecutar consultas de SQL en los datos del tema de Pub/Sub. Actualmente, Dataflow SQL espera que los mensajes en los temas de Pub/Sub se serialicen en formato JSON.

Para asignar un esquema al ejemplo de tema de Cloud Pub/Sub transactions:

  1. Crea un archivo de texto y asígnale el nombre transactions_schema.yaml. Copia y pega el texto de esquema siguiente en transactions_schema.yaml.

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Asigna el esquema con Google Cloud CLI.

    a. Actualiza la CLI de gcloud con el siguiente comando Asegúrate de que la versión de la CLI de gcloud sea 242.0.0 o superior.

      gcloud components update

    b. Ejecuta el comando siguiente en una ventana de línea de comandos. Reemplaza project-id por el ID del proyecto y path-to-file por la ruta de acceso a tu archivo transactions_schema.yaml.

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml

    Para obtener más información sobre los parámetros del comando y los formatos de archivo de esquema permitidos, consulta la página de documentación de actualización de entradas de Data Catalog en Gcloud.

    c. Confirma que tu esquema se asignó de forma correcta al tema de Pub/Sub transactions(opcional). Reemplaza project-id con el ID del proyecto.

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'

Busca fuentes de Pub/Sub

La IU de Dataflow SQL proporciona una forma de encontrar objetos de fuente de datos de Pub/Sub para cualquier proyecto al que tengas acceso, por lo que no es necesario que recuerdes sus nombres completos.

Para el ejemplo de este instructivo, navega hacia el editor de Dataflow SQL y busca el tema transactions de Pub/Sub que creaste:

  1. Navega hasta el lugar de trabajo de SQL.

  2. En el panel Editor de Dataflow SQL, en la barra de búsqueda, busca projectid=project-id transactions. Reemplaza project-id con el ID de tu proyecto.

    Panel de búsqueda de Data Catalog en el lugar de trabajo de Dataflow SQL.

Observa el esquema

  1. En el panel Editor de Dataflow SQL de la IU de Dataflow SQL, haz clic en transacciones o escribe projectid=project-id system=cloud_pubsub para buscar un tema de Pub/Sub, y selecciona el tema.
  2. En Esquema, podrás ver el esquema que asignaste al tema de Pub/Sub.

    Esquema asignado al tema, incluida una lista de nombres de campos y sus descripciones.

Crea una consulta de SQL

La IU de Dataflow SQL te permite crear consultas de SQL para ejecutar tus trabajos de Dataflow.

La consulta de SQL siguiente es una consulta de enriquecimiento de datos. Agrega un campo adicional, sales_region, a la transmisión de eventos de Pub/Sub (transactions), mediante una tabla de BigQuery (us_state_salesregions) que asigne estados a las regiones de ventas.

Copia y pega la consulta de SQL siguiente en el Editor de consultas. Reemplaza project-id por el ID del proyecto.

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Cuando ingresas una consulta en la IU de Dataflow SQL, la consulta validador verifica la sintaxis de consulta. Si la consulta es válida, se muestra un ícono de marca de verificación verde. Si la consulta no es válida, se muestra un ícono de signo de exclamación rojo. Si la sintaxis de la consulta no es válida, se proporcionará información sobre lo que se debe corregir si haces clic en el ícono del validador.

En la captura de pantalla siguiente, se muestra la consulta válida en el Editor de consultas. El validador muestra una marca de verificación verde.

Espacio de trabajo de Dataflow SQL con la consulta del instructivo visible en el editor.

Crea un trabajo de Dataflow para ejecutar la consulta de SQL

Para ejecutar tu consulta de SQL, crea un trabajo de Dataflow desde la IU de Dataflow SQL.

  1. En el Editor de consultas, haz clic en Crear trabajo.

  2. En el panel Crear trabajo de Dataflow que se abre, haz lo siguiente:

    • En Destino, selecciona BigQuery.
    • En ID del conjunto de datos, selecciona dataflow_sql_tutorial.
    • En Nombre de la tabla, ingresa sales.
    Crea un formulario de trabajo de Dataflow SQL.
  3. Opcional: Dataflow elige automáticamente la configuración óptima para tu trabajo de Dataflow SQL, pero puedes expandir el menú Parámetros opcionales para especificar manualmente las siguientes opciones de canalización:

    • Cantidad máxima de trabajadores
    • Zona
    • Correo electrónico de la cuenta de servicio
    • Tipo de máquina
    • Experimentos adicionales
    • Configuración de la dirección IP del trabajador
    • Red
    • Subred
  4. Haz clic en Crear. Tu trabajo de Dataflow tarda unos minutos en comenzar a ejecutarse.

Visualiza el trabajo de Dataflow

Dataflow convierte tu consulta de SQL en una canalización de Apache Beam. Haz clic en Ver trabajo para abrir la IU web de Dataflow, en la que podrás ver una representación gráfica de tu canalización.

Canalización de consulta de SQL que se muestra en la IU web de Dataflow.

Para ver un desglose de las transformaciones que se generan en la canalización, haz clic en las casillas. Por ejemplo, si haces clic en el primer cuadro de la representación gráfica que tiene la etiqueta Ejecutar consulta en SQL, aparecerá un gráfico con las operaciones que se realizan en segundo plano.

Los dos primeros cuadros representan las dos entradas que uniste: el tema de Pub/Sub, transactions, y la tabla de BigQuery, us_state_salesregions.

El resultado de escritura de una unión de dos entradas se completa en 25 segundos.

Para ver la tabla de resultados con los resultados del trabajo, ve a la IU de BigQuery. En el panel Explorador, en tu proyecto, haz clic en el conjunto de datos dataflow_sql_tutorial que creaste. Luego, haz clic en la tabla de resultados, sales. En la pestaña Vista previa, se muestran los contenidos de la tabla de resultados.

La tabla de vista previa de ventas contiene columnas para tr_time_str, first_name, last_name, city, state, product, amount y sales_region.

Visualiza los trabajos anteriores y edita tus consultas

La IU de Dataflow almacena trabajos y consultas anteriores en la página Trabajos.

Puedes usar la lista del historial de trabajos para ver las consultas de SQL anteriores. Por ejemplo, en caso de que quieras modificar tu consulta para agregar ventas por región de ventas cada 15 segundos. Usa la página Trabajos para acceder al trabajo en ejecución que iniciaste antes en el instructivo, copia la consulta de SQL y ejecuta otro trabajo con una consulta modificada.

  1. En la página Trabajos de Dataflow, haz clic en el trabajo que deseas editar.

  2. En la página Detalles del trabajo, en el panel Información del trabajo, en Opciones de canalización, busca la consulta en SQL. Busca la fila para queryString.

    La opción de canalización de trabajo llamada queryString.
  3. Copia y pega la siguiente consulta en SQL en el Editor de Dataflow SQL en el lugar de trabajo de SQL para agregar períodos de saltos de tamaño constante. Reemplaza project-id con el ID de tu proyecto.

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
  4. Haz clic en Crear trabajo para crear uno nuevo con la consulta modificada.

Limpia

Para evitar que se generen cargos en tu cuenta de facturación de Cloud por los recursos que se usaron en este instructivo, sigue estos pasos:

  1. Detén tu secuencia de comandos de publicación transactions_injector.py si aún se está ejecutando.

  2. Detén tus trabajos de Dataflow en ejecución. Ve a la IU web de Dataflow en la consola de Google Cloud.

    Ir a la IU web de Dataflow

    Para cada trabajo que creaste a partir de esta explicación, sigue estos pasos:

    1. Haz clic en el nombre del trabajo.

    2. En la página Detalles del trabajo, haz clic en Detener. Aparecerá el cuadro de diálogo Detener trabajo con las opciones para detenerlo:

    3. Selecciona Cancelar.

    4. Haz clic en Detener trabajo. El servicio detiene la transferencia y procesamiento de todos los datos lo antes posible. Debido a que Cancelar detiene el procesamiento de manera inmediata, es posible que pierdas los datos “en tránsito”. Detener un trabajo puede tardar unos minutos.

  3. Borra tu conjunto de datos de BigQuery. Ve a la IU web de BigQuery en la consola de Google Cloud.

    Ir a la IU web de BigQuery

    1. En el panel Explorador, en la sección Recursos, haz clic en el conjunto de datos dataflow_sql_tutorial que creaste.

    2. En el panel de detalles, haz clic en Borrar. Se abrirá un cuadro de diálogo de confirmación.

    3. En el cuadro de diálogo Borrar conjunto de datos, ingresa delete para confirmar el comando de eliminación y, luego, haz clic en Borrar.

  4. Borra tu tema de Pub/Sub. Ve a la página Temas de Pub/Sub en la consola de Google Cloud.

    Ir a la página de temas de Cloud Pub/Sub

    1. Selecciona el tema transactions.

    2. Haz clic en Borrar para borrar el tema de forma definitiva. Se abrirá un cuadro de diálogo de confirmación.

    3. En el cuadro de diálogo Borrar tema, ingresa delete para confirmar el comando de eliminación y, luego, haz clic en Borrar.

    4. Ve a la página de suscripciones de Pub/Sub.

    5. Selecciona las suscripciones a transactions restantes. Si tus trabajos ya no están en ejecución, es posible que no haya ninguna suscripción.

    6. Haz clic en Borrar para borrar las suscripciones de manera permanente. En el diálogo de confirmación, haz clic en Borrar.

  5. Borra el bucket de etapa de pruebas de Dataflow en Cloud Storage. Ve a la página Buckets de Cloud Storage en la consola de Google Cloud.

    Ir a Buckets

    1. Selecciona el bucket de etapa de pruebas de Dataflow.

    2. Haz clic en Borrar para borrar el depósito de manera permanente. Se abrirá un cuadro de diálogo de confirmación.

    3. En el cuadro de diálogo Borrar bucket, ingresa DELETE para confirmar el comando de eliminación y, luego, haz clic en Borrar.

¿Qué sigue?