Google 제공 스트리밍 템플릿

Google은 오픈소스 Dataflow 템플릿 세트를 제공합니다. 템플릿에 대한 일반 정보는 개요 페이지를 참조하세요. 모든 Google 제공 템플릿 목록은 Google 제공 템플릿 시작 페이지를 참조하세요.

이 페이지는 스트리밍 템플릿을 다룹니다.

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 템플릿은 Pub/Sub 구독에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 템플릿을 사용하여 Pub/Sub 데이터를 BigQuery로 빠르게 이동할 수 있습니다. 이 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 요소로 변환합니다.

파이프라인 요구사항

  • Pub/Sub 메시지가 여기에 설명된 대로 JSON 형식이어야 합니다. 예를 들어 {"k1":"v1", "k2":"v2"} 형식으로 된 메시지는 k1k2라는 두 개의 열 그리고 문자열 데이터 유형으로 BigQuery 테이블에 삽입될 수 있습니다.
  • 파이프라인을 실행하기 전에 출력 테이블이 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 읽어올 Pub/Sub 입력 구독으로, projects/<project>/subscriptions/<subscription> 형식입니다.
outputTableSpec BigQuery 출력 테이블 위치로, <my-project>:<my-dataset>.<my-table> 형식입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 <my-project>:<my-dataset>.<my-table> 형식의 BigQuery 테이블입니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Pub/Sub Subscription to BigQuery 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Subscription to BigQuery template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 템플릿은 Pub/Sub 주제에서 JSON 형식의 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 템플릿을 사용하여 Pub/Sub 데이터를 BigQuery로 빠르게 이동할 수 있습니다. 이 템플릿은 Pub/Sub에서 JSON 형식의 메시지를 읽고 BigQuery 요소로 변환합니다.

파이프라인 요구사항

  • Pub/Sub 메시지가 여기에 설명된 대로 JSON 형식이어야 합니다. 예를 들어 {"k1":"v1", "k2":"v2"} 형식으로 된 메시지는 k1k2라는 두 개의 열 그리고 문자열 데이터 유형으로 BigQuery 테이블에 삽입될 수 있습니다.
  • 파이프라인을 실행하기 전에 출력 테이블이 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputTopic 읽어올 Pub/Sub 입력 주제로, projects/<project>/topics/<topic> 형식입니다.
outputTableSpec BigQuery 출력 테이블 위치로, <my-project>:<my-dataset>.<my-table> 형식입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지의 BigQuery 테이블입니다. <my-project>:<my-dataset>.<my-table> 형식이어야 합니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Pub/Sub Topic to BigQuery 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Topic to BigQuery template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • DATASET: BigQuery 데이터 세트
  • TABLE_NAME: BigQuery 테이블 이름
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 템플릿은 Pub/Sub 구독에서 BigQuery 테이블로 Avro 데이터를 수집하는 스트리밍 파이프라인입니다. BigQuery 테이블에 쓰는 동안 발생하는 모든 오류는 Pub/Sub 처리되지 않은 주제로 스트리밍됩니다.

파이프라인 요구사항

  • 입력 Pub/Sub 구독이 있어야 합니다.
  • Avro 레코드의 스키마 파일이 Cloud Storage에 있어야 합니다.
  • 처리되지 않은 Pub/Sub 주제가 있어야 합니다.
  • 출력 BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
schemaPath Avro 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.avsc입니다.
inputSubscription 읽어올 Pub/Sub 입력 구독입니다. projects/<project>/subscriptions/<subscription>).
outputTopic 처리되지 않은 레코드에 사용할 Pub/Sub 주제입니다. projects/<project-id>/topics/<topic-name>).
outputTableSpec BigQuery 출력 테이블 위치입니다. 예를 들면 <my-project>:<my-dataset>.<my-table>입니다. 지정된 createDisposition에 따라 사용자가 제공한 Avro 스키마를 사용하여 출력 테이블을 자동으로 생성할 수 있습니다.
writeDisposition (선택사항) BigQuery WriteDisposition. 예를 들면 WRITE_APPEND, WRITE_EMPTY, WRITE_TRUNCATE입니다. 기본값: WRITE_APPEND
createDisposition (선택사항) BigQuery CreateDisposition. 예를 들면 CREATE_IF_NEEDED, CREATE_NEVER입니다. 기본값: CREATE_IF_NEEDED

Pub/Sub Avro to BigQuery 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Avro to BigQuery template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 도구 명령줄 도구에서 실행

참고: gcloud 도구 명령줄 도구를 사용하여 템플릿을 실행하려면 Cloud SDK 버전 284.0.0 이상이 필요합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

