継続的クエリを作成する

このドキュメントでは、BigQuery で継続的クエリを実行する方法について説明します。

BigQuery の継続的クエリは、継続的に実行される SQL ステートメントです。継続的クエリを使用すると、BigQuery で受信データをリアルタイムで分析し、結果を Bigtable または Pub/Sub にエクスポートするか、BigQuery テーブルに書き込むことができます。

アカウントの種類を選択する

ユーザー アカウントを使用して継続的クエリジョブを作成して実行できます。また、ユーザー アカウントを使用して継続的クエリジョブを作成し、サービス アカウントを使用して実行することもできます。結果を Pub/Sub トピックにエクスポートする継続的クエリを実行するには、サービス アカウントを使用する必要があります。

ユーザー アカウントを使用する場合、継続的クエリは最大 2 日間実行されます。サービス アカウントを使用する場合、継続的クエリは最大 150 日間実行されます。詳しくは、認可をご覧ください。

必要な権限

このセクションでは、継続的クエリの作成と実行に必要な権限について説明します。前述の Identity and Access Management(IAM)ロールの代わりに、カスタムロールを使用して必要な権限を取得することもできます。

ユーザー アカウントを使用する場合の権限

このセクションでは、ユーザー アカウントを使用して継続的クエリを作成して実行するために必要なロールと権限について説明します。

BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create 権限が付与されています。

BigQuery テーブルからデータをエクスポートするには、ユーザー アカウントに bigquery.tables.export IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.export 権限が付与されています。

BigQuery テーブルのデータを更新するには、ユーザー アカウントに bigquery.tables.updateData IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData 権限が付与されています。

ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdminロールが必要です。

サービス アカウントを使用する場合の権限

このセクションでは、継続的クエリを作成するユーザー アカウントと、継続的クエリを実行するサービス アカウントに必要なロールと権限について説明します。

ユーザー アカウント権限

BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create 権限が付与されています。

