Pub/Sub to MongoDB with Python UDF 템플릿

Pub/Sub to MongoDB with Python UDF 템플릿은 Pub/Sub 구독에서 JSON 인코딩 메시지를 읽고 이를 MongoDB에 문서로 쓰는 스트리밍 파이프라인입니다. 필요한 경우 이 파이프라인은 Python 사용자 정의 함수(UDF)를 사용하여 포함할 수 있는 추가 변환을 지원합니다.

레코드를 처리하는 동안 오류가 발생하면 템플릿은 입력 메시지와 함께 BigQuery 테이블에 레코드를 씁니다. 예를 들어 스키마 불일치나 잘못된 형식의 JSON으로 인해 또는 변환 실행 중에 오류가 발생할 수 있습니다. deadletterTable 매개변수에 테이블 이름을 지정합니다. 테이블이 없으면 파이프라인에서 자동으로 만듭니다.

파이프라인 요구사항

  • Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • MongoDB 클러스터가 있어야 하며 Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription Pub/Sub 구독의 이름입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
mongoDBUri 쉼표로 구분된 MongoDB 서버 목록입니다. 예를 들면 192.285.234.12:27017,192.287.123.11:27017입니다.
database 컬렉션을 저장하기 위한 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
collection MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
deadletterTable 오류(스키마 불일치, 잘못된 형식의 JSON 등)로 인해 메시지를 저장하는 BigQuery 테이블입니다. 예를 들면 project-id:dataset-name.table-name입니다.
pythonExternalTextTransformGcsPath (선택사항) 사용할 사용자 정의 함수(UDF)를 정의하는 Python 코드 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.py입니다.
pythonExternalTextTransformFunctionName (선택사항) 사용할 Python 사용자 정의 함수(UDF)의 이름입니다.
batchSize (선택사항) MongoDB에 문서를 일괄 삽입하기 위해 사용되는 배치 크기입니다. 기본값: 1000
batchSizeBytes (선택사항) 배치 크기(바이트)입니다. 기본값: 5242880
maxConnectionIdleTime (선택사항) 연결이 시간 초과되기 전까지 허용되는 최대 유휴 시간(초)입니다. 기본값: 60000
sslEnabled (선택사항) MongoDB 연결에 SSL이 사용 설정되었는지 여부를 나타내는 불리언 값입니다. 기본값: true
ignoreSSLCertificate (선택사항) SSL 인증서를 무시해야 하는지 여부를 나타내는 불리언 값입니다. 기본값: true
withOrdered (선택사항) MongoDB에 순서대로 대량 삽입할 수 있게 해주는 불리언 값입니다. 기본값: true
withSSLInvalidHostNameAllowed (선택사항) SSL 연결에 잘못된 호스트 이름이 허용되었는지 여부를 나타내는 불리언 값입니다. 기본값: true

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: 입력 CSV 파일의 한 줄입니다.
  • 출력: MongoDB에 삽입할 문자열화된 JSON 문서입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항) 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to MongoDB with Python UDFs template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

다음 단계