Python UDF를 사용하는 Cloud Storage Text to BigQuery(스트리밍) 템플릿

Cloud Storage Text to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 스트리밍하고, 사용자가 제공한 Python 사용자 정의 함수(UDF)를 사용하여 변환하고, 결과를 BigQuery에 추가하는 스트리밍 파이프라인입니다.

파이프라인은 무한히 실행되며 드레이닝을 지원하지 않는 분할 가능한 DoFnWatch 변환 사용으로 인해 드레이닝이 아닌 취소를 통해 수동으로 종료되어야 합니다.

파이프라인 요구사항

  • BigQuery에서 출력 테이블의 스키마를 설명하는 JSON 파일을 만드세요.

    최상위 JSON 배열의 이름이 fields이고 해당 콘텐츠는 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 패턴을 따라야 합니다. 예를 들면 다음과 같습니다.

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
    
  • 텍스트 줄을 변환하는 논리를 제공하는 UDF 함수를 사용하여 Python(.py) 파일을 만듭니다. 함수는 JSON 문자열을 반환해야 합니다.

    다음 예시에서는 CSV 파일의 각 줄을 분할하고, 값이 있는 JSON 객체를 만들고, JSON 문자열을 반환합니다.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)
    

템플릿 매개변수

매개변수 설명
pythonExternalTextTransformGcsPath 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
pythonExternalTextTransformFunctionName 사용할 Python 사용자 정의 함수(UDF)의 이름입니다.
JSONPath JSON으로 설명된 BigQuery 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.json입니다.
outputTable 정규화된 BigQuery 테이블입니다. 예를 들면 my-project:dataset.table입니다.
inputFilePattern 처리하려는 텍스트의 Cloud Storage 위치입니다. 예를 들면 gs://my-bucket/my-files/text.txt입니다.
bigQueryLoadingTemporaryDirectory BigQuery 로딩 프로세스를 위한 임시 디렉터리입니다. 예를 들면 gs://my-bucket/my-files/temp_dir입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지 테이블입니다. 예를 들면 my-project:dataset.my-unprocessed-table입니다. 없으면 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

사용자 정의 함수

이 템플릿에는 파이프라인 요구사항에 설명된 대로 입력 파일을 파싱하는 UDF가 필요합니다. 템플릿은 각 입력 파일의 모든 텍스트 줄에 대해 UDF를 호출합니다. UDF 만들기에 대한 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: 입력 파일의 텍스트 한 줄입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Storage Text to BigQuery (Stream) with Python UDF template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • PYTHON_FUNCTION: 사용할 Python 사용자 정의 함수(UDF)의 이름
  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_PYTHON_UDF_FILE: 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • PYTHON_FUNCTION: 사용할 Python 사용자 정의 함수(UDF)의 이름
  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_PYTHON_UDF_FILE: 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

다음 단계