이 템플릿은 Cloud Storage에 업로드된 새 텍스트 파일을 지속적으로 폴링하고, 각 파일을 한 줄씩 읽고, Pub/Sub 주제에 문자열을 게시하는 스트리밍 파이프라인을 만듭니다. 이 템플릿은 JSON 레코드를 포함하는 줄바꿈 구분 파일 또는 CSV 파일의 레코드를 Pub/Sub 주제에 게시하여 실시간으로 처리합니다. 이 템플릿을 사용하여 Pub/Sub에 데이터를 다시 재생할 수 있습니다.
파이프라인은 무한정으로 실행하며, 배출을 지원하지 않는 'SplittableDoFn'인 '관찰' 변환 사용으로 인해 '배출'이 아닌 '취소'를 통해 수동으로 종료되어야 합니다.
현재 폴링 간격은 고정되어 있으며 10초로 설정되어 있습니다. 이 템플릿은 개별 레코드에 타임스탬프를 설정하지 않기 때문에 이벤트 시간이 실행 중 게시 시간과 일치하게 됩니다. 파이프라인을 처리하기 위해 정확한 이벤트 시간이 필요한 경우에는 이 파이프라인을 사용해서는 안 됩니다.
파이프라인 요구사항
- 입력 파일은 줄바꿈으로 구분되는 JSON 또는 CSV 형식이어야 합니다. 소스 파일에서 여러 줄에 걸쳐 있는 레코드는 다운스트림 문제를 일으킬 수 있습니다. 파일 안의 각 줄이 Pub/Sub에 메시지로 게시되기 때문입니다.
- 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
- 파이프라인은 무기한으로 실행되며 수동으로 종료해야 합니다.
템플릿 매개변수
매개변수 | 설명 |
---|---|
inputFilePattern |
읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/files/*.json 또는 gs://bucket-name/path/*.csv 입니다. |
outputTopic |
작성할 Pub/Sub 입력 주제입니다. 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다. |
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to Pub/Sub (Stream) template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
다음을 바꿉니다.
JOB_NAME
: 선택한 고유한 작업 이름REGION_NAME
: Dataflow 작업을 배포할 리전 엔드포인트(예:us-central1
)STAGING_LOCATION
: 로컬 파일의 스테이징 위치(예:gs://your-bucket/staging
)TEMP_LOCATION
: 임시 파일 쓰기 위치(예:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub 주제 이름BUCKET_NAME
: Cloud Storage 버킷 이름FILE_PATTERN
: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예:path/*.csv
)
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행할 Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름LOCATION
: Dataflow 작업을 배포할 리전 엔드포인트(예:us-central1
)STAGING_LOCATION
: 로컬 파일의 스테이징 위치(예:gs://your-bucket/staging
)TEMP_LOCATION
: 임시 파일 쓰기 위치(예:gs://your-bucket/temp
)TOPIC_NAME
: Pub/Sub 주제 이름BUCKET_NAME
: Cloud Storage 버킷 이름FILE_PATTERN
: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예:path/*.csv
)