연속 쿼리 만들기

이 문서에서는 BigQuery에서 연속 쿼리를 실행하는 방법을 설명합니다.

BigQuery 연속 쿼리는 지속적으로 실행되는 SQL 문입니다. 연속 쿼리를 사용하면 BigQuery에서 수신 데이터를 실시간으로 분석한 후 결과를 Bigtable 또는 Pub/Sub로 내보내거나 결과를 BigQuery 테이블에 기록할 수 있습니다.

계정 유형 선택

사용자 계정을 사용하여 연속 쿼리 작업을 만들고 실행하거나 사용자 계정을 사용하여 연속 쿼리 작업을 만든 후 서비스 계정을 사용하여 실행할 수 있습니다. 결과를 Pub/Sub 주제로 내보내는 연속 쿼리를 실행하려면 서비스 계정을 사용해야 합니다.

사용자 계정을 사용하면 연속 쿼리가 2일 동안 실행됩니다. 서비스 계정을 사용하면 명시적으로 취소될 때까지 연속 쿼리가 실행됩니다. 자세한 내용은 승인을 참조하세요.

필수 권한

이 섹션에서는 연속 쿼리를 만들고 실행하는 데 필요한 권한에 대해 설명합니다. 언급된 Identity and Access Management(IAM) 역할 대신 커스텀 역할을 통해 필요한 권한을 가져올 수 있습니다.

사용자 계정 사용 시 권한

이 섹션에서는 사용자 계정을 사용하여 연속 쿼리를 만들고 실행하는 데 필요한 역할 및 권한에 대한 정보를 제공합니다.

BigQuery에서 작업을 만들려면 사용자 계정에 bigquery.jobs.create IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.jobs.create 권한을 부여합니다.

BigQuery 테이블에서 데이터를 내보내려면 사용자 계정에 bigquery.tables.export IAM 권한이 있어야 합니다 . 다음 각 IAM 역할은 bigquery.tables.export 권한을 부여합니다.

BigQuery 테이블의 데이터를 업데이트하려면 사용자 계정에 bigquery.tables.updateData IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.updateData 권한을 부여합니다.

사용자 계정에 연속 쿼리 사용 사례에 필요한 API를 사용 설정해야 하는 경우 사용자 계정에 서비스 사용량 관리자(roles/serviceusage.serviceUsageAdmin) 역할이 있어야 합니다.

서비스 계정 사용 시 권한

이 섹션에서는 연속 쿼리를 만드는 사용자 계정과 연속 쿼리를 실행하는 서비스 계정에 필요한 역할 및 권한에 대한 정보를 제공합니다.

사용자 계정 권한

BigQuery에서 작업을 만들려면 사용자 계정에 bigquery.jobs.create IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.jobs.create 권한을 부여합니다.

서비스 계정을 사용하여 실행되는 작업을 제출하려면 사용자 계정에 서비스 계정 사용자(roles/iam.serviceAccountUser) 역할이 있어야 합니다. 동일한 사용자 계정을 사용하여 서비스 계정을 만드는 경우 사용자 계정에 서비스 계정 관리자(roles/iam.serviceAccountAdmin) 역할이 있어야 합니다. 사용자의 액세스를 프로젝트 내의 모든 서비스 계정이 아닌 단일 서비스 계정으로 제한하는 방법은 단일 역할 부여를 참조하세요.

사용자 계정에 연속 쿼리 사용 사례에 필요한 API를 사용 설정해야 하는 경우 사용자 계정에 서비스 사용량 관리자(roles/serviceusage.serviceUsageAdmin) 역할이 있어야 합니다.

서비스 계정 권한

BigQuery 테이블에서 데이터를 내보내려면 서비스 계정에 bigquery.tables.export IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.export 권한을 부여합니다.

BigQuery 테이블의 데이터를 업데이트하려면 서비스 계정에 bigquery.tables.updateData IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.updateData 권한을 부여합니다.

시작하기 전에

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  3. Enable the BigQuery API.

    Enable the API

예약 만들기

Enterprise 또는 Enterprise Plus 버전 예약을 만든CONTINUOUS 작업 유형으로 예약 할당을 만듭니다.

Pub/Sub로 내보내기

Pub/Sub로 데이터를 내보내려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 Pub/Sub로 내보내기를 참조하세요.

Pub/Sub 메시지에 맞춤 속성을 메타데이터로 삽입

