Pub/Sub Proto to BigQuery with Python UDF 템플릿

Pub/Sub proto to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 proto 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

Python 사용자 정의 함수(UDF)를 제공하여 데이터를 변환할 수 있습니다. UDF 실행 중 오류는 개별 Pub/Sub 주제 또는 BigQuery 오류와 동일한 미처리 주제로 전송될 수 있습니다.

파이프라인 요구사항

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Proto 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 출력 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.
  • BigQuery 테이블이 있으면 createDisposition 값에 관계없이 Proto 데이터와 일치하는 스키마가 있어야 합니다.

템플릿 매개변수

매개변수 설명
protoSchemaPath 자체 포함된 Proto 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/file.pb입니다. 이 파일은 protoc 명령어의 --descriptor_set_out 플래그를 사용하여 생성할 수 있습니다. --include_imports 플래그는 파일을 독립 실행형 파일로 만듭니다.
fullMessageName 전체 Proto 메시지 이름입니다. 예를 들면 package.name.MessageName입니다. 여기서 package.namejava_package 문이 아닌 package 문에 대해 제공된 값입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독입니다. 예를 들면 projects/<project>/subscriptions/<subscription>입니다.
outputTopic 처리되지 않은 레코드에 사용할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
outputTableSpec BigQuery 출력 테이블 위치입니다. 예를 들면 my-project:my_dataset.my_table입니다. 지정된 createDisposition에 따라 입력 스키마 파일을 사용해서 출력 테이블을 자동으로 만들 수 있습니다.
preserveProtoFieldNames (선택사항) JSON에서 원본 Proto 필드를 보존하기 위한 true입니다. 더 많은 표준 JSON 이름을 사용하기 위한 false입니다. 예를 들어 falsefield_namefieldName으로 바꿉니다. (기본값: false)
bigQueryTableSchemaPath (선택사항) BigQuery 스키마 경로에 대한 Cloud Storage 경로입니다. 예를 들면 gs://path/to/my/schema.json입니다. 제공되지 않는 경우 스키마가 Proto 스키마에서 유추됩니다.
pythonExternalTextTransformGcsPath (선택사항) 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
pythonExternalTextTransformFunctionName (선택사항) 사용할 Python 사용자 정의 함수(UDF)의 이름입니다.
udfOutputTopic (선택사항) UDF 오류를 저장하는 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다. 제공되지 않는 경우 UDF 오류가 outputTopic과 동일한 주제로 전송됩니다.
writeDisposition (선택사항) BigQuery WriteDisposition입니다. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값: WRITE_APPEND
createDisposition (선택사항) BigQuery CreateDisposition입니다. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값: CREATE_IF_NEEDED
useStorageWriteApi (선택사항): true이면 파이프라인에서 BigQuery Storage Write API를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API 사용을 참조하세요.
useStorageWriteApiAtLeastOnce (선택사항): Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스를 사용하려면 이 매개변수를 true으로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
numStorageWriteApiStreams (선택사항): Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.
storageWriteApiTriggeringFrequencySec (선택사항): Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 Pub/Sub 메시지 데이터 필드입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Proto to BigQuery with Python UDF template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    다음을 바꿉니다.

    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
    • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
    • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
    • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
    • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
    • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
    • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
    • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

    다음 단계