연속 쿼리 만들기
이 문서에서는 BigQuery에서 연속 쿼리를 실행하는 방법을 설명합니다.
BigQuery 연속 쿼리는 지속적으로 실행되는 SQL 문입니다. 연속 쿼리를 사용하면 BigQuery에서 수신 데이터를 실시간으로 분석한 후 결과를 Bigtable 또는 Pub/Sub로 내보내거나 결과를 BigQuery 테이블에 기록할 수 있습니다.
계정 유형 선택
사용자 계정을 사용하여 연속 쿼리 작업을 만들고 실행하거나 사용자 계정을 사용하여 연속 쿼리 작업을 만든 후 서비스 계정을 사용하여 실행할 수 있습니다. 결과를 Pub/Sub 주제로 내보내는 연속 쿼리를 실행하려면 서비스 계정을 사용해야 합니다.
사용자 계정을 사용하면 연속 쿼리가 2일 동안 실행됩니다. 서비스 계정을 사용하면 명시적으로 취소될 때까지 연속 쿼리가 실행됩니다. 자세한 내용은 승인을 참조하세요.
필수 권한
이 섹션에서는 연속 쿼리를 만들고 실행하는 데 필요한 권한에 대해 설명합니다. 언급된 Identity and Access Management(IAM) 역할 대신 커스텀 역할을 통해 필요한 권한을 가져올 수 있습니다.
사용자 계정 사용 시 권한
이 섹션에서는 사용자 계정을 사용하여 연속 쿼리를 만들고 실행하는 데 필요한 역할 및 권한에 대한 정보를 제공합니다.
BigQuery에서 작업을 만들려면 사용자 계정에 bigquery.jobs.create
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.jobs.create
권한을 부여합니다.
- BigQuery 사용자(
roles/bigquery.user
) - BigQuery 작업 사용자(
roles/bigquery.jobUser
) - BigQuery 관리자(
roles/bigquery.admin
)
BigQuery 테이블에서 데이터를 내보내려면 사용자 계정에 bigquery.tables.export
IAM 권한이 있어야 합니다 . 다음 각 IAM 역할은 bigquery.tables.export
권한을 부여합니다.
- BigQuery 데이터 뷰어(
roles/bigquery.dataViewer
) - BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
BigQuery 테이블의 데이터를 업데이트하려면 사용자 계정에 bigquery.tables.updateData
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.updateData
권한을 부여합니다.
- BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
사용자 계정에 연속 쿼리 사용 사례에 필요한 API를 사용 설정해야 하는 경우 사용자 계정에 서비스 사용량 관리자(roles/serviceusage.serviceUsageAdmin
) 역할이 있어야 합니다.
서비스 계정 사용 시 권한
이 섹션에서는 연속 쿼리를 만드는 사용자 계정과 연속 쿼리를 실행하는 서비스 계정에 필요한 역할 및 권한에 대한 정보를 제공합니다.
사용자 계정 권한
BigQuery에서 작업을 만들려면 사용자 계정에 bigquery.jobs.create
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.jobs.create
권한을 부여합니다.
- BigQuery 사용자(
roles/bigquery.user
) - BigQuery 작업 사용자(
roles/bigquery.jobUser
) - BigQuery 관리자(
roles/bigquery.admin
)
서비스 계정을 사용하여 실행되는 작업을 제출하려면 사용자 계정에 서비스 계정 사용자(roles/iam.serviceAccountUser
) 역할이 있어야 합니다. 동일한 사용자 계정을 사용하여 서비스 계정을 만드는 경우 사용자 계정에 서비스 계정 관리자(roles/iam.serviceAccountAdmin
) 역할이 있어야 합니다. 사용자의 액세스를 프로젝트 내의 모든 서비스 계정이 아닌 단일 서비스 계정으로 제한하는 방법은 단일 역할 부여를 참조하세요.
사용자 계정에 연속 쿼리 사용 사례에 필요한 API를 사용 설정해야 하는 경우 사용자 계정에 서비스 사용량 관리자(roles/serviceusage.serviceUsageAdmin
) 역할이 있어야 합니다.
서비스 계정 권한
BigQuery 테이블에서 데이터를 내보내려면 서비스 계정에 bigquery.tables.export
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.export
권한을 부여합니다.
- BigQuery 데이터 뷰어(
roles/bigquery.dataViewer
) - BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 권한이 있어야 합니다. 다음 각 IAM 역할은 bigquery.tables.updateData
권한을 부여합니다.
- BigQuery 데이터 편집자(
roles/bigquery.dataEditor
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - BigQuery 관리자(
roles/bigquery.admin
)
시작하기 전에
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the BigQuery API.
예약 만들기
Enterprise 또는 Enterprise Plus 버전 예약을 만든 후 CONTINUOUS
작업 유형으로 예약 할당을 만듭니다.
Pub/Sub로 내보내기
Pub/Sub로 데이터를 내보내려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 Pub/Sub로 내보내기를 참조하세요.
Pub/Sub 메시지에 맞춤 속성을 메타데이터로 삽입
Pub/Sub 속성을 사용하여 우선순위, 출처, 대상 또는 추가 메타데이터와 같은 메시지에 관한 추가 정보를 제공할 수 있습니다. 속성을 사용하여 구독에서 메시지를 필터링할 수도 있습니다.
연속 쿼리 결과 내에서 열 이름이 _ATTRIBUTES
인 경우 값이 Pub/Sub 메시지 속성에 복사됩니다.
_ATTRIBUTES
내에 제공된 필드는 속성 키로 사용됩니다.
_ATTRIBUTES
열은 ARRAY<STRUCT<STRING, STRING>>
또는 STRUCT<STRING>
형식의 JSON
유형이어야 합니다.
예시는 Pub/Sub 주제로 데이터 내보내기를 참조하세요.
Bigtable로 내보내기
Bigtable로 데이터를 내보내려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 Bigtable로 내보내기를 참조하세요.
BigQuery 테이블에 데이터 쓰기
INSERT
문을 사용하여 BigQuery 테이블에 데이터를 쓸 수 있습니다.
AI 함수 사용
연속 쿼리에서 지원되는 AI 함수를 사용하려면 추가 API, IAM 권한, Google Cloud 리소스가 필요합니다. 자세한 내용은 사용 사례에 따라 다음 주제 중 하나를 참조하세요.
ML.GENERATE_TEXT
함수를 사용하여 텍스트 생성ML.GENERATE_EMBEDDING
함수를 사용하여 텍스트 임베딩 생성ML.UNDERSTAND_TEXT
함수를 사용하여 텍스트 이해하기ML.TRANSLATE
함수를 사용하여 텍스트 번역
연속 쿼리에서 AI 함수를 사용할 때 쿼리 출력이 함수의 할당량 내에 있는지 여부를 고려하세요. 할당량을 초과하면 처리되지 않는 레코드를 별도로 처리해야 할 수 있습니다.
사용자 계정을 사용하여 연속 쿼리 실행
이 섹션에서는 사용자 계정을 사용하여 연속 쿼리를 실행하는 방법을 설명합니다. 연속 쿼리가 실행된 후 쿼리 실행을 중단하지 않고 Google Cloud 콘솔, 터미널 창 또는 애플리케이션을 닫을 수 있습니다.
연속 쿼리를 실행하려면 다음 단계를 따르세요.
콘솔
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
쿼리 편집기에서 더보기를 클릭합니다.
쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.
확인을 클릭합니다.
쿼리 편집기에서 연속 쿼리에 대한 SQL 문을 입력합니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
실행을 클릭합니다.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Cloud Shell에서
--continuous
플래그와 함께bq query
명령어를 사용하여 연속 쿼리를 실행합니다.bq query --use_legacy_sql=false --continuous=true 'QUERY'
QUERY
를 연속 쿼리의 SQL 문으로 바꿉니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
API
jobs.insert
메서드를 호출하여 연속 쿼리를 실행합니다.
전달하는 Job
리소스의 JobConfigurationQuery
에서 continuous
필드를 true
로 설정해야 합니다. .
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.QUERY
: 연속 쿼리의 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
서비스 계정을 사용하여 연속 쿼리 실행
이 섹션에서는 서비스 계정을 사용하여 연속 쿼리를 실행하는 방법을 설명합니다. 연속 쿼리가 실행된 후 쿼리 실행을 중단하지 않고 Google Cloud 콘솔, 터미널 창 또는 애플리케이션을 닫을 수 있습니다.
서비스 계정을 사용하여 연속 쿼리를 실행하려면 다음 단계를 따르세요.
콘솔
- 서비스 계정을 만듭니다.
- 서비스 계정에 필요한 권한을 부여합니다.
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
쿼리 편집기에서 더보기를 클릭합니다.
쿼리 모드 선택 섹션에서 연속 쿼리를 선택합니다.
확인을 클릭합니다.
쿼리 편집기에서 더보기 > 쿼리 설정을 클릭합니다.
연속 쿼리 섹션에서 서비스 계정 상자를 사용하여 생성한 서비스 계정을 선택합니다.
저장을 클릭합니다.
쿼리 편집기에서 연속 쿼리에 대한 SQL 문을 입력합니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
실행을 클릭합니다.
bq
- 서비스 계정을 만듭니다.
- 서비스 계정에 필요한 권한을 부여합니다.
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
명령줄에서 다음 플래그와 함께
bq query
명령어를 사용하여 연속 쿼리를 실행합니다.- 쿼리를 연속으로 만들려면
--continuous
플래그를true
로 설정합니다. --connection_property
플래그를 사용하여 사용할 서비스 계정을 지정합니다.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.SERVICE_ACCOUNT_EMAIL
: 서비스 계정 이메일입니다. Google Cloud 콘솔의 서비스 계정 페이지에서 서비스 계정 이메일을 가져올 수 있습니다.QUERY
: 연속 쿼리의 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.
- 쿼리를 연속으로 만들려면
API
- 서비스 계정을 만듭니다.
- 서비스 계정에 필요한 권한을 부여합니다.
jobs.insert
메서드를 호출하여 연속 쿼리를 실행합니다. 전달하는Job
리소스의JobConfigurationQuery
리소스에서 다음 필드를 설정합니다.- 쿼리를 연속으로 만들려면
continuous
필드를true
로 설정합니다. connection_property
필드를 사용하여 사용할 서비스 계정을 지정합니다.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.QUERY
: 연속 쿼리의 SQL 문입니다. SQL 문에는 지원되는 작업만 포함되어야 합니다.SERVICE_ACCOUNT_EMAIL
: 서비스 계정 이메일입니다. Google Cloud 콘솔의 서비스 계정 페이지에서 서비스 계정 이메일을 가져올 수 있습니다.
- 쿼리를 연속으로 만들려면
예시
다음 SQL 예시는 연속 쿼리의 일반적인 사용 사례를 보여줍니다.
Pub/Sub 주제로 데이터 내보내기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블에서 데이터를 필터링하고 메시지 속성과 함께 데이터를 실시간으로 Pub/Sub 주제에 게시하는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Bigtable 테이블로 데이터 내보내기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블의 데이터를 필터링하고 데이터를 실시간으로 Bigtable 테이블로 내보내는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
BigQuery 테이블에 데이터 쓰기
다음 예시는 스트리밍 택시 탑승 정보를 수신하는 BigQuery 테이블의 데이터를 필터링 및 변환하고 데이터를 다른 BigQuery 테이블에 실시간으로 쓰는 연속 쿼리를 보여줍니다. 이렇게 하면 추가 다운스트림 분석에 데이터를 사용할 수 있습니다.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Vertex AI 모델을 사용하여 데이터 처리
다음 예시는 Vertex AI 모델을 사용하여 현재 위도 및 경도를 기준으로 택시 탑승자를 위한 광고를 생성한 후 결과를 실시간으로 Pub/Sub 주제로 내보내는 연속 쿼리를 보여줍니다.
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
특정 시점에서 연속 쿼리 시작
연속 쿼리를 시작하면 선택한 테이블의 모든 행을 처리한 후 새 행이 들어오면 처리합니다. 기존 데이터의 일부 또는 전체 처리를 건너뛰려면 APPENDS
변경 내역 함수를 사용하여 특정 시점에서 처리를 시작할 수 있습니다.
다음 예시에서는 APPENDS
함수를 사용하여 특정 시점에서 연속 쿼리를 시작하는 방법을 보여줍니다.
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
연속 쿼리의 SQL 수정
연속 쿼리 작업이 실행되는 동안에는 연속 쿼리에 사용되는 SQL을 업데이트할 수 없습니다. 연속 쿼리 작업을 취소하고 SQL을 수정한 다음 원래의 연속 쿼리 작업이 중지된 지점에서 새로운 연속 쿼리 작업을 시작해야 합니다.
연속 쿼리에 사용되는 SQL을 수정하려면 다음 단계를 따르세요.
- 업데이트할 연속 쿼리 작업의 작업 세부정보를 확인하고 작업 ID를 기록해 둡니다.
- 가능하면 업스트림 데이터 수집을 일시중지합니다. 이렇게 할 수 없으면 연속 쿼리가 다시 시작될 때 일부 데이터 중복이 발생할 수 있습니다.
- 수정할 연속 쿼리를 취소합니다.
INFORMATION_SCHEMA
JOBS
뷰를 사용하여 원래의 연속 쿼리 작업의end_time
값을 가져옵니다.SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 ID입니다.REGION
: 프로젝트에 사용되는 리전입니다.JOB_ID
: 1단계에서 식별한 연속 쿼리 작업 ID입니다.
5단계에서 가져온
end_time
값을 시작 값으로 사용하여 특정 시점에서 연속 쿼리를 시작하도록 연속 쿼리 SQL 문을 수정합니다.필요한 변경사항을 반영하도록 연속 쿼리 SQL 문을 수정합니다.
수정된 연속 쿼리를 실행합니다.
연속 쿼리 취소
다른 작업과 마찬가지로 연속 쿼리 작업을 취소할 수 있습니다. 작업이 취소된 후 쿼리가 실행을 중지하는 데 최대 1분이 걸릴 수 있습니다.