Criar consultas contínuas
Neste documento, descrevemos como executar uma consulta contínua no BigQuery.
As consultas contínuas do BigQuery são instruções SQL executadas continuamente. As consultas contínuas permitem analisar dados recebidos no BigQuery em tempo real e depois exportar os resultados para o Bigtable ou Pub/Sub, ou gravar os resultados em uma tabela do BigQuery.
Escolher um tipo de conta
É possível criar e executar um job de consulta contínua usando uma conta de usuário ou criar um job de consulta contínua usando uma conta de usuário e executá-lo usando uma conta de serviço. Use uma conta de serviço para executar uma consulta contínua que exporta resultados para um tópico do Pub/Sub.
Quando você usa uma conta de usuário, uma consulta contínua é executada por dois dias. Quando você usa uma conta de serviço, a consulta contínua é executada até ser explicitamente cancelada. Para mais informações, consulte Autorização.
Permissões necessárias
Esta seção descreve as permissões necessárias para criar e executar uma consulta contínua. Como alternativa aos papéis do Identity and Access Management (IAM), você pode conseguir as permissões necessárias através de papéis personalizados.
Permissões ao usar uma conta de usuário
Nesta seção, você encontra informações sobre os papéis e as permissões necessários para criar e executar uma consulta contínua usando uma conta de usuário.
Para criar um job no BigQuery, a conta de usuário deve ter a permissão
bigquery.jobs.create
do IAM. Cada um dos
seguintes papéis do IAM concedem a permissão bigquery.jobs.create
:
- Usuário do BigQuery (
roles/bigquery.user
) - Usuário de jobs do BigQuery(
roles/bigquery.jobUser
) - Administrador do BigQuery (
roles/bigquery.admin
)
Para exportar dados de uma tabela do BigQuery, a conta de usuário precisa ter
a permissão do IAM bigquery.tables.export
. Cada um dos
seguintes papéis do IAM concedem a permissão bigquery.tables.export
:
- Leitor de dados do BigQuery (
roles/bigquery.dataViewer
) - Editor de dados do BigQuery (
roles/bigquery.dataEditor
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Administrador do BigQuery (
roles/bigquery.admin
)
Para atualizar dados em uma tabela do BigQuery, a conta de usuário precisa ter
a permissão do IAM bigquery.tables.updateData
. Cada um dos
seguintes papéis do IAM concedem a permissão bigquery.tables.updateData
:
- Editor de dados do BigQuery (
roles/bigquery.dataEditor
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Administrador do BigQuery (
roles/bigquery.admin
)
Se a conta de usuário precisar ativar as APIs necessárias para o
caso de uso de consulta contínua, a conta de usuário deve ter o papel
Administrador do Service Usage (roles/serviceusage.serviceUsageAdmin
).
Permissões ao usar uma conta de serviço
Nesta seção, você encontra informações sobre os papéis e as permissões exigidos pela conta de usuário que cria a consulta contínua e a conta de serviço que executa a consulta contínua.
Permissões da conta de usuário
Para criar um job no BigQuery, a conta de usuário deve ter a permissão
bigquery.jobs.create
do IAM. Cada um dos seguintes
papéis do IAM concedem a permissão bigquery.jobs.create
:
- Usuário do BigQuery (
roles/bigquery.user
) - Usuário de jobs do BigQuery(
roles/bigquery.jobUser
) - Administrador do BigQuery (
roles/bigquery.admin
)
Para enviar um job executado usando uma conta de serviço, a conta de usuário precisa ter o papel
Usuário da conta de serviço (roles/iam.serviceAccountUser
). Se você estiver usando a mesma conta de usuário para criar a conta de serviço,
a conta de usuário deverá ter o papel
Administrador da conta de serviço (roles/iam.serviceAccountAdmin
). Para saber como limitar o acesso de um usuário a uma única conta de serviço
e não a todas as contas de serviço dentro de um projeto, consulte
Conceder um único papel.
Se a conta de usuário precisar ativar as APIs necessárias para o
caso de uso de consulta contínua, a conta de usuário deve ter o papel
Administrador do Service Usage (roles/serviceusage.serviceUsageAdmin
).
Permissões de conta de serviço
Para exportar dados de uma tabela do BigQuery, a conta de serviço precisa
ter a permissão bigquery.tables.export
do IAM. Cada um dos
seguintes papéis do IAM concedem a permissão bigquery.tables.export
:
- Leitor de dados do BigQuery (
roles/bigquery.dataViewer
) - Editor de dados do BigQuery (
roles/bigquery.dataEditor
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Administrador do BigQuery (
roles/bigquery.admin
)
bigquery.tables.updateData
do IAM. Cada um dos
seguintes papéis do IAM concedem a permissão bigquery.tables.updateData
:
- Editor de dados do BigQuery (
roles/bigquery.dataEditor
) - Proprietário de dados do BigQuery (
roles/bigquery.dataOwner
) - Administrador do BigQuery (
roles/bigquery.admin
)
Antes de começar
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Criar uma reserva
Crie uma reserva do Enterprise ou Enterprise Plus,
e depois
crie uma atribuição de reserva
com um tipo de job CONTINUOUS
.
Quando você cria uma atribuição de reserva para uma consulta contínua, a reserva associada é limitada a 500 slots ou menos e não pode ser configurada para usar o escalonamento automático.
Exportar para Pub/Sub
Outras APIs, permissões do IAM e recursos do Google Cloud são necessários para exportar dados para o Pub/Sub. Para mais informações, consulte Exportar para o Pub/Sub.
Incorporar atributos personalizados como metadados em mensagens do Pub/Sub
É possível usar atributos do Pub/Sub para fornecer mais informações sobre a mensagem, como prioridade, origem, destino ou outros metadados. Também é possível usar atributos para filtrar mensagens na assinatura.
Em um resultado de consulta contínua, se uma coluna for denominada _ATTRIBUTES
,
os valores dela são copiados para os atributos de mensagem do Pub/Sub.
Os campos fornecidos em _ATTRIBUTES
são usados como chaves de atributos.
A coluna _ATTRIBUTES
precisa ser do tipo JSON
, no formato
ARRAY<STRUCT<STRING, STRING>>
ou STRUCT<STRING>
.
Para conferir um exemplo, consulte exportar dados para um tópico do Pub/Sub.
Exportar para o Bigtable
Outras APIs, permissões do IAM e recursos do Google Cloud são necessários para exportar dados para o Bigtable. Para mais informações, consulte Exportar para o Bigtable.
Gravar dados em uma tabela do BigQuery
Você pode gravar dados em uma tabela do BigQuery usando uma
Instrução INSERT
.
Usar funções de IA
Outras APIs, permissões do IAM e recursos do Google Cloud são necessários para usar uma função de IA compatível em uma consulta contínua. Para saber mais, consulte um dos tópicos a seguir, com base no seu caso de uso:
- Gerar texto usando a função
ML.GENERATE_TEXT
- Gerar embeddings de texto usando a função
ML.GENERATE_EMBEDDING
- Entender o texto com a função
ML.UNDERSTAND_TEXT
- Traduzir texto com a função
ML.TRANSLATE
Ao usar uma função de IA em uma consulta contínua, considere se a consulta permanecerá dentro da cota para a função. Se você exceder a cota, precisará processar separadamente os registros que não são processados.
Executar uma consulta contínua usando uma conta de usuário
Esta seção descreve como executar uma consulta contínua usando uma conta de usuário. Depois que a consulta contínua estiver em execução, feche o console do Google Cloud , a janela do terminal ou o aplicativo sem interromper a execução da consulta.
Siga estas etapas para executar uma consulta contínua:
Console
No console do Google Cloud , acesse a página BigQuery.
No editor de consultas, clique em Mais.
Na seção Escolher o modo de consulta, escolha Consulta contínua.
Clique em Confirmar.
No editor de consultas, digite a instrução SQL para a consulta contínua. A instrução SQL deve conter apenas operações compatíveis.
Clique em Executar.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
No Cloud Shell, execute a consulta contínua usando o comando
bq query
com a sinalização--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Substitua
QUERY
pela instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.
API
Execute a consulta contínua chamando o
método jobs.insert
.
Defina o campo continuous
como true
no
JobConfigurationQuery
do recurso Job
transmitido.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Substitua:
PROJECT_ID
: o ID do projeto.QUERY
: a instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.
Executar uma consulta contínua usando uma conta de serviço
Nesta seção, descrevemos como executar uma consulta contínua usando uma conta de serviço. Depois que a consulta contínua estiver em execução, feche o console do Google Cloud , a janela do terminal ou o aplicativo sem interromper a execução da consulta.
Siga estas etapas para executar uma consulta contínua com uma conta de serviço:
Console
- Crie uma conta de serviço.
- Conceda as permissões necessárias à conta de serviço.
No console do Google Cloud , acesse a página BigQuery.
No editor de consultas, clique em Mais.
Na seção Escolher o modo de consulta, escolha Consulta contínua.
Clique em Confirmar.
No editor de consultas, clique em Mais > Configurações de consulta.
Na seção Consulta contínua, use a caixa Conta de serviço para selecionar a conta de serviço que você criou.
Clique em Salvar.
No editor de consultas, digite a instrução SQL para a consulta contínua. A instrução SQL deve conter apenas operações compatíveis.
Clique em Executar.
bq
- Crie uma conta de serviço.
- Conceda as permissões necessárias à conta de serviço.
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Na linha de comando, execute a consulta contínua usando o comando
bq query
com as seguintes sinalizações:- Defina a sinalização
--continuous
comotrue
para tornar a consulta contínua. - Use a sinalização
--connection_property
para especificar uma conta de serviço a ser usada.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Substitua:
PROJECT_ID
: o ID do projeto.SERVICE_ACCOUNT_EMAIL
: o e-mail da conta de serviço. Você pode acessar o e-mail da conta de serviço na página Contas de serviço do console Google Cloud .QUERY
: a instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.
- Defina a sinalização
API
- Crie uma conta de serviço.
- Conceda as permissões necessárias à conta de serviço.
Execute a consulta contínua chamando o método
jobs.insert
. Defina os seguintes campos no recursoJobConfigurationQuery
do recursoJob
transmitido:- Defina o campo
continuous
comotrue
para tornar a consulta contínua. - Use o campo
connection_property
para especificar a conta de serviço que será usada.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Substitua:
PROJECT_ID
: o ID do projeto.QUERY
: a instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.SERVICE_ACCOUNT_EMAIL
: o e-mail da conta de serviço. Você pode acessar o e-mail da conta de serviço na página Contas de serviço do console Google Cloud .
- Defina o campo
Exemplos
Os exemplos de SQL a seguir mostram casos de uso comuns para consultas contínuas.
Exportar dados para um tópico do Pub/Sub
O exemplo a seguir mostra uma consulta contínua que filtra dados de uma tabela do BigQuery que está recebendo informações de streaming de corridas de táxi e publica os dados em um tópico do Pub/Sub em tempo real com atributos de mensagem:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Exportar dados para uma tabela do Bigtable
O exemplo a seguir mostra uma consulta contínua que filtra dados de uma tabela do BigQuery que está recebendo informações de streaming de corridas de táxi, e exporta os dados para a tabela do Bigtable em tempo real:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Gravar dados em uma tabela do BigQuery
O exemplo a seguir mostra uma consulta contínua que filtra e transforma dados de uma tabela do BigQuery que está recebendo streaming de corridas de táxi e grava os dados em outra tabela do BigQuery em tempo real. Isso disponibiliza os dados para outras análises downstream.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Processar dados usando um modelo da Vertex AI
O exemplo a seguir mostra uma consulta contínua que usa um modelo da Vertex AI para gerar um anúncio para passageiros de táxi com base na latitude e longitude atuais e exporta os resultados em um tópico do Pub/Sub em tempo real:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Iniciar uma consulta contínua de um momento específico
Quando você inicia uma consulta contínua, ela processa todas as linhas da tabela
que você está selecionando e processa novas linhas à medida que elas chegam. Se
quiser pular o processamento de alguns ou de todos os dados existentes, você poderá usar a função do histórico de alterações
APPENDS
para iniciar o processamento de um momento específico.
O exemplo a seguir mostra como iniciar uma consulta contínua a partir de um determinado
momento usando a função APPENDS
:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
Modificar o SQL de uma consulta contínua
Não é possível atualizar o SQL usado em uma consulta contínua enquanto o job de consulta contínua está em execução. Você precisa cancelar o job de consulta contínua, modificar o SQL, e iniciar um novo job de consulta contínua do ponto em que parou o job de consulta contínua original.
Siga estas etapas para modificar o SQL usado em uma consulta contínua:
- Confira os detalhes do job para o job de consulta contínua que você quer atualizar e anote o ID do job.
- Se possível, pause a coleta de dados upstream. Se não puder fazer isso, você pode ter duplicação de dados quando a consulta contínua é reiniciada.
- Cancelar a consulta contínua que você quer modificar.
Encontre o valor
end_time
do job de consulta contínua original usando aINFORMATION_SCHEMA
visualizaçãoJOBS
:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Substitua:
PROJECT_ID
: o ID do projeto.REGION
: a região usada pelo seu projeto.JOB_ID
: o ID do job de consulta contínua que você identificou na Etapa 1.
Modifique a instrução SQL de consulta contínua para iniciar a consulta contínua de um ponto específico no tempo, usando o valor
end_time
recuperado na etapa 5 como o ponto de partida.Modifique a instrução SQL de consulta contínua para refletir as alterações necessárias.
Execute a consulta contínua modificada.
Cancelar uma consulta contínua
Você pode cancelar um job de consulta contínua como com qualquer outro trabalho. Pode levar até um minuto para que a consulta deixe de ser executada após o cancelamento do job.