Google 제공 유틸리티 템플릿

Google은 오픈소스 Dataflow 템플릿 세트를 제공합니다. 템플릿에 대한 일반 정보는 Dataflow 템플릿을 참조하세요. 모든 Google 제공 템플릿 목록은 Google 제공 템플릿 시작을 참조하세요.

이 가이드에서는 유틸리티 템플릿을 다룹니다.

파일 형식 변환(Avro, Parquet, CSV)

파일 형식 변환 템플릿은 Cloud Storage에 저장된 파일을 지원되는 형식에서 다른 형식으로 변환하는 일괄 파이프라인입니다.

다음과 같은 형식 변환이 지원됩니다.

  • CSV에서 Avro로
  • CSV에서 Parquet로
  • Avro에서 Parquet로
  • Parquet에서 Avro로

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 출력 Cloud Storage 버킷이 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFileFormat 입력 파일 형식입니다. [csv, avro, parquet] 중 하나여야 합니다.
outputFileFormat 출력 파일 형식입니다. [avro, parquet] 중 하나여야 합니다.
inputFileSpec 입력 파일의 Cloud Storage 경로 패턴입니다. 예를 들면 gs://bucket-name/path/*.csv입니다.
outputBucket 출력 파일을 쓸 Cloud Storage 폴더입니다. 이 경로는 슬래시로 끝나야 합니다. 예를 들면 gs://bucket-name/output/입니다.
schema Avro 스키마 파일의 Cloud Storage 경로입니다(예시: gs://bucket-name/schema/my-schema.avsc).
containsHeaders (선택사항) 입력 CSV 파일에는 헤더 레코드가 포함됩니다(true/false). 기본값은 false입니다. CSV 파일을 읽을 때만 필요합니다.
csvFormat (선택사항) 레코드 파싱에 사용할 CSV 형식 지정입니다. 기본값은 Default입니다. 자세한 내용은 Apache Commons CSV 형식을 참조하세요.
delimiter (선택사항) 입력 CSV 파일에서 사용하는 필드 구분 기호입니다.
outputFilePrefix (선택사항) 출력 파일 프리픽스입니다. 기본값은 output입니다.
numShards (선택사항) 출력 파일 샤드 수입니다.

파일 형식 변환 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Convert file formats template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_FORMAT: 입력 파일의 파일 형식. [csv, avro, parquet] 중 하나여야 합니다.
  • OUTPUT_FORMAT: 출력 파일의 파일 형식. [avro, parquet] 중 하나여야 합니다.
  • INPUT_FILES: 입력 파일의 경로 패턴
  • OUTPUT_FOLDER: 출력 파일의 Cloud Storage 폴더
  • SCHEMA: Avro 스키마 파일의 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/File_Format_Conversion",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_FORMAT: 입력 파일의 파일 형식. [csv, avro, parquet] 중 하나여야 합니다.
  • OUTPUT_FORMAT: 출력 파일의 파일 형식. [avro, parquet] 중 하나여야 합니다.
  • INPUT_FILES: 입력 파일의 경로 패턴
  • OUTPUT_FOLDER: 출력 파일의 Cloud Storage 폴더
  • SCHEMA: Avro 스키마 파일의 경로

Bulk Compress Cloud Storage Files

Bulk Compress Cloud Storage Files 템플릿은 Cloud Storage의 파일을 지정된 위치에 압축하는 일괄 파이프라인입니다. 이 템플릿은 주기적인 보관처리 프로세스의 일환으로 큰 파일 배치를 압축해야 할 때 유용할 수 있습니다. 지원되는 압축 모드는 BZIP2, DEFLATE, GZIP입니다. 대상 위치로의 파일 출력은 원래 파일 이름에 압축 모드 확장자를 추가하는 명명 스키마를 따릅니다. 추가되는 확장자는 .bzip2, .deflate, .gz 중 하나입니다.

압축 프로세스 중에 발생하는 오류는 파일 이름, 오류 메시지의 CSV 형식으로 오류 파일로 출력됩니다. 파이프라인 실행 중에 오류가 발생하지 않는 경우에도 오류 파일은 생성되지만 오류 레코드를 포함하지 않습니다.

파이프라인 요구사항:

  • 압축은 BZIP2, DEFLATE, GZIP 형식 중 하나여야 합니다.
  • 파이프라인을 실행하기 전에 출력 디렉터리가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/uncompressed/*.txt입니다.
outputDirectory 출력을 쓸 위치입니다. 예를 들면 gs://bucket-name/compressed/입니다.
outputFailureFile 압축 프로세스 중에 발생하는 쓰기 오류에 사용할 오류 로그 출력 파일입니다. 예를 들면 gs://bucket-name/compressed/failed.csv입니다. 오류가 없는 경우에도 파일은 생성되지만 비어 있게 됩니다. 파일 콘텐츠는 CSV 형식(파일 이름, 오류)이며 압축에 실패한 파일이 한 줄에 하나씩 표시됩니다.
compression 일치하는 파일을 압축하는 데 사용된 압축 알고리즘입니다. BZIP2, DEFLATE, GZIP 중 하나여야 합니다.

Bulk Compress Cloud Storage Files 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bulk Compress Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • COMPRESSION: 원하는 압축 알고리즘

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • COMPRESSION: 원하는 압축 알고리즘

Bulk Decompress Cloud Storage Files

Bulk Decompress Cloud Storage Files 템플릿은 Cloud Storage의 파일을 지정된 위치로 압축 해제하는 일괄 파이프라인입니다. 이 기능은 이전 중에는 압축된 데이터를 사용하여 네트워크 대역폭 비용을 최소화하되, 이전 후에는 압축 해제된 데이터를 사용하여 분석 처리 속도를 최대화하려는 경우에 유용합니다. 파이프라인은 단일 실행 중에 여러 압축 모드를 자동으로 처리하며, 파일 확장자(.bzip2, .deflate, .gz, .zip)에 따라 사용할 압축 해제 모드를 결정합니다.

참고: Bulk Decompress Cloud Storage Files 템플릿은 압축된 폴더가 아닌 단일 압축 파일을 대상으로 합니다.

파이프라인 요구사항:

  • 압축 해제할 파일은 Bzip2, Deflate, Gzip, Zip 형식 중 하나여야 합니다.
  • 파이프라인을 실행하기 전에 출력 디렉터리가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/compressed/*.gz입니다.
outputDirectory 출력을 쓸 위치입니다. 예를 들면 gs://bucket-name/decompressed입니다.
outputFailureFile 압축 해제 프로세스 중에 발생하는 쓰기 오류에 사용할 오류 로그 출력 파일입니다. 예를 들면 gs://bucket-name/decompressed/failed.csv입니다. 오류가 없는 경우에도 파일은 생성되지만 비어 있게 됩니다. 파일 콘텐츠는 CSV 형식(파일 이름, 오류)이며 압축 해제에 실패한 파일이 한 줄에 하나씩 표시됩니다.

Bulk Decompress Cloud Storage Files 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bulk Decompress Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • OUTPUT_FAILURE_FILE_PATH: 실패 정보를 포함하는 파일 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Bulk_Decompress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/compressed/*.gz",
       "outputDirectory": "gs://BUCKET_NAME/decompressed",
       "outputFailureFile": "OUTPUT_FAILURE_FILE_PATH"
   },
   "environment": { "zone": "us-central1-f" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • OUTPUT_FAILURE_FILE_PATH: 실패 정보를 포함하는 파일 경로

Datastore 일괄 삭제[지원 중단됨]

이 템플릿은 지원 중단되었으며 2022년 1분기에 삭제됩니다. Firestore 일괄 삭제 템플릿으로 마이그레이션하세요.

Datastore 일괄 삭제 템플릿은 지정된 GQL 쿼리로 Datastore 에서 항목을 읽고 선택한 대상 프로젝트에서 일치하는 모든 항목을 삭제하는 파이프라인입니다. 이 파이프라인은 JSON 인코딩된 Datastore 항목을 자바스크립트 UDF(null 값을 반환하여 항목 필터링)로 선택적으로 전달할 수 있습니다.

파이프라인 요구사항:

  • 템플릿을 실행하기 전에 프로젝트에서 Datastore를 설정해야 합니다.
  • 별도의 Datastore 인스턴스에서 읽고 삭제하는 경우 Dataflow 작업자 서비스 계정에 한 인스턴스에서 읽고 다른 인스턴스에서 삭제할 수 있는 권한이 있어야 합니다.

템플릿 매개변수

매개변수 설명
datastoreReadGqlQuery 삭제 대상과 일치하는 항목을 지정하는 GQL 쿼리입니다. 키 전용 쿼리를 사용하면 성능이 향상될 수 있습니다. 예를 들면 다음과 같습니다. 'SELECT __key__ FROM MyKind'
datastoreReadProjectId 일치하는 데 사용되는 항목(GQL 쿼리 사용)을 읽으려는 Datastore 인스턴스의 GCP 프로젝트 ID입니다.
datastoreDeleteProjectId 일치하는 항목을 삭제할 Datastore 인스턴스의 GCP 프로젝트 ID입니다. Datastore 인스턴스 내에서 읽고 삭제하려는 경우, datastoreReadProjectId와 같을 수 있습니다.
datastoreReadNamespace (선택사항) 요청한 항목의 네임스페이스입니다. 기본 네임스페이스는 ""로 설정되어 있습니다.
datastoreHintNumWorkers (선택사항) Datastore 증가 제한 단계의 예상 작업자 수에 대한 힌트입니다. 기본값은 500입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요. 이 함수가 지정된 Datastore 항목에 대해 undefined 또는 null 값을 반환하는 경우 해당 항목은 삭제되지 않습니다.

Datastore 일괄 삭제 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bulk Delete Entities in Datastore template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete \
    --region REGION_NAME \
    --parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GQL_QUERY: 삭제 대상을 항목을 대조하는 데 사용할 쿼리
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: Datastore 인스턴스 프로젝트 ID. 이 예시에서는 동일한 Datastore 인스턴스에서 읽고 삭제합니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Datastore_to_Datastore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "datastoreReadGqlQuery": "GQL_QUERY",
       "datastoreReadProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID",
       "datastoreDeleteProjectId": "DATASTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GQL_QUERY: 삭제 대상을 항목을 대조하는 데 사용할 쿼리
  • DATASTORE_READ_AND_DELETE_PROJECT_ID: Datastore 인스턴스 프로젝트 ID. 이 예시에서는 동일한 Datastore 인스턴스에서 읽고 삭제합니다.

Firestore 일괄 삭제

Firestore 일괄 삭제 템플릿은 지정된 GQL 쿼리로 Firestore에서 항목을 읽고 선택한 대상 프로젝트에서 일치하는 모든 항목을 삭제하는 파이프라인입니다. 이 파이프라인은 JSON 인코딩된 Firestore 항목을 자바스크립트 UDF(null 값을 반환하여 항목 필터링)로 선택적으로 전달할 수 있습니다.

파이프라인 요구사항:

  • 템플릿을 실행하기 전에 프로젝트에서 Firestore를 설정해야 합니다.
  • 별도의 Firestore 인스턴스에서 읽고 삭제하는 경우 Dataflow 작업자 서비스 계정에 한 인스턴스에서 읽고 다른 인스턴스에서 삭제할 수 있는 권한이 있어야 합니다.

템플릿 매개변수

매개변수 설명
firestoreReadGqlQuery 삭제 대상과 일치하는 항목을 지정하는 GQL 쿼리입니다. 키 전용 쿼리를 사용하면 성능이 향상될 수 있습니다. 예를 들면 다음과 같습니다. 'SELECT __key__ FROM MyKind'
firestoreReadProjectId 일치하는 데 사용되는 항목(GQL 쿼리 사용)을 읽으려는 Firestore 인스턴스의 GCP 프로젝트 ID입니다.
firestoreDeleteProjectId 일치하는 항목을 삭제할 Firestore 인스턴스의 GCP 프로젝트 ID입니다. Firestore 인스턴스 내에서 읽고 삭제하려는 경우, firestoreReadProjectId와 같을 수 있습니다.
firestoreReadNamespace (선택사항) 요청한 항목의 네임스페이스입니다. 기본 네임스페이스는 ""로 설정되어 있습니다.
firestoreHintNumWorkers (선택사항) Firestore 증가 제한 단계의 예상 작업자 수에 대한 힌트입니다. 기본값은 500입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요. 이 함수가 지정된 Firestore 항목에 대해 undefined 또는 null 값을 반환하는 경우 해당 항목은 삭제되지 않습니다.

Firestore 일괄 삭제 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Bulk Delete Entities in Firestore template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete \
    --region REGION_NAME \
    --parameters \
firestoreReadGqlQuery="GQL_QUERY",\
firestoreReadProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID,\
firestoreDeleteProjectId=FIRESTORE_READ_AND_DELETE_PROJECT_ID

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GQL_QUERY: 삭제 대상을 항목을 대조하는 데 사용할 쿼리
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID: Firestore 인스턴스 프로젝트 ID. 이 예시에서는 동일한 Firestore 인스턴스에서 읽고 삭제합니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Firestore_to_Firestore_Delete
{
   "jobName": "JOB_NAME",
   "parameters": {
       "firestoreReadGqlQuery": "GQL_QUERY",
       "firestoreReadProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID",
       "firestoreDeleteProjectId": "FIRESTORE_READ_AND_DELETE_PROJECT_ID"
   },
   "environment": { "zone": "us-central1-f" }
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GQL_QUERY: 삭제 대상을 항목을 대조하는 데 사용할 쿼리
  • FIRESTORE_READ_AND_DELETE_PROJECT_ID: Firestore 인스턴스 프로젝트 ID. 이 예시에서는 동일한 Firestore 인스턴스에서 읽고 삭제합니다.

Pub/Sub/BigQuery/Cloud Storage에 대한 Streaming Data Generator

Streaming Data Generator 템플릿은 지정된 속도로 사용자가 제공한 스키마를 기반으로 무제한 또는 고정된 수의 종합 레코드 또는 메시지를 생성하는 데 사용됩니다. 호환되는 대상에는 Pub/Sub 주제, BigQuery 테이블, Cloud Storage 버킷이 있습니다.

다음은 몇 가지 가능한 사용 사례입니다.

  • Pub/Sub 주제에 대한 대규모 실시간 이벤트 게시를 시뮬레이션하여 게시된 이벤트를 처리하는 데 필요한 소비자의 수와 크기를 측정하고 결정합니다.
  • BigQuery 테이블 또는 Cloud Storage 버킷에 합성 데이터를 생성하여 성능 벤치마크를 평가하거나 개념 증명을 수행합니다.

지원되는 싱크 및 인코딩 형식

다음 표에서는 이 템플릿에서 지원하는 싱크 및 인코딩 형식을 설명합니다.
JSON Avro Parquet
Pub/Sub 아니요
BigQuery 아니요 아니요
Cloud Storage

파이프라인에서 사용하는 JSON 데이터 생성기 라이브러리를 사용하면 각 스키마 필드에 다양한 가짜 함수를 사용할 수 있습니다. 가짜 함수 및 스키마 형식에 대한 자세한 내용은 json-data-generator 문서를 참조하세요.

파이프라인 요구사항:

  • 메시지 스키마 파일을 만들고 이 파일을 Cloud Storage 위치에 저장합니다.
  • 실행하기 전에 출력 대상이 있어야 합니다. 대상은 싱크 유형에 따라 Pub/Sub 주제, BigQuery 테이블 또는 Cloud Storage 버킷이어야 합니다.
  • 출력 인코딩이 Avro 또는 Parquet이면 Avro 스키마 파일을 만들어 Cloud Storage 위치에 저장합니다.

템플릿 매개변수

매개변수 설명
schemaLocation 스키마 파일의 위치입니다. 예를 들면 gs://mybucket/filename.json입니다.
qps 초당 게시될 메시지 수입니다. 예를 들면 100입니다.
sinkType (선택사항) 출력 싱크 유형입니다. 가능한 값은 PUBSUB, BIGQUERY, GCS입니다. 기본값은 PUBSUB입니다.
outputType (선택사항) 출력 인코딩 유형입니다. 가능한 값은 JSON, AVRO, PARQUET입니다. 기본값은 JSON입니다.
avroSchemaLocation (선택사항) AVRO 스키마 파일의 위치입니다. outputType이 AVRO 또는 PARQUET인 경우 필수입니다. 예를 들면 gs://mybucket/filename.avsc입니다.
topic (선택사항) 파이프라인에서 데이터를 게시해야 하는 Pub/Sub 주제의 이름입니다. sinkType이 Pub/Sub인 경우 필수입니다. 예를 들면 projects/my-project-ID/topics/my-topic-ID입니다.
outputTableSpec (선택사항) 출력 BigQuery 테이블의 이름입니다. sinkType이 BigQuery인 경우 필수입니다. 예를 들면 my-project-ID:my_dataset_name.my-table-name입니다.
writeDisposition (선택사항) BigQuery 쓰기 처리입니다. 가능한 값은 WRITE_APPEND, WRITE_EMPTY 또는 WRITE_TRUNCATE입니다. 기본값은 WRITE_APPEND입니다.
outputDeadletterTable (선택사항) 실패한 레코드를 저장할 출력 BigQuery 테이블의 이름입니다. 제공되지 않으면 파이프라인은 실행 중에 이름이 {output_table_name}_error_records인 테이블을 만듭니다. 예를 들면 my-project-ID:my_dataset_name.my-table-name입니다.
outputDirectory (선택사항) 출력 Cloud Storage 위치의 경로입니다. sinkType이 Cloud Storage인 경우 필수입니다. 예를 들면 gs://mybucket/pathprefix/입니다.
outputFilenamePrefix (선택사항) Cloud Storage에 기록된 출력 파일의 파일 이름 프리픽스입니다. 기본값은 output-입니다.
windowDuration (선택사항) Cloud Storage에 출력이 기록되는 기간 간격입니다. 기본값은 1m입니다. 즉, 1분입니다.
numShards (선택사항) 최대 출력 샤드 수입니다. sinkType이 Cloud Storage이고 1 이상의 숫자로 설정되어야 하는 경우 필수입니다.
messagesLimit (선택사항) 출력 메시지의 최대 개수입니다. 기본값은 0이며 무제한을 의미합니다.
autoscalingAlgorithm (선택사항) 작업자 자동 확장에 사용되는 알고리즘입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다.
maxNumWorkers (선택사항) 최대 작업자 머신 수입니다. 예를 들면 10입니다.

Streaming Data Generator 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Streaming Data Generator template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_LOCATION: Cloud Storage에서 스키마 파일의 경로. 예를 들면 gs://mybucket/filename.json입니다.
  • QPS: 초당 게시되는 메시지 수
  • PUBSUB_TOPIC: 출력 Pub/Sub 주제. 예를 들면 projects/my-project-ID/topics/my-topic-ID입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Streaming_Data_Generator",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_LOCATION: Cloud Storage에서 스키마 파일의 경로. 예를 들면 gs://mybucket/filename.json입니다.
  • QPS: 초당 게시되는 메시지 수
  • PUBSUB_TOPIC: 출력 Pub/Sub 주제. 예를 들면 projects/my-project-ID/topics/my-topic-ID입니다.