Kafka 주제를 Hive로 스트리밍


Apache Kafka는 실시간 데이터 파이프라인 및 데이터 통합을 위한 오픈소스 분산 스트리밍 플랫폼입니다. 다음과 같은 다양한 애플리케이션에서 사용할 수 있는 효율적이고 확장 가능한 스트리밍 시스템을 제공합니다.

  • 실시간 분석
  • 스트림 처리
  • 로그 집계
  • 분산 메시징
  • 이벤트 스트리밍

목표

  1. ZooKeeper를 사용하여 Dataproc HA 클러스터에 Kafka를 설치합니다(이 튜토리얼에서는 'Dataproc Kafka 클러스터'라고 함).

  2. 가상의 고객 데이터를 만든 후 데이터를 Kafka 주제에 게시합니다.

  3. Cloud Storage에서 Hive parquet 및 ORC 테이블을 만들어 스트리밍된 Kafka 주제 데이터를 수신합니다.

  4. PySpark 작업을 제출하여 Kafka 주제를 구독하고 Parquet 및 ORC 형식으로 Cloud Storage에 스트리밍합니다.

  5. 스트리밍된 Hive 테이블 데이터에서 쿼리를 실행하여 스트리밍된 Kafka 메시지 수를 계산합니다.

비용

이 문서에서는 비용이 청구될 수 있는 Google Cloud구성요소( )를 사용합니다.

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다.

Google Cloud 신규 사용자는 무료 체험판을 사용할 수 있습니다.

이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

