Text Files on Cloud Storage to Pub/Sub(스트림) 템플릿

이 템플릿은 Cloud Storage에 업로드된 새 텍스트 파일을 지속적으로 폴링하고, 각 파일을 한 줄씩 읽고, Pub/Sub 주제에 문자열을 게시하는 스트리밍 파이프라인을 만듭니다. 이 템플릿은 JSON 레코드를 포함하는 줄바꿈 구분 파일 또는 CSV 파일의 레코드를 Pub/Sub 주제에 게시하여 실시간으로 처리합니다. 이 템플릿을 사용하여 Pub/Sub에 데이터를 다시 재생할 수 있습니다.

파이프라인은 무한정으로 실행하며, 배출을 지원하지 않는 'SplittableDoFn'인 '관찰' 변환 사용으로 인해 '배출'이 아닌 '취소'를 통해 수동으로 종료되어야 합니다.

현재 폴링 간격은 고정되어 있으며 10초로 설정되어 있습니다. 이 템플릿은 개별 레코드에 타임스탬프를 설정하지 않기 때문에 이벤트 시간이 실행 중 게시 시간과 일치하게 됩니다. 파이프라인 처리를 위해 정확한 이벤트 시간이 필요한 경우 이 파이프라인을 사용해서는 안 됩니다.

파이프라인 요구사항

  • 입력 파일은 줄바꿈으로 구분되는 JSON 또는 CSV 형식이어야 합니다. 소스 파일에서 여러 줄에 걸쳐 있는 레코드는 다운스트림 문제를 일으킬 수 있습니다. 파일 안의 각 줄이 Pub/Sub에 메시지로 게시되기 때문입니다.
  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 파이프라인은 무기한으로 실행되며 수동으로 종료해야 합니다.

템플릿 매개변수

필수 매개변수

  • inputFilePattern: 읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/files/*.json입니다.
  • outputTopic: 작성할 Pub/Sub 입력 주제입니다. 이름은 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 형식이어야 합니다. 예를 들면 projects/your-project-id/topics/your-topic-name입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항) 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to Pub/Sub (Stream) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. (선택사항) 정확히 한 번 처리에서 적어도 한 번 스트리밍 모드로 전환하려면 적어도 한 번를 선택합니다.
  8. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)

다음 단계