Criar consultas contínuas

Neste documento, descrevemos como executar uma consulta contínua no BigQuery.

As consultas contínuas do BigQuery são instruções SQL executadas continuamente. As consultas contínuas permitem analisar dados recebidos no BigQuery em tempo real e depois exportar os resultados para o Bigtable ou Pub/Sub, ou gravar os resultados em uma tabela do BigQuery.

Escolher um tipo de conta

É possível criar e executar um job de consulta contínua usando uma conta de usuário ou criar um job de consulta contínua usando uma conta de usuário e executá-lo usando uma conta de serviço. Use uma conta de serviço para executar uma consulta contínua que exporta resultados para um tópico do Pub/Sub.

Quando você usa uma conta de usuário, uma consulta contínua é executada por dois dias. Quando você usa uma conta de serviço, a consulta contínua é executada até ser explicitamente cancelada. Para mais informações, consulte Autorização.

Permissões necessárias

Esta seção descreve as permissões necessárias para criar e executar uma consulta contínua. Como alternativa aos papéis do Identity and Access Management (IAM), você pode conseguir as permissões necessárias através de papéis personalizados.

Permissões ao usar uma conta de usuário

Nesta seção, você encontra informações sobre os papéis e as permissões necessários para criar e executar uma consulta contínua usando uma conta de usuário.

Para criar um job no BigQuery, a conta de usuário deve ter a permissão bigquery.jobs.create do IAM. Cada um dos seguintes papéis do IAM concedem a permissão bigquery.jobs.create:

Para exportar dados de uma tabela do BigQuery, a conta de usuário precisa ter a permissão do IAM bigquery.tables.export. Cada um dos seguintes papéis do IAM concedem a permissão bigquery.tables.export:

Para atualizar dados em uma tabela do BigQuery, a conta de usuário precisa ter a permissão do IAM bigquery.tables.updateData. Cada um dos seguintes papéis do IAM concedem a permissão bigquery.tables.updateData:

Se a conta de usuário precisar ativar as APIs necessárias para o caso de uso de consulta contínua, a conta de usuário deve ter o papel Administrador do Service Usage (roles/serviceusage.serviceUsageAdmin).

Permissões ao usar uma conta de serviço

Nesta seção, você encontra informações sobre os papéis e as permissões exigidos pela conta de usuário que cria a consulta contínua e a conta de serviço que executa a consulta contínua.

Permissões da conta de usuário

Para criar um job no BigQuery, a conta de usuário deve ter a permissão bigquery.jobs.create do IAM. Cada um dos seguintes papéis do IAM concedem a permissão bigquery.jobs.create:

Para enviar um job executado usando uma conta de serviço, a conta de usuário precisa ter o papel Usuário da conta de serviço (roles/iam.serviceAccountUser). Se você estiver usando a mesma conta de usuário para criar a conta de serviço, a conta de usuário deverá ter o papel Administrador da conta de serviço (roles/iam.serviceAccountAdmin). Para saber como limitar o acesso de um usuário a uma única conta de serviço e não a todas as contas de serviço dentro de um projeto, consulte Conceder um único papel.

Se a conta de usuário precisar ativar as APIs necessárias para o caso de uso de consulta contínua, a conta de usuário deve ter o papel Administrador do Service Usage (roles/serviceusage.serviceUsageAdmin).

Permissões de conta de serviço

Para exportar dados de uma tabela do BigQuery, a conta de serviço precisa ter a permissão bigquery.tables.export do IAM. Cada um dos seguintes papéis do IAM concedem a permissão bigquery.tables.export:

Para atualizar dados em uma tabela do BigQuery, a conta de serviço precisa ter a permissão bigquery.tables.updateData do IAM. Cada um dos seguintes papéis do IAM concedem a permissão bigquery.tables.updateData:

Antes de começar

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Criar uma reserva

Crie uma reserva do Enterprise ou Enterprise Plus, e depois crie uma atribuição de reserva com um tipo de job CONTINUOUS.

Quando você cria uma atribuição de reserva para uma consulta contínua, a reserva associada é limitada a 500 slots ou menos e não pode ser configurada para usar o escalonamento automático.

Exportar para Pub/Sub

