Python UDF를 사용하는 Cloud Storage Text to BigQuery 템플릿

Python UDF를 사용하는 Cloud Storage Text to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 읽고 Python 사용자 정의 함수(UDF)를 사용하여 변환한 후 결과를 BigQuery 테이블에 추가하는 일괄 파이프라인입니다.

파이프라인 요구사항

  • BigQuery 스키마를 설명하는 JSON 파일을 만듭니다.

    최상위 JSON 배열의 이름이 BigQuery Schema이고 해당 콘텐츠는 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 패턴을 따라야 합니다.

    Cloud Storage Text to BigQuery 일괄 템플릿은 대상 BigQuery 테이블에서 STRUCT(레코드) 필드로 데이터 가져오기를 지원하지 않습니다.

    다음 JSON은 예시 BigQuery 스키마를 설명합니다.

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • 텍스트 줄을 변환하는 논리를 제공하는 UDF 함수를 사용하여 Python(.py) 파일을 만듭니다. 함수는 JSON 문자열을 반환해야 합니다.

    예를 들어 이 함수는 CSV 파일의 각 줄을 분할하고, 값을 변환한 후에 JSON 문자열을 반환합니다.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

템플릿 매개변수

매개변수 설명
JSONPath Cloud Storage에 저장된 BigQuery 스키마를 정의하는 JSON 파일의 gs:// 경로입니다. 예를 들면 gs://path/to/my/schema.json입니다.
pythonExternalTextTransformGcsPath 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
pythonExternalTextTransformFunctionName 사용할 Python 사용자 정의 함수(UDF)의 이름입니다.
inputFilePattern Cloud Storage에서 처리하려는 텍스트의 gs:// 경로입니다. 예를 들면 gs://path/to/my/text/data.txt입니다.
outputTable 처리된 데이터를 저장하기 위해 만들 BigQuery 테이블 이름입니다. 기존 BigQuery 테이블을 다시 사용하면 데이터가 대상 테이블에 추가됩니다. 예를 들면 my-project-name:my-dataset.my-table입니다.
bigQueryLoadingTemporaryDirectory BigQuery 로드 프로세스를 위한 임시 디렉터리입니다. 예를 들면 gs://my-bucket/my-files/temp_dir입니다.
useStorageWriteApi (선택사항): true이면 파이프라인에서 BigQuery Storage Write API를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API 사용을 참조하세요.
useStorageWriteApiAtLeastOnce (선택사항): Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스를 사용하려면 이 매개변수를 true으로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: Cloud Storage 입력 파일의 텍스트 줄입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID입니다.
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • PYTHON_FUNCTION: 사용할 Python 사용자 정의 함수(UDF)의 이름
  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_PYTHON_UDF_FILE: 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID입니다.
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • PYTHON_FUNCTION: 사용할 Python 사용자 정의 함수(UDF)의 이름
  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_PYTHON_UDF_FILE: 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

다음 단계