다음을 바꿉니다.

  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION_NAME: Dataflow 리전 이름(예: us-central1)
  • SCHEMA_PATH: Avro 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • DEADLETTER_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • LOCATION: Dataflow 리전 이름(예: us-central1)
  • SCHEMA_PATH: Avro 스키마 파일의 Cloud Storage 경로(예: gs://MyBucket/file.avsc).
  • SUBSCRIPTION_NAME: Pub/Sub 입력 구독 이름
  • BIGQUERY_TABLE: BigQuery 출력 테이블 이름
  • DEADLETTER_TOPIC: 처리되지 않은 큐에 사용할 Pub/Sub 주제
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 템플릿은 Pub/Sub 구독에서 메시지를 읽고 다른 Pub/Sub 주제에 메시지를 쓰는 스트리밍 파이프라인입니다. 또한 파이프라인은 선택적인 메시지 속성 키와 Pub/Sub 주제에 쓰여질 메시지를 필터링하는 데 사용할 수 있는 값을 허용합니다. 이 템플릿을 사용하여 선택적 메시지 필터로 Pub/Sub 구독에서 다른 Pub/Sub 주제로 메시지를 복사할 수 있습니다.

파이프라인 요구사항:

  • 실행하기 전에 소스 Pub/Sub 구독이 있어야 합니다.
  • 실행하기 전에 대상 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
outputTopic 출력을 작성할 Cloud Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
filterKey [선택사항] 속성 키를 기반으로 하는 필터 이벤트입니다. filterKey가 지정되지 않은 경우 필터가 적용되지 않습니다.
filterValue [선택사항] filterKey가 제공된 경우에 사용하는 필터 속성 값입니다. 기본적으로 null인 filterValue가 사용됩니다.

Pub/Sub to Pub/Sub 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Pub/Sub template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOPIC_NAME: Pub/Sub 주제 이름
  • FILTER_KEY: 이벤트가 필터링되는 속성 키입니다. 키를 지정하지 않으면 필터가 적용되지 않습니다.
  • FILTER_VALUE: 이벤트 필터 키가 제공된 경우 사용할 필터 속성 값입니다. 유효한 자바 정규식 문자열을 이벤트 필터 값으로 사용합니다. 정규식이 제공된 경우 메시지 필터링을 위해 전체 표현식이 일치해야 합니다. 부분 일치(예: 하위 문자열)는 필터링되지 않습니다. 기본적으로 null 이벤트 필터 값이 사용됩니다.
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOPIC_NAME: Pub/Sub 주제 이름
  • FILTER_KEY: 이벤트가 필터링되는 속성 키입니다. 키를 지정하지 않으면 필터가 적용되지 않습니다.
  • FILTER_VALUE: 이벤트 필터 키가 제공된 경우 사용할 필터 속성 값입니다. 유효한 자바 정규식 문자열을 이벤트 필터 값으로 사용합니다. 정규식이 제공된 경우 메시지 필터링을 위해 전체 표현식이 일치해야 합니다. 부분 일치(예: 하위 문자열)는 필터링되지 않습니다. 기본적으로 null 이벤트 필터 값이 사용됩니다.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

Pub/Sub to Splunk 템플릿은 Splunk의 HTTP Event Collector(HEC)를 통해 Pub/Sub 구독에서 메시지를 읽고 Splunk에 메시지 페이로드를 쓰는 스트리밍 파이프라인입니다. Splunk에 쓰기 전에 자바스크립트 사용자 정의 함수를 메시지 페이로드에 적용할 수도 있습니다. 처리 실패가 발생한 메시지는 추가적인 문제 해결 및 재처리를 위해 Pub/Sub 처리되지 않은 주제로 전달됩니다.

HEC 토큰의 추가 보안 레이어로 Cloud KMS 키로 암호화된 base64 인코딩 HEC 토큰 매개변수와 함께 Cloud KMS 키를 전달할 수도 있습니다. HEC 토큰 매개변수 암호화에 대한 자세한 내용은 Cloud KMS API 암호화 엔드포인트를 참조하세요.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 소스 Pub/Sub 구독이 있어야 합니다.
  • 파이프라인을 실행하기 전에 Pub/Sub 처리되지 않은 주제가 있어야 합니다.
  • Splunk HEC 엔드포인트는 Dataflow 작업자 네트워크에서 액세스할 수 있어야 합니다.
  • Splunk HEC 토큰이 생성되고 사용 가능해야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription 입력을 읽어올 Pub/Sub 구독입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
token Splunk HEC 인증 토큰입니다. 이 base64 인코딩 문자열은 보안을 강화하기 위해 Cloud KMS 키로 암호화할 수 있습니다.
url Splunk HEC URL입니다. 이 값은 파이프라인이 실행되는 VPC에서 라우팅할 수 있어야 합니다. 예를 들면 https://splunk-hec-host:8088입니다.
outputDeadletterTopic 전달할 수 없는 메시지를 전달할 Pub/Sub 주제입니다. 예를 들면 projects/<project-id>/topics/<topic-name>입니다.
javascriptTextTransformGcsPath [선택사항] 모든 자바스크립트 코드를 포함한 Cloud Storage 경로입니다. 예를 들면 gs://mybucket/mytransforms/*.js입니다.
javascriptTextTransformFunctionName [선택사항] 호출할 자바스크립트 함수의 이름입니다. 예를 들어 자바스크립트 함수가 function myTransform(inJson) { ...dostuff...}이면 함수 이름은 myTransform입니다.
batchCount [선택사항] Splunk에 여러 이벤트를 전송하기 위한 배치 크기입니다. 기본값은 1(일괄 처리 없음)입니다.
parallelism [선택사항] 최대 동시 요청 수입니다. 기본값은 1(동시 처리 없음)입니다.
disableCertificateValidation [선택사항] SSL 인증서 유효성 검사를 사용 중지합니다. 기본값은 false(유효성 검사 사용)입니다.
includePubsubMessage [선택사항] 페이로드의 전체 Pub/Sub 메시지를 포함합니다. 기본값은 false입니다(데이터 요소만 페이로드에 포함됨).
tokenKMSEncryptionKey [선택사항] HEC 토큰 문자열을 복호화할 Cloud KMS 키입니다. Cloud KMS 키가 제공되면 HEC 토큰 문자열이 암호화되어 전달되어야 합니다.

Pub/Sub to Splunk 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Splunk template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOKEN: Splunk의 Http Event Collector 토큰
  • URL: Splunk의 Http Event Collector의 URL 경로(예: https://splunk-hec-host:8088)입니다.
  • DEADLETTER_TOPIC_NAME: Pub/Sub 주제 이름
  • JAVASCRIPT_FUNCTION: 자바스크립트 함수 이름
  • PATH_TO_JAVASCRIPT_UDF_FILE: 자바스크립트 코드가 포함된 .js 파일의 Cloud Storage 경로(예: gs://your-bucket/your-function.js)
  • BATCH_COUNT: Splunk에 여러 이벤트를 전송하기 위해 사용할 배치 크기
  • PARALLELISM: Splunk에 이벤트를 전송하기 위해 사용할 동시 요청 수
  • DISABLE_VALIDATION: SSL 인증서 검증을 사용 중지하려는 경우 true
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • TOKEN: Splunk의 Http Event Collector 토큰
  • URL: Splunk의 Http Event Collector의 URL 경로(예: https://splunk-hec-host:8088)입니다.
  • DEADLETTER_TOPIC_NAME: Pub/Sub 주제 이름
  • JAVASCRIPT_FUNCTION: 자바스크립트 함수 이름
  • PATH_TO_JAVASCRIPT_UDF_FILE: 자바스크립트 코드가 포함된 .js 파일의 Cloud Storage 경로(예: gs://your-bucket/your-function.js)
  • BATCH_COUNT: Splunk에 여러 이벤트를 전송하기 위해 사용할 배치 크기
  • PARALLELISM: Splunk에 이벤트를 전송하기 위해 사용할 동시 요청 수
  • DISABLE_VALIDATION: SSL 인증서 검증을 사용 중지하려는 경우 true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 템플릿은 Pub/Sub 주제에서 데이터를 읽고 지정된 Cloud Storage 버킷에 Avro 파일을 쓰는 스트리밍 파이프라인입니다.

파이프라인 요구사항:

  • 파이프라인을 실행하기 전에 입력 Pub/Sub 주제가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputTopic 메시지 소비를 위해 구독할 Cloud Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.
outputDirectory 출력 Avro 파일이 보관처리되는 출력 디렉터리입니다. 마지막에 /를 추가합니다. 예를 들면 gs://example-bucket/example-directory/입니다.
avroTempDirectory 임시 Avro 파일의 디렉터리입니다. 마지막에 /를 추가하세요. 예를 들면 gs://example-bucket/example-directory/입니다.
outputFilenamePrefix [선택사항] Avro 파일의 출력 파일 이름 프리픽스입니다.
outputFilenameSuffix [선택사항] Avro 파일의 출력 파일 이름 서픽스입니다.
outputShardTemplate [선택사항] 출력 파일의 분할 템플릿입니다. 문자 'S' 또는 'N'의 반복 시퀀스로 지정됩니다(예: SSS-NNN). 이는 각각 분할 번호 또는 분할 개수로 바뀝니다. 이 매개변수가 지정되지 않은 경우 기본 템플릿 형식은 'W-P-SS-of-NN'입니다.
numShards [선택사항] 쓰는 동안 생성된 출력 분할의 최대 개수입니다. 기본 최대 분할 개수는 1입니다.

Pub/Sub to Cloud Storage Avro 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Cloud Storage Avro template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 원하는 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILENAME_PREFIX: 선호하는 출력 파일 이름 프리픽스
  • FILENAME_SUFFIX: 선호하는 출력 파일 이름 서픽스
  • SHARD_TEMPLATE: 선호하는 출력 샤드 템플릿
  • NUM_SHARDS: 출력 샤드 수
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 원하는 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILENAME_PREFIX: 선호하는 출력 파일 이름 프리픽스
  • FILENAME_SUFFIX: 선호하는 출력 파일 이름 서픽스
  • SHARD_TEMPLATE: 선호하는 출력 샤드 템플릿
  • NUM_SHARDS: 출력 샤드 수
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 템플릿은 Pub/Sub에서 레코드를 읽고 텍스트 형식으로 된 일련의 Cloud Storage 파일로 저장하는 스트리밍 파이프라인입니다. 나중에 사용하기 위해 Pub/Sub에 데이터를 빠르게 저장하는 수단으로 템플릿을 사용할 수 있습니다. 기본적으로 템플릿은 5분마다 새 파일을 생성합니다.

파이프라인 요구사항:

  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 주제에 게시되는 메시지는 텍스트 형식이어야 합니다.
  • 주제에 게시되는 메시지에는 줄바꿈을 사용할 수 없습니다. 각 Pub/Sub 메시지는 출력 파일에 한 줄로 저장됩니다.

템플릿 매개변수

매개변수 설명
inputTopic 입력을 읽어올 Pub/Sub 주제입니다. 주제 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.
outputDirectory 출력 파일을 쓰기 위한 경로 및 파일 이름 프리픽스입니다. 예를 들면 gs://bucket-name/path/입니다. 이 값은 슬래시로 끝나야 합니다.
outputFilenamePrefix 윈도우 설정된 각 파일에 넣을 프리픽스입니다. 예를 들면 output-입니다.
outputFilenameSuffix 윈도우 설정된 각 파일에 넣을 서픽스입니다. 일반적으로 .txt 또는 .csv와 같은 파일 확장자입니다.
outputShardTemplate 샤드 템플릿은 윈도우 설정된 각 파일의 동적 부분을 정의합니다. 기본적으로, 파이프라인은 각 윈도우 내에서 단일 샤드을 사용하여 파일 시스템에 출력합니다. 따라서 모든 데이터가 윈도우별로 한 파일 안에 들어갑니다. outputShardTemplate의 기본값은 W-P-SS-of-NN입니다. 여기에서 W은 윈도우 기간, P는 창 정보, S는 샤드 번호, N은 샤드 개수입니다. 단일 파일의 경우에는 outputShardTemplateSS-of-NN 부분이 00-of-01이 됩니다.

Pub/Sub to Text Files on Cloud Storage 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub to Text Files on Cloud Storage template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

Pub/Sub to MongoDB 템플릿은 Pub/Sub 구독에서 JSON 인코딩 메시지를 읽고 이를 MongoDB에 문서로 쓰는 스트리밍 파이프라인입니다. 필요한 경우 이 파이프라인은 자바스크립트 사용자 정의 함수(UDF)를 사용하여 포함할 수 있는 추가 변환을 지원합니다. 스키마 불일치, 잘못된 형식의 JSON 또는 변환 실행 중에 오류가 발생하는 경우, 입력 메시지와 함께 처리되지 않은 메시지가 BigQuery 테이블에 기록됩니다. 처리되지 않은 레코드의 테이블이 실행되기 전에 존재하지 않으면 파이프라인은 자동으로 이 테이블을 만듭니다.

파이프라인 요구사항

  • Pub/Sub 구독이 있어야 하며 메시지가 유효한 JSON 형식으로 인코딩되어야 합니다.
  • MongoDB 클러스터가 있어야 하며 Dataflow 작업자 머신에서 액세스할 수 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscription Pub/Sub 구독의 이름입니다. 예를 들면 projects/<project-id>/subscriptions/<subscription-name>입니다.
mongoDBUri 쉼표로 구분된 MongoDB 서버 목록입니다. 예를 들면 192.285.234.12:27017,192.287.123.11:27017입니다.
database 컬렉션을 저장하기 위한 MongoDB의 데이터베이스입니다. 예를 들면 my-db입니다.
collection MongoDB 데이터베이스 내부의 컬렉션 이름입니다. 예를 들면 my-collection입니다.
deadletterTable 오류(스키마 불일치, 잘못된 형식의 json 등)로 인해 메시지를 저장하는 BigQuery 테이블입니다. 예를 들면 project-id:dataset-name.table-name입니다.
javascriptTextTransformGcsPath [선택사항] UDF 변환이 포함된 자바스크립트 파일의 Cloud Storage 위치입니다. 예를 들면 gs://mybucket/filename.json입니다.
javascriptTextTransformFunctionName [선택사항] 자바스크립트 UDF의 이름입니다. 예를 들면 transform입니다.
batchSize [선택사항] MongoDB에 문서를 일괄 삽입하기 위해 사용되는 배치 크기입니다. 기본값: 1000
batchSizeBytes [선택사항] 배치 크기(바이트)입니다. 기본값: 5242880
maxConnectionIdleTime [선택사항] 연결이 시간 초과되기 전까지 허용되는 최대 유휴 시간(초)입니다. 기본값: 60000
sslEnabled [선택사항] MongoDB 연결에 SSL이 사용 설정되었는지 여부를 나타내는 부울 값입니다. 기본값: true
ignoreSSLCertificate [선택사항] SSL 인증서를 무시해야 하는지 여부를 나타내는 부울 값입니다. 기본값: true
withOrdered [선택사항] MongoDB에 순서대로 대량 삽입할 수 있게 해주는 부울 값입니다. 기본값: true
withSSLInvalidHostNameAllowed [선택사항] SSL 연결에 잘못된 호스트 이름이 허용되었는지 여부를 나타내는 부울 값입니다. 기본값: true

Pub/Sub to MongoDB 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 Pub/Sub to MongoDB template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 284.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • REGION_NAME: Dataflow 리전 이름(예: us-central1)
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • LOCATION: Dataflow 리전 이름(예: us-central1)
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • INPUT_SUBSCRIPTION: Pub/Sub 구독(예: projects/<project-id>/subscriptions/<subscription-name>)
  • MONGODB_URI: MongoDB 서버 주소(예: 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: MongoDB 데이터베이스의 이름(예: users)
  • COLLECTION: MongoDB 컬렉션의 이름(예: profiles)
  • UNPROCESSED_TABLE: BigQuery 테이블의 이름(예: your-project:your-dataset.your-table-name)
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Text Files on Cloud Storage to BigQuery(스트리밍)

Text Files on Cloud Storage to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 스트리밍하고, 사용자가 제공한 자바스크립트 사용자 정의 함수(UDF)를 사용하여 변환하고, 결과를 BigQuery에 출력할 수 있게 해주는 스트리밍 파이프라인입니다.

파이프라인 요구사항:

  • 출력 테이블을 설명하는 JSON 형식의 BigQuery 스키마 파일을 만듭니다.
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • 텍스트 줄을 변환하는 논리를 제공하는 UDF 함수를 사용하여 자바스크립트(.js) 파일을 만듭니다. 함수는 JSON 문자열을 반환해야 합니다.

    예를 들어 이 함수는 CSV 파일의 각 줄을 분할하고, 값을 변환한 후에 JSON 문자열을 반환합니다.

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

템플릿 매개변수

매개변수 설명
javascriptTextTransformGcsPath 자바스크립트 UDF의 Cloud Storage 위치입니다. 예를 들면 gs://my_bucket/my_function.js입니다.
JSONPath JSON으로 설명된 BigQuery 스키마 파일의 Cloud Storage 위치입니다. 예를 들면 gs://path/to/my/schema.json입니다.
javascriptTextTransformFunctionName UDF로 호출할 자바스크립트 함수의 이름입니다. 예를 들면 transform입니다.
outputTable 정규화된 BigQuery 테이블입니다. 예를 들면 my-project:dataset.table입니다.
inputFilePattern 처리하려는 텍스트의 Cloud Storage 위치입니다. 예를 들면 gs://my-bucket/my-files/text.txt입니다.
bigQueryLoadingTemporaryDirectory BigQuery 로딩 프로세스를 위한 임시 디렉터리입니다. 예를 들면 gs://my-bucket/my-files/temp_dir입니다.
outputDeadletterTable 출력 테이블에 도달하지 못한 메시지 테이블입니다. 예를 들면 my-project:dataset.my-unprocessed-table입니다. 존재하지 않을 경우 파이프라인 실행 중에 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Cloud Storage Text to BigQuery(스트리밍) 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Storage Text to BigQuery template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: UDF의 이름
  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 자바스크립트 코드가 포함된 .js 파일의 Cloud Storage 경로
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • JAVASCRIPT_FUNCTION: UDF의 이름
  • PATH_TO_BIGQUERY_SCHEMA_JSON: 스키마 정의가 포함된 JSON 파일의 Cloud Storage 경로
  • PATH_TO_JAVASCRIPT_UDF_FILE: 자바스크립트 코드가 포함된 .js 파일의 Cloud Storage 경로
  • PATH_TO_TEXT_DATA: 텍스트 데이터 세트의 Cloud Storage 경로
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • BIGQUERY_UNPROCESSED_TABLE: 처리되지 않은 메시지에 대한 BigQuery 테이블의 이름
  • PATH_TO_TEMP_DIR_ON_GCS: 임시 디렉터리의 Cloud Storage 경로
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Text Files on Cloud Storage to Pub/Sub(스트리밍)

이 템플릿은 Cloud Storage에 업로드된 새 텍스트 파일을 지속적으로 폴링하고, 각 파일을 한 줄씩 읽고, Pub/Sub 주제에 문자열을 게시하는 스트리밍 파이프라인을 만듭니다. 이 템플릿은 JSON 레코드를 포함하는 줄바꿈 구분 파일 또는 CSV 파일의 레코드를 Pub/Sub 주제에 게시하여 실시간으로 처리합니다. 이 템플릿을 사용하여 Pub/Sub에 데이터를 다시 재생할 수 있습니다.

현재 폴링 간격은 고정되어 있으며 10초로 설정되어 있습니다. 이 템플릿은 개별 레코드에 타임스탬프를 설정하지 않기 때문에 이벤트 시간이 실행 중 게시 시간과 일치하게 됩니다. 파이프라인을 처리하기 위해 정확한 이벤트 시간이 필요한 경우에는 이 파이프라인을 사용해서는 안 됩니다.

파이프라인 요구사항:

  • 입력 파일은 줄바꿈으로 구분되는 JSON 또는 CSV 형식이어야 합니다. 소스 파일에서 여러 줄에 걸쳐 있는 레코드는 다운스트림 문제를 일으킬 수 있습니다. 파일 안의 각 줄이 Pub/Sub에 메시지로 게시되기 때문입니다.
  • 실행하기 전에 Pub/Sub 주제가 있어야 합니다.
  • 파이프라인은 무기한으로 실행되며 수동으로 종료해야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/files/*.json 또는 gs://bucket-name/path/*.csv입니다.
outputTopic 작성할 Pub/Sub 입력 주제입니다. 이름은 projects/<project-id>/topics/<topic-name> 형식이어야 합니다.

Text Files on Cloud Storage to Pub/Sub(스트리밍) 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Text Files on Cloud Storage to Pub/Sub (Stream) template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • TOPIC_NAME: Pub/Sub 주제 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • FILE_PATTERN: Cloud Storage 버킷에서 읽을 파일 패턴 glob(예: path/*.csv)
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용)

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용) 템플릿은 Cloud Storage 버킷에서 csv 파일을 읽고, 익명화를 위해 Cloud Data Loss Prevention(Cloud DLP) API를 호출하며, 익명화된 데이터를 지정된 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다. 이 템플릿은 Cloud DLP 검사 템플릿과 Cloud DLP 익명화 템플릿 사용을 지원합니다. 따라서 사용자가 잠재적으로 민감한 정보를 검사하고 식별을 익명화할 수 있을 뿐만 아니라 열을 익명화하도록 지정된 구조화된 데이터를 익명화하며 검사가 필요 없습니다.

파이프라인 요구사항:

  • 토큰화할 입력 데이터가 있어야 합니다.
  • Cloud DLP 템플릿이 있어야 합니다(예: DeidentifyTemplate 및 InspectTemplate). 자세한 내용은 Cloud DLP 템플릿을 참조하세요.
  • BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

매개변수 설명
inputFilePattern 입력 데이터 레코드를 읽어 들일 csv 파일입니다. 와일드 카드 사용도 허용됩니다. 예를 들면 gs://mybucket/my_csv_filename.csv 또는 gs://mybucket/file-*.csv입니다.
dlpProjectId Cloud DLP API 리소스를 소유하는 Cloud DLP 프로젝트 ID입니다. 이 Cloud DLP 프로젝트는 Cloud DLP 템플릿을 소유하는 동일한 프로젝트이거나 별도의 프로젝트일 수 있습니다. 예를 들면 my_dlp_api_project입니다.
deidentifyTemplateName API 요청에 사용할 Cloud DLP 익명화 템플릿으로, projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 패턴을 사용하여 지정됩니다. 예를 들면 projects/my_project/deidentifyTemplates/100입니다.
datasetName 토큰화된 결과를 보내기 위한 BigQuery 데이터 세트입니다.
batchSize 검사하거나 익명화할 데이터를 보내는 데 사용할 청크/배치 크기입니다. csv 파일의 경우 batchSize는 배치의 행 수입니다. 사용자는 레코드 크기 및 파일 크기에 따라 배치 크기를 결정해야 합니다. Cloud DLP API의 페이로드 크기 제한은 API 호출당 524KB입니다.
inspectTemplateName [선택사항] API 요청에 사용할 Cloud DLP 검사 템플릿으로, projects/{template_project_id}/identifyTemplates/{idTemplateId} 패턴을 사용하여 지정됩니다. 예를 들면 projects/my_project/identifyTemplates/100입니다.

Data Masking/Tokenization from Cloud Storage to BigQuery(Cloud DLP 사용) 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 명령줄 도구에서 실행

참고: gcloud 명령줄 도구로 템플릿을 실행하려면 Cloud SDK 버전이 138.0.0 이상이어야 합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

다음을 바꿉니다.

  • TEMPLATE_PROJECT_ID: 템플릿 프로젝트 ID
  • DLP_API_PROJECT_ID: Cloud DLP API 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_DATA: 입력 파일 경로
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify 템플릿 번호
  • DATASET_NAME: BigQuery 데이터 세트 이름
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect 템플릿 번호
  • BATCH_SIZE_VALUE: 배치 크기(csv의 API당 행 수)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

다음을 바꿉니다.

  • TEMPLATE_PROJECT_ID: 템플릿 프로젝트 ID
  • DLP_API_PROJECT_ID: Cloud DLP API 프로젝트 ID
  • JOB_NAME: 선택한 작업 이름입니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • REGION: 리전 엔드포인트(예: us-west1)
  • TEMP_LOCATION: 임시 파일 쓰기 위치(예: gs://your-bucket/temp)
  • INPUT_DATA: 입력 파일 경로
  • DEIDENTIFY_TEMPLATE: Cloud DLPDeidentify 템플릿 번호
  • DATASET_NAME: BigQuery 데이터 세트 이름
  • INSPECT_TEMPLATE_NUMBER: Cloud DLPInspect 템플릿 번호
  • BATCH_SIZE_VALUE: 배치 크기(csv의 API당 행 수)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub(스트리밍)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 템플릿은 Pub/Sub 메시지와 MySQL의 변경 데이터를 읽고 BigQuery에 레코드를 작성하는 스트리밍 파이프라인입니다. Debezium 커넥터는 MySQL 데이터베이스의 변경사항을 캡처하고 변경된 데이터를 Pub/Sub에 게시합니다. 그런 다음 템플릿이 Pub/Sub 메시지를 읽고 BigQuery에 씁니다.

이 템플릿을 사용하여 MySQL 데이터베이스와 BigQuery 테이블을 동기화할 수 있습니다. 파이프라인은 변경된 데이터를 BigQuery 스테이징 테이블에 쓰고 MySQL 데이터베이스를 복제하는 BigQuery 테이블을 간헐적으로 업데이트합니다.

파이프라인 요구사항

  • Debezium 커넥터가 배포되어야 합니다.
  • Pub/Sub 메시지는 빔 행으로 직렬화되어야 합니다.

템플릿 매개변수

매개변수 설명
inputSubscriptions 읽어올 Pub/Sub 입력 구독의 쉼표로 구분된 목록이며 <subscription>,<subscription>, ... 형식입니다.
changeLogDataset 스테이징 테이블을 저장할 BigQuery 데이터 세트이며 <my-dataset> 형식입니다.
replicaDataset 복제 테이블을 저장할 BigQuery 데이터 세트의 위치이며 <my-dataset> 형식입니다.
선택사항: updateFrequencySecs 파이프라인이 MySQL 데이터베이스를 복제하는 BigQuery 테이블을 업데이트하는 간격입니다.

Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery 템플릿 실행

이 템플릿을 실행하려면 다음 단계를 따르세요.

  1. 로컬 머신에서 DataflowTemplates 저장소를 복제합니다.
  2. v2/cdc-parent 디렉터리로 변경합니다.
  3. Debezium 커넥터가 배포되어 있는지 확인합니다.
  4. Maven을 사용하여 Dataflow 템플릿을 실행합니다.

    이 예시에서 다음 값을 바꿔야 합니다.

    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • YOUR_SUBSCRIPTIONS를 쉼표로 구분된 Pub/Sub 구독 이름 목록으로 바꿉니다.
    • YOUR_CHANGELOG_DATASET를 변경로그 데이터의 BigQuery 데이터 세트로 바꾸고 YOUR_REPLICA_DATASET를 복제본 테이블의 BigQuery 데이터 세트로 바꿉니다.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka to BigQuery

Apache Kafka to BigQuery 템플릿은 Apache Kafka에서 텍스트 데이터를 수집하고 사용자 정의 함수(UDF)를 실행하고, 결과 레코드를 BigQuery에 출력하는 스트리밍 파이프라인입니다. 데이터 변환, UDF 실행, 출력 테이블에 삽입 중 발생하는 모든 오류는 BigQuery의 개별 오류 테이블에 삽입됩니다. 실행 전 오류 테이블이 없으면 생성됩니다.

파이프라인 요구사항

  • 출력 BigQuery 테이블이 있어야 합니다.
  • Apache Kafka 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결할 수 있어야 합니다.
  • Apache Kafka 주제가 있어야 하며 메시지는 유효한 JSON 형식으로 인코딩해야 합니다.

템플릿 매개변수

매개변수 설명
outputTableSpec Apache Kafka 메시지를 my-project:dataset.table 형식으로 작성할 BigQuery 출력 테이블 위치입니다.
inputTopics 읽을 Apache Kafka 입력 주제를 쉼표로 구분된 목록으로 표시한 것입니다. 예를 들면 messages입니다.
bootstrapServers 실행 중인 Apache Kafka 브로커 서버의 호스트 주소를 쉼표로 구분된 목록으로 표시한 것이며 각 호스트 주소는 35.70.252.199:9092 형식입니다.
javascriptTextTransformGcsPath (선택사항) 자바스크립트 UDF의 Cloud Storage 위치 경로입니다. 예를 들면 gs://my_bucket/my_function.js입니다.
javascriptTextTransformFunctionName (선택사항) UDF로 호출할 자바스크립트의 이름입니다. 예: transform
outputDeadletterTable (선택사항) 출력 테이블에 도달하지 못한 my-project:dataset.my-deadletter-table 형식의 BigQuery 테이블입니다. 테이블이 없으면 파이프라인 실행 중에 테이블이 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

Apache Kafka to BigQuery 템플릿 실행

콘솔

Google Cloud Console에서 실행
  1. Cloud Console에서 Dataflow 페이지로 이동합니다.
  2. Dataflow 페이지로 이동
  3. 템플릿에서 작업 만들기를 클릭합니다.
  4. 템플릿에서 Cloud Platform 콘솔 생성 작업 버튼
  5. Cloud Dataflow 템플릿 드롭다운 메뉴에서 the Apache Kafka to BigQuery template을 선택합니다.
  6. 작업 이름 필드에 작업 이름을 입력합니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  7. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  8. 작업 실행을 클릭합니다.

GCLOUD

gcloud 도구 명령줄 도구에서 실행

참고: gcloud 도구 명령줄 도구를 사용하여 템플릿을 실행하려면 Cloud SDK 버전 284.0.0 이상이 필요합니다.

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

이 예시에서 다음 값을 바꿔야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • YOUR_JAVASCRIPT_FUNCTION을 UDF의 이름으로 바꿉니다.
  • REGION_NAME을 Dataflow 리전 이름으로 바꿉니다. 예를 들면 us-central1.입니다.
  • BIGQUERY_TABLE을 BigQuery 테이블 이름으로 바꿉니다.
  • KAFKA_TOPICS를 Apache Kakfa 주제 목록으로 바꿉니다. 주제가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE을 자바스크립트 코드가 있는 .js 파일의 Cloud Storage 경로로 바꿉니다.
  • YOUR_JAVASCRIPT_FUNCTION을 UDF의 이름으로 바꿉니다.
  • KAFKA_SERVER_ADDRESSES를 Apache Kafka 브로커 서버 IP 주소 목록으로 바꿉니다. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 함께 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

REST API에서 실행

이 템플릿을 실행할 때는 템플릿의 Cloud Storage 경로가 필요합니다.

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

REST API 요청으로 이 템플릿을 실행하려면 프로젝트 ID와 함께 HTTP POST 요청을 보냅니다. 이 요청에는 승인이 필요합니다.

이 예시에서 다음 값을 바꿔야 합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.
  • JOB_NAME을 원하는 작업 이름으로 바꿉니다. 작업 이름이 유효하려면 정규 표현식 [a-z]([-a-z0-9]{0,38}[a-z0-9])?와 일치해야 합니다.
  • YOUR_JAVASCRIPT_FUNCTION을 UDF의 이름으로 바꿉니다.
  • LOCATION을 Dataflow 리전 이름으로 바꿉니다. 예를 들면 us-central1.입니다.
  • BIGQUERY_TABLE을 BigQuery 테이블 이름으로 바꿉니다.
  • KAFKA_TOPICS를 Apache Kakfa 주제 목록으로 바꿉니다. 주제가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE을 자바스크립트 코드가 있는 .js 파일의 Cloud Storage 경로로 바꿉니다.
  • YOUR_JAVASCRIPT_FUNCTION을 UDF의 이름으로 바꿉니다.
  • KAFKA_SERVER_ADDRESSES를 Apache Kafka 브로커 서버 IP 주소 목록으로 바꿉니다. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 함께 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}