継続的クエリを作成する

このドキュメントでは、BigQuery で継続的クエリを実行する方法について説明します。

BigQuery の継続的クエリは、継続的に実行される SQL ステートメントです。継続的クエリを使用すると、BigQuery で受信データをリアルタイムで分析し、結果を Bigtable または Pub/Sub にエクスポートしたり、BigQuery テーブルに書き込むことができます。

アカウントの種類を選択する

ユーザー アカウントを使用して継続的クエリジョブを作成して実行できます。また、ユーザー アカウントを使用して継続的クエリジョブを作成し、サービス アカウントを使用して実行することもできます。結果を Pub/Sub トピックにエクスポートする継続的クエリを実行するには、サービス アカウントを使用する必要があります。

ユーザー アカウントを使用する場合、継続的クエリは 2 日間実行されます。サービス アカウントを使用する場合、継続的クエリは明示的にキャンセルされるまで実行されます。詳しくは、認可をご覧ください。

必要な権限

このセクションでは、継続的クエリの作成と実行に必要な権限について説明します。前述の Identity and Access Management(IAM)ロールの代わりに、カスタムロールを使用して必要な権限を取得することもできます。

ユーザー アカウントを使用する場合の権限

このセクションでは、ユーザー アカウントを使用して継続的クエリを作成して実行するために必要なロールと権限について説明します。

BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create 権限が付与されています。

BigQuery テーブルからデータをエクスポートするには、ユーザー アカウントに bigquery.tables.export IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.export 権限が付与されています。

BigQuery テーブルのデータを更新するには、ユーザー アカウントに bigquery.tables.updateData IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData 権限が付与されています。

ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdminロールが必要です。

サービス アカウントを使用する場合の権限

このセクションでは、継続的クエリを作成するユーザー アカウントと、継続的クエリを実行するサービス アカウントに必要なロールと権限について説明します。

ユーザー アカウント権限

BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create 権限が付与されています。