サービス アカウントを使用して実行されるジョブを送信するには、ユーザー アカウントにサービス アカウント ユーザー(roles/iam.serviceAccountUserロールが必要です。同じユーザー アカウントを使用してサービス アカウントを作成する場合、ユーザー アカウントにはサービス アカウント管理者(roles/iam.serviceAccountAdminロールが必要です。プロジェクト内のすべてのサービス アカウントではなく、単一のサービス アカウントへのユーザーのアクセスを制限する方法については、単一のロールを付与するをご覧ください。

ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdminロールが必要です。

サービス アカウントの権限

BigQuery テーブルからデータをエクスポートするには、サービス アカウントに bigquery.tables.export IAM 権限が必要です。次の IAM ロールにはそれぞれ bigquery.tables.export 権限が付与されています。

BigQuery テーブルのデータを更新するには、サービス アカウントに bigquery.tables.updateData IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData 権限が付与されています。

始める前に

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

予約を作成する

Enterprise または Enterprise Plus エディションの予約を作成し、CONTINUOUS ジョブタイプで予約割り当てを作成します。この予約では、自動スケーリングを使用できます。継続的クエリの予約割り当てには、予約の制限が適用されます。

Pub/Sub へのエクスポート

Pub/Sub にデータをエクスポートするには、追加の API、IAM 権限、 Google Cloud リソースが必要です。詳細については、Pub/Sub にエクスポートするをご覧ください。

カスタム属性を Pub/Sub メッセージにメタデータとして埋め込む

Pub/Sub 属性を使用して、優先度、送信元、宛先、追加のメタデータなど、メッセージに関する追加情報を指定できます。属性を使用して、サブスクリプションでメッセージをフィルタすることもできます。

継続的クエリの結果で、列の名前が _ATTRIBUTES の場合、その値は Pub/Sub メッセージ属性にコピーされます。_ATTRIBUTES 内に指定されたフィールドは、属性キーとして使用されます。

_ATTRIBUTES 列は JSON 型で、ARRAY<STRUCT<STRING, STRING>> または STRUCT<STRING> の形式にする必要があります。

例については、Pub/Sub トピックにデータをエクスポートするをご覧ください。

Bigtable にエクスポートする

Bigtable にデータをエクスポートするには、追加の API、IAM 権限、 Google Cloudリソースが必要です。詳細については、Bigtable にエクスポートするをご覧ください。

Spanner にエクスポートする

Spanner にデータをエクスポートするには、追加の API、IAM 権限、 Google Cloudリソースが必要です。詳細については、Spanner にエクスポートする(リバース ETL)をご覧ください。

BigQuery テーブルにデータを書き込む

BigQuery テーブルにデータを書き込むには、INSERT ステートメントを使用します。

AI 関数を使用する

継続的クエリでサポートされている AI 関数を使用するには、追加の API、IAM 権限、 Google Cloudリソースが必要です。詳細については、ユースケースに応じて次のいずれかをご覧ください。

継続的クエリで AI 関数を使用する場合は、クエリ出力が関数の割り当て内かどうか検討してください。割り当てを超えた場合は、処理されないレコードを個別に処理する必要があります。

開始点を指定する

処理する最も古いデータを指定するには、継続的クエリの FROM 句で APPENDS 関数を使用する必要があります。たとえば、APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) は、継続的クエリの開始時刻の 10 分前までにテーブル my_table に追加されたデータを処理するように BigQuery に指示します。my_table に追加されたデータは、入力された時点で処理されます。データ処理に遅延は発生しません。継続的クエリで APPENDS 関数を使用する場合は、end_timestamp 引数を指定しないでください。

次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルに対してクエリを実行するときに、APPENDS 関数を使用して特定の時点から継続的クエリを開始する方法を示しています。

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

現在の時刻より前の開始時刻を指定する

現在の時点より前のデータを処理する場合は、APPENDS 関数を使用して、クエリの開始点を指定します。指定する開始点は、選択するテーブルのタイムトラベル期間内にある必要があります。デフォルトでは、タイムトラベル期間は過去 7 日間です。

タイムトラベル期間外のデータを含めるには、標準クエリを使用して特定の時点までのデータを挿入またはエクスポートし、その時点から継続的クエリを開始します。

次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルから特定の時点までの古いデータをテーブルに読み込み、古いデータのカットオフ ポイントから継続的クエリを開始する方法を示しています。

  1. 標準クエリを実行して、特定の時点までのデータをバックフィルします。

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this point in time.
      -- This timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. クエリが停止した時点から継続的クエリを実行します。

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

ユーザー アカウントを使用して継続的クエリを実行する

このセクションでは、ユーザー アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、 Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。ユーザー アカウントで実行される継続的クエリは、最大 2 日間実行され、その後自動的に停止します。新しい受信データの処理を続行するには、新しい継続的クエリを開始し、開始点を指定します。このプロセスを自動化するには、失敗したクエリを再試行するをご覧ください。

継続的クエリを実行する手順は次のとおりです。

コンソール

  1. Google Cloud コンソールで、[BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで [ その他] をクリックします。

    1. [クエリモードを選択] で、[継続的クエリ] を選択します。
    2. [確認] をクリックします。
    3. 省略可: クエリの実行時間を制御するには、[クエリの設定] をクリックして、[ジョブのタイムアウト] をミリ秒単位で設定します。
  3. クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

  4. [実行] をクリックします。

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. Cloud Shell で、--continuous フラグを指定して bq query コマンドを使用し、継続的クエリを実行します。

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY は、継続的クエリの SQL ステートメントに置き換えます。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。クエリの実行時間を制御するには、--job_timeout_ms フラグを使用します。

  3. API

    jobs.insert メソッドを呼び出して、継続的クエリを実行します。Job リソースJobConfigurationQuerycontinuous フィールドを true に設定する必要があります。必要に応じて、jobTimeoutMs フィールドを設定して、クエリの実行時間を制御できます。

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
      --compressed

    次のように置き換えます。

サービス アカウントを使用して継続的クエリを実行する

このセクションでは、サービス アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、 Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。サービス アカウントを使用して実行される継続的クエリは、最大 150 日間実行され、その後自動的に停止します。新しい受信データの処理を続行するには、新しい継続的クエリを開始し、開始点を指定します。このプロセスを自動化するには、失敗したクエリを再試行するをご覧ください。

サービス アカウントを使用して継続的クエリを実行する手順は次のとおりです。

コンソール

  1. サービス アカウントを作成します
  2. サービス アカウントに必要な権限付与します。
  3. Google Cloud コンソールで、[BigQuery] ページに移動します。

    [BigQuery] に移動

  4. クエリエディタで [展開] をクリックします。

  5. [クエリモードを選択] で、[継続的クエリ] を選択します。

  6. [確認] をクリックします。

  7. クエリエディタで、[その他 > クエリ設定] の順にクリックします。

  8. [継続的クエリ] セクションで、[サービス アカウント] ボックスを使用して、作成したサービス アカウントを選択します。

  9. 省略可: クエリの実行時間を制御するには、[ジョブのタイムアウト] をミリ秒単位で設定します。

  10. [保存] をクリックします。

  11. クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

  12. [実行] をクリックします。

bq

  1. サービス アカウントを作成します
  2. サービス アカウントに必要な権限付与します。
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. コマンドラインで、次のフラグを指定して bq query コマンドを使用し、継続的クエリを実行します。

    • クエリを連続的に実行するには、--continuous フラグを true に設定します。
    • --connection_property フラグを使用して、使用するサービス アカウントを指定します。
    • 省略可: --job_timeout_ms フラグを設定して、クエリの実行時間を制限します。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、 Google Cloud コンソールの [サービス アカウント] ページで確認できます。
    • QUERY: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
  5. API

    1. サービス アカウントを作成します
    2. サービス アカウントに必要な権限付与します。
    3. jobs.insert メソッドを呼び出して、継続的クエリを実行します。Job リソースJobConfigurationQuery リソースで、次のフィールドを設定します。

      • クエリを継続的に実行するには、continuous フィールドを true に設定します。
      • connectionProperties フィールドに、使用するサービス アカウントを指定します。

      必要に応じて、JobConfiguration リソースjobTimeoutMs フィールドを設定して、クエリの実行時間を制御できます。

      curl --request POST \
        "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
        --header "Authorization: Bearer $(gcloud auth print-access-token)" \
        --header "Content-Type: application/json; charset=utf-8" \
        --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
        --compressed

      次のように置き換えます。

      • PROJECT_ID: プロジェクト ID。
      • QUERY: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
      • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、 Google Cloud コンソールの [サービス アカウント] ページで確認できます。

カスタムジョブ ID を作成する

すべてのクエリジョブには、ジョブの検索と管理に使用できるジョブ ID が割り当てられます。デフォルトでは、ジョブ ID はランダムに生成されます。ジョブ履歴またはジョブ エクスプローラを使用して継続的クエリのジョブ ID を検索しやすくするには、カスタムジョブ ID 接頭辞を割り当てます。

  1. Google Cloud コンソールで、[BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで [展開] をクリックします。

  3. [クエリモードを選択] で、[継続的クエリ] を選択します。

  4. [確認] をクリックします。

  5. クエリエディタで、[その他 > クエリ設定] の順にクリックします。

  6. [カスタムジョブ ID 接頭辞] セクションに、カスタム名の接頭辞を入力します。

  7. [保存] をクリックします。

次の SQL の例は、継続的クエリの一般的なユースケースを示しています。

データを Pub/Sub トピックにエクスポートする

次の例では、タクシー乗車情報のストリーミングを受信している BigQuery テーブルのデータをフィルタし、メッセージ属性を使用してリアルタイムで Pub/Sub トピックにデータをパブリッシュする継続的クエリを示します。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

データを Bigtable テーブルにエクスポートする

次の例は、タクシーの乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタし、Bigtable テーブルにデータをリアルタイムでエクスポートする継続的クエリを示しています。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

データを Spanner テーブルにエクスポートする

次の例は、タクシーの乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタし、Spanner テーブルにデータをリアルタイムでエクスポートする継続的クエリを示しています。

EXPORT DATA
 OPTIONS (
   format = 'CLOUD_SPANNER',
   uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',
   spanner_options ="""{
      "table": "rides",
      -- To ensure data is written to Spanner in the correct sequence
      -- during a continuous export, use the change_timestamp_column
      -- option. This should be mapped to a timestamp column from your
      -- BigQuery data. If your source data lacks a timestamp, the 
      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function 
      -- will be automatically mapped to the "change_timestamp" column.
      "change_timestamp_column": "change_timestamp"
   }"""
  )
  AS (
  SELECT
    ride_id,
    latitude,
    longitude,
    meter_reading,
    ride_status,
    passenger_count
  FROM APPENDS(
        TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to specify when you want to
        -- start processing data using your continuous query.
        -- This example starts processing at 10 minutes before the current time.
        CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
  );

BigQuery テーブルにデータを書き込む

次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタして変換し、データを別の BigQuery テーブルにリアルタイムで書き込む継続的クエリを示しています。これにより、ダウンストリームの詳細な分析にデータを使用できるようになります。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

Vertex AI モデルを使用してデータを処理する

次の例は、Vertex AI モデルを使用して、現在の緯度と経度に基づいてタクシー乗客向けの広告を生成し、結果を Pub/Sub トピックにリアルタイムでエクスポートする継続的クエリを示しています。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

継続的クエリの SQL を変更する

継続的クエリジョブの実行中に、継続的クエリで使用される SQL を更新することはできません。継続的クエリジョブをキャンセルし、SQL を変更してから、元の継続的クエリジョブを停止したポイントから新しい継続的クエリジョブを開始する必要があります。

継続的クエリで使用される SQL を変更する手順は次のとおりです。

  1. 更新する継続的クエリジョブのジョブの詳細を表示し、ジョブ ID をメモします。
  2. 可能であれば、アップストリーム データの収集を一時停止します。これを行わないと、継続的クエリが再起動したときにデータが重複する可能性があります。
  3. 変更する継続的クエリをキャンセルします。
  4. INFORMATION_SCHEMA JOBS ビューを使用して、元の継続的クエリジョブの end_time 値を取得します。

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • REGION: プロジェクトで使用されるリージョン。
    • JOB_ID: 手順 1 で特定した継続的クエリジョブ ID。
  5. 手順 5 で取得した end_time 値を開始点として、特定の時点から継続的クエリが開始するように、継続的クエリの SQL ステートメントを変更します。

  6. 必要な変更を反映するように、継続的クエリの SQL ステートメントを変更します。

  7. 変更した継続的クエリを実行します。

継続的クエリをキャンセルする

他のジョブと同様に、継続的クエリジョブをキャンセルできます。ジョブがキャンセルされてからクエリの実行が停止するまでに、最大で 1 分かかることがあります。

クエリをキャンセルしてから再起動すると、再起動されたクエリは新しい独立したクエリのように動作します。再起動されたクエリは、前のジョブが停止した場所からデータ処理を開始するわけではないため、前のクエリの結果を参照できません。特定の時点から継続的クエリを開始するをご覧ください。

クエリをモニタリングしてエラーを処理する

継続的クエリは、データ不整合、スキーマ変更、一時的なサービス停止、メンテナンスなどの要因によって中断されることがあります。BigQuery は一部の一時的なエラーを処理しますが、ジョブの復元力を高めるためのベスト プラクティスは次のとおりです。

次のステップ