Crea consultas continuas
En este documento, se describe cómo ejecutar una consulta continua en BigQuery.
Las consultas continuas de BigQuery son instrucciones de SQL que se ejecutan de forma continua. Las consultas continuas te permiten analizar datos entrantes en BigQuery en tiempo real y, luego, exportar los resultados a Bigtable o Pub/Sub, o escribir los resultados en una tabla de BigQuery.
Elige un tipo de cuenta
Puedes crear y ejecutar un trabajo de consulta continua con una cuenta de usuario o puedes crear un trabajo de consulta continua con una cuenta de usuario y, luego, ejecutarlo con una cuenta de servicio. Debes usar una cuenta de servicio para ejecutar una consulta continua que exporte los resultados a un tema de Pub/Sub.
Cuando usas una cuenta de usuario, una consulta continua se ejecuta durante dos días. Cuando usas una cuenta de servicio, se ejecuta una consulta continua hasta que se cancela de forma explícita. Para obtener más información, consulta Autorización.
Permisos necesarios
En esta sección, se describen los permisos que necesitas para crear y ejecutar una consulta continua. Como alternativa a los roles de Identity and Access Management (IAM) mencionados, puedes obtener los permisos necesarios a través de los roles personalizados.
Permisos cuando se usa una cuenta de usuario
En esta sección, se proporciona información sobre los roles y los permisos necesarios para crear y ejecutar una consulta continua mediante una cuenta de usuario.
Para crear un trabajo en BigQuery, la cuenta de usuario debe tener el permiso
bigquery.jobs.create
de IAM. Cada uno de los siguientes roles de IAM otorga el permiso bigquery.jobs.create
:
- Usuario de BigQuery (
roles/bigquery.user
) - Usuario de trabajo de BigQuery (
roles/bigquery.jobUser
) - Administrador de BigQuery (
roles/bigquery.admin
)
Para exportar datos de una tabla de BigQuery, la cuenta de usuario debe tener el permiso bigquery.tables.export
de IAM. Cada uno de los
siguientes roles de IAM otorga el permiso
bigquery.tables.export
:
- Visualizador de datos de BigQuery (
roles/bigquery.dataViewer
) - Editor de datos de BigQuery (
roles/bigquery.dataEditor
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Administrador de BigQuery (
roles/bigquery.admin
)
Para actualizar datos en una tabla de BigQuery, la cuenta de usuario debe tener el permiso de IAM bigquery.tables.updateData
. Cada uno de los
siguientes roles de IAM otorga el permiso
bigquery.tables.updateData
:
- Editor de datos de BigQuery (
roles/bigquery.dataEditor
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Administrador de BigQuery (
roles/bigquery.admin
)
Si la cuenta de usuario debe habilitar las APIs necesarias para tu
caso de uso de la consulta continua, la cuenta de usuario debe tener el rol de
administrador de Service Usage (roles/serviceusage.serviceUsageAdmin
).
Permisos cuando se usa una cuenta de servicio
En esta sección, se proporciona información sobre los roles y permisos que requiere la cuenta de usuario que crea la consulta continua y la cuenta de servicio que ejecuta la consulta continua.
Permisos de la cuenta de usuario
Para crear un trabajo en BigQuery, la cuenta de usuario debe tener el permiso
bigquery.jobs.create
de IAM. Cada uno de los siguientes
roles de IAM otorga el permiso bigquery.jobs.create
:
- Usuario de BigQuery (
roles/bigquery.user
) - Usuario de trabajo de BigQuery (
roles/bigquery.jobUser
) - Administrador de BigQuery (
roles/bigquery.admin
)
Para enviar un trabajo que se ejecute con una cuenta de servicio, la cuenta de usuario debe tener el
rol de usuario de la cuenta de servicio (roles/iam.serviceAccountUser
). Si usas la misma cuenta de usuario para crear la cuenta de servicio,
la cuenta de usuario debe tener el rol de
administrador de cuentas de servicio (roles/iam.serviceAccountAdmin
). Para obtener información sobre cómo limitar el acceso de un usuario a una sola cuenta de servicio,
en lugar de a todas las cuentas de servicio de un proyecto,
consulta Otorga un solo rol.
Si la cuenta de usuario debe habilitar las APIs necesarias para tu
caso de uso de la consulta continua, la cuenta de usuario debe tener el rol de
administrador de Service Usage (roles/serviceusage.serviceUsageAdmin
).
Permisos de las cuentas de servicio
Para exportar datos de una tabla de BigQuery, la cuenta de servicio debe
tener el permiso bigquery.tables.export
de IAM. Cada uno de los
siguientes roles de IAM otorga el permiso
bigquery.tables.export
:
- Visualizador de datos de BigQuery (
roles/bigquery.dataViewer
) - Editor de datos de BigQuery (
roles/bigquery.dataEditor
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Administrador de BigQuery (
roles/bigquery.admin
)
bigquery.tables.updateData
. Cada uno de los
siguientes roles de IAM otorga el permiso
bigquery.tables.updateData
:
- Editor de datos de BigQuery (
roles/bigquery.dataEditor
) - Propietario de datos de BigQuery (
roles/bigquery.dataOwner
) - Administrador de BigQuery (
roles/bigquery.admin
)
Antes de comenzar
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Crea una reserva
Crea una reserva de la edición Enterprise o Enterprise Plus y, luego, crea una asignación de reserva con un tipo de trabajo CONTINUOUS
.
Exporta a Pub/Sub
Se requieren APIs, permisos de IAM y recursos de Google Cloud adicionales para exportar datos a Pub/Sub. Para obtener más información, consulta Exporta datos a Pub/Sub.
Incorpora atributos personalizados como metadatos en mensajes de Pub/Sub
Puedes usar los atributos de Pub/Sub para proporcionar información adicional sobre el mensaje, como su prioridad, origen, destino o metadatos adicionales. También puedes usar atributos para filtrar mensajes en la suscripción.
Dentro de un resultado de consulta continua, si una columna se llama _ATTRIBUTES
,
sus valores se copian en los atributos del mensaje de Pub/Sub.
Los campos proporcionados dentro de _ATTRIBUTES
se usan como claves de atributos.
La columna _ATTRIBUTES
debe ser del tipo JSON
, en el formato
ARRAY<STRUCT<STRING, STRING>>
o STRUCT<STRING>
.
Para ver un ejemplo, consulta Exporta datos a un tema de Pub/Sub.
Exporta a Bigtable
Se requieren APIs, permisos de IAM y recursos de Google Cloud adicionales para exportar datos a Bigtable. Para obtener más información, consulta Exporta datos a Bigtable.
Escribe datos en una tabla de BigQuery
Puedes escribir datos en una tabla de BigQuery con una declaración INSERT
.
Usa funciones de IA
Se requieren APIs adicionales, permisos de IAM y recursos de Google Cloud para usar una función de IA compatible en una consulta continua. Para obtener más información, consulta uno de los siguientes temas, según tu caso de uso:
- Genera texto con la función
ML.GENERATE_TEXT
- Genera embeddings de texto con la función
ML.GENERATE_EMBEDDING
- Comprende texto con la función
ML.UNDERSTAND_TEXT
- Traduce texto con la función
ML.TRANSLATE
Cuando usas una función de IA en una consulta continua, considera si el resultado de la consulta permanecerá dentro de la cuota de la función. Si excedes la cuota, es posible que debas controlar por separado los registros que no se procesan.
Ejecuta una consulta continua con una cuenta de usuario
En esta sección, se describe cómo ejecutar una consulta continua mediante una cuenta de usuario. Una vez que se ejecuta la consulta continua, puedes cerrar la aplicación, la ventana de la terminal o la consola de Google Cloud sin interrumpir la ejecución de la consulta.
Sigue estos pasos para ejecutar una consulta continua:
Console
En la consola de Google Cloud, ve a la página de BigQuery.
En el editor de consultas, haz clic en Más.
En la sección Elige el modo de consulta, elige Consulta continua.
Haz clic en Confirmar.
En el editor de consultas, escribe la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.
Haz clic en Ejecutar.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
En Cloud Shell, ejecuta la consulta continua con el comando
bq query
con la marca--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Reemplaza
QUERY
por la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.
API
Ejecuta la consulta continua con una llamada al
método jobs.insert
.
Debes configurar el campo continuous
como true
en el JobConfigurationQuery
del recurso Job
que pasas dentro.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Reemplaza lo siguiente:
PROJECT_ID
: el ID de tu proyectoQUERY
: Es la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.
Ejecuta una consulta continua con una cuenta de servicio
En esta sección, se describe cómo ejecutar una consulta continua mediante una cuenta de servicio. Una vez que se ejecuta la consulta continua, puedes cerrar la aplicación, la ventana de la terminal o la consola de Google Cloud sin interrumpir la ejecución de la consulta.
Sigue estos pasos para usar una cuenta de servicio a fin de ejecutar una consulta continua:
Console
- Crea una cuenta de servicio.
- Otorga permisos obligatorios a la cuenta de servicio:
En la consola de Google Cloud, ve a la página de BigQuery.
En el editor de consultas, haz clic en Más.
En la sección Elige el modo de consulta, elige Consulta continua.
Haz clic en Confirmar.
En el editor de consultas, haz clic en Más > Configuración de consulta.
En la sección Consulta continua, usa la casilla Cuenta de servicio para seleccionar la cuenta de servicio que creaste.
Haz clic en Guardar.
En el editor de consultas, escribe la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.
Haz clic en Ejecutar.
bq
- Crea una cuenta de servicio.
- Otorga permisos obligatorios a la cuenta de servicio:
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
En la línea de comandos, ejecuta la consulta continua con el comando
bq query
con las siguientes marcas:- Establece la marca
--continuous
entrue
para que la consulta sea continua. - Usa la marca
--connection_property
para especificar una cuenta de servicio para usar.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Reemplaza lo siguiente:
PROJECT_ID
: el ID de tu proyectoSERVICE_ACCOUNT_EMAIL
: Es el correo electrónico de la cuenta de servicio. Puedes obtener el correo electrónico de la cuenta de servicio en la página Cuentas de servicio de la consola de Google Cloud.QUERY
: Es la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.
- Establece la marca
API
- Crea una cuenta de servicio.
- Otorga permisos obligatorios a la cuenta de servicio:
Ejecuta la consulta continua con una llamada al método
jobs.insert
. Configura los siguientes campos en el recursoJobConfigurationQuery
del recursoJob
que pasas:- Establece el campo
continuous
entrue
para que la consulta sea continua. - Usa el campo
connection_property
para especificar la cuenta de servicio que se usará.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Reemplaza lo siguiente:
PROJECT_ID
: el ID de tu proyectoQUERY
: Es la instrucción de SQL para la consulta continua. La instrucción de SQL solo debe contener operaciones compatibles.SERVICE_ACCOUNT_EMAIL
: Es el correo electrónico de la cuenta de servicio. Puedes obtener el correo electrónico de la cuenta de servicio en la página Cuentas de servicio de la consola de Google Cloud.
- Establece el campo
Ejemplos
En los siguientes ejemplos de SQL, se muestran casos de uso comunes para las consultas continuas.
Exporta datos a un tema de Pub/Sub
En el siguiente ejemplo, se muestra una consulta continua que filtra datos de una tabla de BigQuery que recibe información de transmisión de viajes en taxi y los publica en un tema de Pub/Sub en tiempo real con atributos del mensaje:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Exporta datos a una tabla de Bigtable
En el siguiente ejemplo, se muestra una consulta continua que filtra los datos de una tabla de BigQuery que recibe información de transmisión de viajes en taxi y, luego, exporta los datos a la tabla de Bigtable en tiempo real:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Escribe datos en una tabla de BigQuery
En el siguiente ejemplo, se muestra una consulta continua que filtra y transforma datos de una tabla de BigQuery que recibe información de viajes en taxi de transmisión y, luego, escribe los datos en otra tabla de BigQuery en tiempo real. Esto hace que los datos estén disponibles para un análisis posterior.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Procesa datos mediante un modelo de Vertex AI
En el siguiente ejemplo, se muestra una consulta continua que usa un modelo de Vertex AI para generar un anuncio de pasajeros de taxi según su latitud y longitud actuales y, luego, exporta los resultados a un tema de Pub/Sub en tiempo real:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Inicia una consulta continua desde un momento en particular
Cuando inicias una consulta continua, esta procesa todas las filas de la tabla desde la que seleccionas y, luego, procesa las filas nuevas a medida que llegan. Si quieres omitir el procesamiento de algunos o todos los datos existentes, puedes usar la función del historial de cambios APPENDS
para comenzar a procesar desde un momento determinado. .
En el siguiente ejemplo, se muestra cómo iniciar una consulta continua desde un momento determinado con la función APPENDS
:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
Modifica el SQL de una consulta continua
No puedes actualizar el SQL que se usa en una consulta continua mientras se ejecuta el trabajo de consulta continua. Debes cancelar el trabajo de consulta continua, modificar el SQL y, luego, iniciar un nuevo trabajo de consulta continua desde el punto en el que detuviste el trabajo de consulta continua original.
Sigue estos pasos para modificar el SQL usado en una consulta continua:
- Visualiza los detalles del trabajo del trabajo de consulta continua que deseas actualizar y anota el ID del trabajo.
- Si es posible, detén la recopilación de datos ascendentes. Si no puedes hacerlo, es posible que obtengas una duplicación de datos cuando se reinicie la consulta continua.
- Cancela la consulta continua que deseas modificar.
Obtén el valor
end_time
para el trabajo de consulta continua original con la vistaJOBS
INFORMATION_SCHEMA
:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Reemplaza lo siguiente:
PROJECT_ID
: el ID de tu proyectoREGION
: la región que usa tu proyecto.JOB_ID
: el ID del trabajo de consulta continua que identificaste en el paso 1.
Modifica la instrucción de SQL de consulta continua para iniciar la consulta continua desde un momento determinado, con el valor
end_time
que recuperaste en el paso 5 como el valor inicial punto.Modifica la instrucción de SQL de consulta continua para reflejar los cambios necesarios.
Ejecuta la consulta continua modificada.
Cancela una consulta continua
Puedes cancelar un trabajo de consulta continua como cualquier otro trabajo. La consulta puede tardar hasta un minuto en dejar de ejecutarse después de que se cancela el trabajo.