サービス アカウントを使用して実行されるジョブを送信するには、ユーザー アカウントにサービス アカウント ユーザー(roles/iam.serviceAccountUserロールが必要です。同じユーザー アカウントを使用してサービス アカウントを作成する場合、ユーザー アカウントにはサービス アカウント管理者(roles/iam.serviceAccountAdminロールが必要です。プロジェクト内のすべてのサービス アカウントではなく、単一のサービス アカウントへのユーザーのアクセスを制限する方法については、単一のロールを付与するをご覧ください。

ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdminロールが必要です。

サービス アカウントの権限

BigQuery テーブルからデータをエクスポートするには、サービス アカウントに bigquery.tables.export IAM 権限が必要です。次の IAM ロールにはそれぞれ bigquery.tables.export 権限が付与されています。

BigQuery テーブルのデータを更新するには、サービス アカウントに bigquery.tables.updateData IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData 権限が付与されています。

始める前に

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

予約を作成する

Enterprise または Enterprise Plus エディションの予約を作成し、CONTINUOUS ジョブタイプで予約割り当てを作成します。

Pub/Sub へのエクスポート

Pub/Sub にデータをエクスポートするには、追加の API、IAM 権限、Google Cloud リソースが必要です。詳細については、Pub/Sub にエクスポートするをご覧ください。

カスタム属性を Pub/Sub メッセージにメタデータとして埋め込む

Pub/Sub 属性を使用して、優先度、送信元、宛先、追加のメタデータなど、メッセージに関する追加情報を指定できます。属性を使用して、サブスクリプションでメッセージをフィルタすることもできます。

継続的クエリの結果で、列の名前が _ATTRIBUTES の場合、その値は Pub/Sub メッセージ属性にコピーされます。_ATTRIBUTES 内に指定されたフィールドは、属性キーとして使用されます。

_ATTRIBUTES 列は JSON 型で、ARRAY<STRUCT<STRING, STRING>> または STRUCT<STRING> の形式にする必要があります。

例については、Pub/Sub トピックにデータをエクスポートするをご覧ください。

Bigtable にエクスポートする

Bigtable にデータをエクスポートするには、追加の API、IAM 権限、Google Cloud リソースが必要です。詳細については、Bigtable にエクスポートするをご覧ください。

BigQuery テーブルにデータを書き込む

BigQuery テーブルにデータを書き込むには、INSERT ステートメントを使用します。

AI 関数を使用する

継続的クエリでサポートされている AI 関数を使用するには、追加の API、IAM 権限、Google Cloud リソースが必要です。詳細については、ユースケースに応じて次のいずれかをご覧ください。

継続的クエリで AI 関数を使用する場合は、クエリ出力が関数の割り当て内かどうか検討してください。割り当てを超えた場合は、処理されないレコードを個別に処理する必要があります。

ユーザー アカウントを使用して継続的クエリを実行する

このセクションでは、ユーザー アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。

継続的クエリを実行する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで [展開] をクリックします。

  3. [クエリモードを選択] で、[継続的クエリ] を選択します。

  4. [確認] をクリックします。

  5. クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

  6. [実行] をクリックします。

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Cloud Shell で、--continuous フラグを指定して bq query コマンドを使用し、継続的クエリを実行します。

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY は、継続的クエリの SQL ステートメントに置き換えます。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

API

jobs.insert メソッドを呼び出して、継続的クエリを実行します。Job リソースJobConfigurationQuerycontinuous フィールドを true に設定する必要があります。

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

次のように置き換えます。

サービス アカウントを使用して継続的クエリを実行する

このセクションでは、サービス アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。

サービス アカウントを使用して継続的クエリを実行する手順は次のとおりです。

コンソール

  1. サービス アカウントを作成します
  2. サービス アカウントに必要な権限付与します。
  3. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  4. クエリエディタで [展開] をクリックします。

  5. [クエリモードを選択] で、[継続的クエリ] を選択します。

  6. [確認] をクリックします。

  7. クエリエディタで、[展開] > [クエリ設定] の順にクリックします。

  8. [継続的クエリ] セクションで、[サービス アカウント] ボックスを使用して、作成したサービス アカウントを選択します。

  9. [保存] をクリックします。

  10. クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

  11. [実行] をクリックします。

bq

  1. サービス アカウントを作成します
  2. サービス アカウントに必要な権限付与します。
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. コマンドラインで、次のフラグを指定して bq query コマンドを使用し、継続的クエリを実行します。

    • クエリを連続的に実行するには、--continuous フラグを true に設定します。
    • --connection_property フラグを使用して、使用するサービス アカウントを指定します。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。
    • QUERY: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

API

  1. サービス アカウントを作成します
  2. サービス アカウントに必要な権限付与します。
  3. jobs.insert メソッドを呼び出して、継続的クエリを実行します。Job リソースJobConfigurationQuery リソースで、次のフィールドを設定します。

    • クエリを継続的に実行するには、continuous フィールドを true に設定します。
    • connection_property フィールドに、使用するサービス アカウントを指定します。
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • QUERY: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
    • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。

次の SQL の例は、継続的クエリの一般的なユースケースを示しています。

データを Pub/Sub トピックにエクスポートする

次の例では、タクシー乗車情報のストリーミングを受信している BigQuery テーブルのデータをフィルタし、メッセージ属性を使用してリアルタイムで Pub/Sub トピックにデータをパブリッシュする継続的クエリを示します。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

データを Bigtable テーブルにエクスポートする

次の例は、タクシーの乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタし、Bigtable テーブルにデータをリアルタイムでエクスポートする継続的クエリを示しています。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

BigQuery テーブルにデータを書き込む

次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタして変換し、データを別の BigQuery テーブルにリアルタイムで書き込む継続的クエリを示しています。これにより、ダウンストリームの詳細な分析にデータを使用できるようになります。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Vertex AI モデルを使用してデータを処理する

次の例は、Vertex AI モデルを使用して、現在の緯度と経度に基づいてタクシー乗客向けの広告を生成し、結果を Pub/Sub トピックにリアルタイムでエクスポートする継続的クエリを示しています。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

特定の時点から継続的クエリを開始する

継続的クエリを開始すると、選択したテーブル内のすべての行が処理され、新しい行が追加されると処理されます。既存データの一部またはすべてを処理しない場合、APPENDS 変更履歴関数を使用して、特定の時点から処理を開始できます。

次の例は、APPENDS 関数を使用して特定の時点から継続的クエリを開始する方法を示しています。

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

継続的クエリの SQL を変更する

継続的クエリジョブの実行中に、継続的クエリで使用される SQL を更新することはできません。継続的クエリジョブをキャンセルし、SQL を変更してから、元の継続的クエリジョブを停止したポイントから新しい継続的クエリジョブを開始する必要があります。

継続的クエリで使用される SQL を変更する手順は次のとおりです。

  1. 更新する継続的クエリジョブのジョブの詳細を表示し、ジョブ ID をメモします。
  2. 可能であれば、アップストリーム データの収集を一時停止します。これを行わないと、継続的クエリが再起動したときにデータが重複する可能性があります。
  3. 変更する継続的クエリをキャンセルします。
  4. INFORMATION_SCHEMA JOBS ビューを使用して、元の継続的クエリジョブの end_time 値を取得します。

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • REGION: プロジェクトで使用されるリージョン。
    • JOB_ID: 手順 1 で特定した継続的クエリジョブ ID。
  5. 手順 5 で取得した end_time 値を開始点として、特定の時点から継続的クエリが開始するように、継続的クエリの SQL ステートメントを変更します。

  6. 必要な変更を反映するように、継続的クエリの SQL ステートメントを変更します。

  7. 変更した継続的クエリを実行します。

継続的クエリをキャンセルする

他のジョブと同様に、継続的クエリジョブをキャンセルできます。ジョブがキャンセルされてからクエリの実行が停止するまでに、最大で 1 分かかることがあります。

次のステップ