Pub/Sub to BigQuery 템플릿

Pub/Sub to BigQuery 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 선택적으로 수신 메시지를 처리하기 위해 JavaScript로 작성된 사용자 정의 함수(UDF)를 제공할 수 있습니다.

이 시나리오에 Dataflow 파이프라인을 실행하기 전에 UDF가 있는 Pub/Sub BigQuery 구독이 요구사항을 충족하는지 고려하세요.

파이프라인 요구사항

  • BigQuery 테이블이 있어야 하며 스키마가 포함되어야 합니다.
  • Pub/Sub 메시지 데이터는 JSON 형식을 사용해야 합니다. 또는 메시지 데이터를 JSON으로 변환하는 UDF를 제공해야 합니다. JSON 데이터는 BigQuery 테이블 스키마와 일치해야 합니다. 예를 들어 JSON 페이로드가 {"k1":"v1", "k2":"v2"} 형식인 경우 BigQuery 테이블에는 k1k2라는 두 개의 문자열 열이 있어야 합니다.
  • inputSubscription 또는 inputTopic 매개변수 중 하나만 지정합니다.

템플릿 매개변수

필수 매개변수

  • outputTableSpec : 작성할 BigQuery 테이블이며 PROJECT_ID:DATASET_NAME.TABLE_NAME 형식입니다.

선택적 매개변수

  • inputTopic: 읽어올 Pub/Sub 주제이며 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 형식입니다.
  • inputSubscription: 읽어올 Pub/Sub 구독이며 projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME> 형식입니다.
  • outputDeadletterTable: 출력 테이블에 도달할 수 없는 메시지에 사용할 BigQuery 테이블로, PROJECT_ID:DATASET_NAME.TABLE_NAME 형식입니다. 테이블이 없으면 파이프라인이 실행될 때 생성됩니다. 이 매개변수를 지정하지 않으면 OUTPUT_TABLE_SPEC_error_records 값이 대신 사용됩니다.
  • useStorageWriteApiAtLeastOnce: Storage Write API를 사용할 경우 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)를 사용하려면 이 파라미터를 true로 설정합니다. 정확히 1회만 실행되는 시맨틱스를 사용하려면 파라미터를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
  • useStorageWriteApi: true이면 파이프라인에서 BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • numStorageWriteApiStreams: Storage Write API를 사용할 경우 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 경우 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수(UDF) 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.
  • javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이 0이면 UDF 새로고침이 사용 중지됩니다. 기본값은 0입니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: JSON 문자열로 직렬화된 Pub/Sub 메시지 데이터 필드입니다.
  • 출력: BigQuery 대상 테이블의 스키마와 일치하는 JSON 문자열입니다.
  • 템플릿 실행

    콘솔

    1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
    2. 템플릿에서 작업 만들기로 이동
    3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
    4. (선택사항) 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

      Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

    5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to BigQuery template을 선택합니다.
    6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
    7. (선택사항) 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
    8. 작업 실행을 클릭합니다.

    gcloud

    셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    다음을 바꿉니다.

    • JOB_NAME: 선택한 고유한 작업 이름
    • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • STAGING_LOCATION: 로컬 파일의 스테이징 위치입니다(예: gs://your-bucket/staging).
    • TOPIC_NAME: Pub/Sub 주제 이름
    • DATASET: BigQuery 데이터 세트
    • TABLE_NAME: BigQuery 테이블 이름

    API

    REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • JOB_NAME: 선택한 고유한 작업 이름
    • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
    • VERSION: 사용할 템플릿 버전

      다음 값을 사용할 수 있습니다.

    • STAGING_LOCATION: 로컬 파일의 스테이징 위치입니다(예: gs://your-bucket/staging).
    • TOPIC_NAME: Pub/Sub 주제 이름
    • DATASET: BigQuery 데이터 세트
    • TABLE_NAME: BigQuery 테이블 이름

    다음 단계