Google 제공 스트리밍 템플릿

Google은 오픈소스 Dataflow 템플릿 세트를 제공합니다. 템플릿에 대한 일반 정보는 Dataflow 템플릿을 참조하세요. 모든 Google 제공 템플릿 목록은 Google 제공 템플릿 시작을 참조하세요.

이 가이드에서는 스트리밍 템플릿을 다룹니다.

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 템플릿은 Pub/Sub 구독에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 템플릿을 사용하여 Pub/Sub 데이터를 BigQuery로 빠르게 이동할 수 있습니다. 이 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 요소로 변환합니다.

파이프라인 요구사항:

  • Pub/Sub 메시지가 여기에 설명된 대로 JSON 형식이어야 합니다. 예를 들어 {"k1":"v1", "k2":"v2"} 형식으로 된 메시지는 k1k2라는 두 개의 열 그리고 문자열 데이터 유형으로 BigQuery 테이블에 삽입될 수 있습니다.
  • 파이프라인을 실행하기 전에 출력 테이블이 있어야 합니다. 테이블 스키마는 입력 JSON 객체와 일치해야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 읽어올 Pub/Sub 입력 구독으로, projects/<project>/subscriptions/<subscription> 형식입니다.
outputTableSpec BigQuery 출력 테이블 위치로, <my-project>:<my-dataset>.<my-table> 형식입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지의 BigQuery 테이블로 <my-project>:<my-dataset>.<my-table> 형식입니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 OUTPUT_TABLE_SPEC_error_records가 대신 사용됩니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

Pub/Sub Subscription to BigQuery 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Subscription to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 템플릿은 Pub/Sub 주제에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 템플릿을 사용하여 Pub/Sub 데이터를 BigQuery로 빠르게 이동할 수 있습니다. 이 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 요소로 변환합니다.

파이프라인 요구사항:

  • Pub/Sub 메시지가 여기에 설명된 대로 JSON 형식이어야 합니다. 예를 들어 {"k1":"v1", "k2":"v2"} 형식으로 된 메시지는 k1k2라는 두 개의 열 그리고 문자열 데이터 유형으로 BigQuery 테이블에 삽입될 수 있습니다.
  • 파이프라인을 실행하기 전에 출력 테이블이 있어야 합니다. 테이블 스키마는 입력 JSON 객체와 일치해야 합니다.

템플릿 매개변수

매개변수 설명
inputTopic 읽어올 Pub/Sub 입력 주제로, projects/<project>/topics/<topic> 형식입니다.
outputTableSpec BigQuery 출력 테이블 위치로, <my-project>:<my-dataset>.<my-table> 형식입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지의 BigQuery 테이블이며, <my-project>:<my-dataset>.<my-table> 형식이어야 합니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

Pub/Sub Topic to BigQuery 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Topic to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 Avro 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

파이프라인 요구사항

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Avro 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 처리되지 않은 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
schemaPath Avro 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.avsc입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독입니다. 예를 들면 projects/<project>/subscriptions/<subscription>입니다.
outputTopic 처리되지 않은 레코드에 사용할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
outputTableSpec BigQuery 출력 테이블 위치입니다. 예를 들면 <my-project>:<my-dataset>.<my-table>입니다. 지정된 createDisposition에 따라 사용자가 제공한 Avro 스키마를 사용하여 출력 테이블을 자동으로 생성할 수 있습니다.
writeDisposition (선택사항) BigQuery WriteDisposition. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값: WRITE_APPEND
createDisposition (선택사항) BigQuery CreateDisposition. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Avro to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_PATH: Avro 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • DEADLETTER_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_PATH: Avro 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • DEADLETTER_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 템플릿은 Pub/Sub 구독에서 메시지를 읽고 다른 Pub/Sub 주제에 메시지를 쓰는 스트리밍 파이프라인입니다. 또한 파이프라인은 선택적인 메시지 속성 키와 Pub/Sub 주제에 쓰여질 메시지를 필터링하는 데 사용할 수 있는 값을 허용합니다. 이 템플릿을 사용하여 선택적 메시지 필터로 Pub/Sub 구독에서 다른 Pub/Sub 주제로 메시지를 복사할 수 있습니다.

