BigQuery에 데이터 스트리밍

BigQuery에 데이터를 로드하는 작업을 사용하지 않고 tabledata.insertAll 메서드를 사용하여 한 번에 레코드 한 개씩 데이터를 BigQuery로 스트리밍할 수 있습니다. 이 접근 방식을 사용하면 로드 작업 실행으로 인한 지연 없이 데이터를 쿼리할 수 있습니다.

이 문서에서는 스트리밍 할당량, 데이터 가용성, 데이터 일관성을 포함하여 접근 방법을 선택하기 전에 고려해야 할 여러 가지 중요한 장단점을 설명합니다.

시작하기 전에

  1. 대상 테이블이 포함된 데이터세트에 대한 쓰기 액세스 권한이 있는지 확인합니다. 템플릿 테이블을 사용하는 경우를 제외하고 데이터를 쓰기 전에 테이블이 존재해야 합니다. 템플릿 테이블에 대한 자세한 내용은 템플릿 테이블을 사용하여 자동으로 테이블 만들기를 참조하세요.

  2. 스트리밍 데이터의 할당량 정책을 확인합니다.

  3. Google Cloud Platform 프로젝트에 결제가 사용 설정되어 있는지 확인하세요.

    결제 사용 설정 방법 알아보기

    무료 등급으로는 스트리밍을 사용할 수 없습니다. 결제를 사용 설정하지 않고 스트리밍을 사용하려고 하면 BigQuery: Streaming insert is not allowed in the free tier. 오류가 발생합니다.

데이터 가용성 확인

스트리밍되는 데이터는 테이블에 첫 번째 스트리밍 삽입이 실행된 이후 몇 초 이내에 실시간 분석에서 사용할 수 있습니다. 드문 경우(예를 들어 장애 발생) 스트리밍 버퍼의 데이터를 일시적으로 사용할 수 없는 경우가 발생할 수 있습니다. 데이터를 사용할 수 없는 경우 쿼리는 계속 성공적으로 실행되지만 아직 스트리밍 버퍼에 있는 일부 데이터를 건너뜁니다. 이러한 쿼리에서는 bigquery.jobs.getQueryResultserrors 필드, bigquery.jobs.query에 대한 응답 또는 bigquery.jobs.getstatus.errors 필드에 경고가 포함됩니다.

복사 및 내보내기 작업에 데이터를 사용할 수 있게 될 때까지 최대 90분이 소요될 수 있습니다. 또한 파티션을 나눈 테이블에 스트리밍하는 경우 스트리밍 버퍼의 데이터는 _PARTITIONTIME 유사 열의 NULL 값을 가집니다. 데이터를 복사 및 내보내기에 사용할 수 있는지 여부를 보려면 streamingBuffer 섹션의 tables.get 응답을 확인하세요. 이 섹션이 없는 경우 데이터를 복사 또는 내보내기에 사용할 수 있으며 _PARTITIONTIME 유사 열에 Null이 아닌 값이 있습니다. 또한 streamingBuffer.oldestEntryTime 필드를 활용하여 스트리밍 버퍼 레코드의 존재 기간을 확인할 수 있습니다.

데이터 일관성 확인

데이터 일관성을 확보하기 위해 삽입된 각 행에 insertId를 제공할 수 있습니다. BigQuery는 최소 1분 동안 이 ID를 기억합니다. 이 시간 내에 동일한 행 조합을 스트리밍하려는 경우 insertId 속성이 설정되어 있으면 BigQuery는 insertId 속성을 사용하여 최선의 노력으로 데이터 중복을 삭제합니다.

예를 들어 시스템과 BigQuery 사이의 네트워크 오류 또는 BigQuery 내부 오류와 같은 특정 오류 상태에서는 스트리밍 삽입의 상태를 확인할 방법이 없으므로 재시도해야 할 수 있습니다. 삽입을 재시도하는 경우 BigQuery가 데이터 중복 삭제를 시도할 수 있도록 동일한 행 조합에 동일한 insertId를 사용하세요. 자세한 내용은 스트리밍 삽입 문제 해결을 참조하세요.

