Crea consultas continuas

En este documento, se describe cómo ejecutar una consulta continua en BigQuery.

Las consultas continuas de BigQuery son instrucciones de SQL que se ejecutan de forma continua. Las consultas continuas te permiten analizar datos entrantes en BigQuery en tiempo real y, luego, exportar los resultados a Bigtable o Pub/Sub, o escribir los resultados en una tabla de BigQuery.

Elige un tipo de cuenta

Puedes crear y ejecutar un trabajo de consulta continua con una cuenta de usuario o puedes crear un trabajo de consulta continua con una cuenta de usuario y, luego, ejecutarlo con una cuenta de servicio. Debes usar una cuenta de servicio para ejecutar una consulta continua que exporte los resultados a un tema de Pub/Sub.

Cuando usas una cuenta de usuario, una consulta continua se ejecuta durante dos días. Cuando usas una cuenta de servicio, se ejecuta una consulta continua hasta que se cancela de forma explícita. Para obtener más información, consulta Autorización.

Permisos necesarios

En esta sección, se describen los permisos que necesitas para crear y ejecutar una consulta continua. Como alternativa a los roles de Identity and Access Management (IAM) mencionados, puedes obtener los permisos necesarios a través de los roles personalizados.

Permisos cuando se usa una cuenta de usuario

En esta sección, se proporciona información sobre los roles y los permisos necesarios para crear y ejecutar una consulta continua mediante una cuenta de usuario.

Para crear un trabajo en BigQuery, la cuenta de usuario debe tener el permiso bigquery.jobs.create de IAM. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.jobs.create:

Para exportar datos de una tabla de BigQuery, la cuenta de usuario debe tener el permiso bigquery.tables.export de IAM. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.tables.export:

Para actualizar datos en una tabla de BigQuery, la cuenta de usuario debe tener el permiso de IAM bigquery.tables.updateData. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.tables.updateData:

Si la cuenta de usuario debe habilitar las APIs necesarias para tu caso de uso de la consulta continua, la cuenta de usuario debe tener el rol de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin).

Permisos cuando se usa una cuenta de servicio

En esta sección, se proporciona información sobre los roles y permisos que requiere la cuenta de usuario que crea la consulta continua y la cuenta de servicio que ejecuta la consulta continua.

Permisos de la cuenta de usuario

Para crear un trabajo en BigQuery, la cuenta de usuario debe tener el permiso bigquery.jobs.create de IAM. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.jobs.create:

Para enviar un trabajo que se ejecute con una cuenta de servicio, la cuenta de usuario debe tener el rol de usuario de la cuenta de servicio (roles/iam.serviceAccountUser). Si usas la misma cuenta de usuario para crear la cuenta de servicio, la cuenta de usuario debe tener el rol de administrador de cuentas de servicio (roles/iam.serviceAccountAdmin). Para obtener información sobre cómo limitar el acceso de un usuario a una sola cuenta de servicio, en lugar de a todas las cuentas de servicio de un proyecto, consulta Otorga un solo rol.

Si la cuenta de usuario debe habilitar las APIs necesarias para tu caso de uso de la consulta continua, la cuenta de usuario debe tener el rol de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin).

Permisos de las cuentas de servicio

Para exportar datos de una tabla de BigQuery, la cuenta de servicio debe tener el permiso bigquery.tables.export de IAM. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.tables.export:

Para actualizar datos en una tabla de BigQuery, la cuenta de servicio debe tener el permiso de IAM bigquery.tables.updateData. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.tables.updateData:

Antes de comenzar

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Crea una reserva

Crea una reserva de la edición Enterprise o Enterprise Plus y, luego, crea una asignación de reserva con un tipo de trabajo CONTINUOUS.

Exporta a Pub/Sub

Se requieren APIs, permisos de IAM y recursos de Google Cloud adicionales para exportar datos a Pub/Sub. Para obtener más información, consulta Exporta datos a Pub/Sub.

Incorpora atributos personalizados como metadatos en mensajes de Pub/Sub

Puedes usar los atributos de Pub/Sub para proporcionar información adicional sobre el mensaje, como su prioridad, origen, destino o metadatos adicionales. También puedes usar atributos para filtrar mensajes en la suscripción.

