Datastream 및 Dataflow를 사용하여 BigQuery로 데이터 스트리밍

이 페이지에는 Datastream 및 Dataflow를 사용하여 BigQuery로 데이터를 스트리밍하기 위한 권장사항이 나와 있습니다.

사용자 정의 키의 복제본 데이터 세트 파티션 나누기

BigQuery의 스테이징 데이터 세트는 자동으로 파티셔닝됩니다. 하지만 복제본 테이블의 파티션 키는 Datastream 및 Dataflow에서 적용되는 대신 특정 비즈니스 로직에 따라 정의되어야 하므로 복제본 데이터 세트는 기본적으로 파티셔닝되지 않습니다.

복제본 데이터 세트에서 파티셔닝이 필요한 각 테이블에 대해 다음을 수행합니다.

  1. Dataflow 작업을 중지하고 드레이닝합니다.

  2. BigQuery의 SQL 편집기를 사용하여 복제본 데이터 세트의 각 테이블에 대해 다음 SQL 스크립트를 실행합니다. 이 예시에서는 datastream_cdc 데이터 세트의 actor 테이블에 파티션 키로 설정할 last_update 열이 있습니다. 스크립트를 실행하여 올바른 파티션 키로 테이블을 다시 만듭니다.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
    
    
  3. Datastream to BigQuery 템플릿을 사용하여 Dataflow 작업을 다시 만듭니다.

사용자 정의 함수를 실행하여 이벤트 데이터 조작

Datastream to BigQuery 템플릿을 사용하여 자바스크립트 사용자 정의 함수를 실행할 수 있습니다. 이렇게 하려면 먼저 함수가 포함된 파일을 Cloud Storage의 특정 위치에 배치합니다. 그런 후 다음 작업을 수행합니다.

  • 템플릿에서 javascriptTextTransformGcsPath 매개변수를 사용하여 Cloud Storage에서 사용자 정의 함수가 포함된 파일의 위치를 지정합니다.
  • javascriptTextTransformFunctionName 매개변수를 사용하여 사용자 정의 함수로 호출할 자바스크립트 함수의 이름을 지정합니다.

예를 들어 삭제된 레코드를 BigQuery 내의 복제본 데이터 세트 테이블에 보관하는 사용자 정의 함수를 실행할 수 있습니다. 이 프로세스를 소프트 삭제라고 합니다.

이를 위해 _metadata_deleted 열의 값을 is_deleted라는 새 열에 복사한 후 _metadata_deleted 열 값을 false로 재설정하는 함수를 만듭니다. 이렇게 하면 BigQuery에서 복제본 데이터 세트를 업데이트할 때 Dataflow 작업에서 삭제 이벤트를 무시하고 삭제된 레코드를 보관합니다.

다음은 이 사용자 정의 함수의 샘플 코드입니다.

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

병합 빈도 설정

Datastream to BigQuery 템플릿mergeFrequencyMinutes 매개변수를 사용하여 병합 빈도를 설정합니다. 이 값은 BigQuery의 복제본 데이터 세트에서 지정된 테이블의 병합 간격(분)입니다. 이전 데이터가 백필되는 동안에는 병합 빈도를 낮게 유지하여(12시간 또는 24시간) 비용을 통제하는 것이 좋습니다.

예를 들어 이 매개변수의 값을 10분으로 설정하면 Dataflow가 템플릿을 사용하는 작업을 10분마다 실행합니다. 하지만 작업이 처음 실행될 때는 지연 시간 5분이 적용됩니다. 이 예시에서는 작업이 오전 9시 14분에 실행되는 경우 첫 번째 병합이 오전 9시 29분(병합 10분, 지연 시간 5분)에 발생합니다. 두 번째 병합은 오전 9시 39분에 발생하고, 모든 후속 병합은 10분 간격으로 발생합니다(오전 9시 49분, 오전 9시 59분, 오전 10시 9분 등).

병합 빈도를 60분으로 설정하면 작업 최초 실행에 대한 지연 시간 5분이 적용된 후 1시간마다 작업이 실행됩니다. 작업이 오전 10시에 실행되도록 예약하면 지연 시간 5분으로 인해 실제로는 오전 10시 5분에 실행됩니다. 모든 후속 병합은 60분 간격으로 발생합니다(오전 11시 5분, 오후 12시 5분, 오후 1시 5분 등).

비용 통제 등의 이유로 인해 비즈니스 요구사항에 맞는 빈도로 병합을 수행하지 못할 수 있습니다. 따라서 최신 데이터를 이용하지 못할 수도 있습니다. 최신 데이터에 액세스하려는 경우 BigQuery의 스테이징 및 복제본 데이터 세트 테이블 위에 뷰를 만들면 뷰가 병합을 모방합니다. 이 뷰는 스테이징 및 복제본 데이터 세트 모두에 대해 하나의 논리 테이블로 생성됩니다. 병합 빈도가 낮은 상황에서 데이터에 더 빠르게 액세스하려는 경우 뷰를 사용하세요.