Outras APIs, permissões do IAM e recursos do Google Cloud são necessários para exportar os dados para o Pub/Sub. Para mais informações, consulte Exportar para o Pub/Sub.

Incorporar atributos personalizados como metadados em mensagens do Pub/Sub

É possível usar atributos do Pub/Sub para fornecer mais informações sobre a mensagem, como prioridade, origem, destino ou outros metadados. Também é possível usar atributos para filtrar mensagens na assinatura.

Em um resultado de consulta contínua, se uma coluna for denominada _ATTRIBUTES, os valores dela são copiados para os atributos de mensagem do Pub/Sub. Os campos fornecidos em _ATTRIBUTES são usados como chaves de atributos.

A coluna _ATTRIBUTES precisa ser do tipo JSON, no formato ARRAY<STRUCT<STRING, STRING>> ou STRUCT<STRING>.

Para conferir um exemplo, consulte exportar dados para um tópico do Pub/Sub.

Exportar para o Bigtable

Outras APIs, permissões do IAM e recursos do Google Cloud são necessários para exportar dados para o Bigtable. Para mais informações, consulte Exportar para o Bigtable.

Gravar dados em uma tabela do BigQuery

Você pode gravar dados em uma tabela do BigQuery usando uma Instrução INSERT.

Usar funções de IA

Outras APIs, permissões do IAM e recursos do Google Cloud são necessários para usar uma função de IA compatível em uma consulta contínua. Para saber mais, consulte um dos tópicos a seguir, com base no seu caso de uso:

Ao usar uma função de IA em uma consulta contínua, considere se a consulta permanecerá dentro da cota para a função. Se você exceder a cota, precisará processar separadamente os registros que não são processados.

Executar uma consulta contínua usando uma conta de usuário

Esta seção descreve como executar uma consulta contínua usando uma conta de usuário. Depois que a consulta contínua estiver em execução, feche o console do Google Cloud, a janela do terminal ou o aplicativo, sem interromper a execução da consulta.

Siga estas etapas para executar uma consulta contínua:

Console

  1. No Console do Google Cloud, acesse a página BigQuery.

    Ir para o BigQuery

  2. No editor de consultas, clique em Mais.

  3. Na seção Escolher o modo de consulta, escolha Consulta contínua.

  4. Clique em Confirmar.

  5. No editor de consultas, digite a instrução SQL para a consulta contínua. A instrução SQL deve conter apenas operações compatíveis.

  6. Clique em Executar.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. No Cloud Shell, execute a consulta contínua usando o comando bq query com a sinalização --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Substitua QUERY pela instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.

API

Execute a consulta contínua chamando o método jobs.insert. Defina o campo continuous como true no JobConfigurationQuery do recurso Job transmitido.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Substitua:

  • PROJECT_ID: o ID do projeto.
  • QUERY: a instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.

Executar uma consulta contínua usando uma conta de serviço

Nesta seção, descrevemos como executar uma consulta contínua usando uma conta de serviço. Depois que a consulta contínua estiver em execução, feche o console do Google Cloud, a janela do terminal ou o aplicativo, sem interromper a execução da consulta.

Siga estas etapas para executar uma consulta contínua com uma conta de serviço:

Console

  1. Crie uma conta de serviço.
  2. Conceda as permissões necessárias à conta de serviço.
  3. No Console do Google Cloud, acesse a página BigQuery.

    Ir para o BigQuery

  4. No editor de consultas, clique em Mais.

  5. Na seção Escolher o modo de consulta, escolha Consulta contínua.

  6. Clique em Confirmar.

  7. No editor de consultas, clique em Mais > Configurações de consulta.

  8. Na seção Consulta contínua, use a caixa Conta de serviço para selecionar a conta de serviço que você criou.

  9. Clique em Salvar.

  10. No editor de consultas, digite a instrução SQL para a consulta contínua. A instrução SQL deve conter apenas operações compatíveis.

  11. Clique em Executar.

bq

  1. Crie uma conta de serviço.
  2. Conceda as permissões necessárias à conta de serviço.
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. Na linha de comando, execute a consulta contínua usando o comando bq query com as seguintes sinalizações:

    • Defina a sinalização --continuous como true para tornar a consulta contínua.
    • Use a sinalização --connection_property para especificar uma conta de serviço a ser usada.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • SERVICE_ACCOUNT_EMAIL: o e-mail da conta de serviço. Você pode acessar o e-mail da conta de serviço na página Contas de serviço do console do Google Cloud.
    • QUERY: a instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.