Google 데이터 센터에서 예기치 못하게 연결이 끊기는 드문 경우 자동 중복 삭제가 불가능할 수 있습니다.

데이터 요구사항이 엄격한 경우 트랜잭션을 지원하는 대체 서비스로 Google Cloud Datastore를 사용할 수 있습니다.

최선형 중복 삭제 사용 중지

삽입된 각 행에 대해 insertId 필드를 입력하지 않음으로써 최선형 중복 삭제를 사용 중지할 수 있습니다. insertId를 입력하지 않으면 US 리전에 대해 훨씬 더 많은 스트리밍 수집 할당량을 얻을 수 있습니다. 자세한 내용은 할당량 및 제한사항 페이지를 참조하세요.

여러 데이터 위치에 걸친 데이터 스트리밍

미국과 EU의 데이터세트에 데이터를 스트리밍할 수 있습니다. BigQuery에서 insertAll 요청을 처리하는 동안 데이터는 데이터세트 위치 외부의 머신을 통과하여 흐를 수 있습니다. 데이터세트 위치 외부의 위치에서 데이터를 스트리밍하는 경우 지연 및 오류율이 높아질 수 있습니다.

내부 데이터화-시간 파티션으로 나눈 테이블로 스트리밍

insertAll 요청을 사용해 파티션을 나눈 테이블로 개별 행을 스트리밍할 수 있습니다. 기본적으로 삽입되는 데이터의 대상 파티션은 UTC 시간을 기반으로 한 현재 날짜에서 추론됩니다.

데이터를 내부 데이터화-시간 파티션으로 나눈 테이블로 스트리밍하는 경우 파티션 데코레이터를 insertAll 요청에 포함시켜 추가하면 날짜 추론을 재정의할 수 있습니다. 예를 들어 mydataset.table 테이블에서 파티션 데코레이터를 사용하면 2017-03-01에 해당하는 파티션에 스트리밍할 수 있습니다.

mydataset.table$20170301

새로 도착하는 데이터는 스트리밍 버퍼에 있는 동안 임시로 UNPARTITIONED 파티션과 연결됩니다. 따라서 쿼리는 유사 열(선호하는 데이터 유형에 따라 [_PARTITIONTIME] 또는 [_PARTITIONDATE]) 중 하나를 사용해 NULL 값을 UNPARTITIONED 파티션에서 필터링함으로써 쿼리에서 스트리밍 버퍼의 데이터를 제외할 수 있습니다.

파티션 데코레이터를 사용하여 스트리밍하는 경우 현재 UTC 시간을 기준으로 지난 31일 및 향후 16일 이내의 파티션으로 스트리밍할 수 있습니다. 이러한 허용 범위 이외의 날짜에 파티션에 기록하려면 파티션을 나눈 테이블 데이터에 추가 및 덮어쓰기에 설명된 대로 로드 또는 쿼리 작업을 사용할 수 있습니다.

파티션을 나눈 테이블로 스트리밍

지난 1년부터 향후 6개월 이내의 DATE 또는 TIMESTAMP 열을 기준으로 파티션을 나눈 테이블로 데이터를 스트리밍할 수 있습니다. 이 범위를 벗어나는 데이터는 거부됩니다.

데이터가 스트리밍되면 지난 7일에서 향후 3일 사이의 데이터는 스트리밍 버퍼에 배치된 다음 해당 파티션으로 추출됩니다. 이 기간(1년 6개월 이내의 범위)을 벗어나는 데이터는 스트리밍 버퍼에 저장된 후 UNPARTITIONED 파티션으로 추출됩니다. 파티션으로 나누어지지 않은 데이터가 충분하면 해당 파티션으로 로드됩니다.

템플릿 테이블을 사용하여 자동으로 테이블 만들기

BigQuery로 데이터를 스트리밍하는 경우의 일반적인 사용 패턴은 논리적 테이블을 다수의 더 작은 테이블로 분할하여 더 작은 데이터 조합을 만드는 것입니다(예를 들어 사용자 ID별). 날짜별로 더 작은 데이터 조합을 만들려면 파티션을 나눈 테이블을 사용하세요. 날짜 기반이 아닌 작은 테이블을 만들려는 경우 템플릿 테이블을 사용하면 BigQuery에서 테이블이 생성됩니다.

