Google 제공 Dataflow 스트리밍 템플릿

Google은 오픈소스 Dataflow 템플릿 세트를 제공합니다.

이러한 Dataflow 템플릿을 사용하면 전용 개발 환경을 사용하지 않고도 데이터 가져오기, 데이터 내보내기, 데이터 백업, 데이터 복원, 일괄 API 작업 등의 대규모 데이터 태스크를 해결하는 데 도움이 될 수 있습니다. 이 템플릿은 Apache Beam을 기반으로 하며 Dataflow를 사용하여 데이터를 변환합니다.

템플릿에 대한 일반 정보는 Dataflow 템플릿을 참조하세요. 모든 Google 제공 템플릿 목록은 Google 제공 템플릿 시작을 참조하세요.

이 가이드에서는 스트리밍 템플릿을 다룹니다.

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 템플릿은 Pub/Sub 구독에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 템플릿을 사용하여 Pub/Sub 데이터를 BigQuery로 빠르게 이동할 수 있습니다. 이 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 요소로 변환합니다.

파이프라인 요구사항:

  • Pub/Sub 메시지의 data 필드는 이 JSON 가이드에 설명된 JSON 형식을 사용해야 합니다. 예를 들어 {"k1":"v1", "k2":"v2"} 형식으로 된 data 필드 값이 있는 메시지는 k1k2라는 두 개의 열 그리고 문자열 데이터 유형으로 BigQuery 테이블에 삽입될 수 있습니다.
  • 파이프라인을 실행하기 전에 출력 테이블이 있어야 합니다. 테이블 스키마는 입력 JSON 객체와 일치해야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 읽어올 Pub/Sub 입력 구독으로, projects/<project>/subscriptions/<subscription> 형식입니다.
outputTableSpec BigQuery 출력 테이블 위치로, <my-project>:<my-dataset>.<my-table> 형식입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지의 BigQuery 테이블로 <my-project>:<my-dataset>.<my-table> 형식입니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 OUTPUT_TABLE_SPEC_error_records가 대신 사용됩니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

Pub/Sub Subscription to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Subscription to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 템플릿은 Pub/Sub 주제에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 템플릿을 사용하여 Pub/Sub 데이터를 BigQuery로 빠르게 이동할 수 있습니다. 이 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 요소로 변환합니다.

파이프라인 요구사항:

  • Pub/Sub 메시지의 data 필드는 이 JSON 가이드에 설명된 JSON 형식을 사용해야 합니다. 예를 들어 {"k1":"v1", "k2":"v2"} 형식으로 된 data 필드 값이 있는 메시지는 k1k2라는 두 개의 열 그리고 문자열 데이터 유형으로 BigQuery 테이블에 삽입될 수 있습니다.
  • 파이프라인을 실행하기 전에 출력 테이블이 있어야 합니다. 테이블 스키마는 입력 JSON 객체와 일치해야 합니다.

템플릿 매개변수

매개변수 설명
inputTopic 읽어올 Pub/Sub 입력 주제로, projects/<project>/topics/<topic> 형식입니다.
outputTableSpec BigQuery 출력 테이블 위치로, <my-project>:<my-dataset>.<my-table> 형식입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지의 BigQuery 테이블이며, <my-project>:<my-dataset>.<my-table> 형식이어야 합니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

Pub/Sub Topic to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Topic to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 Avro 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

파이프라인 요구사항

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Avro 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 처리되지 않은 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
schemaPath Avro 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.avsc입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독입니다. 예를 들면 projects/<project>/subscriptions/<subscription>입니다.
outputTopic 처리되지 않은 레코드에 사용할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
outputTableSpec BigQuery 출력 테이블 위치입니다. 예를 들면 <my-project>:<my-dataset>.<my-table>입니다. 지정된 createDisposition에 따라 사용자가 제공한 Avro 스키마를 사용하여 출력 테이블을 자동으로 생성할 수 있습니다.
writeDisposition (선택사항) BigQuery WriteDisposition. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값: WRITE_APPEND
createDisposition (선택사항) BigQuery CreateDisposition. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Avro to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_PATH: Avro 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • DEADLETTER_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_PATH: Avro 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • DEADLETTER_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

Pub/Sub Proto to BigQuery

Pub/Sub proto to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 proto 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

자바스크립트 사용자 정의 함수(UDF)를 제공하여 데이터를 변환할 수 있습니다. UDF 실행 중 오류는 개별 Pub/Sub 주제 또는 BigQuery 오류와 동일한 미처리 주제로 전송될 수 있습니다.

파이프라인 요구사항:

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Proto 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 출력 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.
  • BigQuery 테이블이 있으면 createDisposition 값에 관계없이 Proto 데이터와 일치하는 스키마가 있어야 합니다.

템플릿 매개변수

매개변수 설명
protoSchemaPath 자체 포함된 Proto 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/file.pb입니다. 이 파일은 protoc 명령어의 --descriptor_set_out 플래그를 사용하여 생성할 수 있습니다. --include_imports 플래그는 파일을 독립 실행형 파일로 만듭니다.
fullMessageName 전체 Proto 메시지 이름입니다. 예를 들면 package.name.MessageName입니다. 여기서 package.namejava_package 문이 아닌 package 문에 대해 제공된 값입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독입니다. 예를 들면 projects/<project>/subscriptions/<subscription>입니다.
outputTopic 처리되지 않은 레코드에 사용할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
outputTableSpec BigQuery 출력 테이블 위치입니다. 예를 들면 my-project:my_dataset.my_table입니다. 지정된 createDisposition에 따라 입력 스키마 파일을 사용해서 출력 테이블을 자동으로 만들 수 있습니다.
preserveProtoFieldNames (선택사항) JSON에서 원본 Proto 필드를 보존하기 위한 true입니다. 더 많은 표준 JSON 이름을 사용하기 위한 false입니다. 예를 들어 falsefield_namefieldName으로 바꿉니다. (기본값: false)
bigQueryTableSchemaPath (선택사항) BigQuery 스키마 경로에 대한 Cloud Storage 경로입니다. 예를 들면 gs://path/to/my/schema.json입니다. 제공되지 않는 경우 스키마가 Proto 스키마에서 유추됩니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
udfOutputTopic (선택사항) UDF 오류를 저장하는 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다. 제공되지 않는 경우 UDF 오류가 outputTopic과 동일한 주제로 전송됩니다.
writeDisposition (선택사항) BigQuery WriteDisposition입니다. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값: WRITE_APPEND
createDisposition (선택사항) BigQuery CreateDisposition입니다. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값: CREATE_IF_NEEDED

Pub/Sub Proto to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Proto to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
  • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SCHEMA_PATH: Proto 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.pb).
  • PROTO_MESSAGE_NAME: Proto 메시지 이름(예: package.name.MessageName)
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • UNPROCESSED_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 템플릿은 Pub/Sub 구독에서 메시지를 읽고 다른 Pub/Sub 주제에 메시지를 쓰는 스트리밍 파이프라인입니다. 또한 파이프라인은 선택적인 메시지 속성 키와 Pub/Sub 주제에 쓰여질 메시지를 필터링하는 데 사용할 수 있는 값을 허용합니다. 이 템플릿을 사용하여 선택적 메시지 필터로 Pub/Sub 구독에서 다른 Pub/Sub 주제로 메시지를 복사할 수 있습니다.

파이프라인 요구사항:

  • 실행하기 전에 소스 Pub/Sub 구독이 있어야 합니다.
  • 소스 Pub/Sub 구독은 풀 구독이어야 합니다.
  • 실행하기 전에 대상 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
outputTopic 출력을 작성할 Cloud Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
filterKey (선택사항) 속성 키를 기반으로 하는 필터 이벤트입니다. filterKey가 지정되지 않은 경우 필터가 적용되지 않습니다.
filterValue (선택사항) filterKey가 제공된 경우에 사용하는 필터 속성 값입니다. 기본적으로 null인 filterValue가 사용됩니다.

Pub/Sub to Pub/Sub 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOPIC_NAME: Pub/Sub 주제 이름
  • FILTER_KEY: 이벤트가 필터링되는 속성 키입니다. 키를 지정하지 않으면 필터가 적용되지 않습니다.
  • FILTER_VALUE: 이벤트 필터 키가 제공된 경우 사용할 필터 속성 값입니다. 유효한 자바 정규식 문자열을 이벤트 필터 값으로 사용합니다. 정규식이 제공된 경우 메시지 필터링을 위해 전체 표현식이 일치해야 합니다. 부분 일치(예: 하위 문자열)는 필터링되지 않습니다. 기본적으로 null 이벤트 필터 값이 사용됩니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOPIC_NAME: Pub/Sub 주제 이름
  • FILTER_KEY: 이벤트가 필터링되는 속성 키입니다. 키를 지정하지 않으면 필터가 적용되지 않습니다.
  • FILTER_VALUE: 이벤트 필터 키가 제공된 경우 사용할 필터 속성 값입니다. 유효한 자바 정규식 문자열을 이벤트 필터 값으로 사용합니다. 정규식이 제공된 경우 메시지 필터링을 위해 전체 표현식이 일치해야 합니다. 부분 일치(예: 하위 문자열)는 필터링되지 않습니다. 기본적으로 null 이벤트 필터 값이 사용됩니다.

Pub/Sub to Splunk

Pub/Sub to Splunk 템플릿은 Splunk의 HTTP Event Collector(HEC)를 통해 Pub/Sub 구독에서 메시지를 읽고 Splunk에 메시지 페이로드를 쓰는 스트리밍 파이프라인입니다. 이 템플릿의 가장 일반적인 사용 사례는 Splunk로 로그를 내보내는 것입니다. 기본 워크플로의 예시를 보려면 Dataflow를 사용하여 Splunk로 프로덕션에 즉시 사용 가능한 로그 내보내기 배포를 참조하세요.