파이프라인 요구사항:

  • 실행하기 전에 소스 Pub/Sub 구독이 있어야 합니다.
  • 실행하기 전에 대상 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
outputTopic 출력을 작성할 Cloud Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
filterKey (선택사항) 속성 키를 기반으로 하는 필터 이벤트입니다. filterKey가 지정되지 않은 경우 필터가 적용되지 않습니다.
filterValue (선택사항) filterKey가 제공된 경우에 사용하는 필터 속성 값입니다. 기본적으로 null인 filterValue가 사용됩니다.

Pub/Sub to Pub/Sub 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOPIC_NAME: Pub/Sub 주제 이름
  • FILTER_KEY: 이벤트가 필터링되는 속성 키입니다. 키를 지정하지 않으면 필터가 적용되지 않습니다.
  • FILTER_VALUE: 이벤트 필터 키가 제공된 경우 사용할 필터 속성 값입니다. 유효한 자바 정규식 문자열을 이벤트 필터 값으로 사용합니다. 정규식이 제공된 경우 메시지 필터링을 위해 전체 표현식이 일치해야 합니다. 부분 일치(예: 하위 문자열)는 필터링되지 않습니다. 기본적으로 null 이벤트 필터 값이 사용됩니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOPIC_NAME: Pub/Sub 주제 이름
  • FILTER_KEY: 이벤트가 필터링되는 속성 키입니다. 키를 지정하지 않으면 필터가 적용되지 않습니다.
  • FILTER_VALUE: 이벤트 필터 키가 제공된 경우 사용할 필터 속성 값입니다. 유효한 자바 정규식 문자열을 이벤트 필터 값으로 사용합니다. 정규식이 제공된 경우 메시지 필터링을 위해 전체 표현식이 일치해야 합니다. 부분 일치(예: 하위 문자열)는 필터링되지 않습니다. 기본적으로 null 이벤트 필터 값이 사용됩니다.

Pub/Sub to Splunk

Pub/Sub to Splunk 템플릿은 Splunk의 HTTP Event Collector(HEC)를 통해 Pub/Sub 구독에서 메시지를 읽고 Splunk에 메시지 페이로드를 쓰는 스트리밍 파이프라인입니다. 이 템플릿의 가장 일반적인 사용 사례는 Splunk로 로그를 내보내는 것입니다. 기본 워크플로의 예시를 보려면 Dataflow를 사용하여 Splunk로 프로덕션에 즉시 사용 가능한 로그 내보내기 배포를 참조하세요.

Splunk에 쓰기 전에 자바스크립트 사용자 정의 함수를 메시지 페이로드에 적용할 수도 있습니다. 처리 실패가 발생한 메시지는 추가적인 문제 해결 및 재처리를 위해 Pub/Sub 처리 불가 주제로 전달됩니다.

HEC 토큰의 추가 보안 레이어로 Cloud KMS 키로 암호화된 base64 인코딩 HEC 토큰 매개변수와 함께 Cloud KMS 키를 전달할 수도 있습니다. HEC 토큰 매개변수 암호화에 대한 자세한 내용은 Cloud KMS API 암호화 엔드포인트를 참조하세요.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 소스 Pub/Sub 구독이 있어야 합니다.
  • 파이프라인을 실행하기 전에 Pub/Sub 처리되지 않은 주제가 있어야 합니다.
  • Splunk HEC 엔드포인트는 Dataflow 작업자 네트워크에서 액세스할 수 있어야 합니다.
  • Splunk HEC 토큰이 생성되고 사용 가능해야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