BigQuery API를 통해 템플릿 테이블을 사용하려면 insertAll 요청에 templateSuffix 매개변수를 추가합니다. bq 명령줄 도구의 경우 insert 명령어에 template_suffix 플래그를 추가합니다. BigQuery는 templateSuffix 매개변수 또는 template_suffix 플래그를 감지하면 타겟 테이블을 기본 템플릿으로 취급하며 타겟 테이블과 동일한 스키마를 공유하며 지정된 서픽스를 포함하는 이름을 가진 새 테이블을 만듭니다.

<targeted_table_name> + <templateSuffix>

템플릿 테이블을 사용하면 각 테이블을 개별적으로 만들고 각 테이블의 스키마를 지정하는 오버헤드가 방지됩니다. 하나의 템플릿만 만들고 다양한 서픽스를 제공하여 BigQuery에서 새 테이블을 만들 수 있도록 하면 됩니다. BigQuery는 동일한 프로젝트 및 데이터세트에 테이블을 배치합니다. 또한 템플릿을 사용하면 템플릿 테이블만 업데이트하면 되므로 스키마 업데이트도 더 쉬워집니다.

템플릿 테이블을 통해 만들어진 테이블은 일반적으로 몇 초 내에 사용할 수 있습니다. 드물지만 사용할 수 있게 되기까지 시간이 더 걸리는 경우도 있습니다.

템플릿 테이블 스키마 변경

템플릿 테이블 스키마를 변경하면 이후에 생성되는 모든 테이블은 업데이트된 스키마를 사용합니다. 기존 테이블에 스트리밍 버퍼가 여전히 있는 경우를 제외하고 이전에 생성된 테이블은 영향을 받지 않습니다.

아직 스트리밍 버퍼가 있는 기존 테이블의 경우 역호환 가능한 방식으로 템플릿 테이블 스키마를 수정하면 이러한 활성 스트리밍 생성 테이블의 스키마도 업데이트됩니다. 그러나 역호환되지 않는 방식으로 템플릿 테이블 스키마를 수정하는 경우 기존 스키마를 사용하는 버퍼링된 데이터는 손실됩니다. 또한 현재 호환되지 않는 이전 스키마를 사용하는 기존에 생성된 테이블로 새 데이터를 스트리밍할 수 없습니다.

템플릿 테이블 스키마를 변경한 후 쿼리 생성 테이블에 새 데이터를 삽입하기 전에 변경 사항이 전파될 때까지 기다리세요. 새 필드 삽입 요청은 몇 분 이내에 성공적으로 수행됩니다. 새 필드를 쿼리하려면 최대 90분까지 기다려야 할 수 있습니다.

생성된 테이블의 스키마를 변경하려는 경우, 템플릿 테이블을 통한 스트리밍이 중단되고 생성된 테이블의 스트리밍 통계 섹션이 tables.get() 응답에 없는 경우(테이블에 버퍼링된 데이터가 없음을 의미) 외에는 스키마를 변경하지 마세요.

템플릿 테이블 세부정보

템플릿 서픽스 값
templateSuffix(또는 --template_suffix) 값은 문자(a-z, A-Z), 숫자(0-9), 밑줄(_)만 포함해야 합니다. 테이블 이름과 테이블 서픽스를 결합한 최대 길이는 1024자입니다.
할당량
템플릿 기반이든 수동으로 만들었든 모든 테이블에는 동일한 할당량이 적용됩니다.
라이브까지의 시간
생성된 테이블은 데이터세트에서 만료 시간을 상속합니다. 일반적인 스트리밍 데이터와 마찬가지로 생성된 테이블을 즉시 복사하거나 내보낼 수는 없습니다.
중복 삭제
중복 삭제는 대상 테이블에 대한 동일한 참조 간에만 발생합니다. 예를 들어 템플릿 테이블과 정규 insertAll 명령어를 모두 사용하여 생성된 테이블로 동시에 스트리밍하는 경우 템플릿 테이블과 정규 insertAll 명령어로 삽입된 행 간에는 중복 삭제가 발생하지 않습니다.
템플릿 테이블과 생성된 테이블은 뷰일 수 없습니다.

