Pub/Sub to MongoDB 템플릿

Pub/Sub to MongoDB 템플릿은 Pub/Sub 구독에서 JSON 인코딩 메시지를 읽고 이를 MongoDB에 문서로 쓰는 스트리밍 파이프라인입니다. 필요한 경우 이 파이프라인은 자바스크립트 사용자 정의 함수(UDF)를 사용하여 포함할 수 있는 추가 변환을 지원합니다.

레코드를 처리하는 중에 오류가 발생하면 템플릿은 입력 메시지와 함께 BigQuery 테이블에 레코드를 씁니다. 예를 들어 스키마 불일치나 잘못된 형식의 JSON으로 인해 또는 변환 실행 중에 오류가 발생할 수 있습니다. deadletterTable 매개변수에 테이블 이름을 지정합니다. 테이블이 없으면 파이프라인에서 자동으로 만듭니다.

파이프라인 요구사항

  • Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • MongoDB 클러스터가 있어야 하며 Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • inputSubscription: Pub/Sub 구독의 이름입니다. 예를 들면 projects/your-project-id/subscriptions/your-subscription-name입니다.
  • mongoDBUri: 쉼표로 구분된 MongoDB 서버 목록입니다. 예를 들면 host1:port,host2:port,host3:port입니다.
  • database: 컬렉션을 저장하기 위한 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
  • collection: MongoDB 데이터베이스의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
  • deadletterTable: 스키마 불일치, 잘못된 형식의 JSON 등 오류로 인한 메시지를 저장하는 BigQuery 테이블입니다. 예를 들면 your-project-id:your-dataset.your-table-name입니다.

선택적 매개변수

  • batchSize: MongoDB에 문서를 일괄 삽입하기 위해 사용되는 배치 크기입니다. 기본값은 1000입니다.
  • batchSizeBytes: 바이트 단위의 일괄 처리 크기입니다. 기본값은 5242880입니다.
  • maxConnectionIdleTime: 연결이 시간 초과되기 전까지 허용되는 최대 유휴 시간(초)입니다. 기본값은 60000입니다.
  • sslEnabled: MongoDB 연결에 SSL이 사용 설정되었는지 여부를 나타내는 불리언 값입니다. 기본값은 true입니다.
  • ignoreSSLCertificate: SSL 인증서를 무시할지 여부를 나타내는 불리언 값입니다. 기본값은 true입니다.
  • withOrdered: MongoDB에 순서대로 대량 삽입할 수 있게 해주는 불리언 값입니다. 기본값은 true입니다.
  • withSSLInvalidHostNameAllowed: SSL 연결에 잘못된 호스트 이름이 허용되는지 여부를 나타내는 불리언 값입니다. 기본값은 true입니다.
  • javascriptTextTransformGcsPath: 사용할 JavaScript 사용자 정의 함수 (UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • javascriptTextTransformFunctionName: 사용할 JavaScript 사용자 정의 함수 (UDF)의 이름입니다. 예를 들어 JavaScript 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 JavaScript UDF는 UDF 예시(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)를 참조하세요.
  • javascriptTextTransformReloadIntervalMinutes: UDF를 새로고침할 빈도(분)를 지정합니다. 값이 0보다 크면 Dataflow가 Cloud Storage에서 UDF 파일을 주기적으로 검사하고 파일이 수정된 경우 UDF를 새로고침합니다. 이 매개변수를 사용하면 파이프라인이 실행 중일 때 작업을 다시 시작할 필요 없이 UDF를 업데이트할 수 있습니다. 값이 0이면 UDF 새로고침이 사용 중지됩니다. 기본값은 0입니다.

사용자 정의 함수

선택적으로 사용자 정의 함수(UDF)를 작성하여 이 템플릿을 확장할 수 있습니다. 템플릿이 각 입력 요소에 대해 UDF를 호출합니다. 요소 페이로드는 JSON 문자열로 직렬화됩니다. 자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

함수 사양

UDF의 사양은 다음과 같습니다.

  • 입력: 입력 CSV 파일의 한 줄입니다.
  • 출력: MongoDB에 삽입할 문자열화된 JSON 문서입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to MongoDB template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

다음 단계