Dataflow를 사용하여 Kafka에서 BigQuery로 데이터 쓰기

이 페이지에서는 Dataflow를 사용하여 Apache Kafka용 Google Cloud 관리형 서비스에서 데이터를 읽고 BigQuery 테이블에 레코드를 쓰는 방법을 보여줍니다. 이 튜토리얼에서는 Apache Kafka to BigQuery 템플릿을 사용하여 Dataflow 작업을 만듭니다.

개요

Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Dataflow를 사용하여 Kafka에서 이벤트를 읽고, 이를 처리하고, 추가 분석을 위해 BigQuery 테이블에 결과를 기록할 수 있습니다.

Apache Kafka용 관리형 서비스는 안전하고 확장 가능한 Kafka 클러스터를 실행하는 데 도움이 되는 Google Cloud Platform 서비스입니다.

Kafka 이벤트를 BigQuery로 읽기
Apache Kafka를 사용한 이벤트 기반 아키텍처

필수 권한

Dataflow 작업자 서비스 계정에는 다음 Identity and Access Management(IAM) 역할이 있어야 합니다.

  • 관리형 Kafka 클라이언트(roles/managedkafka.client)
  • BigQuery 데이터 편집자(roles/bigquery.dataEditor)

자세한 내용은 Dataflow 보안 및 권한을 참조하세요.

Kafka 클러스터 만들기

이 단계에서는 Apache Kafka용 관리형 서비스 클러스터를 만듭니다. 자세한 내용은 Apache Kafka용 관리형 서비스 클러스터 만들기를 참조하세요.

콘솔

  1. Apache Kafka용 관리형 서비스 > 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 만들기를 클릭합니다.

  3. 클러스터 이름 체크박스에 클러스터 이름을 입력합니다.

  4. 리전 목록에서 클러스터의 위치를 선택합니다.

  5. 만들기를 클릭합니다.

gcloud

managed-kafka clusters create 명령어를 사용합니다.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

다음을 바꿉니다.

  • CLUSTER: 클러스터 이름
  • REGION: 서브넷을 만든 리전
  • PROJECT_ID: 프로젝트 ID
  • SUBNET_NAME: 클러스터를 배포할 서브넷

클러스터를 만드는 데는 일반적으로 20~30분 정도 걸립니다.

Kafka 주제 만들기

Apache Kafka용 관리형 서비스 클러스터를 만든 후 주제를 만듭니다.

콘솔

  1. Apache Kafka용 관리형 서비스 > 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 클러스터 이름을 클릭합니다.

  3. 클러스터 세부정보 페이지에서 주제 만들기를 클릭합니다.

  4. 주제 이름 체크박스에 주제 이름을 입력합니다.

  5. 만들기를 클릭합니다.

gcloud

managed-kafka topics create 명령어를 사용합니다.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

다음을 바꿉니다.

  • TOPIC_NAME: 만들 주제의 이름

BigQuery 테이블 만들기

이 단계에서는 다음 스키마를 사용하여 BigQuery 테이블을 만듭니다.

열 이름 데이터 유형
name STRING
customer_id INTEGER

BigQuery 데이터 세트가 아직 없으면 먼저 만듭니다. 자세한 내용은 데이터 세트 만들기를 참조하세요. 그런 다음 빈 테이블을 새로 만듭니다.

콘솔

  1. BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 탐색기 창에서 프로젝트를 펼친 후 데이터 세트를 선택합니다.

  3. 데이터 세트 정보 섹션에서 테이블 만들기를 클릭합니다.

  4. 테이블을 만들 소스 목록에서 빈 테이블을 선택합니다.

  5. 테이블 체크박스에 테이블 이름을 입력합니다.

  6. 스키마 섹션에서 텍스트로 수정을 클릭합니다.

  7. 다음 스키마 정의를 붙여넣습니다.

    name:STRING,
    customer_id:INTEGER
    
  8. 테이블 만들기를 클릭합니다.

gcloud

bq mk 명령어를 사용합니다.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID
  • DATASET_NAME: 데이터 세트의 이름
  • TABLE_NAME: 만들 테이블의 이름

Dataflow 작업 실행

Kafka 클러스터와 BigQuery 테이블을 만든 후 Dataflow 템플릿을 실행합니다.

콘솔

먼저 클러스터의 부트스트랩 서버 주소를 가져옵니다.

  1. Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.

    클러스터로 이동

  2. 클러스터 이름을 클릭합니다.

  3. 구성 탭을 클릭합니다.

  4. 부트스트랩 URL에서 부트스트랩 서버 주소를 복사합니다.

그런 다음 템플릿을 실행하여 Dataflow 작업을 만듭니다.

  1. Dataflow > 작업 페이지로 이동합니다.

    작업으로 이동

  2. 템플릿에서 작업 만들기를 클릭합니다.

  3. 작업 이름 필드에 kafka-to-bq를 입력합니다.

  4. 리전 엔드포인트에서 Apache Kafka용 관리형 서비스 클러스터가 있는 리전을 선택합니다.

  5. 'Kafka-BigQuery' 템플릿을 선택합니다.

  6. 다음 템플릿 파라미터를 입력합니다.

    • Kafka 부트스트랩 서버: 부트스트랩 서버 주소
    • 소스 Kafka 주제: 읽어올 주제의 이름
    • Kafka 소스 인증 모드: APPLICATION_DEFAULT_CREDENTIALS
    • Kafka 메시지 형식: JSON
    • 테이블 이름 전략: SINGLE_TABLE_NAME
    • BigQuery 출력 테이블: 다음과 같은 형식의 BigQuery 테이블입니다. PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. 데드 레터 큐에서 BigQuery에 오류 쓰기를 선택합니다.

  8. 다음과 같은 형식의 데드 레터 큐의 BigQuery 테이블 이름을 입력합니다. PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    이 테이블을 미리 만들지 마세요. 파이프라인에서 만듭니다.

  9. 작업 실행을 클릭합니다.