사용 사례

대량 이벤트 로깅

실시간으로 많은 양의 데이터를 수집하는 앱이 있는 경우 스트리밍 삽입이 좋은 선택일 수 있습니다. 일반적으로 이러한 유형의 앱에는 다음과 같은 기준이 있습니다.

  • 트랜잭션이 아님. 대량, 지속적으로 추가되는 행. 앱은 드물지만 중복 삭제가 발생하거나 데이터를 일시적으로 사용할 수 없는 경우를 용인할 수 있습니다.
  • 집계 분석. 쿼리는 단일 또는 협소한 레코드 선택과 대조적으로 대개 트렌드 분석을 위해 수행됩니다.

대량 이벤트 로깅의 한 예는 이벤트 추적입니다. 이벤트를 추적하는 모바일 앱이 있다고 가정해 보겠습니다. 앱 또는 모바일 서버는 사용자 상호작용 또는 시스템 오류를 독립적으로 기록하고 이를 BigQuery로 스트리밍할 수 있습니다. 이 데이터를 분석하여 상호작용이 활발한 영역 또는 문제 영역과 같은 전체적인 트렌드를 파악하고 오류 상태를 실시간으로 모니터링할 수 있습니다.

수동으로 중복 제거

다음 수동 프로세스를 사용하여 스트리밍이 완료된 후 중복 행이 존재하지 않도록 할 수 있습니다.

  1. insertId를 테이블 스키마에 열로 추가하고 insertId 값을 각 행의 데이터에 포함합니다.
  2. 스트리밍이 중지된 후 다음 쿼리를 수행하여 중복을 확인합니다.

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    결과가 1보다 크면 중복이 존재하는 것입니다.
  3. 중복을 제거하려면 다음 쿼리를 수행합니다. 대상 테이블을 지정하고 대량 결과를 허용하고 결과 평면화를 사용 중지해야 합니다.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

중복 제거 쿼리 참고 사항:

  • 중복 제거 쿼리를 위한 더 안전한 전략은 새 테이블을 타겟팅하는 것입니다. 또는 쓰기 처리 WRITE_TRUNCATE로 소스 테이블을 타겟팅할 수 있습니다.
  • 중복 제거 쿼리는 값이 1row_number 열을 테이블 스키마 끝에 추가합니다. 쿼리는 표준 SQLSELECT * EXCEPT 문을 사용하여 대상 테이블에서 row_number 열을 제외합니다. #standardSQL 프리픽스는 이 쿼리에 표준 SQL을 사용 설정합니다. 또는 특정 열 이름별로 선택하여 이 열을 생략할 수 있습니다.
  • 중복이 제거된 라이브 데이터를 쿼리하는 경우 중복 제거 쿼리를 사용하여 테이블에서 뷰를 만들 수도 있습니다. 뷰에 대한 쿼리 비용은 뷰에 선택된 열을 기준으로 계산되며 이로 인해 검색된 바이트 크기가 커질 수 있음을 유의하세요.

스트리밍 삽입 문제해결

스트리밍 삽입 중 발생한 오류의 문제해결 방법은 오류 문제해결 페이지의 스트리밍 삽입 문제해결을 참조하세요.

맨 위로

스트리밍 삽입 예

C#

이 샘플을 시도해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 BigQuery C# API 참조 문서를 확인하세요.


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

이 샘플을 시도하기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 BigQuery Go API 참조 문서를 확인하세요.

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Inserter()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

자바

이 샘플을 시도하기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 BigQuery 자바 API 참조 문서를 확인하세요.

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow(rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder.
            // You can also supply optional unique row keys to support de-duplication scenarios.
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

이 샘플을 시도해 보기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 BigQuery Node.js API 참조 문서를 참조하세요.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

이 샘플을 시도하기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 BigQuery PHP API 참조 문서를 참조하세요.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

이 샘플을 시도하기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 BigQuery Python API 참조 문서를 확인하세요.

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

이 샘플을 시도하기 전에 BigQuery 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 BigQuery Ruby API 참조 문서를 참조하세요.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

맨 위로

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.