Splunk에 쓰기 전에 자바스크립트 사용자 정의 함수를 메시지 페이로드에 적용할 수도 있습니다. 처리 실패가 발생한 메시지는 추가적인 문제 해결 및 재처리를 위해 Pub/Sub 처리 불가 주제로 전달됩니다.

HEC 토큰의 추가 보안 레이어로 Cloud KMS 키로 암호화된 base64 인코딩 HEC 토큰 매개변수와 함께 Cloud KMS 키를 전달할 수도 있습니다. HEC 토큰 매개변수 암호화에 대한 자세한 내용은 Cloud KMS API 암호화 엔드포인트를 참조하세요.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 소스 Pub/Sub 구독이 있어야 합니다.
  • 파이프라인을 실행하기 전에 Pub/Sub 처리되지 않은 주제가 있어야 합니다.
  • Splunk HEC 엔드포인트는 Dataflow 작업자 네트워크에서 액세스할 수 있어야 합니다.
  • Splunk HEC 토큰이 생성되고 사용 가능해야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
token (선택사항) Splunk HEC 인증 토큰입니다. tokenSource가 PLAINTEXT 또는 KMS로 설정된 경우 제공해야 합니다.
url Splunk HEC URL입니다. 이 값은 파이프라인이 실행되는 VPC에서 라우팅할 수 있어야 합니다. 예를 들면 https://splunk-hec-host:8088입니다.
outputDeadletterTopic 전달할 수 없는 메시지를 전달할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
batchCount (선택사항) Splunk에 여러 이벤트를 전송하기 위한 배치 크기입니다. 기본값은 1(일괄 처리 없음)입니다.
parallelism (선택사항) 최대 동시 요청 수입니다. 기본값은 1(동시 처리 없음)입니다.
disableCertificateValidation (선택사항) SSL 인증서 유효성 검사를 사용 중지합니다. 기본값은 false(유효성 검사 사용)입니다. true인 경우 인증서가 검증되지 않고(모든 인증서를 신뢰할 수 있음) `rootCaCertificatePath` 매개변수가 무시됩니다.
includePubsubMessage (선택사항) 페이로드의 전체 Pub/Sub 메시지를 포함합니다. 기본값은 false입니다(데이터 요소만 페이로드에 포함됨).
tokenSource 토큰의 소스입니다. PLAINTEXT, KMS 또는 SECRET_Manager 중 하나입니다. Secret Manager를 사용하는 경우 이 매개변수를 제공해야 합니다. tokenSource가 KMS로 설정된 경우 tokenKMSEncryptionKey 및 암호화된 token제공해야 합니다. tokenSource가 SECRET_MANAGER로 설정된 경우 tokenSecretId제공해야 합니다. tokenSource가 PLAINTEXT로 설정된 경우 token제공해야 합니다.
tokenKMSEncryptionKey (선택사항) HEC 토큰 문자열을 복호화할 Cloud KMS 키입니다. tokenSource가 KMS로 설정된 경우 이 매개변수를 제공해야 합니다. Cloud KMS 키가 제공되면 HEC 토큰 문자열이 암호화되어 전달되어야 합니다.
tokenSecretId (선택사항) 토큰의 Secret Manager 보안 비밀 ID입니다. tokenSource기 SECRET_MANAGER로 설정된 경우 이 매개변수를 제공해야 합니다. 다음 형식이어야 합니다. projects/<project-id>/secrets/<secret-name>/versions/<secret-version>
rootCaCertificatePath (선택사항) Cloud Storage의 루트 CA 인증서에 대한 전체 URL입니다. 예를 들면 gs://mybucket/mycerts/privateCA.crt입니다. Cloud Storage에서 제공하는 인증서는 DER로 인코딩되어야 하며 바이너리 또는 인쇄 가능한 (Base64) 인코딩으로 제공될 수 있습니다. 인증서가 Base64 인코딩으로 제공되는 경우 시작 부분에 -----BEGIN CERTIFICATE-----로 바인딩되고 마지막에 -----END CERTIFICATE-----로 바인딩되어야 합니다. 이 매개변수가 제공되면 Splunk HEC 엔드포인트의 SSL 인증서를 확인하기 위해 이 비공개 CA 인증서 파일을 가져와 Dataflow 작업자의 트러스트 저장소에 추가합니다. 이 매개변수를 제공하지 않으면 기본 트러스트 저장소가 사용됩니다.
enableBatchLogs (선택사항) Splunk에 기록된 배치에 대해 로그를 사용 설정할지 여부를 지정합니다. 기본값: true.
enableGzipHttpCompression (선택사항) Splunk HEC로 전송된 HTTP 요청을 압축할지 여부를 지정합니다(gzip 콘텐츠 인코딩). 기본값: true.

