Menstreaming topik Kafka ke Hive


Apache Kafka adalah platform streaming terdistribusi open source untuk pipeline data real-time dan integrasi data. Layanan ini menyediakan sistem streaming yang efisien dan skalabel untuk digunakan di berbagai aplikasi, termasuk:

  • Analisis real-time
  • Stream processing
  • Agregasi log
  • Pesan terdistribusi
  • Streaming acara

Tujuan

  1. Instal Kafka di cluster HA Dataproc dengan ZooKeeper (dalam tutorial ini disebut sebagai "cluster Dataproc Kafka").

  2. Buat data pelanggan fiktif, lalu publikasikan data tersebut ke topik Kafka.

  3. Buat tabel parquet Hive dan ORC di Cloud Storage untuk menerima data topik Kafka yang di-streaming.

  4. Kirimkan tugas PySpark untuk berlangganan dan melakukan streaming topik Kafka ke Cloud Storage dalam format parquet dan ORC.

  5. Jalankan kueri pada data tabel Hive yang di-streaming untuk menghitung pesan Kafka yang di-streaming.

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

Jika belum melakukannya, buat project Google Cloud.

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Aktifkan API Dataproc, Compute Engine, and Cloud Storage.

    Mengaktifkan API

  5. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  6. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  7. Aktifkan API Dataproc, Compute Engine, and Cloud Storage.

    Mengaktifkan API

  8. Di Konsol Google Cloud, buka halaman Bucket Cloud Storage.

    Buka halaman Bucket

  9. Klik Buat bucket.
  10. Di halaman Buat bucket, masukkan informasi bucket Anda. Untuk melanjutkan ke langkah berikutnya, klik Lanjutkan.
    • Untuk Beri nama bucket, masukkan nama yang memenuhi persyaratan penamaan bucket.
    • Untuk Pilih tempat untuk menyimpan data, lakukan tindakan berikut:
      • Pilih opsi Jenis lokasi.
      • Pilih opsi Lokasi.
    • Untuk Memilih kelas penyimpanan default untuk data Anda, pilih kelas penyimpanan.
    • Untuk Memilih cara mengontrol akses ke objek, pilih opsi Kontrol akses.
    • Untuk Setelan lanjutan (opsional), tentukan metode enkripsi, kebijakan retensi, atau label bucket.
  11. Klik Buat.

Langkah-langkah tutorial

Lakukan langkah-langkah berikut untuk membuat cluster Dataproc Kafka guna membaca topik Kafka ke dalam Cloud Storage dalam format parquet ORC (ORC).

Menyalin skrip penginstalan Kafka ke Cloud Storage

