Cloud Storage Text to BigQuery(스트리밍) 템플릿

Cloud Storage Text to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 스트리밍하고, 사용자가 제공한 JavaScript 사용자 정의 함수(UDF)를 사용하여 변환하고, 결과를 BigQuery에 추가하는 스트리밍 파이프라인입니다.

파이프라인은 무한히 실행되며 드레이닝을 지원하지 않는 분할 가능한 DoFnWatch 변환 사용으로 인해 드레이닝이 아닌 취소를 통해 수동으로 종료되어야 합니다.

파이프라인 요구사항

  • BigQuery에서 출력 테이블의 스키마를 설명하는 JSON 파일을 만드세요.

    최상위 JSON 배열의 이름이 fields이고 해당 콘텐츠는 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 패턴을 따라야 합니다. 예를 들면 다음과 같습니다.

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • 텍스트 줄을 변환하는 논리를 제공하는 UDF 함수를 사용하여 자바스크립트(.js) 파일을 만듭니다. 함수는 JSON 문자열을 반환해야 합니다.

    다음 예시에서는 CSV 파일의 각 줄을 분할하고, 값이 있는 JSON 객체를 만들고, JSON 문자열을 반환합니다.

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

템플릿 매개변수

필수 매개변수

  • inputFilePattern: 처리하려는 Cloud Storage의 텍스트의 gs:// 경로입니다. 예를 들면 gs://your-bucket/your-file.txt입니다.
  • JSONPath: Cloud Storage에 저장된 BigQuery 스키마를 정의하는 JSON 파일의 gs:// 경로입니다. 예를 들면 gs://your-bucket/your-schema.json입니다.
  • outputTable: 처리된 데이터를 저장하는 데 사용할 BigQuery 테이블의 위치입니다. 기존 테이블을 재사용하면 테이블이 덮어쓰기됩니다. 예를 들면 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>입니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://your-bucket/your-transforms/*.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참고하세요. 예를 들어 transform_udf1를 참고하세요.
  • bigQueryLoadingTemporaryDirectory: BigQuery 로드 프로세스를 위한 임시 디렉터리입니다. 예를 들면 gs://your-bucket/your-files/temp-dir입니다.

선택적 매개변수

  • outputDeadletterTable: 출력 테이블에 도달하지 못한 메시지 테이블입니다. 테이블이 없으면 파이프라인 실행 중에 생성됩니다. 지정하지 않으면 <outputTableSpec>_error_records이 사용됩니다. 예를 들면 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>입니다.
  • useStorageWriteApiAtLeastOnce: 이 매개변수는 Use BigQuery Storage Write API가 사용 설정된 경우에만 적용됩니다. 사용 설정하면 Storage Write API에 최소 1회의 시맨틱스가 사용되고 그렇지 않은 경우 정확히 한 번의 시맨틱스가 사용됩니다. 기본값은 false입니다.
  • useStorageWriteApi: true인 경우 파이프라인은 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • numStorageWriteApiStreams: Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.
  • pythonExternalTextTransformGcsPath: 사용자 정의 함수가 포함된 Python 코드의 Cloud Storage 경로 패턴입니다. 예를 들면 gs://your-bucket/your-function.py입니다.
  • javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이 0이면 UDF 새로고침이 사용 중지됩니다. 기본값은 0입니다.

사용자 정의 함수

이 템플릿에는 파이프라인 요구사항에 설명된 대로 입력 파일을 파싱하는 UDF가 필요합니다. 템플릿은 각 입력 파일의 모든 텍스트 줄에 대해 UDF를 호출합니다. UDF 만들기에 대한 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: 입력 파일의 텍스트 한 줄입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Storage Text to BigQuery (Stream) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시를 참조하세요.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시를 참조하세요.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

다음 단계