Pub/Sub to MongoDB 템플릿

Pub/Sub to MongoDB 템플릿은 Pub/Sub 구독에서 JSON 인코딩 메시지를 읽고 이를 MongoDB에 문서로 쓰는 스트리밍 파이프라인입니다. 필요한 경우 이 파이프라인은 자바스크립트 사용자 정의 함수(UDF)를 사용하여 포함할 수 있는 추가 변환을 지원합니다. 스키마 불일치, 잘못된 형식의 JSON으로 인해 오류가 생기거나 변환 실행 중에 오류가 발생하는 경우 입력 메시지와 함께 처리되지 않은 메시지가 BigQuery 테이블에 기록됩니다. 처리되지 않은 레코드의 테이블이 실행되기 전에 존재하지 않으면 파이프라인은 자동으로 이 테이블을 생성합니다.

파이프라인 요구사항

  • Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • MongoDB 클러스터가 있어야 하며 Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription Pub/Sub 구독의 이름입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
mongoDBUri 쉼표로 구분된 MongoDB 서버 목록입니다. 예를 들면 192.285.234.12:27017,192.287.123.11:27017입니다.
database 컬렉션을 저장하기 위한 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
collection MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
deadletterTable 오류(스키마 불일치, 잘못된 형식의 json 등)로 인해 메시지를 저장하는 BigQuery 테이블입니다. 예를 들면 project-id:dataset-name.table-name입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
batchSize (선택사항) MongoDB에 문서를 일괄 삽입하기 위해 사용되는 배치 크기입니다. 기본값: 1000.
batchSizeBytes (선택사항) 배치 크기(바이트)입니다. 기본값: 5242880
maxConnectionIdleTime (선택사항) 연결이 시간 초과되기 전까지 허용되는 최대 유휴 시간(초)입니다. 기본값: 60000
sslEnabled (선택사항) MongoDB 연결에 SSL이 사용 설정되었는지 여부를 나타내는 부울 값입니다. 기본값: true.
ignoreSSLCertificate (선택사항) SSL 인증서를 무시해야 하는지 여부를 나타내는 부울 값입니다. 기본값: true.
withOrdered (선택사항) MongoDB에 순서대로 대량 삽입할 수 있게 해주는 부울 값입니다. 기본값: true.
withSSLInvalidHostNameAllowed (선택사항) SSL 연결에 잘못된 호스트 이름이 허용되었는지 여부를 나타내는 부울 값입니다. 기본값: true

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to MongoDB template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)