Skrip tindakan inisialisasi kafka.sh menginstal Kafka di cluster Dataproc.

  1. Jelajahi kode.

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      retry_apt_command "apt-get install -y gnupg2 &&\
                         apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is the worker ID.
        BROKER_ID=$(hostname | sed 's/.*-w-\([0-9]*\)$/\1/g')
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function main() {
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. Salin skrip tindakan inisialisasi kafka.sh ke bucket Cloud Storage Anda. Skrip ini menginstal Kafka pada cluster Dataproc.

    1. Buka Cloud Shell, lalu jalankan perintah berikut:

      gsutil cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      Lakukan penggantian berikut:

      • REGION: kafka.sh disimpan di bucket publik yang diberi tag regional di Cloud Storage. Tentukan region Compute Engine yang dekat secara geografis, (contoh: us-central1).
      • BUCKET_NAME: Nama bucket Cloud Storage Anda.

Membuat cluster Dataproc Kafka

  1. Buka Cloud Shell, lalu jalankan perintah gcloud dataproc clusters create berikut untuk membuat cluster cluster HA Dataproc yang menginstal komponen Kafka dan ZooKeeper:

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    Catatan:

    • KAFKA_CLUSTER: Nama cluster, yang harus unik dalam project. Nama harus diawali dengan huruf kecil, dan dapat berisi hingga 51 huruf kecil, angka, dan tanda hubung. Bagian ini tidak boleh diakhiri dengan tanda hubung. Nama cluster yang dihapus dapat digunakan kembali.
    • PROJECT_ID: Project yang akan dikaitkan dengan cluster ini.
    • REGION: Region Compute Engine tempat cluster akan berada, seperti us-central1.
      • Anda dapat menambahkan flag --zone=ZONE opsional untuk menentukan zona dalam region yang ditentukan, seperti us-central1-a. Jika Anda tidak menentukan zona, fitur penempatan autozone Dataproc akan memilih zona dengan region yang ditentukan.
    • --image-version: Versi gambar Dataproc 2.1-debian11 direkomendasikan untuk tutorial ini. Catatan: Setiap versi image berisi kumpulan komponen bawaan, termasuk komponen Hive yang digunakan dalam tutorial ini (lihat Versi gambar Dataproc yang didukung).
    • --num-master: node master 3 membuat cluster Ha. Komponen Zookeeper, yang diperlukan oleh Kafka, sudah diinstal di cluster HA.
    • --enable-component-gateway: Mengaktifkan Gateway Komponen Dataproc.
    • BUCKET_NAME: Nama bucket Cloud Storage Anda yang berisi skrip inisialisasi /scripts/kafka.sh (lihat Menyalin skrip penginstalan Kafka ke Cloud Storage).

Membuat topik custdata Kafka

Untuk membuat topik Kafka di cluster Dataproc Kafka:

  1. Gunakan utilitas SSH untuk membuka jendela terminal pada VM master cluster.

  2. Buat topik custdata Kafka.

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    Catatan:

    • KAFKA_CLUSTER: Masukkan nama cluster Kafka Anda. -w-0:9092 menandakan broker Kafka yang berjalan di port 9092 pada node worker-0.

    • Anda dapat menjalankan perintah berikut setelah membuat topik custdata:

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

Memublikasikan konten ke topik custdata Kafka

Skrip berikut menggunakan alat Kafka kafka-console-producer.sh untuk menghasilkan data pelanggan fiktif dalam format CSV.

  1. Salin, lalu tempel skrip di terminal SSH pada node master cluster Kafka Anda. Tekan <return> untuk menjalankan skrip.

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    Catatan:

    • KAFKA_CLUSTER: Nama cluster Kafka Anda.
  2. Jalankan perintah Kafka berikut untuk mengonfirmasi bahwa topik custdata berisi 10.000 pesan.

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    Catatan:

    • KAFKA_CLUSTER: Nama cluster Kafka Anda.

    Output yang diharapkan:

    custdata:0:10000
    

Membuat tabel Hive di Cloud Storage

Buat tabel Hive untuk menerima data topik Kafka yang di-streaming. Lakukan langkah-langkah berikut untuk membuat tabel Hive cust_parquet (parquet) dan cust_orc (ORC) di bucket Cloud Storage Anda.

  1. Masukkan BUCKET_NAME Anda di skrip berikut, lalu salin dan tempel skrip ke terminal SSH pada node master cluster Kafka, lalu tekan <return> untuk membuat skrip ~/hivetables.hql (Hive Query Language).

    Anda akan menjalankan skrip ~/hivetables.hql pada langkah berikutnya untuk membuat tabel parquet dan Hive ORC di bucket Cloud Storage.

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. Di terminal SSH pada node master cluster Kafka, kirim tugas Hive ~/hivetables.hql untuk membuat tabel Hive cust_parquet (parquet) dan cust_orc (ORC) dalam bucket Cloud Storage.

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    Catatan:

    • Komponen Hive sudah diinstal sebelumnya di cluster Dataproc Kafka. Lihat versi rilis 2.1.x untuk daftar versi komponen Hive yang disertakan dalam image 2.1 yang baru dirilis.
    • KAFKA_CLUSTER: Nama cluster Kafka Anda.
    • REGION: Region tempat cluster Kafka Anda berada.

Streaming Kafka custdata ke tabel Hive

  1. Jalankan perintah berikut di terminal SSH pada node master cluster Kafka untuk menginstal library kafka-python. Klien Kafka diperlukan untuk melakukan streaming data topik Kafka ke Cloud Storage.
    pip install kafka-python
    
  2. Masukkan BUCKET_NAME, lalu salin dan tempel kode PySpark berikut ke terminal SSH pada node master cluster Kafka, lalu tekan <return> untuk membuat file streamdata.py.

    Skrip ini berlangganan topik custdata Kafka, lalu mengalirkan data ke tabel Hive Anda di Cloud Storage. Format output, yang dapat berupa parquet atau ORC, diteruskan ke skrip sebagai parameter.

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. Di terminal SSH pada node master cluster Kafka, jalankan spark-submit untuk mengalirkan data ke tabel Hive Anda di Cloud Storage.

    1. Masukkan nama KAFKA_CLUSTER dan output FORMAT Anda, lalu salin dan tempel kode berikut ke terminal SSH di node master cluster Kafka Anda, lalu tekan <return> untuk menjalankan kode dan melakukan streaming data custdata Kafka dalam format parquet ke tabel Hive Anda di Cloud Storage.

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      Catatan:

      • KAFKA_CLUSTER: Masukkan nama cluster Kafka Anda.
      • FORMAT: Menentukan parquet atau orc sebagai format output. Anda dapat menjalankan perintah secara berurutan untuk melakukan streaming kedua format ke tabel Hive: misalnya, pada pemanggilan pertama, tentukan parquet untuk mengalirkan topik custdata Kafka ke tabel parquet Hive; lalu, pada pemanggilan kedua, tentukan format orc untuk mengalirkan custdata ke tabel Hive ORC.
  4. Setelah output standar berhenti di terminal SSH, yang menandakan bahwa semua custdata telah di-streaming, tekan <control-c> di terminal SSH untuk menghentikan proses.

  5. Mencantumkan tabel Hive di Cloud Storage.

    gsutil ls -r gs://BUCKET_NAME/tables/*
    

    Catatan:

    • BUCKET_NAME: Masukkan nama bucket Cloud Storage yang berisi tabel Hive Anda (lihat Membuat tabel Hive).

Kueri data yang di-streaming

  1. Di terminal SSH pada node master cluster Kafka, jalankan perintah hive berikut untuk menghitung pesan custdata Kafka yang di-streaming dalam tabel Hive di Cloud Storage.

    hive -e "select count(1) from TABLE_NAME"
    

    Catatan:

    • TABLE_NAME: Tentukan cust_parquet atau cust_orc sebagai nama tabel Hive.

    Cuplikan output yang diharapkan:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)

Pembersihan

Menghapus project

    Menghapus project Google Cloud:

    gcloud projects delete PROJECT_ID

Menghapus resource

  • Hapus bucket:
    gcloud storage buckets delete BUCKET_NAME
  • Hapus cluster Kafka Anda:
    gcloud dataproc clusters delete KAFKA_CLUSTER \
        --region=${REGION}