Apache Kafka는 실시간 데이터 파이프라인 및 데이터 통합을 위한 오픈소스 분산 스트리밍 플랫폼입니다. 다음과 같은 다양한 애플리케이션에서 사용할 수 있는 효율적이고 확장 가능한 스트리밍 시스템을 제공합니다.
- 실시간 분석
- 스트림 처리
- 로그 집계
- 분산 메시징
- 이벤트 스트리밍
목표
ZooKeeper를 사용하여 Dataproc HA 클러스터에 Kafka를 설치합니다(이 튜토리얼에서는 'Dataproc Kafka 클러스터'라고 함).
가상의 고객 데이터를 만든 후 데이터를 Kafka 주제에 게시합니다.
Cloud Storage에서 Hive parquet 및 ORC 테이블을 만들어 스트리밍된 Kafka 주제 데이터를 수신합니다.
PySpark 작업을 제출하여 Kafka 주제를 구독하고 Parquet 및 ORC 형식으로 Cloud Storage에 스트리밍합니다.
스트리밍된 Hive 테이블 데이터에서 쿼리를 실행하여 스트리밍된 Kafka 메시지 수를 계산합니다.
비용
이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요.
이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.
시작하기 전에
Google Cloud 프로젝트를 아직 만들지 않았으면 지금 만듭니다.
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
튜토리얼 단계
다음 단계를 수행하여 Dataproc Kafka 클러스터를 만들어 Parquet 또는 ORC 형식으로 Cloud Storage에 Kafka 주제를 읽어옵니다.
Kafka 설치 스크립트를 Cloud Storage에 복사
kafka.sh
초기화 작업 스크립트는 Dataproc 클러스터에 Kafka를 설치합니다.
코드를 찾아봅니다.
kafka.sh
초기화 작업 스크립트를 Cloud Storage 버킷에 복사합니다. 이 스크립트는 Dataproc 클러스터에 Kafka를 설치합니다.Cloud Shell을 열고 다음 명령어를 실행합니다.
gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
다음을 바꿉니다.
- REGION:
kafka.sh
는 Cloud Storage의 리전별로 태그가 지정된 공개 버킷에 저장됩니다. 지리적으로 가까운 Compute Engine 리전(예:us-central1
)을 지정합니다. - BUCKET_NAME: Cloud Storage 버킷 이름입니다.
- REGION:
Dataproc Kafka 클러스터 만들기
Cloud Shell을 열고 다음
gcloud dataproc clusters create
명령어를 실행하여 Kafka 및 ZooKeeper 구성요소를 설치하는 Dataproc HA 클러스터를 만듭니다.gcloud dataproc clusters create KAFKA_CLUSTER \ --project=PROJECT_ID \ --region=REGION \ --image-version=2.1-debian11 \ --num-masters=3 \ --enable-component-gateway \ --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
참고:
- KAFKA_CLUSTER: 프로젝트 내에서 고유해야 하는 클러스터 이름입니다. 이름은 소문자로 시작해야 하며, 최대 51자(영문 기준)의 소문자, 숫자, 하이픈을 포함할 수 있습니다. 하이픈으로 끝나면 안 됩니다. 삭제된 클러스터의 이름을 재사용할 수 있습니다.
- PROJECT_ID: 클러스터와 연결할 프로젝트입니다.
- REGION: 클러스터가 있는 Compute Engine 리전(예:
us-central1
)입니다.- 선택사항인
--zone=ZONE
플래그를 추가하여us-central1-a
와 같이 지정된 리전 내의 영역을 지정할 수 있습니다. 영역을 지정하지 않으면 Dataproc 자동 영역 배치 기능에서 지정된 리전이 있는 영역을 선택합니다.
- 선택사항인
--image-version
: 이 튜토리얼에서는 Dataproc 이미지 버전2.1-debian11
을 사용하는 것이 좋습니다. 참고: 각 이미지 버전에는 이 튜토리얼에 사용된 Hive 구성요소를 포함하여 사전 설치된 구성요소 집합이 포함되어 있습니다(지원되는 Dataproc 이미지 버전 참조).--num-master
:3
마스터 노드는 HA 클러스터를 만듭니다. Kafka에 필요한 Zookeeper 구성요소는 HA 클러스터에 사전 설치되어 있습니다.--enable-component-gateway
: Dataproc 구성요소 게이트웨이를 사용 설정합니다.- BUCKET_NAME:
/scripts/kafka.sh
초기화 스크립트가 포함된 Cloud Storage 버킷의 이름입니다(Kafka 설치 스크립트를 Cloud Storage에 복사 참조).
Kafka custdata
주제 만들기
Dataproc Kafka 클러스터에서 Kafka 주제를 만들려면 다음 안내를 따르세요.
SSH 유틸리티를 사용하여 클러스터 마스터 VM의 터미널 창을 엽니다.
Kafka
custdata
주제를 만듭니다./usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --create --topic custdata
참고:
KAFKA_CLUSTER: Kafka 클러스터의 이름을 삽입합니다.
-w-0:9092
는worker-0
노드의 포트9092
에서 실행 중인 Kafka 브로커를 나타냅니다.custdata
주제를 만든 후 다음 명령어를 실행할 수 있습니다.# List all topics. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --list
# Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --topic custdata
# Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \ --bootstrap-server KAFKA_CLUSTER-w-0:9092 \ --delete --topic custdata
Kafka custdata
주제에 콘텐츠 게시
다음 스크립트는 kafka-console-producer.sh
Kafka 도구를 사용하여 CSV 형식으로 가상의 고객 데이터를 생성합니다.
스크립트를 복사한 후 Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에 붙여넣습니다. <Return>키를 눌러 스크립트를 실행합니다.
for i in {1..10000}; do \ custname="cust name${i}" uuid=$(dbus-uuidgen) age=$((45 + $RANDOM % 45)) amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))") message="${uuid}:${custname},${age},${amount}" echo ${message} done | /usr/lib/kafka/bin/kafka-console-producer.sh \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata \ --property "parse.key=true" \ --property "key.separator=:"
참고:
- KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
다음 Kafka 명령어를 실행하여
custdata
주제에 메시지 10,000개가 포함되어 있는지 확인합니다./usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list KAFKA_CLUSTER-w-0:9092 \ --topic custdata
참고:
- KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
예상 출력:
custdata:0:10000
Cloud Storage에서 Hive 테이블 만들기
스트리밍된 Kafka 주제 데이터를 수신할 Hive 테이블을 만듭니다.
다음 단계를 수행하여 Cloud Storage 버킷에서 cust_parquet
(parquet) 및 cust_orc
(ORC) Hive 테이블을 만듭니다.
다음 스크립트에서 BUCKET_NAME을 삽입한 다음 Kafka 클러스터 마스터 노드의 SSH 터미널에 스크립트를 복사하여 붙여넣은 후<Return>키를 눌러
~/hivetables.hql
(Hive 쿼리 언어) 스크립트를 만듭니다.다음 단계에서
~/hivetables.hql
스크립트를 실행하여 Cloud Storage 버킷에 Parquet 및 ORC Hive 테이블을 만듭니다.cat > ~/hivetables.hql <<EOF drop table if exists cust_parquet; create external table if not exists cust_parquet (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as parquet location "gs://BUCKET_NAME/tables/cust_parquet"; drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서
~/hivetables.hql
Hive 작업을 제출하여 Cloud Storage 버킷에cust_parquet
(parquet) 및cust_orc
(ORC) Hive 테이블을 만듭니다.gcloud dataproc jobs submit hive \ --cluster=KAFKA_CLUSTER \ --region=REGION \ -f ~/hivetables.hql
참고:
- Hive 구성요소는 Dataproc Kafka 클러스터에 사전 설치되어 있습니다. 최근 출시된 2.1 이미지에 포함된 Hive 구성요소 버전 목록은 2.1.x 출시 버전을 참조하세요.
- KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
- REGION: Kafka 클러스터가 있는 리전입니다.
Kafka custdata
를 Hive 테이블로 스트리밍
- Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 다음 명령어를 실행하여
kafka-python
라이브러리를 설치합니다. Kafka 주제 데이터를 Cloud Storage로 스트리밍하려면 Kafka 클라이언트가 필요합니다.
pip install kafka-python
BUCKET_NAME을 삽입하고 Kafka 클러스터 마스터 노드의 SSH 터미널에 다음 PySpark 코드를 복사하여 붙여넣은 후 <Return> 키를 눌러
streamdata.py
파일을 만듭니다.스크립트는 Kafka
custdata
주제를 구독한 후 데이터를 Cloud Storage의 Hive 테이블로 스트리밍합니다. parquet 또는 ORC일 수 있는 출력 형식은 매개변수로 스크립트에 전달됩니다.cat > streamdata.py <<EOF #!/bin/python import sys from pyspark.sql.functions import * from pyspark.sql.types import * from pyspark.sql import SparkSession from kafka import KafkaConsumer def getNameFn (data): return data.split(",")[0] def getAgeFn (data): return data.split(",")[1] def getAmtFn (data): return data.split(",")[2] def main(cluster, outputfmt): spark = SparkSession.builder.appName("APP").getOrCreate() spark.sparkContext.setLogLevel("WARN") Logger = spark._jvm.org.apache.log4j.Logger logger = Logger.getLogger(__name__) rows = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \ .option("startingOffsets", "earliest")\ .load() getNameUDF = udf(getNameFn, StringType()) getAgeUDF = udf(getAgeFn, StringType()) getAmtUDF = udf(getAmtFn, StringType()) logger.warn("Params passed in are cluster name: " + cluster + " output format(sink): " + outputfmt) query = rows.select (col("key").cast("string").alias("uuid"),\ getNameUDF (col("value").cast("string")).alias("custname"),\ getAgeUDF (col("value").cast("string")).alias("age"),\ getAmtUDF (col("value").cast("string")).alias("amount")) writer = query.writeStream.format(outputfmt)\ .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\ .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \ .outputMode("append")\ .start() writer.awaitTermination() if __name__=="__main__": if len(sys.argv) < 2: print ("Invalid number of arguments passed ", len(sys.argv)) print ("Usage: ", sys.argv[0], " cluster format") print ("e.g.: ", sys.argv[0], " <cluster_name> orc") print ("e.g.: ", sys.argv[0], " <cluster_name> parquet") main(sys.argv[1], sys.argv[2]) EOF
Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서
spark-submit
을 실행하여 Cloud Storage의 Hive 테이블로 데이터를 스트리밍합니다.KAFKA_CLUSTER 및 출력 FORMAT의 이름을 삽입한 후 다음 코드를 복사하여 Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에 붙여넣은 다음 <Return> 키를 눌러 코드를 실행하고 Kafka
custdata
데이터를 Parquet 형식으로 Cloud Storage의 Hive 테이블로 스트리밍합니다.spark-submit --packages \ org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \ --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \ --conf spark.driver.memory=4096m \ --conf spark.executor.cores=2 \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=6144m \ streamdata.py KAFKA_CLUSTER FORMAT
참고:
- KAFKA_CLUSTER: Kafka 클러스터의 이름을 삽입합니다.
- FORMAT:
parquet
또는orc
를 출력 형식으로 지정합니다. 명령어를 연속적으로 실행하여 두 형식을 Hive 테이블로 스트리밍할 수 있습니다. 예를 들어 첫 번째 호출에서parquet
을 지정하여 Kafkacustdata
주제를 Hive parquet 테이블로 스트리밍합니다. 그런 다음 두 번째 호출에서orc
형식을 지정하여custdata
를 Hive ORC 테이블로 스트리밍합니다.
모든
custdata
가 스트리밍되었음을 나타내는 SSH 터미널에서 표준 출력이 중지되면 SSH 터미널에서 <control-c>를 눌러 프로세스를 중지합니다.Cloud Storage에서 Hive 테이블을 나열합니다.
gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
참고:
- BUCKET_NAME: Hive 테이블이 포함된 Cloud Storage 버킷의 이름을 삽입합니다(Hive 테이블 만들기 참조).
스트리밍 데이터 쿼리
Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 다음
hive
명령어를 실행하여 Cloud Storage의 Hive 테이블에서 스트리밍된 Kafkacustdata
메시지를 계산합니다.hive -e "select count(1) from TABLE_NAME"
참고:
- TABLE_NAME:
cust_parquet
또는cust_orc
를 Hive 테이블 이름으로 지정합니다.
예상 출력 스니펫:
- TABLE_NAME:
...
Status: Running (Executing on YARN cluster with App id application_....)
----------------------------------------------------------------------------------------------
VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container SUCCEEDED 1 1 0 0 0 0
Reducer 2 ...... container SUCCEEDED 1 1 0 0 0 0
----------------------------------------------------------------------------------------------
VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)
삭제
프로젝트 삭제
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
리소스 삭제
-
버킷을 삭제합니다.
gcloud storage buckets delete BUCKET_NAME
- Kafka 클러스터를 삭제합니다.
gcloud dataproc clusters delete KAFKA_CLUSTER \ --region=${REGION}