Apache Kafka to BigQuery 템플릿

Apache Kafka to BigQuery 템플릿은 Apache Kafka에서 텍스트 데이터를 수집하고 사용자 정의 함수(UDF)를 실행하고, 결과 레코드를 BigQuery에 출력하는 스트리밍 파이프라인입니다. 데이터 변환, UDF 실행, 출력 테이블에 삽입 중 발생하는 모든 오류는 BigQuery의 개별 오류 테이블에 삽입됩니다. 실행 전 오류 테이블이 없으면 생성됩니다.

파이프라인 요구사항

  • 출력 BigQuery 테이블이 있어야 합니다.
  • Apache Kafka 브로커 서버가 실행 중이며 Dataflow 작업자 머신에서 연결할 수 있어야 합니다.
  • Apache Kafka 주제가 있어야 하며 메시지는 유효한 JSON 형식으로 인코딩해야 합니다.

템플릿 매개변수

매개변수 설명
outputTableSpec Apache Kafka 메시지를 my-project:dataset.table 형식으로 작성할 BigQuery 출력 테이블 위치입니다.
inputTopics 읽을 Apache Kafka 입력 주제를 쉼표로 구분된 목록으로 표시한 것입니다. 예를 들면 messages입니다.
bootstrapServers 실행 중인 Apache Kafka 브로커 서버의 호스트 주소를 쉼표로 구분된 목록으로 표시한 것이며 각 호스트 주소는 35.70.252.199:9092 형식입니다.
javascriptTextTransformGcsPath (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName (선택사항) 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다. 예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
outputDeadletterTable (선택사항) 출력 테이블에 도달하지 못한 my-project:dataset.my-deadletter-table 형식의 BigQuery 테이블입니다. 테이블이 없으면 파이프라인 실행 중에 테이블이 생성됩니다. 지정하지 않은 경우 <outputTableSpec>_error_records가 대신 사용됩니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. 선택사항: 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전 엔드포인트는 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Kafka to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 함께 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행할 Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전 엔드포인트(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

    • latest: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.
    • 버전 이름(예: 2021-09-20-00_RC00): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
  • BIGQUERY_TABLE: BigQuery 테이블 이름
  • KAFKA_TOPICS: Apache Kakfa 주제 목록. 주제가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.
  • PATH_TO_JAVASCRIPT_UDF_FILE: 사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
  • JAVASCRIPT_FUNCTION: 사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.

    예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.

  • KAFKA_SERVER_ADDRESSES: Apache Kafka 브로커 서버 IP 주소 목록. 각 IP 주소에는 서버가 액세스할 수 있는 포트 번호가 함께 있어야 합니다. 예를 들면 35.70.252.199:9092입니다. 주소가 여러 개 제공된 경우 쉼표를 이스케이프하는 방법에 대한 안내를 따르세요.

자세한 내용은 Dataflow로 Kafka에서 BigQuery로 데이터 쓰기를 참조하세요.