Dentro de un resultado de consulta continua, si una columna se llama _ATTRIBUTES, sus valores se copian en los atributos del mensaje de Pub/Sub. Los campos proporcionados dentro de _ATTRIBUTES se usan como claves de atributos.

La columna _ATTRIBUTES debe ser del tipo JSON, en el formato ARRAY<STRUCT<STRING, STRING>> o STRUCT<STRING>.

Para ver un ejemplo, consulta Exporta datos a un tema de Pub/Sub.

Exporta a Bigtable

Se requieren APIs, permisos de IAM y recursos de Google Cloud adicionales para exportar datos a Bigtable. Para obtener más información, consulta Exporta datos a Bigtable.

Escribe datos en una tabla de BigQuery

Puedes escribir datos en una tabla de BigQuery con una declaración INSERT.

Usa funciones de IA

Se requieren APIs adicionales, permisos de IAM y recursos de Google Cloud para usar una función de IA compatible en una consulta continua. Para obtener más información, consulta uno de los siguientes temas, según tu caso de uso:

Cuando usas una función de IA en una consulta continua, considera si el resultado de la consulta permanecerá dentro de la cuota de la función. Si excedes la cuota, es posible que debas controlar por separado los registros que no se procesan.

Ejecuta una consulta continua con una cuenta de usuario

En esta sección, se describe cómo ejecutar una consulta continua mediante una cuenta de usuario. Una vez que se ejecuta la consulta continua, puedes cerrar la aplicación, la ventana de la terminal o la consola de Google Cloud sin interrumpir la ejecución de la consulta.

Sigue estos pasos para ejecutar una consulta continua:

Console

  1. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  2. En el editor de consultas, haz clic en Más.

  3. En la sección Elige el modo de consulta, elige Consulta continua.

  4. Haz clic en Confirmar.

  5. En el editor de consultas, escribe la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.

  6. Haz clic en Ejecutar.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. En Cloud Shell, ejecuta la consulta continua con el comando bq query con la marca --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Reemplaza QUERY por la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.

API

Ejecuta la consulta continua con una llamada al método jobs.insert. Debes configurar el campo continuous como true en el JobConfigurationQuery del recurso Job que pasas dentro.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Reemplaza lo siguiente:

  • PROJECT_ID: el ID de tu proyecto
  • QUERY: Es la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.

Ejecuta una consulta continua con una cuenta de servicio

En esta sección, se describe cómo ejecutar una consulta continua mediante una cuenta de servicio. Una vez que se ejecuta la consulta continua, puedes cerrar la aplicación, la ventana de la terminal o la consola de Google Cloud sin interrumpir la ejecución de la consulta.

Sigue estos pasos para usar una cuenta de servicio a fin de ejecutar una consulta continua:

Console

  1. Crea una cuenta de servicio.
  2. Otorga permisos obligatorios a la cuenta de servicio:
  3. En la consola de Google Cloud, ve a la página de BigQuery.

    Ir a BigQuery

  4. En el editor de consultas, haz clic en Más.

  5. En la sección Elige el modo de consulta, elige Consulta continua.

  6. Haz clic en Confirmar.

  7. En el editor de consultas, haz clic en Más > Configuración de consulta.

  8. En la sección Consulta continua, usa la casilla Cuenta de servicio para seleccionar la cuenta de servicio que creaste.

  9. Haz clic en Guardar.

  10. En el editor de consultas, escribe la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.

  11. Haz clic en Ejecutar.

bq

  1. Crea una cuenta de servicio.
  2. Otorga permisos obligatorios a la cuenta de servicio:
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. En la línea de comandos, ejecuta la consulta continua con el comando bq query con las siguientes marcas:

    • Establece la marca --continuous en true para que la consulta sea continua.
    • Usa la marca --connection_property para especificar una cuenta de servicio para usar.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Reemplaza lo siguiente:

    • PROJECT_ID: el ID de tu proyecto
    • SERVICE_ACCOUNT_EMAIL: Es el correo electrónico de la cuenta de servicio. Puedes obtener el correo electrónico de la cuenta de servicio en la página Cuentas de servicio de la consola de Google Cloud.
    • QUERY: Es la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.