Pub/Sub to Splunk 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Splunk template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOKEN: Splunk의 Http Event Collector 토큰
  • URL: Splunk의 Http Event Collector의 URL 경로(예: https://splunk-hec-host:8088)입니다.
  • DEADLETTER_TOPIC_NAME: Pub/Sub 주제 이름
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • BATCH_COUNT: Splunk에 여러 이벤트를 전송하기 위해 사용할 배치 크기
  • PARALLELISM: Splunk에 이벤트를 전송하기 위해 사용할 동시 요청 수
  • DISABLE_VALIDATION: SSL 인증서 검증을 사용 중지하려는 경우 true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage의 루트 CA 인증서 경로(예: gs://your-bucket/privateCA.crt)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOKEN: Splunk의 Http Event Collector 토큰
  • URL: Splunk의 Http Event Collector의 URL 경로(예: https://splunk-hec-host:8088)입니다.
  • DEADLETTER_TOPIC_NAME: Pub/Sub 주제 이름
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • BATCH_COUNT: Splunk에 여러 이벤트를 전송하기 위해 사용할 배치 크기
  • PARALLELISM: Splunk에 이벤트를 전송하기 위해 사용할 동시 요청 수
  • DISABLE_VALIDATION: SSL 인증서 검증을 사용 중지하려는 경우 true
  • ROOT_CA_CERTIFICATE_PATH: Cloud Storage의 루트 CA 인증서 경로(예: gs://your-bucket/privateCA.crt)

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 템플릿은 Pub/Sub 주제에서 데이터를 읽고 지정된 Cloud Storage 버킷에 Avro 파일을 쓰는 스트리밍 파이프라인입니다.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 입력 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputTopic 메시지 소비를 위해 구독할 Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.
outputDirectory 출력 Avro 파일이 보관처리되는 출력 디렉터리입니다. 마지막에 /를 포함해야 합니다. 예를 들면 gs://example-bucket/example-directory/입니다.
avroTempDirectory 임시 Avro 파일의 디렉터리입니다. 마지막에 /를 포함해야 합니다. 예를 들면 gs://example-bucket/example-directory/입니다.
outputFilenamePrefix (선택사항) Avro 파일의 출력 파일 이름 프리픽스입니다.
outputFilenameSuffix (선택사항) Avro 파일의 출력 파일 이름 서픽스입니다.
outputShardTemplate (선택사항) 출력 파일의 샤드 템플릿입니다. S 또는 N 문자의 반복 시퀀스로 지정됩니다. 예를 들면 SSS-NNN입니다. 이는 각각 샤드 번호 또는 총 샤드 개수로 바뀝니다. 이 매개변수가 지정되지 않았으면 기본 템플릿 형식이 W-P-SS-of-NN입니다.

Pub/Sub to Cloud Storage Avro 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Avro Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILENAME_PREFIX: 선호하는 출력 파일 이름 프리픽스
  • FILENAME_SUFFIX: 선호하는 출력 파일 이름 서픽스
  • SHARD_TEMPLATE: 선호하는 출력 샤드 템플릿

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILENAME_PREFIX: 선호하는 출력 파일 이름 프리픽스
  • FILENAME_SUFFIX: 선호하는 출력 파일 이름 서픽스
  • SHARD_TEMPLATE: 선호하는 출력 샤드 템플릿

Pub/Sub Topic to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 템플릿은 Pub/Sub 주제에서 레코드를 읽고 텍스트 형식으로 된 일련의 Cloud Storage 파일로 저장하는 스트리밍 파이프라인입니다. 나중에 사용하기 위해 Pub/Sub에 데이터를 빠르게 저장하는 수단으로 템플릿을 사용할 수 있습니다. 기본적으로 템플릿은 5분마다 새 파일을 생성합니다.

파이프라인 요구사항:

  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 주제에 게시되는 메시지는 텍스트 형식이어야 합니다.
  • 주제에 게시되는 메시지에는 줄바꿈을 사용할 수 없습니다. 각 Pub/Sub 메시지는 출력 파일에 한 줄로 저장됩니다.

템플릿 매개변수

매개변수 설명
inputTopic 입력을 읽어올 Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.
outputDirectory 출력 파일을 쓰기 위한 경로 및 파일 이름 프리픽스입니다. 예를 들면 gs://bucket-name/path/입니다. 이 값은 슬래시로 끝나야 합니다.
outputFilenamePrefix 윈도우 설정된 각 파일에 넣을 프리픽스입니다. 예를 들면 output-입니다.
outputFilenameSuffix 윈도우 설정된 각 파일에 넣을 서픽스입니다. 일반적으로 .txt 또는 .csv와 같은 파일 확장자입니다.
outputShardTemplate 샤드 템플릿은 윈도우 설정된 각 파일의 동적 부분을 정의합니다. 기본적으로, 파이프라인은 각 윈도우 내에서 단일 샤드를 사용하여 파일 시스템에 출력합니다. 따라서 모든 데이터가 윈도우별로 한 파일에 출력됩니다. outputShardTemplate의 기본값은 W-P-SS-of-NN입니다. 여기에서 W은 윈도우 기간, P는 창 정보, S는 샤드 번호, N은 샤드 개수입니다. 단일 파일의 경우 outputShardTemplateSS-of-NN 부분이 00-of-01입니다.

Pub/Sub Topic to Text Files on Cloud Storage 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Text Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

Pub/Sub Topic 또는 Subscription to Text Files on Cloud Storage

Pub/Sub Topic 또는 Subscription to Cloud Storage Text 템플릿은 Pub/Sub에서 레코드를 읽고 텍스트 형식으로 된 일련의 Cloud Storage 파일로 저장하는 스트리밍 파이프라인입니다. 나중에 사용하기 위해 Pub/Sub에 데이터를 빠르게 저장하는 수단으로 템플릿을 사용할 수 있습니다. 기본적으로 템플릿은 5분마다 새 파일을 생성합니다.

파이프라인 요구사항:

  • 실행하기 전에 Pub/Sub 주제 또는 구독이 있어야 합니다.
  • 주제에 게시되는 메시지는 텍스트 형식이어야 합니다.
  • 주제에 게시되는 메시지에는 줄바꿈을 사용할 수 없습니다. 각 Pub/Sub 메시지는 출력 파일에 한 줄로 저장됩니다.

템플릿 매개변수

매개변수 설명
inputTopic 입력을 읽어올 Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다. 이 매개변수가 제공되면 inputSubscription을 제공해서는 안 됩니다.
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 구독 이름은 projects/<project-id>/subscription/<subscription-name> 형식이어야 합니다. 이 매개변수가 제공되면 inputTopic을 제공해서는 안 됩니다.
outputDirectory 출력 파일을 쓰기 위한 경로 및 파일 이름 프리픽스입니다. 예를 들면 gs://bucket-name/path/입니다. 이 값은 슬래시로 끝나야 합니다.
outputFilenamePrefix 윈도우 설정된 각 파일에 넣을 프리픽스입니다. 예를 들면 output-입니다.
outputFilenameSuffix 윈도우 설정된 각 파일에 넣을 서픽스입니다. 일반적으로 .txt 또는 .csv와 같은 파일 확장자입니다.
outputShardTemplate 샤드 템플릿은 윈도우 설정된 각 파일의 동적 부분을 정의합니다. 기본적으로, 파이프라인은 각 윈도우 내에서 단일 샤드를 사용하여 파일 시스템에 출력합니다. 따라서 모든 데이터가 윈도우별로 한 파일에 출력됩니다. outputShardTemplate의 기본값은 W-P-SS-of-NN입니다. 여기에서 W은 윈도우 기간, P는 창 정보, S는 샤드 번호, N은 샤드 개수입니다. 단일 파일의 경우 outputShardTemplateSS-of-NN 부분이 00-of-01입니다.
windowDuration (선택사항) 범위 기간은 데이터가 출력 디렉터리에 기록되는 간격입니다. 파이프라인의 처리량을 기준으로 기간을 구성합니다. 예를 들어 처리량이 높을수록 데이터가 메모리에 적합하도록 더 작은 범위가 필요할 수 있습니다. 기본값은 5m이며 최소 1초 이상이어야 합니다. 허용되는 형식은 [int]s(초 단위, 예: 5s), [int]m(분 단위, 예: 12m), [int]h(시간 단위, 예: 2h)입니다.

Pub/Sub Topic 또는 Subscription to Text Files on Cloud Storage 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

Pub/Sub to MongoDB

Pub/Sub to MongoDB 템플릿은 Pub/Sub 구독에서 JSON 인코딩 메시지를 읽고 이를 MongoDB에 문서로 쓰는 스트리밍 파이프라인입니다. 필요한 경우 이 파이프라인은 자바스크립트 사용자 정의 함수(UDF)를 사용하여 포함할 수 있는 추가 변환을 지원합니다. 스키마 불일치, 잘못된 형식의 JSON으로 인해 오류가 생기거나 변환 실행 중에 오류가 발생하는 경우 입력 메시지와 함께 처리되지 않은 메시지가 BigQuery 테이블에 기록됩니다. 처리되지 않은 레코드의 테이블이 실행되기 전에 존재하지 않으면 파이프라인은 자동으로 이 테이블을 생성합니다.

파이프라인 요구사항:

  • Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • MongoDB 클러스터가 있어야 하며 Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription Pub/Sub 구독의 이름입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
mongoDBUri 쉼표로 구분된 MongoDB 서버 목록입니다. 예를 들면 192.285.234.12:27017,192.287.123.11:27017입니다.
database 컬렉션을 저장하기 위한 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
collection MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
deadletterTable 오류(스키마 불일치, 잘못된 형식의 json 등)로 인해 메시지를 저장하는 BigQuery 테이블입니다. 예를 들면 project-id:dataset-name.table-name입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
batchSize (선택사항) MongoDB에 문서를 일괄 삽입하기 위해 사용되는 배치 크기입니다. 기본값: 1000.
batchSizeBytes (선택사항) 배치 크기(바이트)입니다. 기본값: 5242880
maxConnectionIdleTime (선택사항) 연결이 시간 초과되기 전까지 허용되는 최대 유휴 시간(초)입니다. 기본값: 60000
sslEnabled (선택사항) MongoDB 연결에 SSL이 사용 설정되었는지 여부를 나타내는 부울 값입니다. 기본값: true.
ignoreSSLCertificate (선택사항) SSL 인증서를 무시해야 하는지 여부를 나타내는 부울 값입니다. 기본값: true.
withOrdered (선택사항) MongoDB에 순서대로 대량 삽입할 수 있게 해주는 부울 값입니다. 기본값: true.
withSSLInvalidHostNameAllowed (선택사항) SSL 연결에 잘못된 호스트 이름이 허용되었는지 여부를 나타내는 부울 값입니다. 기본값: true.

Pub/Sub to MongoDB 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to MongoDB template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch 템플릿은 Pub/Sub 구독에서 메시지를 읽고 사용자 정의 함수(UDF)를 실행하고 Elasticsearch에 문서로 쓰는 스트리밍 파이프라인입니다. Dataflow 템플릿은 Elasticsearch의 데이터 스트림 기능을 사용하여 여러 색인에 시계열 데이터를 저장하면서 요청에 대한 이름이 지정된 단일 리소스를 제공합니다. 데이터 스트림은 로그, 측정항목, trace, Pub/Sub에 저장된 기타 지속적으로 생성된 데이터에 적합합니다.

파이프라인 요구사항

  • 소스 Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • GCP 인스턴스 또는 Elasticsearch 버전 7.0 이상을 사용하는 Elastic Cloud에 공개적으로 연결 가능한 Elasticsearch 호스트가 있어야 합니다. 자세한 내용은 Google Cloud의 Elastic 통합을 참조하세요.
  • 오류 출력을 위한 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 사용할 Pub/Sub 구독입니다. 이름은 projects/<project-id>/subscriptions/<subscription-name> 형식이어야 합니다.
connectionUrl Elasticsearch URL(https://hostname:[port] 형식) 또는 Elastic Cloud를 사용하는 경우 CloudID를 지정합니다.
apiKey 인증에 사용되는 Base64로 인코딩된 API 키입니다.
errorOutputTopic 실패한 레코드를 projects/<project-id>/topics/<topic-name> 형식으로 게시하는 Pub/Sub 출력 주제입니다.
dataset (선택사항) 즉시 사용 가능한 대시보드가 있는 Pub/Sub를 통해 전송된 로그 유형입니다. 알려진 로그 유형 값은 audit, vpcflow, firewall입니다. 기본값: pubsub
namespace (선택사항) 환경(dev, prod, qa), 팀 또는 전략적 사업부와 같은 임의의 그룹화입니다. 기본값: default
batchSize (선택사항) 문서 수의 배치 크기입니다. 기본값: 1000.
batchSizeBytes (선택사항) 바이트 수의 배치 크기입니다. 기본값은 5242880(5MB)입니다.
maxRetryAttempts (선택사항) 최대 재시도 횟수이며 0 이상이어야 합니다. 기본값: no retries.
maxRetryDuration (선택사항) 밀리초 단위의 최대 재시도 시간이며 0 이상이어야 합니다. 기본값: no retries.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
propertyAsIndex (선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정하는 값이 있는 문서의 속성입니다(_index UDF보다 우선 적용됨). 기본값: none.
propertyAsId (선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정하는 값이 있는 문서의 속성입니다(_id UDF보다 우선 적용됨). 기본값: none.
javaScriptIndexFnGcsPath (선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정할 자바스크립트 UDF 소스의 Cloud Storage 경로입니다. 기본값: none.
javaScriptIndexFnName (선택사항) 일괄 요청에서 문서에 포함할 _index 메타데이터를 지정할 함수의 UDF 자바스크립트 함수 이름입니다. 기본값: none.
javaScriptIdFnGcsPath (선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정할 자바스크립트 UDF 소스의 Cloud Storage 경로입니다. 기본값: none.
javaScriptIdFnName (선택사항) 일괄 요청에서 문서에 포함할 _id 메타데이터를 지정할 함수의 UDF 자바스크립트 함수 이름입니다. 기본값: none.
javaScriptTypeFnGcsPath (선택사항) 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정할 자바스크립트 UDF 소스의 Cloud Storage 경로입니다. 기본값: none.
javaScriptTypeFnName (선택사항) 일괄 요청에서 문서에 포함할 _type 메타데이터를 지정할 함수의 UDF 자바스크립트 함수 이름입니다. 기본값: none.
javaScriptIsDeleteFnGcsPath (선택사항) 문서를 삽입하거나 업데이트하는 대신 삭제해야 하는지 여부를 결정하는 함수의 자바스크립트 UDF 소스에 대한 Cloud Storage 경로입니다. 이 함수는 문자열 값 "true" 또는 "false"를 반환해야 합니다. 기본값: none.
javaScriptIsDeleteFnName (선택사항) 문서를 삽입하거나 업데이트하지 않고 삭제할지 여부를 결정하는 함수의 UDF 자바스크립트 함수 이름입니다. 이 함수는 문자열 값 "true" 또는 "false"를 반환해야 합니다. 기본값: none.
usePartialUpdate (선택사항) Elasticsearch 요청에서 부분 업데이트 (만들기 또는 색인 대신 업데이트, 부분 문서 허용)를 사용할지 여부입니다. 기본값: false.
bulkInsertMethod (선택사항) Elasticsearch 일괄 요청에서 INDEX(색인, 업데이트 허용) 또는 CREATE(만들기, 중복 _id 오류)를 사용할지 여부입니다. 기본값: CREATE

Pub/Sub to Elasticsearch 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Elasticsearch template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • ERROR_OUTPUT_TOPIC: 오류 출력을 위한 Pub/Sub 주제
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • CONNECTION_URL: Elasticsearch URL
  • DATASET: 로그 유형
  • NAMESPACE: 데이터 세트의 네임스페이스
  • APIKEY: 인증을 위한 base64로 인코딩된 API 키

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • ERROR_OUTPUT_TOPIC: 오류 출력을 위한 Pub/Sub 주제
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • CONNECTION_URL: Elasticsearch URL
  • DATASET: 로그 유형
  • NAMESPACE: 데이터 세트의 네임스페이스
  • APIKEY: 인증을 위한 base64로 인코딩된 API 키

Datastream to Cloud Spanner

Datastream to Cloud Spanner 템플릿은 Cloud Storage 버킷에서 Datastream 이벤트를 읽고 Cloud Spanner 데이터베이스에 쓰는 스트리밍 파이프라인입니다. 데이터를 Datastream 소스에서 Cloud Spanner로 마이그레이션합니다.

마이그레이션에 필요한 모든 테이블은 템플릿을 실행하기 전에 대상 Cloud Spanner 데이터베이스에 있어야 합니다. 따라서 데이터 마이그레이션 전에 소스 데이터베이스에서 대상 Cloud Spanner로 스키마 마이그레이션을 완료해야 합니다. 마이그레이션하기 전에 데이터가 테이블에 있을 수 있습니다. 이 템플릿은 Datastream 스키마 변경사항을 Cloud Spanner 데이터베이스에 전파하지 않습니다.

데이터 일관성은 모든 데이터가 Cloud Spanner에 기록될 때 마이그레이션 종료 시에만 보장됩니다. Cloud Spanner에 쓰인 각 레코드의 순서 정보를 저장하도록 이 템플릿은 Cloud Spanner 데이터베이스의 각 테이블에 추가 테이블(섀도 테이블이라고 함)을 만듭니다. 이 테이블은 마이그레이션 종료 시 일관성을 보장하는 데 사용됩니다. 섀도 테이블은 마이그레이션 후에 삭제되지 않으며 마이그레이션 종료 시 유효성 검사 목적으로 사용될 수 있습니다.

스키마 불일치, 잘못된 형식의 JSON 파일 또는 변환 실행으로 발생하는 오류와 같은 작업 중에 발생하는 모든 오류는 오류 큐에 기록됩니다. 오류 큐는 오류와 함께 오류가 발생한 모든 Datastream 이벤트를 텍스트 형식으로 저장하는 Cloud Storage 폴더입니다. 오류는 일시적이거나 영구적일 수 있으며 오류 큐의 적절한 Cloud Storage 폴더에 저장됩니다. 일시적인 오류는 자동으로 재시도되지만 영구 오류는 그렇지 않습니다. 영구적인 오류가 발생할 경우 변경 이벤트를 수정하고 템플릿이 실행되는 동안 재시도 가능한 버킷으로 이동할 수 있습니다.

파이프라인 요구사항:

  • 실행 중 또는 시작되지 않음 상태의 Datastream 스트림
  • Datastream 이벤트가 복제되는 Cloud Storage 버킷
  • 기존 테이블이 있는 Cloud Spanner 데이터베이스 이러한 테이블은 비어 있거나 데이터를 포함할 수 있습니다.

템플릿 매개변수

매개변수 설명
inputFilePattern Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이는 스트림의 루트 경로입니다.
streamName 스키마 정보와 소스 유형을 폴링할 스키마의 이름이나 템플릿입니다.
instanceId 변경사항이 복제된 Cloud Spanner 인스턴스입니다.
databaseId 변경사항이 복제된 Cloud Spanner 데이터베이스입니다.
projectId Cloud Spanner 프로젝트 ID입니다.
deadLetterQueueDirectory (선택사항) 오류 큐 출력을 저장할 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다.
inputFileFormat (선택사항) Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
shadowTablePrefix (선택사항) 섀도우 테이블의 이름을 지정하는 데 사용되는 프리픽스입니다. 기본값: shadow_

Datastream to Cloud Spanner 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Datastream to Spanner template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Cloud Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Cloud Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Cloud Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Cloud Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

Text Files on Cloud Storage to BigQuery(스트리밍)

Text Files on Cloud Storage to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 스트리밍하고, 사용자가 제공한 자바스크립트 사용자 정의 함수(UDF)를 사용하여 변환하고, 결과를 BigQuery에 추가할 수 있게 해주는 스트리밍 파이프라인입니다.

파이프라인은 무한히 실행되며 드레이닝을 지원하지 않는 분할 가능한 DoFnWatch 변환 사용으로 인해 드레이닝이 아닌 취소를 통해 수동으로 종료되어야 합니다.

파이프라인 요구사항:

  • BigQuery에서 출력 테이블의 스키마를 설명하는 JSON 파일을 만드세요.

    최상위 JSON 배열의 이름이 fields이고 해당 콘텐츠는 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 패턴을 따라야 합니다. 예를 들면 다음과 같습니다.

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • 텍스트 줄을 변환하는 논리를 제공하는 UDF 함수를 사용하여 자바스크립트(.js) 파일을 만듭니다. 함수는 JSON 문자열을 반환해야 합니다.

    예를 들어 이 함수는 CSV 파일의 각 줄을 분할하고, 값을 변환한 후에 JSON 문자열을 반환합니다.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

템플릿 매개변수

매개변수 설명
javascriptTextTransformGcsPath 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
JSONPath JSON으로 설명된 BigQuery 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.json입니다.
javascriptTextTransformFunctionName 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
outputTable 정규화된 BigQuery 테이블입니다. 예를 들면 my-project:dataset.table입니다.
inputFilePattern 처리하려는 텍스트의 Cloud Storage 위치입니다. 예를 들면 gs://my-bucket/my-files/text.txt입니다.
bigQueryLoadingTemporaryDirectory BigQuery 로딩 프로세스를 위한 임시 디렉터리입니다. 예를 들면 gs://my-bucket/my-files/temp_dir입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지 테이블입니다. 예를 들면 my-project:dataset.my-unprocessed-table입니다. 없으면 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Cloud Storage Text to BigQuery(스트리밍) 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로

Text Files on Cloud Storage to Pub/Sub(스트리밍)

이 템플릿은 Cloud Storage에 업로드된 새 텍스트 파일을 지속적으로 폴링하고, 각 파일을 한 줄씩 읽고, Pub/Sub 주제에 문자열을 게시하는 스트리밍 파이프라인을 만듭니다. 이 템플릿은 JSON 레코드를 포함하는 줄바꿈 구분 파일 또는 CSV 파일의 레코드를 Pub/Sub 주제에 게시하여 실시간으로 처리합니다. 이 템플릿을 사용하여 Pub/Sub에 데이터를 다시 재생할 수 있습니다.

파이프라인은 무한정으로 실행하며, 배출을 지원하지 않는 'SplittableDoFn'인 '관찰' 변환 사용으로 인해 '배출'이 아닌 '취소'를 통해 수동으로 종료되어야 합니다.

현재 폴링 간격은 고정되어 있으며 10초로 설정되어 있습니다. 이 템플릿은 개별 레코드에 타임스탬프를 설정하지 않기 때문에 이벤트 시간이 실행 중 게시 시간과 일치하게 됩니다. 파이프라인을 처리하기 위해 정확한 이벤트 시간이 필요한 경우에는 이 파이프라인을 사용해서는 안 됩니다.

파이프라인 요구사항:

  • 입력 파일은 줄바꿈으로 구분되는 JSON 또는 CSV 형식이어야 합니다. 소스 파일에서 여러 줄에 걸쳐 있는 레코드는 다운스트림 문제를 일으킬 수 있습니다. 파일 안의 각 줄이 Pub/Sub에 메시지로 게시되기 때문입니다.
  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 파이프라인은 무기한으로 실행되며 수동으로 종료해야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/files/*.json 또는 gs://bucket-name/path/*.csv입니다.
outputTopic 작성할 Pub/Sub 입력 주제입니다. 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.

Text Files on Cloud Storage to Pub/Sub(스트리밍) 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to Pub/Sub (Stream) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용)

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용) 템플릿은 Cloud Storage 버킷에서 csv 파일을 읽고, 익명화를 위해 Cloud Data Loss Prevention(Cloud DLP) API를 호출하며, 익명화된 데이터를 지정된 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 이 템플릿은 Cloud DLP 검사 템플릿과 Cloud DLP 익명화 템플릿 사용을 지원합니다. 따라서 사용자가 잠재적으로 민감한 정보를 검사하고 식별을 익명화할 수 있을 뿐만 아니라 열을 익명화하도록 지정된 구조화된 데이터를 익명화하며 검사가 필요 없습니다. 또한 이 템플릿은 익명화 템플릿 위치에 대한 리전 경로를 지원하지 않습니다. 전역 경로만 지원됩니다.

파이프라인 요구사항:

  • 토큰화할 입력 데이터가 있어야 합니다.
  • Cloud DLP 템플릿이 있어야 합니다(예: DeidentifyTemplate 및 InspectTemplate). 자세한 내용은 Cloud DLP 템플릿을 참조하세요.
  • BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 입력 데이터 레코드를 읽어 들일 csv 파일입니다. 와일드 카드 사용도 허용됩니다. 예를 들면 gs://mybucket/my_csv_filename.csv 또는 gs://mybucket/file-*.csv입니다.
dlpProjectId Cloud DLP API 리소스를 소유하는 Cloud DLP 프로젝트 ID입니다. 이 Cloud DLP 프로젝트는 Cloud DLP 템플릿을 소유하는 동일한 프로젝트이거나 별도의 프로젝트일 수 있습니다. 예를 들면 my_dlp_api_project입니다.
deidentifyTemplateName API 요청에 사용할 Cloud DLP 익명화 템플릿으로, projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 패턴을 사용하여 지정됩니다. 예를 들면 projects/my_project/deidentifyTemplates/100입니다.
datasetName 토큰화된 결과를 보내기 위한 BigQuery 데이터 세트입니다.
batchSize 검사하거나 익명화할 데이터를 보내는 데 사용할 청크/배치 크기입니다. csv 파일의 경우 batchSize는 배치의 행 수입니다. 사용자는 레코드 크기 및 파일 크기에 따라 배치 크기를 결정해야 합니다. Cloud DLP API의 페이로드 크기 제한은 API 호출당 524KB입니다.
inspectTemplateName (선택사항) API 요청에 사용할 Cloud DLP 검사 템플릿으로, projects/{template_project_id}/identifyTemplates/{idTemplateId} 패턴을 사용하여 지정됩니다. 예를 들면 projects/my_project/identifyTemplates/100입니다.

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용) 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

다음을 바꿉니다.

  • DLP_API_PROJECT_ID: Cloud DLP API 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_DATA: 입력 파일 경로
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify 템플릿 번호
  • DATASET_NAME: BigQuery 데이터 세트 이름
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect 템플릿 번호
  • BATCH_SIZE_VALUE: 배치 크기(csv의 API당 행 수)

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • DLP_API_PROJECT_ID: Cloud DLP API 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • STAGING_LOCATION: 로컬 파일의 스테이징 위치(예: gs://your-bucket/staging)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_DATA: 입력 파일 경로
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify 템플릿 번호
  • DATASET_NAME: BigQuery 데이터 세트 이름
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect 템플릿 번호
  • BATCH_SIZE_VALUE: 배치 크기(csv의 API당 행 수)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub(스트리밍)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 템플릿은 Pub/Sub 메시지와 MySQL의 변경 데이터를 읽고 BigQuery에 레코드를 작성하는 스트리밍 파이프라인입니다. Debezium 커넥터는 MySQL 데이터베이스의 변경사항을 캡처하고 변경된 데이터를 Pub/Sub에 게시합니다. 그런 다음 템플릿이 Pub/Sub 메시지를 읽고 BigQuery에 씁니다.

이 템플릿을 사용하여 MySQL 데이터베이스와 BigQuery 테이블을 동기화할 수 있습니다. 파이프라인은 변경된 데이터를 BigQuery 스테이징 테이블에 쓰고 MySQL 데이터베이스를 복제하는 BigQuery 테이블을 간헐적으로 업데이트합니다.

파이프라인 요구사항:

  • Debezium 커넥터가 배포되어야 합니다.
  • Pub/Sub 메시지는 빔 행으로 직렬화되어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscriptions 읽어올 Pub/Sub 입력 구독의 쉼표로 구분된 목록이며 <subscription>,<subscription>, ... 형식입니다.
changeLogDataset 스테이징 테이블을 저장할 BigQuery 데이터 세트이며 <my-dataset> 형식입니다.
replicaDataset 복제 테이블을 저장할 BigQuery 데이터 세트의 위치이며 <my-dataset> 형식입니다.
updateFrequencySecs 파이프라인이 MySQL 데이터베이스를 복제하는 BigQuery 테이블을 업데이트하는 간격입니다.

Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery 템플릿 실행

이 템플릿을 실행하려면 다음 단계를 따르세요.

  1. 로컬 머신에서 DataflowTemplates 저장소를 복제합니다.
  2. v2/cdc-parent 디렉터리로 변경합니다.
  3. Debezium 커넥터가 배포되어 있는지 확인합니다.
  4. Maven을 사용하여 Dataflow 템플릿을 실행합니다.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
    • SUBSCRIPTIONS: 쉼표로 구분된 Pub/Sub 구독 이름
    • CHANGELOG_DATASET: 변경 로그 데이터의 BigQuery 데이터 세트
    • REPLICA_DATASET: 복제본 테이블의 BigQuery 데이터 세트

Apache Kafka to BigQuery

Apache Kafka to BigQuery 템플릿은 Apache Kafka에서 텍스트 데이터를 수집하고 사용자 정의 함수(UDF)를 실행하고, 결과 레코드를 BigQuery에 출력하는 스트리밍 파이프라인입니다. 데이터 변환, UDF 실행, 출력 테이블에 삽입 중 발생하는 모든 오류는 BigQuery의 개별 오류 테이블에 삽입됩니다. 실행 전 오류 테이블이 없으면 생성됩니다.

파이프라인 요구사항

  • 출력 BigQuery 테이블이 있어야 합니다.
  • Apache Kafka 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결할 수 있어야 합니다.
  • Apache Kafka 주제가 있어야 하며 메시지는 유효한 JSON 형식으로 인코딩해야 합니다.

템플릿 매개변수

매개변수 설명
outputTableSpec Apache Kafka 메시지를 my-project:dataset.table 형식으로 작성할 BigQuery 출력 테이블 위치입니다.
inputTopics 읽을 Apache Kafka 입력 주제를 쉼표로 구분된 목록으로 표시한 것입니다. 예를 들면 messages입니다.
bootstrapServers 실행 중인 Apache Kafka 브로커 서버의 호스트 주소를 쉼표로 구분된 목록으로 표시한 것이며 각 호스트 주소는 35.70.252.199:9092 형식입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
outputDeadletterTable (선택사항) 출력 테이블에 도달하지 못한 my-project:dataset.my-deadletter-table 형식의 BigQuery 테이블입니다. 테이블이 없으면 파이프라인 실행 중에 테이블이 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Apache Kafka to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Kafka to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 함께 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 함께 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.

자세한 내용은 Dataflow로 Kafka에서 BigQuery로 데이터 쓰기를 참조하세요.

Datastream to BigQuery(스트림)

Datastream to BigQuery 템플릿은 Datastream 데이터를 읽고 BigQuery에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 시간으로 파티션을 나눈 BigQuery 스테이징 테이블에 복제합니다. 복제 후 템플릿은 BigQuery에서 MERGE를 실행하여 모든 변경 데이터 캡처(CDC) 변경사항을 소스 테이블의 복제본에 적용합니다.

템플릿은 복제에서 관리하는 BigQuery 테이블의 생성 및 업데이트를 처리합니다. 데이터 정의 언어(DDL)가 필요한 경우 Datastream에 대한 콜백은 소스 테이블 스키마를 추출하여 BigQuery 데이터 유형으로 변환합니다. 지원되는 작업은 다음과 같습니다.

  • 데이터가 삽입될 때 새 테이블이 생성됩니다.
  • 초기 값이 null인 새 열이 BigQuery 테이블에 추가됩니다.
  • 삭제된 열은 BigQuery에서 무시되며 향후 값은 null입니다.
  • 이름이 변경된 열은 BigQuery에 새 열로 추가됩니다.
  • 형식 변경사항은 BigQuery로 전파되지 않습니다.

파이프라인 요구사항:

  • 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
  • Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
  • BigQuery 대상 데이터 세트가 생성되었고 Compute Engine 서비스 계정에 이에 대한 관리 액세스 권한이 부여되었습니다.
  • 대상 복제본 테이블을 만들 소스 테이블에 기본 키가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이 파일 위치는 스트림의 루트 경로입니다.
gcsPubSubSubscription Datastream 파일 알림을 사용하는 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
inputFileFormat Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
outputStagingDatasetTemplate 스테이징 테이블을 포함할 기존 데이터 세트의 이름입니다. {_metadata_dataset} 템플릿을 소스 데이터 세트/스키마의 이름으로 대체되는 자리표시자(예: {_metadata_dataset}_log)로 포함할 수 있습니다.
outputDatasetTemplate 복제본 테이블을 포함할 기존 데이터 세트의 이름입니다. {_metadata_dataset} 템플릿을 소스 데이터 세트/스키마의 이름으로 대체되는 자리표시자(예: {_metadata_dataset})로 포함할 수 있습니다.
deadLetterQueueDirectory 메시지를 처리할 수 없는 이유와 함께 처리되지 않은 모든 메시지가 저장되는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다.
outputStagingTableNameTemplate (선택사항) 스테이징 테이블 이름의 템플릿입니다. 기본값은 {_metadata_table}_log입니다. 여러 스키마를 복제하는 경우 {_metadata_schema}_{_metadata_table}_log를 사용하는 것이 좋습니다.
outputTableNameTemplate (선택사항) 복제본 테이블 이름의 템플릿입니다. 기본값은 {_metadata_table}입니다. 여러 스키마를 복제하는 경우 {_metadata_schema}_{_metadata_table}을 사용하는 것이 좋습니다.
outputProjectId (선택사항) 데이터를 출력할 BigQuery 데이터 세트의 프로젝트입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
streamName (선택사항) 스키마 정보를 폴링할 스트림의 이름이나 템플릿입니다. 기본값은 {_metadata_stream}입니다.
mergeFrequencyMinutes (선택사항) 지정된 테이블의 병합 간격(분)입니다. 기본값은 5입니다.
dlqRetryMinutes (선택사항) 데드 레터 큐(DLQ) 재시도 간격(분)입니다. 기본값은 10입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

Datastream to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Datastream to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • BIGQUERY_DATASET: BigQuery 데이터 세트 이름입니다.
  • BIGQUERY_TABLE: BigQuery 테이블 템플릿입니다. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • BIGQUERY_DATASET: BigQuery 데이터 세트 이름입니다.
  • BIGQUERY_TABLE: BigQuery 테이블 템플릿입니다. 예를 들면 {_metadata_schema}_{_metadata_table}_log입니다.

Datastream to MySQL 또는 PostgreSQL(스트리밍)

Datastream to SQL 템플릿은 Datastream 데이터를 읽고 모든 MySQL 또는 PostgreSQL 데이터베이스에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 이 데이터를 SQL 복제본 테이블에 복제합니다.

이 템플릿은 데이터 정의 언어(DDL)를 지원하지 않으며 모든 테이블이 이미 데이터베이스에 있어야 합니다. 복제는 Dataflow 스테이트풀(Stateful) 변환을 사용하여 비활성 데이터를 필터링하고 비순차적인 데이터의 일관성을 보장합니다. 예를 들어 행의 최신 버전이 이미 통과되면 해당 행의 최신 도착 버전은 무시됩니다. 실행되는 DML은 소스나 대상 데이터를 완벽하게 복제하려고 합니다. 실행된 DML 문은 다음 규칙을 따릅니다.

  • 기본 키가 존재하는 경우 삽입 및 업데이트 작업에서 upsert 구문을 사용합니다(즉, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • 기본 키가 있으면 삭제가 삭제 DML으로 복제됩니다.
  • 기본 키가 없으면 삽입 작업과 업데이트 작업 모두 테이블에 삽입됩니다.
  • 기본 키가 없으면 삭제는 무시됩니다.

Oracle to Postgres 유틸리티를 사용하는 경우 기본 키가 없으면 SQL에서 ROWID를 기본 키로 추가합니다.

이 파이프라인의 요구사항은 다음과 같습니다.

  • 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
  • Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
  • PostgreSQL 데이터베이스에 필요한 스키마가 입력되었습니다.
  • Dataflow 작업자와 PostgreSQL 사이에 네트워크 액세스가 설정됩니다.

템플릿 매개변수

매개변수 설명
inputFilePattern Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이 파일 위치는 스트림의 루트 경로입니다.
gcsPubSubSubscription Datastream 파일 알림을 사용하는 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
inputFileFormat Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
databaseHost 연결할 SQL 호스트입니다.
databaseUser 복제의 모든 테이블에 쓰는 데 필요한 모든 권한이 있는 SQL 사용자입니다.
databasePassword 지정된 SQL 사용자의 비밀번호입니다.
databasePort (선택사항) 연결할 SQL 데이터베이스 포트입니다. 기본값은 5432입니다.
databaseName (선택사항) 연결할 SQL 데이터베이스의 이름입니다. 기본값은 postgres입니다.
streamName (선택사항) 스키마 정보를 폴링할 스트림의 이름이나 템플릿입니다. 기본값은 {_metadata_stream}입니다.

Datastream to SQL 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Datastream to SQL template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • DATABASE_HOST: SQL 호스트 IP입니다.
  • DATABASE_USER: SQL 사용자입니다.
  • DATABASE_PASSWORD: SQL 비밀번호입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • DATABASE_HOST: SQL 호스트 IP입니다.
  • DATABASE_USER: SQL 사용자입니다.
  • DATABASE_PASSWORD: SQL 비밀번호입니다.

Pub/Sub to Java Database Connectivity(JDBC)

Pub/Sub to JDBC(Java Database Connectivity 템플릿은 기존 Cloud Pub/Sub 구독의 데이터를 JSON 문자열로 수집하고 결과 레코드를 JDBC에 쓰는 스트리밍 파이프라인입니다.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 Cloud Pub/Sub 구독이 있어야 합니다.
  • 파이프라인을 실행하기 전에 JDBC 소스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Pub/Sub 출력 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
driverClassName JDBC 드라이버 클래스 이름입니다. 예를 들면 com.mysql.jdbc.Driver입니다.
connectionUrl JDBC 연결 URL 문자열입니다. 예를 들면 jdbc:mysql://some-host:3306/sampledb입니다. Base64로 인코딩된 문자열로 전달한 후 Cloud KMS 키로 암호화할 수 있습니다.
driverJars JDBC 드라이버의 쉼표로 구분된 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar입니다.
username (선택사항) JDBC 연결에 사용할 사용자 이름입니다. Cloud KMS 키로 암호화된 Base64 인코딩 문자열로 전달할 수 있습니다.
password (선택사항) JDBC 연결에 사용할 비밀번호입니다. Cloud KMS 키로 암호화된 Base64 인코딩 문자열로 전달할 수 있습니다.
connectionProperties (선택사항) JDBC 연결에 사용할 속성 문자열입니다. 문자열 형식은 [propertyName=property;]*여야 합니다. 예를 들면 unicode=true;characterEncoding=UTF-8입니다.
statement 데이터베이스에 대해 실행할 문입니다. 이 문은 순서에 관계없이 테이블의 열 이름을 지정해야 합니다. 지정된 열 이름의 값만 JSON에서 읽고 문에 추가됩니다. 예를 들면 INSERT INTO tableName (column1, column2) VALUES (?,?)입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독으로, projects/<project>/subscriptions/<subscription> 형식입니다.
outputDeadletterTopic 전달할 수 없는 메시지를 전달할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
KMSEncryptionKey (선택사항) 사용자 이름, 비밀번호, 연결 문자열을 복호화하는 Cloud KMS 암호화 키입니다. Cloud KMS 키가 전달되면 사용자 이름, 비밀번호, 연결 문자열이 모두 암호화되어 전달되어야 합니다.
extraFilesToStage 작업자에 스테이징할 파일의 쉼표로 구분된 Cloud Storage 경로 또는 Secret Manager 보안 비밀입니다. 이러한 파일은 각 작업자의 /extra_files 디렉터리에 저장됩니다. gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>).

Pub/Sub to Java Database Connectivity(JDBC) 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to JDBC template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • DRIVER_CLASS_NAME: 드라이버 클래스 이름
  • JDBC_CONNECTION_URL: JDBC 연결 URL
  • DRIVER_PATHS: JDBC 드라이버의 쉼표로 구분된 Cloud Storage 경로
  • CONNECTION_USERNAME: JDBC 연결 사용자 이름
  • CONNECTION_PASSWORD: JDBC 연결 비밀번호
  • CONNECTION_PROPERTIES: JDBC 연결 속성(필요한 경우)
  • SQL_STATEMENT: 데이터베이스에 대해 실행할 SQL 문
  • INPUT_SUBSCRIPTION: 읽어올 Pub/Sub 입력 구독
  • OUTPUT_DEADLETTER_TOPIC: 전달할 수 없는 메시지를 전달할 Pub/Sub
  • KMS_ENCRYPTION_KEY: Cloud KMS 암호화 키

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • DRIVER_CLASS_NAME: 드라이버 클래스 이름
  • JDBC_CONNECTION_URL: JDBC 연결 URL
  • DRIVER_PATHS: JDBC 드라이버의 쉼표로 구분된 Cloud Storage 경로
  • CONNECTION_USERNAME: JDBC 연결 사용자 이름
  • CONNECTION_PASSWORD: JDBC 연결 비밀번호
  • CONNECTION_PROPERTIES: JDBC 연결 속성(필요한 경우)
  • SQL_STATEMENT: 데이터베이스에 대해 실행할 SQL 문
  • INPUT_SUBSCRIPTION: 읽어올 Pub/Sub 입력 구독
  • OUTPUT_DEADLETTER_TOPIC: 전달할 수 없는 메시지를 전달할 Pub/Sub
  • KMS_ENCRYPTION_KEY: Cloud KMS 암호화 키

Cloud Spanner change streams to Cloud Storage

Cloud Spanner change streams to Cloud Storage 템플릿은 Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 Cloud Storage 버킷에 쓰는 스트리밍 파이프라인입니다.

파이프라인은 Spanner 변경 내역 기록을 타임스탬프에 따라 기간으로 그룹화하며, 각 기간은 이 템플릿으로 구성할 수 있는 기간을 나타냅니다. 기간에 속한 타임스탬프가 있는 모든 레코드는 지연될 수 없으므로 기간 내에 존재한다고 보장됩니다. 또한 출력 샤드를 여러 개 정의할 수도 있습니다. 파이프라인은 샤드당 기간별로 하나의 Cloud Storage 출력 파일을 만듭니다. 출력 파일 내에서 레코드는 정렬되지 않습니다. 출력 파일은 사용자 구성에 따라 JSON 또는 AVRO 형식으로 작성될 수 있습니다.

Cloud Spanner 인스턴스 또는 Cloud Storage 버킷과 동일한 리전에서 Dataflow 작업을 실행하면 네트워크 지연 시간과 네트워크 전송 비용을 최소화할 수 있습니다. 작업 리전 외부에 있는 소스, 싱크, 스테이징 파일 위치 또는 임시 파일 위치를 사용하면 데이터가 리전 간에서 전송될 수 있습니다. Dataflow 리전 엔드포인트에 대해 자세히 알아보세요.

변경 스트림, 변경 스트림 Dataflow 파이프라인 빌드 방법, 권장사항에 대해 자세히 알아보세요.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 Cloud Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Storage 출력 버킷이 있어야 합니다.

템플릿 매개변수

매개변수 설명
spannerInstanceId 변경 스트림 데이터를 읽어 올 Cloud Spanner 인스턴스 ID입니다.
spannerDatabase 변경 스트림 데이터를 읽어 올 Cloud Spanner 데이터베이스입니다.
spannerMetadataInstanceId 변경 내역 커넥터 메타데이터 테이블에 사용할 Cloud Spanner 인스턴스 ID입니다.
spannerMetadataDatabase 변경 내역 커넥터 메타데이터 테이블에 사용할 Cloud Spanner 데이터베이스입니다.
spannerChangeStreamName 읽어 올 Cloud Spanner 변경 스트림의 이름입니다.
gcsOutputDirectory Cloud Storage의 변경 스트림 출력 파일 위치는 'gs://${BUCKET}/${ROOT_PATH}/' 형식입니다.
outputFilenamePrefix (선택사항) 작성할 파일의 파일 이름 프리픽스입니다. 기본 파일 프리픽스는 'output'으로 설정됩니다.
spannerProjectId (선택사항) 변경 내역을 읽어 올 프로젝트입니다. 변경 내역 커넥터 메타데이터 테이블이 생성되는 프로젝트이기도 합니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
startTimestamp (선택사항) 변경 내역을 읽는 데 사용할 시작 DateTime(경계 포함)입니다. Ex-2021-10-12T07:20:50.52Z. 기본값은 파이프라인이 시작되는 시점의 타임스탬프, 즉 현재 시간입니다.
endTimestamp (선택사항) 변경 내역을 읽는 데 사용할 종료 DateTime(경계 포함)입니다. Ex-2021-10-12T07:20:50.52Z. 기본값은 미래의 무한대 시간입니다.
outputFileFormat (선택사항) 출력 Cloud Storage 파일의 형식입니다. 허용되는 형식은 TEXT, AVRO입니다. 기본값은 AVRO입니다.
windowDuration (선택사항) 범위 기간은 데이터가 출력 디렉터리에 기록되는 간격입니다. 파이프라인의 처리량을 기준으로 기간을 구성합니다. 예를 들어 처리량이 높을수록 데이터가 메모리에 적합하도록 더 작은 범위가 필요할 수 있습니다. 기본값은 5m이며 최소 1초 이상이어야 합니다. 허용되는 형식은 [int]s(초 단위, 예: 5s), [int]m(분 단위, 예: 12m), [int]h(시간 단위, 예: 2h)입니다.
rpcPriority (선택사항) Cloud Spanner 호출의 요청 우선순위입니다. 값은 [HIGH,MEDIUM,LOW] 중 하나여야 합니다. (기본값: HIGH)
numShards (선택사항) 쓰는 동안 생성된 출력 분할의 최대 개수입니다. 기본값은 20입니다. 샤드 수가 많을수록 Cloud Storage 쓰기 처리량이 높아지지만 출력 Cloud Storage 파일 처리 시 샤드 간에 데이터 집계 비용이 늘어날 수 있습니다.
spannerMetadataTableName (선택사항) 사용할 Cloud Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공하지 않으면 파이프라인 흐름 중에 Cloud Spanner 변경 내역 메타데이터 테이블이 자동으로 생성됩니다. 기존 파이프라인을 업데이트할 때는 이 매개변수를 반드시 제공해야 하며, 이외의 경우에는 제공해서는 안 됩니다.

Cloud Spanner change streams to Cloud Storage 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to Google Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • SPANNER_INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • SPANNER_DATABASE: Cloud Spanner 데이터베이스
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner 메타데이터 인스턴스 ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner 메타데이터 데이터베이스
  • SPANNER_CHANGE_STREAM: Cloud Spanner 변경 내역
  • GCS_OUTPUT_DIRECTORY: 변경 스트림 출력용 파일 위치

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • SPANNER_INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • SPANNER_DATABASE: Cloud Spanner 데이터베이스
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner 메타데이터 인스턴스 ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner 메타데이터 데이터베이스
  • SPANNER_CHANGE_STREAM: Cloud Spanner 변경 내역
  • GCS_OUTPUT_DIRECTORY: 변경 스트림 출력용 파일 위치

Cloud Spanner change streams to BigQuery

Cloud Spanner change stream to BigQuery 템플릿은 Cloud Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다.

필요한 BigQuery 테이블이 없으면 파이프라인이 테이블을 만듭니다. 그렇지 않으면 기존 BigQuery 테이블이 사용됩니다. 기존 BigQuery 테이블의 스키마에는 Cloud Spanner 테이블의 해당 추적 열과 추가 메타데이터 열(다음 목록의 메타데이터 필드 설명 참조)이 'ignoreFields' 옵션으로 인해 명시적으로 무시되지 않아야 합니다. 각각의 새 BigQuery 행에는 변경 레코드의 타임스탬프에 있는 Cloud Spanner 테이블의 해당 행에서 변경 내역이 감시하는 모든 열이 포함됩니다.

Cloud Spanner 트랜잭션의 수정 여부와 관계없이 변경 내역이 감시하는 모든 열이 각 BigQuery 테이블 행에 포함됩니다. 감시 대상이 아닌 열은 BigQuery 행에 포함되지 않습니다. Dataflow 워터마크보다 낮은 Cloud Spanner 변경사항은 BigQuery 테이블에 적용되거나 재시도를 위해 데드 레터 큐에 저장됩니다. BigQuery 행이 저장되는 순서는 원본 Cloud Spanner 커밋 타임스탬프의 순서와 다릅니다.

다음 메타데이터 필드가 BigQuery 테이블에 추가됩니다.

  • _metadata_spanner_mod_type: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_table_name: Cloud Spanner 테이블 이름입니다. 커넥터의 메타데이터 테이블 이름이 아닙니다.
  • _metadata_spanner_commit_timestamp: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_server_transaction_id: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_record_sequence: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_number_of_records_in_transaction: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_number_of_partitions_in_transaction: 변경 내역의 데이터 변경 레코드에서 추출됩니다.
  • _metadata_big_query_commit_timestamp: BigQuery에 행이 삽입된 시점의 커밋 타임스탬프입니다.

참고:

  • 이 템플릿은 스키마 변경사항을 Cloud Spanner에서 BigQuery로 전파하지 않습니다. Cloud Spanner에서 스키마 변경을 수행하면 파이프라인이 중단될 가능성이 높으므로 스키마 변경 후 파이프라인을 다시 만들어야 할 수 있습니다.
  • OLD_AND_NEW_VALUESNEW_VALUES 값 캡처 유형의 경우 데이터 변경 레코드에 UPDATE 변경사항이 있으면 템플릿은 변경되지 않았지만 감시된 열을 검색하기 위해 데이터 변경 레코드의 커밋 타임스탬프에서 Cloud Spanner에 대한 비활성 읽기를 수행해야 합니다. 비활성 읽기에 대해 데이터베이스 'version_retention_period'를 올바르게 구성해야 합니다. NEW_ROW 값 캡처 유형의 경우 데이터 변경 레코드가 UPDATE에서 업데이트되지 않는 열을 포함하여 전체 새 행을 캡처하기 때문에 더 효율적이므로 템플릿이 비활성 읽기를 수행할 필요가 없습니다.
  • Cloud Spanner 인스턴스 또는 BigQuery 테이블과 동일한 리전에서 Dataflow 작업을 실행하면 네트워크 지연 시간과 네트워크 전송 비용을 최소화할 수 있습니다. 작업 리전 외부에 있는 소스, 싱크, 스테이징 파일 위치 또는 임시 파일 위치를 사용하면 데이터가 리전 간에서 전송될 수 있습니다. Dataflow 리전 엔드포인트에 대해 자세히 알아보세요.
  • 이 템플릿은 모든 유효한 Cloud Spanner 데이터 유형을 지원하지만, BigQuery 유형이 Cloud Spanner 유형보다 더 정확한 경우 변환 중에 정밀도 손실이 발생할 수 있습니다. 구체적으로는 다음과 같습니다.
    • Cloud Spanner JSON 유형의 경우 객체 멤버의 순서는 사전순으로 정렬되지만 BigQuery JSON 유형은 보장되지 않습니다.
    • Cloud Spanner는 나노초 TIMESTAMP 유형만 지원하고 BigQuery는 마이크로초 TIMESTAMP 유형만 지원합니다.

변경 스트림, 변경 스트림 Dataflow 파이프라인 빌드 방법, 권장사항에 대해 자세히 알아보세요.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 Cloud Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
spannerInstanceId 변경 스트림을 읽어올 Cloud Spanner 인스턴스입니다.
spannerDatabase 변경 스트림을 읽어올 Cloud Spanner 데이터베이스입니다.
spannerMetadataInstanceId 변경 내역 커넥터 메타데이터 테이블에 사용할 Cloud Spanner 인스턴스입니다.
spannerMetadataDatabase 변경 내역 커넥터 메타데이터 테이블에 사용할 Cloud Spanner 데이터베이스입니다.
spannerChangeStreamName 읽어 올 Cloud Spanner 변경 스트림의 이름입니다.
bigQueryDataSet 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.
spannerProjectId (선택사항) 변경 내역을 읽어 올 프로젝트입니다. 변경 내역 커넥터 메타데이터 테이블이 생성되는 프로젝트이기도 합니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
spannerMetadataTableName (선택사항) 사용할 Cloud Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공하지 않으면 파이프라인 흐름 중에 Cloud Spanner 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기존 파이프라인을 업데이트할 때는 이 매개변수를 반드시 제공해야 하며, 이외의 경우에는 제공해서는 안 됩니다.
rpcPriority (선택사항) Cloud Spanner 호출의 요청 우선순위입니다. 값은 [HIGH,MEDIUM,LOW] 중 하나여야 합니다. (기본값: HIGH)
startTimestamp (선택사항) 변경 내역을 읽는 데 사용할 시작 DateTime(경계 포함)입니다. Ex-2021-10-12T07:20:50.52Z. 기본값은 파이프라인이 시작되는 시점의 타임스탬프, 즉 현재 시간입니다.
endTimestamp (선택사항) 변경 내역을 읽는 데 사용할 종료 DateTime(경계 포함)입니다. Ex-2021-10-12T07:20:50.52Z. 기본값은 미래의 무한대 시간입니다.
bigQueryProjectId (선택사항) BigQuery 프로젝트입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
bigQueryChangelogTableNameTemplate (선택사항) BigQuery 변경 로그 테이블 이름의 템플릿입니다. 기본값은 {_metadata_spanner_table_name}_changelog입니다.
deadLetterQueueDirectory (선택사항) 메시지를 처리할 수 없는 이유와 함께 처리되지 않은 모든 레코드가 저장되는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다.
dlqRetryMinutes (선택사항) 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
ignoreFields (선택사항) 무시될 쉼표로 구분된 필드 목록(대소문자 구분)입니다. 이러한 필드는 감시 테이블의 필드이거나 파이프라인에 의해 추가된 메타데이터 필드입니다. 무시된 필드는 BigQuery에 삽입되지 않습니다.

Cloud Spanner change streams to BigQuery 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • SPANNER_INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • SPANNER_DATABASE: Cloud Spanner 데이터베이스
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner 메타데이터 인스턴스 ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner 메타데이터 데이터베이스
  • SPANNER_CHANGE_STREAM: Cloud Spanner 변경 내역
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • SPANNER_INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • SPANNER_DATABASE: Cloud Spanner 데이터베이스
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner 메타데이터 인스턴스 ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner 메타데이터 데이터베이스
  • SPANNER_CHANGE_STREAM: Cloud Spanner 변경 내역
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트

Cloud Spanner change streams to Pub/Sub

Cloud Spanner change stream to Pub/Sub 템플릿은 Cloud Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 Pub/Sub 주제에 쓰는 스트리밍 파이프라인입니다.

새 Pub/Sub 주제로 데이터를 출력하려면 먼저 주제를 만들어야 합니다. 생성 후 Pub/Sub는 자동으로 구독을 생성하고 새 주제에 연결합니다. 존재하지 않는 Pub/Sub 주제에 데이터를 출력하려고 하면 Dataflow 파이프라인에서 예외가 발생하고 파이프라인이 연결을 시도하는 동안 파이프라인이 중단됩니다.

필요한 Pub/Sub 주제가 이미 있으면 데이터를 해당 주제로 출력할 수 있습니다.

자세한 내용은 변경 내역 정보, Dataflow를 사용하여 변경 내역 연결 빌드, 변경 내역 권장사항을 참조하세요.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 Cloud Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
spannerInstanceId 변경 스트림을 읽어올 Cloud Spanner 인스턴스입니다.
spannerDatabase 변경 스트림을 읽어올 Cloud Spanner 데이터베이스입니다.
spannerMetadataInstanceId 변경 내역 커넥터 메타데이터 테이블에 사용할 Cloud Spanner 인스턴스입니다.
spannerMetadataDatabase 변경 내역 커넥터 메타데이터 테이블에 사용할 Cloud Spanner 데이터베이스입니다.
spannerChangeStreamName 읽어 올 Cloud Spanner 변경 스트림의 이름입니다.
pubsubTopic 변경 내역 출력을 위한 Pub/Sub 주제입니다.
spannerProjectId (선택사항) 변경 내역을 읽어 올 프로젝트입니다. 변경 내역 커넥터 메타데이터 테이블이 생성되는 프로젝트이기도 합니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
spannerMetadataTableName (선택사항) 사용할 Cloud Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공되지 않으면 Cloud Spanner는 파이프라인 흐름 변경 중에 스트림 커넥터 메타데이터 테이블을 자동으로 만듭니다. 기존 파이프라인을 업데이트할 때 이 매개변수를 제공해야 합니다. 다른 경우에는 이 매개변수를 사용하지 마세요.
rpcPriority (선택사항) Cloud Spanner 호출의 요청 우선순위입니다. 값은 [HIGH,MEDIUM,LOW] 중 하나여야 합니다. (기본값: HIGH)
startTimestamp (선택사항) 변경 내역을 읽는 데 사용할 시작 DateTime(경계 포함)입니다. 예를 들면 ex-2021-10-12T07:20:50.52Z입니다. 기본값은 파이프라인이 시작되는 시점의 타임스탬프, 즉 현재 시간입니다.
endTimestamp (선택사항) 변경 내역을 읽는 데 사용할 종료 DateTime(경계 포함)입니다. 예를 들면 ex-2021-10-12T07:20:50.52Z입니다. 기본값은 미래의 무한대 시간입니다.
outputFileFormat (선택사항) 출력의 형식입니다. 출력은 여러 PubsubMessages로 래핑되고 Pub/Sub 주제로 전송됩니다. 허용되는 형식은 JSON 및 AVRO입니다. 기본값은 JSON입니다.
pubsubAPI (선택사항)파이프라인을 구현하는 데 사용되는 Pub/Sub API입니다. 허용되는 API는 pubsubionative_client입니다. 소수의 초당 쿼리 수(QPS)에서는 native_client 지연 시간이 더 짧습니다. 많은 수의 QPS에서 pubsubio가 더 우수하고 안정적인 성능을 제공합니다. 기본값은 pubsubio입니다.

Cloud Spanner change streams to the Pub/Sub 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to Pub/Sub template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

    gcloud beta dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • SPANNER_INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • SPANNER_DATABASE: Cloud Spanner 데이터베이스
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner 메타데이터 인스턴스 ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner 메타데이터 데이터베이스
  • SPANNER_CHANGE_STREAM: Cloud Spanner 변경 내역
  • PUBSUB_TOPIC: 변경 내역 출력을 위한 Pub/Sub 주제

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • SPANNER_INSTANCE_ID: Cloud Spanner 인스턴스 ID
  • SPANNER_DATABASE: Cloud Spanner 데이터베이스
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner 메타데이터 인스턴스 ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner 메타데이터 데이터베이스
  • SPANNER_CHANGE_STREAM: Cloud Spanner 변경 내역
  • PUBSUB_TOPIC: 변경 내역 출력을 위한 Pub/Sub 주제

MongoDB to BigQuery(CDC)

MongoDB to BigQuery CDC(변경 데이터 캡처) 템플릿은 MongoDB 변경 내역과 함께 작동하는 스트리밍 파이프라인입니다. 파이프라인은 MongoDB 변경 내역을 통해 Pub/Sub로 푸시된 JSON 레코드를 읽고 userOption 매개변수에서 지정한 대로 BigQuery에 씁니다.

파이프라인 요구사항

  • 대상 BigQuery 데이터 세트가 있어야 합니다.
  • Dataflow 작업자 머신에서 소스 MongoDB 인스턴스에 액세스할 수 있어야 합니다.
  • MongoDB에서 Pub/Sub로 변경사항을 푸시하는 변경 내역이 실행되고 있어야 합니다.

템플릿 매개변수

매개변수 설명
mongoDbUri mongodb+srv://:@ 형식의 MongoDB 연결 URI입니다.
database 컬렉션을 읽을 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
collection MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
outputTableSpec 쓸 BigQuery 테이블입니다. 예를 들면 bigquery-project:dataset.output_table입니다.
userOption FLATTEN 또는 NONE입니다. FLATTEN은 문서를 첫 번째 수준으로 평면화합니다. NONE은 전체 문서를 JSON 문자열로 저장합니다.
inputTopic 읽어올 Pub/Sub 입력 주제로, projects/<project>/topics/<topic> 형식입니다.

MongoDB to BigQuery(CDC) 템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the MongoDB to BigQuery (CDC) template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION,\
inputTopic=INPUT_TOPIC

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • OUTPUT_TABLE_SPEC: 대상 BigQuery 테이블 이름
  • MONGO_DB_URI: MongoDB URI
  • DATABASE: MongoDB 데이터베이스
  • COLLECTION: MongoDB 컬렉션
  • USER_OPTION: FLATTEN 또는 NONE
  • INPUT_TOPIC: Pub/Sub 입력 주제

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION",
          "inputTopic": "INPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • OUTPUT_TABLE_SPEC: 대상 BigQuery 테이블 이름
  • MONGO_DB_URI: MongoDB URI
  • DATABASE: MongoDB 데이터베이스
  • COLLECTION: MongoDB 컬렉션
  • USER_OPTION: FLATTEN 또는 NONE
  • INPUT_TOPIC: Pub/Sub 입력 주제