Pub/Sub 속성을 사용하여 우선순위, 출처, 대상 또는 추가 메타데이터와 같은 메시지에 관한 추가 정보를 제공할 수 있습니다. 속성을 사용하여 구독에서 메시지를 필터링할 수도 있습니다.

연속 쿼리 결과 내에서 열 이름이 _ATTRIBUTES인 경우 값이 Pub/Sub 메시지 속성에 복사됩니다. _ATTRIBUTES 내에 제공된 필드는 속성 키로 사용됩니다.

_ATTRIBUTES 열은 ARRAY<STRUCT<STRING, STRING>> 또는 STRUCT<STRING> 형식의 JSON 유형이어야 합니다.

예시는 Pub/Sub 주제로 데이터 내보내기를 참조하세요.

Bigtable로 내보내기

Bigtable로 데이터를 내보내려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 Bigtable로 내보내기를 참조하세요.

BigQuery 테이블에 데이터 쓰기

INSERT을 사용하여 BigQuery 테이블에 데이터를 쓸 수 있습니다.

AI 함수 사용

연속 쿼리에서 지원되는 AI 함수를 사용하려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 사용 사례에 따라 다음 주제 중 하나를 참조하세요.

연속 쿼리에서 AI 함수를 사용할 때 쿼리 출력이 함수의 할당량 내에 있는지 여부를 고려하세요. 할당량을 초과하면 처리되지 않는 레코드를 별도로 처리해야 할 수 있습니다.

사용자 계정을 사용하여 연속 쿼리 실행

이 섹션에서는 사용자 계정을 사용하여 연속 쿼리를 실행하는 방법을 설명합니다. 연속 쿼리가 실행된 후 쿼리 실행을 중단하지 않고 Google Cloud 콘솔, 터미널 창 또는 애플리케이션을 닫을 수 있습니다.

연속 쿼리를 실행하려면 다음 단계를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 쿼리 편집기에서 더보기를 클릭합니다.

  3. 쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.

  4. 확인을 클릭합니다.

  5. 쿼리 편집기에서 연속 쿼리에 대한 SQL 문을 입력합니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.

  6. 실행을 클릭합니다.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Cloud Shell에서 --continuous 플래그와 함께 bq query 명령어를 사용하여 연속 쿼리를 실행합니다.

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY를 연속 쿼리의 SQL 문으로 바꿉니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.

API

jobs.insert 메서드를 호출하여 연속 쿼리를 실행합니다. 전달하는 Job 리소스JobConfigurationQuery에서 continuous 필드를 true로 설정해야 합니다. .

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID입니다.
  • QUERY: 연속 쿼리의 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.

서비스 계정을 사용하여 연속 쿼리 실행

이 섹션에서는 서비스 계정을 사용하여 연속 쿼리를 실행하는 방법을 설명합니다. 연속 쿼리가 실행된 후 쿼리 실행을 중단하지 않고 Google Cloud 콘솔, 터미널 창 또는 애플리케이션을 닫을 수 있습니다.

서비스 계정을 사용하여 연속 쿼리를 실행하려면 다음 단계를 따르세요.

콘솔

  1. 서비스 계정을 만듭니다.
  2. 서비스 계정에 필요한 권한부여합니다.
  3. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  4. 쿼리 편집기에서 더보기를 클릭합니다.

  5. 쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.

  6. 확인을 클릭합니다.

  7. 쿼리 편집기에서 더보기 > 쿼리 설정을 클릭합니다.

  8. 연속 쿼리 섹션에서 서비스 계정 상자를 사용하여 생성한 서비스 계정을 선택합니다.

  9. 저장을 클릭합니다.

  10. 쿼리 편집기에서 연속 쿼리에 대한 SQL 문을 입력합니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.

  11. 실행을 클릭합니다.

bq

  1. 서비스 계정을 만듭니다.
  2. 서비스 계정에 필요한 권한부여합니다.
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. 명령줄에서 다음 플래그와 함께 bq query 명령어를 사용하여 연속 쿼리를 실행합니다.

    • 쿼리를 연속으로 만들려면 --continuous 플래그를 true로 설정합니다.
    • --connection_property 플래그를 사용하여 사용할 서비스 계정을 지정합니다.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    다음을 바꿉니다.

    • PROJECT_ID: 프로젝트 ID입니다.
    • SERVICE_ACCOUNT_EMAIL: 서비스 계정 이메일입니다. Google Cloud 콘솔의 서비스 계정 페이지에서 서비스 계정 이메일을 가져올 수 있습니다.
    • QUERY: 연속 쿼리의 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.

