Pub/Sub to BigQuery 템플릿

Pub/Sub to BigQuery 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 선택적으로 수신 메시지를 처리하기 위해 JavaScript로 작성된 사용자 정의 함수 (UDF)를 제공할 수 있습니다.

파이프라인 요구사항

  • BigQuery 테이블이 있어야 하며 스키마가 포함되어야 합니다.
  • Pub/Sub 메시지 데이터는 JSON 형식을 사용해야 합니다. 또는 메시지 데이터를 JSON으로 변환하는 UDF를 제공해야 합니다. JSON 데이터는 BigQuery 테이블 스키마와 일치해야 합니다. 예를 들어 JSON 페이로드가 {"k1":"v1", "k2":"v2"} 형식인 경우 BigQuery 테이블에는 k1k2라는 두 개의 문자열 열이 있어야 합니다.
  • inputSubscription 또는 inputTopic 매개변수 중 하나만 지정합니다.

템플릿 매개변수

필수 매개변수

  • outputTableSpec: 작성할 BigQuery 테이블이며 PROJECT_ID:DATASET_NAME.TABLE_NAME 형식입니다.

선택적 매개변수

  • inputTopic: 읽어올 Pub/Sub 주제이며 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 형식입니다.
  • inputSubscription: 읽어올 Pub/Sub 구독이며 projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME> 형식입니다.
  • outputDeadletterTable: 출력 테이블에 도달하지 못한 메시지에 사용할 BigQuery 테이블이며 PROJECT_ID:DATASET_NAME.TABLE_NAME 형식입니다. 테이블이 없으면 파이프라인이 실행될 때 생성됩니다. 이 매개변수를 지정하지 않으면 OUTPUT_TABLE_SPEC_error_records 값이 대신 사용됩니다.
  • useStorageWriteApiAtLeastOnce: Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)를 사용하려면 이 매개변수를 true로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
  • useStorageWriteApi: true인 경우 파이프라인은 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • numStorageWriteApiStreams: Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.
  • javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이 0이면 UDF 새로고침이 사용 중지됩니다. 기본값은 0입니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 Pub/Sub 메시지 데이터 필드입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to BigQuery template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. 선택사항: 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
    8. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    다음을 바꿉니다.

    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • STAGING_LOCATION: 로컬 파일의 스테이징 위치입니다(예: gs://your-bucket/staging).
    • TOPIC_NAME: Pub/Sub 주제 이름
    • DATASET: BigQuery 데이터 세트
    • TABLE_NAME: BigQuery 테이블 이름

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • STAGING_LOCATION: 로컬 파일의 스테이징 위치입니다(예: gs://your-bucket/staging).
    • TOPIC_NAME: Pub/Sub 주제 이름
    • DATASET: BigQuery 데이터 세트
    • TABLE_NAME: BigQuery 테이블 이름

    다음 단계