token Splunk HEC 인증 토큰입니다. 이 base64 인코딩 문자열은 보안을 강화하기 위해 Cloud KMS 키로 암호화할 수 있습니다.
url Splunk HEC URL입니다. 이 값은 파이프라인이 실행되는 VPC에서 라우팅할 수 있어야 합니다. 예를 들면 https://splunk-hec-host:8088입니다.
outputDeadletterTopic 전달할 수 없는 메시지를 전달할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
batchCount (선택사항) Splunk에 여러 이벤트를 전송하기 위한 배치 크기입니다. 기본값은 1(일괄 처리 없음)입니다.
parallelism (선택사항) 최대 동시 요청 수입니다. 기본값은 1(동시 처리 없음)입니다.
disableCertificateValidation (선택사항) SSL 인증서 유효성 검사를 사용 중지합니다. 기본값은 false(유효성 검사 사용)입니다. true인 경우 인증서가 검증되지 않고(모든 인증서를 신뢰할 수 있음) `rootCaCertificatePath` 매개변수가 무시됩니다.
includePubsubMessage (선택사항) 페이로드의 전체 Pub/Sub 메시지를 포함합니다. 기본값은 false입니다(데이터 요소만 페이로드에 포함됨).
tokenKMSEncryptionKey (선택사항) HEC 토큰 문자열을 복호화할 Cloud KMS 키입니다. Cloud KMS 키가 제공되면 HEC 토큰 문자열이 암호화되어 전달되어야 합니다.
rootCaCertificatePath (선택사항) Cloud Storage의 루트 CA 인증서에 대한 전체 URL입니다. 예를 들면 gs://mybucket/mycerts/privateCA.crt입니다. Cloud Storage에서 제공하는 인증서는 DER로 인코딩되어야 하며 바이너리 또는 인쇄 가능한(Base64) 인코딩으로 제공될 수 있습니다. 인증서가 Base64 인코딩으로 제공되는 경우 시작 부분이 -----BEGIN CERTIFICATE-----로 제한되고, 끝부분은 -----END CERTIFICATE-----로 제한되어야 합니다. 이 매개변수가 제공되면 Splunk HEC 엔드포인트의 SSL 인증서를 확인하기 위해 이 비공개 CA 인증서 파일을 가져와서 Dataflow 작업자의 트러스트 저장소에 추가합니다. 이 매개변수를 제공하지 않을 경우 기본 트러스트 저장소가 사용됩니다.