gcloud

dataflow flex-template run 명령어를 사용합니다.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

다음 변수를 바꿉니다.

  • LOCATION: Apache Kafka용 관리형 서비스가 있는 리전
  • PROJECT_ID: Google Cloud Platform 프로젝트 이름
  • CLUSTER_ID: 클러스터의 이름
  • TOPIC: Kafka 주제의 이름
  • DATASET_NAME: 데이터 세트의 이름
  • TABLE_NAME: 테이블의 이름
  • ERROR_TABLE_NAME: 데드 레터 큐의 BigQuery 테이블 이름

데드 레터 큐의 테이블을 미리 만들지 마세요. 파이프라인에서 만듭니다.

Kafka로 메시지 전송

Dataflow 작업이 시작되면 Kafka에 메시지를 보낼 수 있으며 파이프라인이 이를 BigQuery에 씁니다.

  1. Kafka 클러스터와 동일한 서브넷에 VM을 만들고 Kafka 명령줄 도구를 설치합니다. 자세한 안내는 CLI에서 메시지 게시 및 사용클라이언트 머신 설정을 참조하세요.

  2. 다음 명령어를 실행하여 Kafka 주제에 메시지를 씁니다.

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    다음 변수를 바꿉니다.

    • TOPIC: Kafka 주제의 이름
    • CLUSTER_ID: 클러스터의 이름입니다.
    • LOCATION: 클러스터가 있는 리전
    • PROJECT_ID: Google Cloud Platform 프로젝트 이름
  3. 프롬프트에서 다음 텍스트 줄을 입력하여 Kafka에 메시지를 전송합니다.

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

데드 레터 큐 사용

작업이 실행되는 동안 파이프라인이 개별 메시지를 BigQuery에 쓰지 못할 수 있습니다. 가능한 오류는 다음과 같습니다.

  • 잘못된 형식의 JSON을 포함하는 직렬화 오류
  • 테이블 스키마 및 JSON 데이터의 불일치로 인한 유형 변환 오류
  • 테이블 스키마에 없는 JSON 데이터의 추가 필드

이러한 오류로 인해 작업이 실패하지는 않으며, Dataflow 작업 로그에 오류로 표시되지 않습니다. 대신 파이프라인은 데드 레터 큐를 사용하여 이러한 유형의 오류를 처리합니다.

템플릿을 실행할 때 데드 레터 큐를 사용 설정하려면 다음 템플릿 파라미터를 설정하세요.

  • useBigQueryDLQ: true
  • outputDeadletterTable: 정규화된 BigQuery 테이블 이름(예: my-project:dataset1.errors)

파이프라인에서 테이블을 자동으로 만듭니다. Kafka 메시지를 처리할 때 오류가 발생하면 파이프라인이 테이블에 오류 항목을 기록합니다.

오류 메시지 예시:

오류 유형 이벤트 데이터 errorMessage
직렬화 오류 'Hello world' json을 테이블 행으로 직렬화할 수 없음: 'Hello world'
유형 변환 오류 {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
알 수 없는 입력란 {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

BigQuery 데이터 유형 작업

내부적으로 Kafka I/O 커넥터는 JSON 메시지 페이로드를 Apache Beam TableRow 객체로 변환하고 TableRow 필드 값을 BigQuery 유형으로 변환합니다.

다음 표에는 BigQuery 데이터 유형의 JSON 표현이 나와 있습니다.

BigQuery 유형 JSON 표현
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

잘 알려진 텍스트(WKT) 또는 문자열로 형식이 지정된 GeoJSON을 사용하여 지역을 지정합니다. 자세한 내용은 지리 공간 데이터 로드를 참조하세요.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

JavaScript Date.toJSON 메서드를 사용하여 값 형식을 지정합니다.

구조화된 데이터

JSON 메시지가 일관된 스키마를 따르는 경우 BigQuery에서 STRUCT 데이터 유형을 사용하여 JSON 객체를 나타낼 수 있습니다.

다음 예에서 answers 필드는 ab의 두 하위 필드가 있는 JSON 객체입니다.

{"name":"Emily","answers":{"a":"yes","b":"no"}}

다음 SQL 문은 호환되는 스키마를 사용하여 BigQuery 테이블을 만듭니다.

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

결과 테이블은 다음과 같습니다.

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

반정형 데이터

JSON 메시지가 엄격한 스키마를 따르지 않는 경우 BigQuery에 JSON 데이터 유형으로 저장하는 것이 좋습니다. JSON 데이터를 JSON 유형으로 저장하면 스키마를 미리 정의할 필요가 없습니다. 데이터 수집 후 GoogleSQL에서 필드 액세스(점 표기법) 및 배열 액세스 연산자를 사용하여 데이터를 쿼리할 수 있습니다. 자세한 내용은 GoogleSQL의 JSON 데이터 작업을 참조하세요.

UDF를 사용하여 데이터 변환

이 튜토리얼에서는 Kafka 메시지가 JSON 형식으로 지정되어 있고 BigQuery 테이블 스키마가 데이터에 변환을 적용하지 않고 JSON 데이터와 일치한다고 가정합니다.

선택적으로 BigQuery에 기록되기 전 데이터를 변환하는 JavaScript 사용자 정의 함수(UDF)를 제공할 수 있습니다. UDF는 필터링, 개인 식별 정보(PII) 삭제 또는 추가 필드로 데이터 강화와 같은 추가 처리도 수행할 수 있습니다.

자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.

다음 단계