API

  1. Crie uma conta de serviço.
  2. Conceda as permissões necessárias à conta de serviço.
  3. Execute a consulta contínua chamando o método jobs.insert. Defina os seguintes campos no recurso JobConfigurationQuery do recurso Job transmitido:

    • Defina o campo continuous como true para tornar a consulta contínua.
    • Use o campo connection_property para especificar a conta de serviço que será usada.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • QUERY: a instrução SQL da consulta contínua. A instrução SQL deve conter apenas operações compatíveis.
    • SERVICE_ACCOUNT_EMAIL: o e-mail da conta de serviço. Você pode acessar o e-mail da conta de serviço na página Contas de serviço do console do Google Cloud.

Exemplos

Os exemplos de SQL a seguir mostram casos de uso comuns para consultas contínuas.

Exportar dados para um tópico do Pub/Sub

O exemplo a seguir mostra uma consulta contínua que filtra dados de uma tabela do BigQuery que está recebendo informações de streaming de corridas de táxi e publica os dados em um tópico do Pub/Sub em tempo real com atributos de mensagem:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Exportar dados para uma tabela do Bigtable

O exemplo a seguir mostra uma consulta contínua que filtra dados de uma tabela do BigQuery que está recebendo informações de streaming de corridas de táxi, e exporta os dados para a tabela do Bigtable em tempo real:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Gravar dados em uma tabela do BigQuery

O exemplo a seguir mostra uma consulta contínua que filtra e transforma dados de uma tabela do BigQuery que está recebendo streaming de corridas de táxi e grava os dados em outra tabela do BigQuery em tempo real. Isso disponibiliza os dados para outras análises downstream.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Processar dados usando um modelo da Vertex AI

O exemplo a seguir mostra uma consulta contínua que usa um modelo da Vertex AI para gerar um anúncio para passageiros de táxi com base na latitude e longitude atuais e exporta os resultados em um tópico do Pub/Sub em tempo real:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Iniciar uma consulta contínua de um momento específico

Quando você inicia uma consulta contínua, ela processa todas as linhas da tabela que você está selecionando e processa novas linhas à medida que elas chegam. Se quiser pular o processamento de alguns ou de todos os dados existentes, você poderá usar a função do histórico de alterações APPENDS para iniciar o processamento de um momento específico.

O exemplo a seguir mostra como iniciar uma consulta contínua a partir de um determinado momento usando a função APPENDS:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Modificar o SQL de uma consulta contínua

Não é possível atualizar o SQL usado em uma consulta contínua enquanto o job de consulta contínua está em execução. Você precisa cancelar o job de consulta contínua, modificar o SQL, e iniciar um novo job de consulta contínua do ponto em que parou o job de consulta contínua original.

Siga estas etapas para modificar o SQL usado em uma consulta contínua:

  1. Confira os detalhes do job para o job de consulta contínua que você quer atualizar e anote o ID do job.
  2. Se possível, pause a coleta de dados upstream. Se não puder fazer isso, você pode ter duplicação de dados quando a consulta contínua é reiniciada.
  3. Cancelar a consulta contínua que você quer modificar.
  4. Encontre o valor end_time do job de consulta contínua original usando a INFORMATION_SCHEMA visualização JOBS:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Substitua:

    • PROJECT_ID: o ID do projeto.
    • REGION: a região usada pelo seu projeto.
    • JOB_ID: o ID do job de consulta contínua que você identificou na Etapa 1.
  5. Modifique a instrução SQL de consulta contínua para iniciar a consulta contínua de um ponto específico no tempo, usando o valor end_time recuperado na etapa 5 como o ponto de partida.

  6. Modifique a instrução SQL de consulta contínua para refletir as alterações necessárias.

  7. Execute a consulta contínua modificada.

Cancelar uma consulta contínua

Você pode cancelar um job de consulta contínua como com qualquer outro trabalho. Pode levar até um minuto para que a consulta deixe de ser executada após o cancelamento do job.

A seguir