Pub/Sub to Splunk 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Splunk template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOKEN: Splunk의 Http Event Collector 토큰
  • URL: Splunk의 Http Event Collector의 URL 경로(예: https://splunk-hec-host:8088)입니다.
  • DEADLETTER_TOPIC_NAME: Pub/Sub 주제 이름
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • BATCH_COUNT: Splunk에 여러 이벤트를 전송하기 위해 사용할 배치 크기
  • PARALLELISM: Splunk에 이벤트를 전송하기 위해 사용할 동시 요청 수
  • DISABLE_VALIDATION: SSL 인증서 검증을 사용 중지하려는 경우 true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage의 루트 CA 인증서 경로(예: gs://your-bucket/privateCA.crt)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOKEN: Splunk의 Http Event Collector 토큰
  • URL: Splunk의 Http Event Collector의 URL 경로(예: https://splunk-hec-host:8088)입니다.
  • DEADLETTER_TOPIC_NAME: Pub/Sub 주제 이름
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • BATCH_COUNT: Splunk에 여러 이벤트를 전송하기 위해 사용할 배치 크기
  • PARALLELISM: Splunk에 이벤트를 전송하기 위해 사용할 동시 요청 수
  • DISABLE_VALIDATION: SSL 인증서 검증을 사용 중지하려는 경우 true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage의 루트 CA 인증서 경로(예: gs://your-bucket/privateCA.crt)

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 템플릿은 Pub/Sub 주제에서 데이터를 읽고 지정된 Cloud Storage 버킷에 Avro 파일을 쓰는 스트리밍 파이프라인입니다.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 입력 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputTopic 메시지 소비를 위해 구독할 Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.
outputDirectory 출력 Avro 파일이 보관처리되는 출력 디렉터리입니다. 마지막에 /를 포함해야 합니다. 예를 들면 gs://example-bucket/example-directory/입니다.
avroTempDirectory 임시 Avro 파일의 디렉터리입니다. 마지막에 /를 포함해야 합니다. 예를 들면 gs://example-bucket/example-directory/입니다.
outputFilenamePrefix (선택사항) Avro 파일의 출력 파일 이름 프리픽스입니다.
outputFilenameSuffix (선택사항) Avro 파일의 출력 파일 이름 서픽스입니다.
outputShardTemplate (선택사항) 출력 파일의 샤드 템플릿입니다. S 또는 N 문자의 반복 시퀀스로 지정됩니다. 예를 들면 SSS-NNN입니다. 이는 각각 샤드 번호 또는 총 샤드 개수로 바뀝니다. 이 매개변수가 지정되지 않았으면 기본 템플릿 형식이 W-P-SS-of-NN입니다.

Pub/Sub to Cloud Storage Avro 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Avro Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILENAME_PREFIX: 선호하는 출력 파일 이름 프리픽스
  • FILENAME_SUFFIX: 선호하는 출력 파일 이름 서픽스
  • SHARD_TEMPLATE: 선호하는 출력 샤드 템플릿

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILENAME_PREFIX: 선호하는 출력 파일 이름 프리픽스
  • FILENAME_SUFFIX: 선호하는 출력 파일 이름 서픽스
  • SHARD_TEMPLATE: 선호하는 출력 샤드 템플릿

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 템플릿은 Pub/Sub에서 레코드를 읽고 텍스트 형식으로 된 일련의 Cloud Storage 파일로 저장하는 스트리밍 파이프라인입니다. 나중에 사용하기 위해 Pub/Sub에 데이터를 빠르게 저장하는 수단으로 템플릿을 사용할 수 있습니다. 기본적으로 템플릿은 5분마다 새 파일을 생성합니다.

파이프라인 요구사항:

  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 주제에 게시되는 메시지는 텍스트 형식이어야 합니다.
  • 주제에 게시되는 메시지에는 줄바꿈을 사용할 수 없습니다. 각 Pub/Sub 메시지는 출력 파일에 한 줄로 저장됩니다.

템플릿 매개변수

매개변수 설명
inputTopic 입력을 읽어올 Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.
outputDirectory 출력 파일을 쓰기 위한 경로 및 파일 이름 프리픽스입니다. 예를 들면 gs://bucket-name/path/입니다. 이 값은 슬래시로 끝나야 합니다.
outputFilenamePrefix 윈도우 설정된 각 파일에 넣을 프리픽스입니다. 예를 들면 output-입니다.
outputFilenameSuffix 윈도우 설정된 각 파일에 넣을 서픽스입니다. 일반적으로 .txt 또는 .csv와 같은 파일 확장자입니다.
outputShardTemplate 샤드 템플릿은 윈도우 설정된 각 파일의 동적 부분을 정의합니다. 기본적으로, 파이프라인은 각 윈도우 내에서 단일 샤드을 사용하여 파일 시스템에 출력합니다. 따라서 모든 데이터가 윈도우별로 한 파일 안에 들어갑니다. outputShardTemplate의 기본값은 W-P-SS-of-NN입니다. 여기에서 W은 윈도우 기간, P는 창 정보, S는 샤드 번호, N은 샤드 개수입니다. 단일 파일의 경우에는 outputShardTemplateSS-of-NN 부분이 00-of-01이 됩니다.

Pub/Sub to Text Files on Cloud Storage 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Text Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

Pub/Sub to MongoDB

Pub/Sub to MongoDB 템플릿은 Pub/Sub 구독에서 JSON 인코딩 메시지를 읽고 이를 MongoDB에 문서로 쓰는 스트리밍 파이프라인입니다. 필요한 경우 이 파이프라인은 자바스크립트 사용자 정의 함수(UDF)를 사용하여 포함할 수 있는 추가 변환을 지원합니다. 스키마 불일치, 잘못된 형식의 JSON으로 인해 오류가 생기거나 변환 실행 중에 오류가 발생하는 경우 입력 메시지와 함께 처리되지 않은 메시지가 BigQuery 테이블에 기록됩니다. 처리되지 않은 레코드의 테이블이 실행되기 전에 존재하지 않으면 파이프라인은 자동으로 이 테이블을 생성합니다.

파이프라인 요구사항:

  • Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • MongoDB 클러스터가 있어야 하며 Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription Pub/Sub 구독의 이름입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
mongoDBUri 쉼표로 구분된 MongoDB 서버 목록입니다. 예를 들면 192.285.234.12:27017,192.287.123.11:27017입니다.
database 컬렉션을 저장하기 위한 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
collection MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
deadletterTable 오류(스키마 불일치, 잘못된 형식의 json 등)로 인해 메시지를 저장하는 BigQuery 테이블입니다. 예를 들면 project-id:dataset-name.table-name입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
batchSize (선택사항) MongoDB에 문서를 일괄 삽입하기 위해 사용되는 배치 크기입니다. 기본값: 1000
batchSizeBytes (선택사항) 배치 크기(바이트)입니다. 기본값: 5242880
maxConnectionIdleTime (선택사항) 연결이 시간 초과되기 전까지 허용되는 최대 유휴 시간(초)입니다. 기본값: 60000
sslEnabled (선택사항) MongoDB 연결에 SSL이 사용 설정되었는지 여부를 나타내는 부울 값입니다. 기본값: true
ignoreSSLCertificate (선택사항) SSL 인증서를 무시해야 하는지 여부를 나타내는 부울 값입니다. 기본값: true
withOrdered (선택사항) MongoDB에 순서대로 대량 삽입할 수 있게 해주는 부울 값입니다. 기본값: true
withSSLInvalidHostNameAllowed (선택사항) SSL 연결에 잘못된 호스트 이름이 허용되었는지 여부를 나타내는 부울 값입니다. 기본값: true

Pub/Sub to MongoDB 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to MongoDB template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch 템플릿은 Pub/Sub 구독에서 메시지를 읽고 사용자 정의 함수(UDF)를 실행하고 Elasticsearch에 문서로 쓰는 스트리밍 파이프라인입니다. Dataflow 템플릿은 Elasticsearch의 데이터 스트림 기능을 사용하여 여러 색인에 시계열 데이터를 저장하면서 요청에 대한 이름이 지정된 단일 리소스를 제공합니다. 데이터 스트림은 로그, 측정항목, trace, Pub/Sub에 저장된 기타 지속적으로 생성된 데이터에 적합합니다.

파이프라인 요구사항

  • 소스 Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • GCP 인스턴스 또는 Elasticsearch 버전 7.0 이상을 사용하는 Elastic Cloud에 공개적으로 연결 가능한 Elasticsearch 호스트가 있어야 합니다. 자세한 내용은 Google Cloud의 Elastic 통합을 참조하세요.
  • 오류 출력을 위한 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 사용할 Pub/Sub 구독입니다. 이름은 projects/<project-id>/subscriptions/<subscription-name> 형식이어야 합니다.
connectionUrl Elasticsearch URL(https://hostname:[port] 형식) 또는 Elastic Cloud를 사용하는 경우 CloudID를 지정합니다.
apiKey 인증에 사용되는 Base64로 인코딩된 API 키입니다.
errorOutputTopic 실패한 레코드를 projects/<project-id>/topics/<topic-name> 형식으로 게시하는 Pub/Sub 출력 주제입니다.
dataset (선택사항) 즉시 사용 가능한 대시보드가 있는 Pub/Sub를 통해 전송된 로그 유형입니다. 알려진 로그 유형 값은 audit, vpcflow, firewall입니다. 기본값: pubsub
namespace (선택사항) 환경(dev, prod, qa), 팀 또는 전략적 사업부와 같은 임의의 그룹화입니다. 기본값: default
batchSize (선택사항) 문서 수의 배치 크기입니다. 기본값: 1000
batchSizeBytes (선택사항) 바이트 수의 배치 크기입니다. 기본값은 5242880(5MB)입니다.
maxRetryAttempts (선택사항) 최대 재시도 횟수이며 0 이상이어야 합니다. 기본값: no retries
maxRetryDuration (선택사항) 밀리초 단위의 최대 재시도 시간이며 0 이상이어야 합니다. 기본값: no retries
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

Pub/Sub to Elasticsearch 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Elasticsearch template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • ERROR_OUTPUT_TOPIC: 오류 출력을 위한 Pub/Sub 주제
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • CONNECTION_URL: Elasticsearch URL
  • DATASET: 로그 유형
  • NAMESPACE: 데이터 세트의 네임스페이스
  • APIKEY: 인증을 위한 base64로 인코딩된 API 키

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • ERROR_OUTPUT_TOPIC: 오류 출력을 위한 Pub/Sub 주제
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • CONNECTION_URL: Elasticsearch URL
  • DATASET: 로그 유형
  • NAMESPACE: 데이터 세트의 네임스페이스
  • APIKEY: 인증을 위한 base64로 인코딩된 API 키

Datastream to Cloud Spanner

Datastream to Cloud Spanner 템플릿은 Cloud Storage 버킷에서 Datastream 이벤트를 읽고 Cloud Spanner 데이터베이스에 쓰는 스트리밍 파이프라인입니다. 데이터를 Datastream 소스에서 Cloud Spanner로 마이그레이션합니다.

마이그레이션에 필요한 모든 테이블은 템플릿을 실행하기 전에 대상 Cloud Spanner 데이터베이스에 있어야 합니다. 따라서 데이터 마이그레이션 전에 소스 데이터베이스에서 대상 Cloud Spanner로 스키마 마이그레이션을 완료해야 합니다. 마이그레이션하기 전에 데이터가 테이블에 있을 수 있습니다. 이 템플릿은 Datastream 스키마 변경사항을 Cloud Spanner 데이터베이스에 전파하지 않습니다.

데이터 일관성은 모든 데이터가 Cloud Spanner에 기록될 때 마이그레이션 종료 시에만 보장됩니다. Cloud Spanner에 쓰인 각 레코드의 순서 정보를 저장하도록 이 템플릿은 Cloud Spanner 데이터베이스의 각 테이블에 추가 테이블(섀도 테이블이라고 함)을 만듭니다. 이 테이블은 마이그레이션 종료 시 일관성을 보장하는 데 사용됩니다. 섀도 테이블은 마이그레이션 후에 삭제되지 않으며 마이그레이션 종료 시 유효성 검사 목적으로 사용될 수 있습니다.

스키마 불일치, 잘못된 형식의 JSON 파일 또는 변환 실행으로 발생하는 오류와 같은 작업 중에 발생하는 모든 오류는 오류 큐에 기록됩니다. 오류 큐는 오류와 함께 오류가 발생한 모든 Datastream 이벤트를 텍스트 형식으로 저장하는 Cloud Storage 폴더입니다. 오류는 일시적이거나 영구적일 수 있으며 오류 큐의 적절한 Cloud Storage 폴더에 저장됩니다. 일시적인 오류는 자동으로 재시도되지만 영구 오류는 그렇지 않습니다. 영구적인 오류가 발생할 경우 변경 이벤트를 수정하고 템플릿이 실행되는 동안 재시도 가능한 버킷으로 이동할 수 있습니다.

파이프라인 요구사항:

  • 실행 중 또는 시작되지 않음 상태의 Datastream 스트림
  • Datastream 이벤트가 복제되는 Cloud Storage 버킷
  • 기존 테이블이 있는 Cloud Spanner 데이터베이스 이러한 테이블은 비어 있거나 데이터를 포함할 수 있습니다.

템플릿 매개변수

매개변수 설명
inputFilePattern Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이는 스트림의 루트 경로입니다.
streamName 스키마 정보와 소스 유형을 폴링할 스키마의 이름이나 템플릿입니다.
instanceId 변경사항이 복제된 Cloud Spanner 인스턴스입니다.
databaseId 변경사항이 복제된 Cloud Spanner 데이터베이스입니다.
projectId Cloud Spanner 프로젝트 ID입니다.
deadLetterQueueDirectory (선택사항) 오류 큐 출력을 저장할 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다.
inputFileFormat (선택사항) Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
shadowTablePrefix (선택사항) 섀도우 테이블의 이름을 지정하는 데 사용되는 프리픽스입니다. 기본값: shadow_

Datastream to Cloud Spanner 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Datastream to Spanner template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Cloud Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Cloud Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Cloud Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Cloud Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

Text Files on Cloud Storage to BigQuery(스트리밍)

Text Files on Cloud Storage to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 스트리밍하고, 사용자가 제공한 자바스크립트 사용자 정의 함수(UDF)를 사용하여 변환하고, 결과를 BigQuery에 추가할 수 있게 해주는 스트리밍 파이프라인입니다.

파이프라인은 무한히 실행되며 드레이닝을 지원하지 않는 분할 가능한 DoFnWatch 변환 사용으로 인해 드레이닝이 아닌 취소를 통해 수동으로 종료되어야 합니다.

파이프라인 요구사항:

  • BigQuery에서 출력 테이블의 스키마를 설명하는 JSON 파일을 만드세요.

    최상위 JSON 배열의 이름이 BigQuery Schema이고 해당 콘텐츠는 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 패턴을 따라야 합니다. 예를 들면 다음과 같습니다.

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • 텍스트 줄을 변환하는 논리를 제공하는 UDF 함수를 사용하여 자바스크립트(.js) 파일을 만듭니다. 함수는 JSON 문자열을 반환해야 합니다.

    예를 들어 이 함수는 CSV 파일의 각 줄을 분할하고, 값을 변환한 후에 JSON 문자열을 반환합니다.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

템플릿 매개변수

매개변수 설명
javascriptTextTransformGcsPath 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
JSONPath JSON으로 설명된 BigQuery 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.json입니다.
javascriptTextTransformFunctionName 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
outputTable 정규화된 BigQuery 테이블입니다. 예를 들면 my-project:dataset.table입니다.
inputFilePattern 처리하려는 텍스트의 Cloud Storage 위치입니다. 예를 들면 gs://my-bucket/my-files/text.txt입니다.
bigQueryLoadingTemporaryDirectory BigQuery 로딩 프로세스를 위한 임시 디렉터리입니다. 예를 들면 gs://my-bucket/my-files/temp_dir입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지 테이블입니다. 예를 들면 my-project:dataset.my-unprocessed-table입니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Cloud Storage Text to BigQuery(스트리밍) 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

Text Files on Cloud Storage to Pub/Sub(스트리밍)

이 템플릿은 Cloud Storage에 업로드된 새 텍스트 파일을 지속적으로 폴링하고, 각 파일을 한 줄씩 읽고, Pub/Sub 주제에 문자열을 게시하는 스트리밍 파이프라인을 만듭니다. 이 템플릿은 JSON 레코드를 포함하는 줄바꿈 구분 파일 또는 CSV 파일의 레코드를 Pub/Sub 주제에 게시하여 실시간으로 처리합니다. 이 템플릿을 사용하여 Pub/Sub에 데이터를 다시 재생할 수 있습니다.

파이프라인은 무한정으로 실행하며, 배출을 지원하지 않는 'SplittableDoFn'인 '관찰' 변환 사용으로 인해 '배출'이 아닌 '취소'를 통해 수동으로 종료되어야 합니다.

현재 폴링 간격은 고정되어 있으며 10초로 설정되어 있습니다. 이 템플릿은 개별 레코드에 타임스탬프를 설정하지 않기 때문에 이벤트 시간이 실행 중 게시 시간과 일치하게 됩니다. 파이프라인을 처리하기 위해 정확한 이벤트 시간이 필요한 경우에는 이 파이프라인을 사용해서는 안 됩니다.

파이프라인 요구사항:

  • 입력 파일은 줄바꿈으로 구분되는 JSON 또는 CSV 형식이어야 합니다. 소스 파일에서 여러 줄에 걸쳐 있는 레코드는 다운스트림 문제를 일으킬 수 있습니다. 파일 안의 각 줄이 Pub/Sub에 메시지로 게시되기 때문입니다.
  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 파이프라인은 무기한으로 실행되며 수동으로 종료해야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/files/*.json 또는 gs://bucket-name/path/*.csv입니다.
outputTopic 작성할 Pub/Sub 입력 주제입니다. 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.

Text Files on Cloud Storage to Pub/Sub(스트리밍) 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to Pub/Sub (Stream) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용)

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용) 템플릿은 Cloud Storage 버킷에서 csv 파일을 읽고, 익명화를 위해 Cloud Data Loss Prevention(Cloud DLP) API를 호출하며, 익명화된 데이터를 지정된 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 이 템플릿은 Cloud DLP 검사 템플릿과 Cloud DLP 익명화 템플릿 사용을 지원합니다. 따라서 사용자가 잠재적으로 민감한 정보를 검사하고 식별을 익명화할 수 있을 뿐만 아니라 열을 익명화하도록 지정된 구조화된 데이터를 익명화하며 검사가 필요 없습니다.

파이프라인 요구사항:

  • 토큰화할 입력 데이터가 있어야 합니다.
  • Cloud DLP 템플릿이 있어야 합니다(예: DeidentifyTemplate 및 InspectTemplate). 자세한 내용은 Cloud DLP 템플릿을 참조하세요.
  • BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 입력 데이터 레코드를 읽어 들일 csv 파일입니다. 와일드 카드 사용도 허용됩니다. 예를 들면 gs://mybucket/my_csv_filename.csv 또는 gs://mybucket/file-*.csv입니다.
dlpProjectId Cloud DLP API 리소스를 소유하는 Cloud DLP 프로젝트 ID입니다. 이 Cloud DLP 프로젝트는 Cloud DLP 템플릿을 소유하는 동일한 프로젝트이거나 별도의 프로젝트일 수 있습니다. 예를 들면 my_dlp_api_project입니다.
deidentifyTemplateName API 요청에 사용할 Cloud DLP 익명화 템플릿으로, projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 패턴을 사용하여 지정됩니다. 예를 들면 projects/my_project/deidentifyTemplates/100입니다.
datasetName 토큰화된 결과를 보내기 위한 BigQuery 데이터 세트입니다.
batchSize 검사하거나 익명화할 데이터를 보내는 데 사용할 청크/배치 크기입니다. csv 파일의 경우 batchSize는 배치의 행 수입니다. 사용자는 레코드 크기 및 파일 크기에 따라 배치 크기를 결정해야 합니다. Cloud DLP API의 페이로드 크기 제한은 API 호출당 524KB입니다.
inspectTemplateName (선택사항) API 요청에 사용할 Cloud DLP 검사 템플릿으로, projects/{template_project_id}/identifyTemplates/{idTemplateId} 패턴을 사용하여 지정됩니다. 예를 들면 projects/my_project/identifyTemplates/100입니다.

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용) 템플릿 실행

Console

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

다음을 바꿉니다.

  • DLP_API_PROJECT_ID: Cloud DLP API 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_DATA: 입력 파일 경로
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify 템플릿 번호
  • DATASET_NAME: BigQuery 데이터 세트 이름
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect 템플릿 번호
  • BATCH_SIZE_VALUE: 배치 크기(csv의 API당 행 수)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • DLP_API_PROJECT_ID: Cloud DLP API 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_DATA: 입력 파일 경로
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify 템플릿 번호
  • DATASET_NAME: BigQuery 데이터 세트 이름
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect 템플릿 번호
  • BATCH_SIZE_VALUE: 배치 크기(csv의 API당 행 수)