Kafka 주제를 Hive로 스트리밍


Apache Kafka는 실시간 데이터 파이프라인 및 데이터 통합을 위한 오픈소스 분산 스트리밍 플랫폼입니다. 다음과 같은 다양한 애플리케이션에서 사용할 수 있는 효율적이고 확장 가능한 스트리밍 시스템을 제공합니다.

  • 실시간 분석
  • 스트림 처리
  • 로그 집계
  • 분산형 메시지
  • 이벤트 스트리밍

목표

  1. ZooKeeper를 사용하여 Dataproc HA 클러스터에 Kafka를 설치합니다(이 튜토리얼에서는 'Dataproc Kafka 클러스터'라고 함).

  2. 가상의 고객 데이터를 만든 후 Kafka 주제에 데이터를 게시합니다.

  3. 스트리밍된 Kafka 주제 데이터를 수신하도록 Cloud Storage에 Hive Parquet 및 ORC 테이블을 만듭니다.

  4. PySpark 작업을 제출하여 Kafka 주제를 구독하고 Parquet 및 ORC 형식으로 Cloud Storage에 스트리밍합니다.

  5. 스트리밍된 Hive 테이블 데이터에서 쿼리를 실행하여 스트리밍된 Kafka 메시지를 계산합니다.

비용

이 문서에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용하세요. Google Cloud를 처음 사용하는 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

아직 Google Cloud 프로젝트를 만들지 않았으면 지금 만듭니다.

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  4. API Dataproc, Compute Engine, and Cloud Storage 사용 설정

    API 사용 설정

  5. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  6. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  7. API Dataproc, Compute Engine, and Cloud Storage 사용 설정

    API 사용 설정

  8. Google Cloud Console에서 Cloud Storage 버킷 페이지로 이동합니다.

    버킷 페이지로 이동

  9. 버킷 만들기를 클릭합니다.
  10. 버킷 만들기 페이지에서 버킷 정보를 입력합니다. 다음 단계로 이동하려면 계속을 클릭합니다.
    • 버킷 이름 지정에서 버킷 이름 지정 요구사항을 충족하는 이름을 입력합니다.
    • 데이터를 저장할 위치 선택에서 다음을 수행합니다.
      • 위치 유형 옵션을 선택합니다.
      • 위치 옵션을 선택합니다.
    • 데이터의 기본 스토리지 클래스 선택에서 스토리지 클래스를 선택합니다.
    • 객체 액세스를 제어하는 방식 선택에서 액세스 제어 옵션을 선택합니다.
    • 고급 설정(선택사항)에서 암호화 방법, 보관 정책 또는 버킷 라벨을 지정합니다.
  11. 만들기를 클릭합니다.

튜토리얼 단계

다음 단계를 수행하여 Dataproc Kafka 클러스터를 만들어 Parquet 또는 ORC 형식으로 Cloud Storage에 Kafka 주제를 읽어옵니다.

Kafka 설치 스크립트를 Cloud Storage에 복사