아직 만들지 않았으면 Google Cloud 프로젝트를 만듭니다.

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Enable the APIs

  8. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  9. Click Create.
  10. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
    1. In the Get started section, do the following:
      • Enter a globally unique name that meets the bucket naming requirements.
      • To add a bucket label, expand the Labels section (), click Add label, and specify a key and a value for your label.
    2. In the Choose where to store your data section, do the following:
      1. Select a Location type.
      2. Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
      3. To set up cross-bucket replication, select Add cross-bucket replication via Storage Transfer Service and follow these steps:

        Set up cross-bucket replication

        1. In the Bucket menu, select a bucket.
        2. In the Replication settings section, click Configure to configure settings for the replication job.

          The Configure cross-bucket replication pane appears.

          • To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
          • To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
          • Click Done.
    3. In the Choose how to store your data section, do the following:
      1. Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
      2. To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
    4. In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
    5. In the Choose how to protect object data section, do the following:
      • Select any of the options under Data protection that you want to set for your bucket.
        • To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
        • To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
        • To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
          • To enable Object Retention Lock, click the Enable object retention checkbox.
          • To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
      • To choose how your object data will be encrypted, expand the Data encryption section (), and select a Data encryption method.
  11. Click Create.
  12. 튜토리얼 단계

    다음 단계를 수행하여 Dataproc Kafka 클러스터를 만들어 Parquet 또는 ORC 형식으로 Cloud Storage에 Kafka 주제를 읽어옵니다.

    Kafka 설치 스크립트를 Cloud Storage에 복사

    kafka.sh 초기화 작업 스크립트는 Dataproc 클러스터에 Kafka를 설치합니다.

    1. 코드를 찾아봅니다.

      #!/bin/bash
      #    Copyright 2015 Google, Inc.
      #
      #    Licensed under the Apache License, Version 2.0 (the "License");
      #    you may not use this file except in compliance with the License.
      #    You may obtain a copy of the License at
      #
      #        http://www.apache.org/licenses/LICENSE-2.0
      #
      #    Unless required by applicable law or agreed to in writing, software
      #    distributed under the License is distributed on an "AS IS" BASIS,
      #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      #    See the License for the specific language governing permissions and
      #    limitations under the License.
      #
      # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
      # Dataproc cluster.
      
      set -euxo pipefail
      
      readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
      readonly KAFKA_HOME=/usr/lib/kafka
      readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
      readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
      readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
      readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
      readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
      readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
      
      # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
      ZOOKEEPER_ADDRESS=''
      # Integer broker ID of this node, e.g., 0
      BROKER_ID=''
      
      function retry_apt_command() {
        cmd="$1"
        for ((i = 0; i < 10; i++)); do
          if eval "$cmd"; then
            return 0
          fi
          sleep 5
        done
        return 1
      }
      
      function recv_keys() {
        retry_apt_command "apt-get install -y gnupg2 &&\
                           apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
      }
      
      function update_apt_get() {
        retry_apt_command "apt-get update"
      }
      
      function install_apt_get() {
        pkgs="$@"
        retry_apt_command "apt-get install -y $pkgs"
      }
      
      function err() {
        echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
        return 1
      }
      
      # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
      function get_broker_list() {
        ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
          <<<"ls /brokers/ids" |
          grep '\[.*\]' |
          sed 's/\[/ /' |
          sed 's/\]/,/'
      }
      
      # Waits for zookeeper to be up or time out.
      function wait_for_zookeeper() {
        for i in {1..20}; do
          if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
            return 0
          else
            echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
            sleep 5
          fi
        done
        echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
        exit 1
      }
      
      # Wait until the current broker is registered or time out.
      function wait_for_kafka() {
        for i in {1..20}; do
          local broker_list=$(get_broker_list || true)
          if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
            return 0
          else
            echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
            sleep 5
          fi
        done
        echo "Failed to start Kafka broker ${BROKER_ID}." >&2
        exit 1
      }
      
      function install_and_configure_kafka_server() {
        # Find zookeeper list first, before attempting any installation.
        local zookeeper_client_port
        zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
          tail -n 1 |
          cut -d '=' -f 2)
      
        local zookeeper_list
        zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
          cut -d '=' -f 2 |
          cut -d ':' -f 1 |
          sort |
          uniq |
          sed "s/$/:${zookeeper_client_port}/" |
          xargs echo |
          sed "s/ /,/g")
      
        if [[ -z "${zookeeper_list}" ]]; then
          # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
          # bother to populate it. Check if YARN HA is configured.
          zookeeper_list=$(bdconfig get_property_value --configuration_file \
            /etc/hadoop/conf/yarn-site.xml \
            --name yarn.resourcemanager.zk-address 2>/dev/null)
        fi
      
        # If all attempts failed, error out.
        if [[ -z "${zookeeper_list}" ]]; then
          err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
        fi
      
        ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
      
        # Install Kafka from Dataproc distro.
        install_apt_get kafka-server || dpkg -l kafka-server ||
          err 'Unable to install and find kafka-server.'
      
        mkdir -p /var/lib/kafka-logs
        chown kafka:kafka -R /var/lib/kafka-logs
      
        if [[ "${ROLE}" == "Master" ]]; then
          # For master nodes, broker ID starts from 10,000.
          if [[ "$(hostname)" == *-m ]]; then
            # non-HA
            BROKER_ID=10000
          else
            # HA
            BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
          fi
        else
          # For worker nodes, broker ID is a random number generated less than 10000.
          # 10000 is choosen since the max broker ID allowed being set is 10000.
          BROKER_ID=$((RANDOM % 10000))
        fi
        sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
          "${KAFKA_PROP_FILE}"
        sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
          "${KAFKA_PROP_FILE}"
        sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
          "${KAFKA_PROP_FILE}"
        echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
        echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
      
        if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
          sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
          sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
        fi
      
        wait_for_zookeeper
      
        # Start Kafka.
        service kafka-server restart
      
        wait_for_kafka
      }
      
      function install_kafka_python_package() {
        KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
        if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
          return
        fi
      
        if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
          /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
        else
          OS=$(. /etc/os-release && echo "${ID}")
          if [[ "${OS}" == "rocky" ]]; then
            yum install -y python2-pip
          else
            apt-get install -y python-pip
          fi
          pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
        fi
      }
      
      function remove_old_backports {
        # This script uses 'apt-get update' and is therefore potentially dependent on
        # backports repositories which have been archived.  In order to mitigate this
        # problem, we will remove any reference to backports repos older than oldstable
      
        # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
        oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
        stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
      
        matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
        if [[ -n "$matched_files" ]]; then
          for filename in "$matched_files"; do
            grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
              sed -i -e 's/^.*-backports.*$//' "$filename"
          done
        fi
      }
      
      function main() {
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
          remove_old_backports
        fi
        recv_keys || err 'Unable to receive keys.'
        update_apt_get || err 'Unable to update packages lists.'
        install_kafka_python_package
      
        # Only run the installation on workers; verify zookeeper on master(s).
        if [[ "${ROLE}" == 'Master' ]]; then
          service zookeeper-server status ||
            err 'Required zookeeper-server not running on master!'
          if [[ "${RUN_ON_MASTER}" == "true" ]]; then
            # Run installation on masters.
            install_and_configure_kafka_server
          else
            # On master nodes, just install kafka command-line tools and libs but not
            # kafka-server.
            install_apt_get kafka ||
              err 'Unable to install kafka libraries on master!'
          fi
        else
          # Run installation on workers.
          install_and_configure_kafka_server
        fi
      }
      
      main
      

    2. kafka.sh 초기화 작업 스크립트를 Cloud Storage 버킷에 복사합니다. 이 스크립트는 Dataproc 클러스터에 Kafka를 설치합니다.

      1. Cloud Shell을 열고 다음 명령어를 실행합니다.

        gcloud storage cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
        

        다음을 바꿉니다.

        • REGION: kafka.sh는 Cloud Storage의 리전별로 태그가 지정된 공개 버킷에 저장됩니다. 지리적으로 가까운 Compute Engine 리전(예: us-central1)을 지정합니다.
        • BUCKET_NAME: Cloud Storage 버킷 이름입니다.

    Dataproc Kafka 클러스터 만들기

    1. Cloud Shell을 열고 다음 gcloud dataproc clusters create 명령어를 실행하여 Kafka 및 ZooKeeper 구성요소를 설치하는 Dataproc HA 클러스터를 만듭니다.

      gcloud dataproc clusters create KAFKA_CLUSTER \
          --project=PROJECT_ID \
          --region=REGION \
          --image-version=2.1-debian11 \
          --num-masters=3 \
          --enable-component-gateway \
          --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
      

      참고:

      • KAFKA_CLUSTER: 프로젝트 내에서 고유해야 하는 클러스터 이름입니다. 이름은 소문자로 시작해야 하며, 최대 51자(영문 기준)의 소문자, 숫자, 하이픈을 포함할 수 있습니다. 하이픈으로 끝나면 안 됩니다. 삭제된 클러스터의 이름을 재사용할 수 있습니다.
      • PROJECT_ID: 클러스터와 연결할 프로젝트입니다.
      • REGION: 클러스터가 있는 Compute Engine 리전(예: us-central1)입니다.
        • 선택사항인 --zone=ZONE 플래그를 추가하여 us-central1-a와 같이 지정된 리전 내의 영역을 지정할 수 있습니다. 영역을 지정하지 않으면 Dataproc 자동 영역 배치 기능에서 지정된 리전이 있는 영역을 선택합니다.
      • --image-version: 이 튜토리얼에서는 Dataproc 이미지 버전 2.1-debian11을 사용하는 것이 좋습니다. 참고: 각 이미지 버전에는 이 튜토리얼에 사용된 Hive 구성요소를 포함하여 사전 설치된 구성요소 집합이 포함되어 있습니다(지원되는 Dataproc 이미지 버전 참조).
      • --num-master: 3 마스터 노드는 HA 클러스터를 만듭니다. Kafka에 필요한 Zookeeper 구성요소는 HA 클러스터에 사전 설치되어 있습니다.
      • --enable-component-gateway: Dataproc 구성요소 게이트웨이를 사용 설정합니다.
      • BUCKET_NAME: /scripts/kafka.sh 초기화 스크립트가 포함된 Cloud Storage 버킷의 이름입니다(Kafka 설치 스크립트를 Cloud Storage에 복사 참조).

    Kafka custdata 주제 만들기

    Dataproc Kafka 클러스터에서 Kafka 주제를 만들려면 다음 안내를 따르세요.

    1. SSH 유틸리티를 사용하여 클러스터 마스터 VM의 터미널 창을 엽니다.

    2. Kafka custdata 주제를 만듭니다.

      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --create --topic custdata
      

      참고:

      • KAFKA_CLUSTER: Kafka 클러스터의 이름을 삽입합니다. -w-0:9092worker-0 노드의 포트 9092에서 실행 중인 Kafka 브로커를 나타냅니다.

      • custdata 주제를 만든 후 다음 명령어를 실행할 수 있습니다.

        # List all topics.
        /usr/lib/kafka/bin/kafka-topics.sh \
            --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
            --list
        
        # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
        # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
        # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

    Kafka custdata 주제에 콘텐츠 게시

    다음 스크립트는 kafka-console-producer.sh Kafka 도구를 사용하여 CSV 형식으로 가상의 고객 데이터를 생성합니다.

    1. 스크립트를 복사한 후 Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에 붙여넣습니다. <Return>키를 눌러 스크립트를 실행합니다.

      for i in {1..10000}; do \
      custname="cust name${i}"
      uuid=$(dbus-uuidgen)
      age=$((45 + $RANDOM % 45))
      amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
      message="${uuid}:${custname},${age},${amount}"
      echo ${message}
      done | /usr/lib/kafka/bin/kafka-console-producer.sh \
      --broker-list KAFKA_CLUSTER-w-0:9092 \
      --topic custdata \
      --property "parse.key=true" \
      --property "key.separator=:"
      

      참고:

      • KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
    2. 다음 Kafka 명령어를 실행하여 custdata 주제에 메시지 10,000개가 포함되어 있는지 확인합니다.

      /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list KAFKA_CLUSTER-w-0:9092 \
      --topic custdata
      

      참고:

      • KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.

      예상 출력:

      custdata:0:10000
      

    Cloud Storage에서 Hive 테이블 만들기

    스트리밍된 Kafka 주제 데이터를 수신할 Hive 테이블을 만듭니다. 다음 단계를 수행하여 Cloud Storage 버킷에서 cust_parquet(parquet) 및 cust_orc(ORC) Hive 테이블을 만듭니다.

    1. 다음 스크립트에서 BUCKET_NAME을 삽입한 다음 Kafka 클러스터 마스터 노드의 SSH 터미널에 스크립트를 복사하여 붙여넣은 후<Return>키를 눌러 ~/hivetables.hql(Hive 쿼리 언어) 스크립트를 만듭니다.

      다음 단계에서 ~/hivetables.hql 스크립트를 실행하여 Cloud Storage 버킷에 Parquet 및 ORC Hive 테이블을 만듭니다.

      cat > ~/hivetables.hql <<EOF
      drop table if exists cust_parquet;
      create external table if not exists cust_parquet
      (uuid string, custname string, age string, amount string)
      row format delimited fields terminated by ','
      stored as parquet
      location "gs://BUCKET_NAME/tables/cust_parquet";
      

      drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
    2. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 ~/hivetables.hql Hive 작업을 제출하여 Cloud Storage 버킷에 cust_parquet(parquet) 및 cust_orc(ORC) Hive 테이블을 만듭니다.

      gcloud dataproc jobs submit hive \
          --cluster=KAFKA_CLUSTER \
          --region=REGION \
          -f ~/hivetables.hql
      

      참고:

      • Hive 구성요소는 Dataproc Kafka 클러스터에 사전 설치되어 있습니다. 최근 출시된 2.1 이미지에 포함된 Hive 구성요소 버전 목록은 2.1.x 출시 버전을 참조하세요.
      • KAFKA_CLUSTER: Kafka 클러스터의 이름입니다.
      • REGION: Kafka 클러스터가 있는 리전입니다.

    Kafka custdata를 Hive 테이블로 스트리밍

    1. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 다음 명령어를 실행하여 kafka-python 라이브러리를 설치합니다. Kafka 주제 데이터를 Cloud Storage로 스트리밍하려면 Kafka 클라이언트가 필요합니다.
      pip install kafka-python
      
    2. BUCKET_NAME을 삽입하고 Kafka 클러스터 마스터 노드의 SSH 터미널에 다음 PySpark 코드를 복사하여 붙여넣은 후 <return> 키를 눌러 streamdata.py 파일을 만듭니다.

      스크립트는 Kafka custdata 주제를 구독한 후 데이터를 Cloud Storage의 Hive 테이블로 스트리밍합니다. parquet 또는 ORC일 수 있는 출력 형식은 매개변수로 스크립트에 전달됩니다.

      cat > streamdata.py <<EOF
      #!/bin/python
      
      import sys
      from pyspark.sql.functions import *
      from pyspark.sql.types import *
      from pyspark.sql import SparkSession
      from kafka import KafkaConsumer
      
      def getNameFn (data): return data.split(",")[0]
      def getAgeFn  (data): return data.split(",")[1]
      def getAmtFn  (data): return data.split(",")[2]
      
      def main(cluster, outputfmt):
          spark = SparkSession.builder.appName("APP").getOrCreate()
          spark.sparkContext.setLogLevel("WARN")
          Logger = spark._jvm.org.apache.log4j.Logger
          logger = Logger.getLogger(__name__)
      
          rows = spark.readStream.format("kafka") \
          .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
          .option("startingOffsets", "earliest")\
          .load()
      
          getNameUDF = udf(getNameFn, StringType())
          getAgeUDF  = udf(getAgeFn,  StringType())
          getAmtUDF  = udf(getAmtFn,  StringType())
      
          logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
      
          query = rows.select (col("key").cast("string").alias("uuid"),\
              getNameUDF      (col("value").cast("string")).alias("custname"),\
              getAgeUDF       (col("value").cast("string")).alias("age"),\
              getAmtUDF       (col("value").cast("string")).alias("amount"))
      
          writer = query.writeStream.format(outputfmt)\
                  .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                  .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
              .outputMode("append")\
              .start()
      
          writer.awaitTermination()
      
      if __name__=="__main__":
          if len(sys.argv) < 2:
              print ("Invalid number of arguments passed ", len(sys.argv))
              print ("Usage: ", sys.argv[0], " cluster  format")
              print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
              print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
          main(sys.argv[1], sys.argv[2])
      
      EOF
      
    3. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 spark-submit을 실행하여 Cloud Storage의 Hive 테이블로 데이터를 스트리밍합니다.

      1. KAFKA_CLUSTER 및 출력 FORMAT의 이름을 삽입한 후 다음 코드를 복사하여 Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에 붙여넣은 다음 <Return> 키를 눌러 코드를 실행하고 Kafka custdata 데이터를 Parquet 형식으로 Cloud Storage의 Hive 테이블로 스트리밍합니다.

        spark-submit --packages \
        org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
            --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
            --conf spark.driver.memory=4096m \
            --conf spark.executor.cores=2 \
            --conf spark.executor.instances=2 \
            --conf spark.executor.memory=6144m \
            streamdata.py KAFKA_CLUSTER FORMAT
            

        참고:

        • KAFKA_CLUSTER: Kafka 클러스터의 이름을 삽입합니다.
        • FORMAT: parquet 또는 orc를 출력 형식으로 지정합니다. 명령어를 연속적으로 실행하여 두 형식을 Hive 테이블로 스트리밍할 수 있습니다. 예를 들어 첫 번째 호출에서 parquet을 지정하여 Kafka custdata 주제를 Hive parquet 테이블로 스트리밍합니다. 그런 다음 두 번째 호출에서 orc 형식을 지정하여 custdata를 Hive ORC 테이블로 스트리밍합니다.
    4. 모든 custdata가 스트리밍되었음을 나타내는 SSH 터미널에서 표준 출력이 중지되면 SSH 터미널에서 <control-c>를 눌러 프로세스를 중지합니다.

    5. Cloud Storage에서 Hive 테이블을 나열합니다.

      gcloud storage ls gs://BUCKET_NAME/tables/* --recursive
      

      참고:

      • BUCKET_NAME: Hive 테이블이 포함된 Cloud Storage 버킷의 이름을 삽입합니다(Hive 테이블 만들기 참조).

    스트리밍 데이터 쿼리

    1. Kafka 클러스터의 마스터 노드에 있는 SSH 터미널에서 다음 hive 명령어를 실행하여 Cloud Storage의 Hive 테이블에서 스트리밍된 Kafka custdata 메시지를 계산합니다.

      hive -e "select count(1) from TABLE_NAME"
      

      참고:

      • TABLE_NAME: cust_parquet 또는 cust_orc를 Hive 테이블 이름으로 지정합니다.

      예상 출력 스니펫:

    ...
    Status: Running (Executing on YARN cluster with App id application_....)
    
    ----------------------------------------------------------------------------------------------
            VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
    ----------------------------------------------------------------------------------------------
    Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
    Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
    ----------------------------------------------------------------------------------------------
    VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
    ----------------------------------------------------------------------------------------------
    OK
    10000
    Time taken: 21.394 seconds, Fetched: 1 row(s)
    

    삭제

    프로젝트 삭제

    1. In the Google Cloud console, go to the Manage resources page.

      Go to Manage resources

    2. In the project list, select the project that you want to delete, and then click Delete.
    3. In the dialog, type the project ID, and then click Shut down to delete the project.

    리소스 삭제

    • In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets

    • Click the checkbox for the bucket that you want to delete.
    • To delete the bucket, click Delete, and then follow the instructions.
    • Kafka 클러스터를 삭제합니다.
      gcloud dataproc clusters delete KAFKA_CLUSTER \
          --region=${REGION}