API

  1. Crea una cuenta de servicio.
  2. Otorga permisos obligatorios a la cuenta de servicio:
  3. Ejecuta la consulta continua con una llamada al método jobs.insert. Configura los siguientes campos en el recurso JobConfigurationQuery del recurso Job que pasas:

    • Establece el campo continuous en true para que la consulta sea continua.
    • Usa el campo connection_property para especificar la cuenta de servicio que se usará.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Reemplaza lo siguiente:

    • PROJECT_ID: el ID de tu proyecto
    • QUERY: Es la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.
    • SERVICE_ACCOUNT_EMAIL: Es el correo electrónico de la cuenta de servicio. Puedes obtener el correo electrónico de la cuenta de servicio en la página Cuentas de servicio de la consola de Google Cloud.

Ejemplos

En los siguientes ejemplos de SQL, se muestran casos de uso comunes para las consultas continuas.

Exporta datos a un tema de Pub/Sub

En el siguiente ejemplo, se muestra una consulta continua que filtra datos de una tabla de BigQuery que recibe información de transmisión de viajes en taxi y los publica en un tema de Pub/Sub en tiempo real con atributos del mensaje:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Exporta datos a una tabla de Bigtable

En el siguiente ejemplo, se muestra una consulta continua que filtra los datos de una tabla de BigQuery que recibe información de transmisión de viajes en taxi y, luego, exporta los datos a la tabla de Bigtable en tiempo real:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Escribe datos en una tabla de BigQuery

En el siguiente ejemplo, se muestra una consulta continua que filtra y transforma datos de una tabla de BigQuery que recibe información de viajes en taxi de transmisión y, luego, escribe los datos en otra tabla de BigQuery en tiempo real. Esto hace que los datos estén disponibles para un análisis posterior.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Procesa datos mediante un modelo de Vertex AI

En el siguiente ejemplo, se muestra una consulta continua que usa un modelo de Vertex AI para generar un anuncio de pasajeros de taxi según su latitud y longitud actuales y, luego, exporta los resultados a un tema de Pub/Sub en tiempo real:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Inicia una consulta continua desde un momento en particular

Cuando inicias una consulta continua, esta procesa todas las filas de la tabla desde la que seleccionas y, luego, procesa las filas nuevas a medida que llegan. Si quieres omitir el procesamiento de algunos o todos los datos existentes, puedes usar la función del historial de cambios APPENDS para comenzar a procesar desde un momento determinado. .

En el siguiente ejemplo, se muestra cómo iniciar una consulta continua desde un momento determinado con la función APPENDS:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Modifica el SQL de una consulta continua

No puedes actualizar el SQL que se usa en una consulta continua mientras se ejecuta el trabajo de consulta continua. Debes cancelar el trabajo de consulta continua, modificar el SQL y, luego, iniciar un nuevo trabajo de consulta continua desde el punto en el que detuviste el trabajo de consulta continua original.

Sigue estos pasos para modificar el SQL usado en una consulta continua:

  1. Visualiza los detalles del trabajo del trabajo de consulta continua que deseas actualizar y anota el ID del trabajo.
  2. Si es posible, detén la recopilación de datos ascendentes. Si no puedes hacerlo, es posible que obtengas una duplicación de datos cuando se reinicie la consulta continua.
  3. Cancela la consulta continua que deseas modificar.
  4. Obtén el valor end_time para el trabajo de consulta continua original con la vista JOBS INFORMATION_SCHEMA:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Reemplaza lo siguiente:

    • PROJECT_ID: el ID de tu proyecto
    • REGION: la región que usa tu proyecto.
    • JOB_ID: el ID del trabajo de consulta continua que identificaste en el paso 1.
  5. Modifica la instrucción de SQL de consulta continua para iniciar la consulta continua desde un momento determinado, con el valor end_time que recuperaste en el paso 5 como el valor inicial punto.

  6. Modifica la instrucción de SQL de consulta continua para reflejar los cambios necesarios.

  7. Ejecuta la consulta continua modificada.

Cancela una consulta continua

Puedes cancelar un trabajo de consulta continua como cualquier otro trabajo. La consulta puede tardar hasta un minuto en dejar de ejecutarse después de que se cancela el trabajo.

¿Qué sigue?