kafka.sh 초기화 작업 스크립트는 Dataproc 클러스터에 Kafka를 설치합니다.

  1. 코드 둘러보기

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      retry_apt_command "apt-get install -y gnupg2 &&\
                         apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is the worker ID.
        BROKER_ID=$(hostname | sed 's/.*-w-\([0-9]*\)$/\1/g')
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function remove_old_backports {
      # This script uses 'apt-get update' and is therefore potentially dependent on
      # backports repositories which have been archived.  In order to mitigate this
      # problem, we will remove any reference to backports repos older than oldstable
    
      # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
      oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
      stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
    
      matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
      if [[ -n "$matched_files" ]]; then
        for filename in "$matched_files"; do
          grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
            sed -i -e 's/^.*-backports.*$//' "$filename"
        done
      fi
    }
    
    function main() {
      OS=$(. /etc/os-release && echo "${ID}")
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
        remove_old_backports
      fi
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. kafka.sh 초기화 작업 스크립트를 Cloud Storage 버킷에 복사합니다. 이 스크립트는 Dataproc 클러스터에 Kafka를 설치합니다.

    1. Cloud Shell을 열고 다음 명령어를 실행합니다.

      gsutil cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      다음을 바꿉니다.

      • REGION: kafka.sh는 리전별로 태그된 공개 Cloud Storage 버킷에 저장됩니다. 지리적으로 가까운 Compute Engine 리전(예: us-central1)을 지정합니다.
      • BUCKET_NAME: Cloud Storage 버킷 이름입니다.

Dataproc Kafka 클러스터 만들기

  1. Cloud Shell을 열고 다음 gcloud dataproc clusters create 명령어를 실행하여 Kafka 및 ZooKeeper 구성요소를 설치하는 Dataproc HA 클러스터를 만듭니다.

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    참고:

    • KAFKA_CLUSTER: 프로젝트 내에서 고유해야 하는 클러스터 이름입니다. 이름은 소문자로 시작해야 하며, 최대 51자(영문 기준)의 소문자, 숫자, 하이픈을 포함할 수 있습니다. 하이픈으로 끝나면 안 됩니다. 삭제된 클러스터의 이름은 재사용할 수 있습니다.
    • PROJECT_ID: 이 클러스터와 연결할 프로젝트입니다.
    • REGION: 클러스터가 위치할 Compute Engine 리전입니다(예: us-central1).
      • 선택사항인 --zone=ZONE 플래그를 추가하여 us-central1-a와 같이 지정된 리전 내의 영역을 지정할 수 있습니다. 영역을 지정하지 않으면 Dataproc 자동 영역 배치 기능이 지정된 리전이 있는 영역을 선택합니다.
    • --image-version: 이 튜토리얼에서는 Dataproc 이미지 버전 2.1-debian11을 사용하는 것이 좋습니다. 참고: 각 이미지 버전에는 이 튜토리얼에 사용된 Hive 구성요소를 포함하여 사전 설치된 구성요소 집합이 포함되어 있습니다(지원되는 Dataproc 이미지 버전 참조).
    • --num-master: 3 마스터 노드는 HA 클러스터를 만듭니다. Kafka에 필요한 Zookeeper 구성요소는 HA 클러스터에 사전 설치되어 있습니다.
    • --enable-component-gateway: Dataproc 구성요소 게이트웨이를 사용 설정합니다.
    • BUCKET_NAME: /scripts/kafka.sh 초기화 스크립트가 포함된 Cloud Storage 버킷의 이름입니다(Kafka 설치 스크립트를 Cloud Storage에 복사 참조).

Kafka custdata 주제 만들기

Dataproc Kafka 클러스터에서 Kafka 주제를 만들려면 다음 안내를 따르세요.

  1. SSH 유틸리티를 사용하여 클러스터 마스터 VM의 터미널 창을 엽니다.

  2. Kafka custdata 주제를 만듭니다.

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    참고:

    • KAFKA_CLUSTER: Kafka 클러스터의 이름을 삽입합니다. -w-0:9092worker-0 노드의 포트 9092에서 실행 중인 Kafka 브로커를 나타냅니다.

    • custdata 주제를 만든 후 다음 명령어를 실행할 수 있습니다.

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

Kafka custdata 주제에 콘텐츠 게시

다음 스크립트는 kafka-console-producer.sh Kafka 도구를 사용하여 CSV 형식으로 가상의 고객 데이터를 생성합니다.

  1. 스크립트를 복사한 후 Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에 붙여넣습니다. <Return>키를 눌러 스크립트를 실행합니다.

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    참고:

    • KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
  2. 다음 Kafka 명령어를 실행하여 custdata 주제에 10,000개의 메시지가 포함되어 있는지 확인합니다.

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    참고:

    • KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.

    예상 출력:

    custdata:0:10000
    

Cloud Storage에서 Hive 테이블 만들기

스트리밍된 Kafka 주제 데이터를 수신할 Hive 테이블을 만듭니다. 다음 단계를 수행하여 Cloud Storage 버킷에서 cust_parquet(parquet) 및 cust_orc(ORC) Hive 테이블을 만듭니다.

  1. 다음 스크립트에서 BUCKET_NAME을 삽입한 다음 Kafka 클러스터 마스터 노드의 SSH 터미널에 스크립트를 복사하여 붙여넣은 후<Return>키를 눌러 ~/hivetables.hql(Hive 쿼리 언어) 스크립트를 만듭니다.

    다음 단계에서 ~/hivetables.hql 스크립트를 실행하여 Cloud Storage 버킷에 parquet 및 ORC Hive 테이블을 만듭니다.

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 ~/hivetables.hql Hive 작업을 제출하여 Cloud Storage 버킷에 cust_parquet(parquet) 및 cust_orc(ORC) Hive 테이블을 만듭니다.

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    참고:

    • Hive 구성요소는 Dataproc Kafka 클러스터에 사전 설치되어 있습니다. 최근에 출시된 2.1 이미지에 포함된 Hive 구성요소 버전 목록은 2.1.x 출시 버전을 참조하세요.
    • KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
    • REGION: Kafka 클러스터가 위치한 리전입니다.

Kafka custdata를 Hive 테이블로 스트리밍

  1. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 다음 명령어를 실행하여 kafka-python 라이브러리를 설치합니다. Kafka 주제 데이터를 Cloud Storage로 스트리밍하려면 Kafka 클라이언트가 필요합니다.
    pip install kafka-python
    
  2. BUCKET_NAME을 삽입하고 Kafka 클러스터 마스터 노드의 SSH 터미널에 다음 PySpark 코드를 복사하여 붙여넣은 후 <Return> 키를 눌러 streamdata.py 파일을 만듭니다.

    스크립트는 Kafka custdata 주제를 구독한 다음 Cloud Storage의 Hive 테이블에 데이터를 스트리밍합니다. parquet 또는 ORC일 수 있는 출력 형식은 매개변수로 스크립트에 전달됩니다.

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 spark-submit을 실행하여 Cloud Storage의 Hive 테이블로 데이터를 스트리밍합니다.

    1. KAFKA_CLUSTER 및 출력 FORMAT의 이름을 삽입한 후 다음 코드를 복사하여 Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에 붙여넣은 다음 <Return> 키를 눌러 코드를 실행하고 Kafka custdata 데이터를 Parquet 형식으로 Cloud Storage의 Hive 테이블로 스트리밍합니다.

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      참고:

      • KAFKA_CLUSTER: Kafka 클러스터의 이름을 삽입합니다.
      • FORMAT: parquet 또는 orc를 출력 형식으로 지정합니다. 명령어를 연속적으로 실행하여 두 형식을 Hive 테이블로 스트리밍할 수 있습니다. 예를 들어 첫 번째 호출에서 parquet을 지정하여 Kafka custdata 주제를 Hive parquet 테이블로 스트리밍합니다. 그런 다음 두 번째 호출에서 orc 형식을 지정하여 custdata를 Hive ORC 테이블로 스트리밍합니다.
  4. 모든 custdata가 스트리밍되었음을 나타내는 SSH 터미널에서 표준 출력이 중지되면 SSH 터미널에서 <control-c>를 눌러 프로세스를 중지합니다.

  5. Cloud Storage에 Hive 테이블을 나열합니다.

    gsutil ls -r gs://BUCKET_NAME/tables/*
    

    참고:

    • BUCKET_NAME: Hive 테이블이 포함된 Cloud Storage 버킷의 이름을 삽입합니다(Hive 테이블 만들기 참조).

스트리밍된 데이터 쿼리

  1. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 다음 hive 명령어를 실행하여 Cloud Storage의 Hive 테이블에 있는 스트리밍 Kafka custdata 메시지를 계산합니다.

    hive -e "select count(1) from TABLE_NAME"
    

    참고:

    • TABLE_NAME: cust_parquet 또는 cust_orc를 Hive 테이블 이름으로 지정합니다.

    예상 출력 스니펫:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)

삭제

프로젝트 삭제

    Google Cloud 프로젝트를 삭제합니다.

    gcloud projects delete PROJECT_ID

리소스 삭제

  • 버킷을 삭제합니다.
    gcloud storage buckets delete BUCKET_NAME
  • Kafka 클러스터를 삭제합니다.
    gcloud dataproc clusters delete KAFKA_CLUSTER \
        --region=${REGION}