API

  1. 서비스 계정을 만듭니다.
  2. 서비스 계정에 필요한 권한부여합니다.
  3. jobs.insert 메서드를 호출하여 연속 쿼리를 실행합니다. 전달하는 Job 리소스JobConfigurationQuery 리소스에서 다음 필드를 설정합니다.

    • 쿼리를 연속으로 만들려면 continuous 필드를 true로 설정합니다.
    • connection_property 필드를 사용하여 사용할 서비스 계정을 지정합니다.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    다음을 바꿉니다.

    • PROJECT_ID: 프로젝트 ID입니다.
    • QUERY: 연속 쿼리의 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
    • SERVICE_ACCOUNT_EMAIL: 서비스 계정 이메일입니다. Google Cloud 콘솔의 서비스 계정 페이지에서 서비스 계정 이메일을 가져올 수 있습니다.

예시

다음 SQL 예시는 연속 쿼리의 일반적인 사용 사례를 보여줍니다.

Pub/Sub 주제로 데이터 내보내기

다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 데이터를 필터링하고 메시지 속성과 함께 데이터를 실시간으로 Pub/Sub 주제에 게시하는 연속 쿼리를 보여줍니다.

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Bigtable 테이블로 데이터 내보내기

다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블의 데이터를 필터링하고 데이터를 실시간으로 Bigtable 테이블로 내보내는 연속 쿼리를 보여줍니다.

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

BigQuery 테이블에 데이터 쓰기

다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블의 데이터를 필터링 및 변환하고 데이터를 다른 BigQuery 테이블에 실시간으로 쓰는 연속 쿼리를 보여줍니다. 이렇게 하면 추가 다운스트림 분석에 데이터를 사용할 수 있습니다.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Vertex AI 모델을 사용하여 데이터 처리

다음 예시는 Vertex AI 모델을 사용하여 현재 위도 및 경도를 기준으로 택시 탑승자를 위한 광고를 생성한 후 결과를 실시간으로 Pub/Sub 주제로 내보내는 연속 쿼리를 보여줍니다.

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

특정 시점에서 연속 쿼리 시작

연속 쿼리를 시작하면 선택한 테이블의 모든 행을 처리한 후 새 행이 들어오면 처리합니다. 기존 데이터의 일부 또는 전체 처리를 건너뛰려면 APPENDS 변경 내역 함수를 사용하여 특정 시점에서 처리를 시작할 수 있습니다.

다음 예시에서는 APPENDS 함수를 사용하여 특정 시점에서 연속 쿼리를 시작하는 방법을 보여줍니다.

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

연속 쿼리의 SQL 수정

연속 쿼리 작업이 실행되는 동안에는 연속 쿼리에 사용되는 SQL을 업데이트할 수 없습니다. 연속 쿼리 작업을 취소하고 SQL을 수정한 다음 원래의 연속 쿼리 작업이 중지된 지점에서 새로운 연속 쿼리 작업을 시작해야 합니다.

연속 쿼리에 사용되는 SQL을 수정하려면 다음 단계를 따르세요.

  1. 업데이트할 연속 쿼리 작업의 작업 세부정보를 확인하고 작업 ID를 기록해 둡니다.
  2. 가능하면 업스트림 데이터 수집을 일시중지합니다. 이렇게 할 수 없으면 연속 쿼리가 다시 시작될 때 일부 데이터 중복이 발생할 수 있습니다.
  3. 수정할 연속 쿼리를 취소합니다.
  4. INFORMATION_SCHEMA JOBS를 사용하여 원래의 연속 쿼리 작업의 end_time 값을 가져옵니다.

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    다음을 바꿉니다.

    • PROJECT_ID: 프로젝트 ID입니다.
    • REGION: 프로젝트에 사용되는 리전입니다.
    • JOB_ID: 1단계에서 식별한 연속 쿼리 작업 ID입니다.
  5. 5단계에서 가져온 end_time 값을 시작 값으로 사용하여 특정 시점에서 연속 쿼리를 시작하도록 연속 쿼리 SQL 문을 수정합니다.

  6. 필요한 변경사항을 반영하도록 연속 쿼리 SQL 문을 수정합니다.

  7. 수정된 연속 쿼리를 실행합니다.

연속 쿼리 취소

다른 작업과 마찬가지로 연속 쿼리 작업을 취소할 수 있습니다. 작업이 취소된 후 쿼리가 실행을 중지하는 데 최대 1분이 걸릴 수 있습니다.

다음 단계