이 페이지에서는 Dataflow를 사용하여 Apache Kafka용 Google Cloud 관리형 서비스에서 데이터를 읽고 BigQuery 테이블에 레코드를 쓰는 방법을 보여줍니다. 이 튜토리얼에서는 Apache Kafka to BigQuery 템플릿을 사용하여 Dataflow 작업을 만듭니다.
개요
Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Dataflow를 사용하여 Kafka에서 이벤트를 읽고, 이를 처리하고, 추가 분석을 위해 BigQuery 테이블에 결과를 기록할 수 있습니다.
Apache Kafka용 관리형 서비스는 안전하고 확장 가능한 Kafka 클러스터를 실행하는 데 도움이 되는 Google Cloud Platform 서비스입니다.

필수 권한
Dataflow 작업자 서비스 계정에는 다음 Identity and Access Management(IAM) 역할이 있어야 합니다.
- 관리형 Kafka 클라이언트(
roles/managedkafka.client
) - BigQuery 데이터 편집자(
roles/bigquery.dataEditor
)
자세한 내용은 Dataflow 보안 및 권한을 참조하세요.
Kafka 클러스터 만들기
이 단계에서는 Apache Kafka용 관리형 서비스 클러스터를 만듭니다. 자세한 내용은 Apache Kafka용 관리형 서비스 클러스터 만들기를 참조하세요.
콘솔
Apache Kafka용 관리형 서비스 > 클러스터 페이지로 이동합니다.
만들기를 클릭합니다.
클러스터 이름 체크박스에 클러스터 이름을 입력합니다.
리전 목록에서 클러스터의 위치를 선택합니다.
만들기를 클릭합니다.
gcloud
managed-kafka clusters create
명령어를 사용합니다.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
다음을 바꿉니다.
CLUSTER
: 클러스터 이름REGION
: 서브넷을 만든 리전PROJECT_ID
: 프로젝트 IDSUBNET_NAME
: 클러스터를 배포할 서브넷
클러스터를 만드는 데는 일반적으로 20~30분 정도 걸립니다.
Kafka 주제 만들기
Apache Kafka용 관리형 서비스 클러스터를 만든 후 주제를 만듭니다.
콘솔
Apache Kafka용 관리형 서비스 > 클러스터 페이지로 이동합니다.
클러스터 이름을 클릭합니다.
클러스터 세부정보 페이지에서 주제 만들기를 클릭합니다.
주제 이름 체크박스에 주제 이름을 입력합니다.
만들기를 클릭합니다.
gcloud
managed-kafka topics create
명령어를 사용합니다.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
다음을 바꿉니다.
TOPIC_NAME
: 만들 주제의 이름
BigQuery 테이블 만들기
이 단계에서는 다음 스키마를 사용하여 BigQuery 테이블을 만듭니다.
열 이름 | 데이터 유형 |
---|---|
name |
STRING |
customer_id |
INTEGER |
BigQuery 데이터 세트가 아직 없으면 먼저 만듭니다. 자세한 내용은 데이터 세트 만들기를 참조하세요. 그런 다음 빈 테이블을 새로 만듭니다.
콘솔
BigQuery 페이지로 이동합니다.
탐색기 창에서 프로젝트를 펼친 후 데이터 세트를 선택합니다.
데이터 세트 정보 섹션에서
테이블 만들기를 클릭합니다.테이블을 만들 소스 목록에서 빈 테이블을 선택합니다.
테이블 체크박스에 테이블 이름을 입력합니다.
스키마 섹션에서 텍스트로 수정을 클릭합니다.
다음 스키마 정의를 붙여넣습니다.
name:STRING, customer_id:INTEGER
테이블 만들기를 클릭합니다.
gcloud
bq mk
명령어를 사용합니다.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 IDDATASET_NAME
: 데이터 세트의 이름TABLE_NAME
: 만들 테이블의 이름
Dataflow 작업 실행
Kafka 클러스터와 BigQuery 테이블을 만든 후 Dataflow 템플릿을 실행합니다.
콘솔
먼저 클러스터의 부트스트랩 서버 주소를 가져옵니다.
Google Cloud 콘솔에서 클러스터 페이지로 이동합니다.
클러스터 이름을 클릭합니다.
구성 탭을 클릭합니다.
부트스트랩 URL에서 부트스트랩 서버 주소를 복사합니다.
그런 다음 템플릿을 실행하여 Dataflow 작업을 만듭니다.
Dataflow > 작업 페이지로 이동합니다.
템플릿에서 작업 만들기를 클릭합니다.
작업 이름 필드에
kafka-to-bq
를 입력합니다.리전 엔드포인트에서 Apache Kafka용 관리형 서비스 클러스터가 있는 리전을 선택합니다.
'Kafka-BigQuery' 템플릿을 선택합니다.
다음 템플릿 파라미터를 입력합니다.
- Kafka 부트스트랩 서버: 부트스트랩 서버 주소
- 소스 Kafka 주제: 읽어올 주제의 이름
- Kafka 소스 인증 모드:
APPLICATION_DEFAULT_CREDENTIALS
- Kafka 메시지 형식:
JSON
- 테이블 이름 전략:
SINGLE_TABLE_NAME
- BigQuery 출력 테이블: 다음과 같은 형식의 BigQuery 테이블입니다.
PROJECT_ID
:DATASET_NAME
.TABLE_NAME
데드 레터 큐에서 BigQuery에 오류 쓰기를 선택합니다.
다음과 같은 형식의 데드 레터 큐의 BigQuery 테이블 이름을 입력합니다.
PROJECT_ID
:DATASET_NAME
.ERROR_TABLE_NAME
이 테이블을 미리 만들지 마세요. 파이프라인에서 만듭니다.
작업 실행을 클릭합니다.
gcloud
dataflow flex-template run
명령어를 사용합니다.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
다음 변수를 바꿉니다.
LOCATION
: Apache Kafka용 관리형 서비스가 있는 리전PROJECT_ID
: Google Cloud Platform 프로젝트 이름CLUSTER_ID
: 클러스터의 이름TOPIC
: Kafka 주제의 이름DATASET_NAME
: 데이터 세트의 이름TABLE_NAME
: 테이블의 이름ERROR_TABLE_NAME
: 데드 레터 큐의 BigQuery 테이블 이름
데드 레터 큐의 테이블을 미리 만들지 마세요. 파이프라인에서 만듭니다.
Kafka로 메시지 전송
Dataflow 작업이 시작되면 Kafka에 메시지를 보낼 수 있으며 파이프라인이 이를 BigQuery에 씁니다.
Kafka 클러스터와 동일한 서브넷에 VM을 만들고 Kafka 명령줄 도구를 설치합니다. 자세한 안내는 CLI에서 메시지 게시 및 사용의 클라이언트 머신 설정을 참조하세요.
다음 명령어를 실행하여 Kafka 주제에 메시지를 씁니다.
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
다음 변수를 바꿉니다.
TOPIC
: Kafka 주제의 이름CLUSTER_ID
: 클러스터의 이름입니다.LOCATION
: 클러스터가 있는 리전PROJECT_ID
: Google Cloud Platform 프로젝트 이름
프롬프트에서 다음 텍스트 줄을 입력하여 Kafka에 메시지를 전송합니다.
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
데드 레터 큐 사용
작업이 실행되는 동안 파이프라인이 개별 메시지를 BigQuery에 쓰지 못할 수 있습니다. 가능한 오류는 다음과 같습니다.
- 잘못된 형식의 JSON을 포함하는 직렬화 오류
- 테이블 스키마 및 JSON 데이터의 불일치로 인한 유형 변환 오류
- 테이블 스키마에 없는 JSON 데이터의 추가 필드
이러한 오류로 인해 작업이 실패하지는 않으며, Dataflow 작업 로그에 오류로 표시되지 않습니다. 대신 파이프라인은 데드 레터 큐를 사용하여 이러한 유형의 오류를 처리합니다.
템플릿을 실행할 때 데드 레터 큐를 사용 설정하려면 다음 템플릿 파라미터를 설정하세요.
useBigQueryDLQ
:true
outputDeadletterTable
: 정규화된 BigQuery 테이블 이름(예:my-project:dataset1.errors
)
파이프라인에서 테이블을 자동으로 만듭니다. Kafka 메시지를 처리할 때 오류가 발생하면 파이프라인이 테이블에 오류 항목을 기록합니다.
오류 메시지 예시:
오류 유형 | 이벤트 데이터 | errorMessage |
---|---|---|
직렬화 오류 | 'Hello world' | json을 테이블 행으로 직렬화할 수 없음: 'Hello world' |
유형 변환 오류 | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
알 수 없는 입력란 | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
BigQuery 데이터 유형 작업
내부적으로 Kafka I/O 커넥터는 JSON 메시지 페이로드를 Apache Beam TableRow
객체로 변환하고 TableRow
필드 값을 BigQuery 유형으로 변환합니다.
다음 표에는 BigQuery 데이터 유형의 JSON 표현이 나와 있습니다.
BigQuery 유형 | JSON 표현 |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" 잘 알려진 텍스트(WKT) 또는 문자열로 형식이 지정된 GeoJSON을 사용하여 지역을 지정합니다. 자세한 내용은 지리 공간 데이터 로드를 참조하세요. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" JavaScript |
구조화된 데이터
JSON 메시지가 일관된 스키마를 따르는 경우 BigQuery에서 STRUCT
데이터 유형을 사용하여 JSON 객체를 나타낼 수 있습니다.
다음 예에서 answers
필드는 a
및 b
의 두 하위 필드가 있는 JSON 객체입니다.
{"name":"Emily","answers":{"a":"yes","b":"no"}}
다음 SQL 문은 호환되는 스키마를 사용하여 BigQuery 테이블을 만듭니다.
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
결과 테이블은 다음과 같습니다.
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
반정형 데이터
JSON 메시지가 엄격한 스키마를 따르지 않는 경우 BigQuery에 JSON
데이터 유형으로 저장하는 것이 좋습니다.
JSON 데이터를 JSON
유형으로 저장하면 스키마를 미리 정의할 필요가 없습니다. 데이터 수집 후 GoogleSQL에서 필드 액세스(점 표기법) 및 배열 액세스 연산자를 사용하여 데이터를 쿼리할 수 있습니다. 자세한 내용은 GoogleSQL의 JSON 데이터 작업을 참조하세요.
UDF를 사용하여 데이터 변환
이 튜토리얼에서는 Kafka 메시지가 JSON 형식으로 지정되어 있고 BigQuery 테이블 스키마가 데이터에 변환을 적용하지 않고 JSON 데이터와 일치한다고 가정합니다.
선택적으로 BigQuery에 기록되기 전 데이터를 변환하는 JavaScript 사용자 정의 함수(UDF)를 제공할 수 있습니다. UDF는 필터링, 개인 식별 정보(PII) 삭제 또는 추가 필드로 데이터 강화와 같은 추가 처리도 수행할 수 있습니다.
자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.