将 Kafka 主题流式传输到 Hive


Apache Kafka 是一个开源分布式流式处理平台,可用于实时数据流水线和数据集成。它提供了高效且可伸缩的流式传输系统,可用于各种应用,包括:

  • 实时分析
  • 流处理
  • 日志汇总
  • 分布式消息传递
  • 事件流式传输

目标

  1. Dataproc 高可用性集群(本教程中称为“Dataproc Kafka 集群”)上安装 Kafka。

  2. 创建虚构的客户数据,然后将数据发布到 Kafka 主题。

  3. 在 Cloud Storage 中创建 Hive Parquet 和 ORC 表,以接收流式 Kafka 主题数据。

  4. 提交 PySpark 作业以订阅 Kafka 主题并以 Parquet 和 ORC 格式将 Kafka 主题流式传输到 Cloud Storage。

  5. 对流式传输的 Hive 表数据运行查询,以统计流式传输的 Kafka 消息。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

创建 Google Cloud 项目(如果尚未创建)。

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Dataproc, Compute Engine, and Cloud Storage API。

    启用 API

  5. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  6. 确保您的 Google Cloud 项目已启用结算功能

  7. 启用 Dataproc, Compute Engine, and Cloud Storage API。

    启用 API

  8. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

    进入“存储桶”页面

  9. 点击创建存储分区
  10. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
    • 指定存储分区的名称中,输入符合存储分区命名要求的名称。
    • 对于选择数据存储位置,执行以下操作:
      • 选择位置类型选项。
      • 选择位置选项。
    • 对于为数据选择一个默认存储类别,请选择一个存储类别
    • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
    • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
  11. 点击创建

教程步骤

执行以下步骤,创建一个 Dataproc Kafka 集群,以便以 Parquet OR ORC 格式将 Kafka 主题读入 Cloud Storage。

将 Kafka 安装脚本复制到 Cloud Storage

kafka.sh 初始化操作脚本会在 Dataproc 集群上安装 Kafka。

  1. 浏览代码。

    #!/bin/bash
    #    Copyright 2015 Google, Inc.
    #
    #    Licensed under the Apache License, Version 2.0 (the "License");
    #    you may not use this file except in compliance with the License.
    #    You may obtain a copy of the License at
    #
    #        http://www.apache.org/licenses/LICENSE-2.0
    #
    #    Unless required by applicable law or agreed to in writing, software
    #    distributed under the License is distributed on an "AS IS" BASIS,
    #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #    See the License for the specific language governing permissions and
    #    limitations under the License.
    #
    # This script installs Apache Kafka (http://kafka.apache.org) on a Google Cloud
    # Dataproc cluster.
    
    set -euxo pipefail
    
    readonly ZOOKEEPER_HOME=/usr/lib/zookeeper
    readonly KAFKA_HOME=/usr/lib/kafka
    readonly KAFKA_PROP_FILE='/etc/kafka/conf/server.properties'
    readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
    readonly RUN_ON_MASTER="$(/usr/share/google/get_metadata_value attributes/run-on-master || echo false)"
    readonly KAFKA_ENABLE_JMX="$(/usr/share/google/get_metadata_value attributes/kafka-enable-jmx || echo false)"
    readonly KAFKA_JMX_PORT="$(/usr/share/google/get_metadata_value attributes/kafka-jmx-port || echo 9999)"
    readonly INSTALL_KAFKA_PYTHON="$(/usr/share/google/get_metadata_value attributes/install-kafka-python || echo false)"
    
    # The first ZooKeeper server address, e.g., "cluster1-m-0:2181".
    ZOOKEEPER_ADDRESS=''
    # Integer broker ID of this node, e.g., 0
    BROKER_ID=''
    
    function retry_apt_command() {
      cmd="$1"
      for ((i = 0; i < 10; i++)); do
        if eval "$cmd"; then
          return 0
        fi
        sleep 5
      done
      return 1
    }
    
    function recv_keys() {
      retry_apt_command "apt-get install -y gnupg2 &&\
                         apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B7B3B788A8D3785C"
    }
    
    function update_apt_get() {
      retry_apt_command "apt-get update"
    }
    
    function install_apt_get() {
      pkgs="$@"
      retry_apt_command "apt-get install -y $pkgs"
    }
    
    function err() {
      echo "[$(date +'%Y-%m-%dT%H:%M:%S%z')]: $@" >&2
      return 1
    }
    
    # Returns the list of broker IDs registered in ZooKeeper, e.g., " 0, 2, 1,".
    function get_broker_list() {
      ${KAFKA_HOME}/bin/zookeeper-shell.sh "${ZOOKEEPER_ADDRESS}" \
        <<<"ls /brokers/ids" |
        grep '\[.*\]' |
        sed 's/\[/ /' |
        sed 's/\]/,/'
    }
    
    # Waits for zookeeper to be up or time out.
    function wait_for_zookeeper() {
      for i in {1..20}; do
        if "${ZOOKEEPER_HOME}/bin/zkCli.sh" -server "${ZOOKEEPER_ADDRESS}" ls /; then
          return 0
        else
          echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to connect to ZooKeeper ${ZOOKEEPER_ADDRESS}" >&2
      exit 1
    }
    
    # Wait until the current broker is registered or time out.
    function wait_for_kafka() {
      for i in {1..20}; do
        local broker_list=$(get_broker_list || true)
        if [[ "${broker_list}" == *" ${BROKER_ID},"* ]]; then
          return 0
        else
          echo "Kafka broker ${BROKER_ID} is not registered yet, retry ${i}..."
          sleep 5
        fi
      done
      echo "Failed to start Kafka broker ${BROKER_ID}." >&2
      exit 1
    }
    
    function install_and_configure_kafka_server() {
      # Find zookeeper list first, before attempting any installation.
      local zookeeper_client_port
      zookeeper_client_port=$(grep 'clientPort' /etc/zookeeper/conf/zoo.cfg |
        tail -n 1 |
        cut -d '=' -f 2)
    
      local zookeeper_list
      zookeeper_list=$(grep '^server\.' /etc/zookeeper/conf/zoo.cfg |
        cut -d '=' -f 2 |
        cut -d ':' -f 1 |
        sort |
        uniq |
        sed "s/$/:${zookeeper_client_port}/" |
        xargs echo |
        sed "s/ /,/g")
    
      if [[ -z "${zookeeper_list}" ]]; then
        # Didn't find zookeeper quorum in zoo.cfg, but possibly workers just didn't
        # bother to populate it. Check if YARN HA is configured.
        zookeeper_list=$(bdconfig get_property_value --configuration_file \
          /etc/hadoop/conf/yarn-site.xml \
          --name yarn.resourcemanager.zk-address 2>/dev/null)
      fi
    
      # If all attempts failed, error out.
      if [[ -z "${zookeeper_list}" ]]; then
        err 'Failed to find configured Zookeeper list; try "--num-masters=3" for HA'
      fi
    
      ZOOKEEPER_ADDRESS="${zookeeper_list%%,*}"
    
      # Install Kafka from Dataproc distro.
      install_apt_get kafka-server || dpkg -l kafka-server ||
        err 'Unable to install and find kafka-server.'
    
      mkdir -p /var/lib/kafka-logs
      chown kafka:kafka -R /var/lib/kafka-logs
    
      if [[ "${ROLE}" == "Master" ]]; then
        # For master nodes, broker ID starts from 10,000.
        if [[ "$(hostname)" == *-m ]]; then
          # non-HA
          BROKER_ID=10000
        else
          # HA
          BROKER_ID=$((10000 + $(hostname | sed 's/.*-m-\([0-9]*\)$/\1/g')))
        fi
      else
        # For worker nodes, broker ID is the worker ID.
        BROKER_ID=$(hostname | sed 's/.*-w-\([0-9]*\)$/\1/g')
      fi
      sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs=/var/lib/kafka-logs|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's|^\(zookeeper\.connect=\).*|\1'${zookeeper_list}'|' \
        "${KAFKA_PROP_FILE}"
      sed -i 's,^\(broker\.id=\).*,\1'${BROKER_ID}',' \
        "${KAFKA_PROP_FILE}"
      echo -e '\nreserved.broker.max.id=100000' >>"${KAFKA_PROP_FILE}"
      echo -e '\ndelete.topic.enable=true' >>"${KAFKA_PROP_FILE}"
    
      if [[ "${KAFKA_ENABLE_JMX}" == "true" ]]; then
        sed -i '/kafka-run-class.sh/i export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Djava.net.preferIPv4Stack=true"' /usr/lib/kafka/bin/kafka-server-start.sh
        sed -i "/kafka-run-class.sh/i export JMX_PORT=${KAFKA_JMX_PORT}" /usr/lib/kafka/bin/kafka-server-start.sh
      fi
    
      wait_for_zookeeper
    
      # Start Kafka.
      service kafka-server restart
    
      wait_for_kafka
    }
    
    function install_kafka_python_package() {
      KAFKA_PYTHON_PACKAGE="kafka-python==2.0.2"
      if [[ "${INSTALL_KAFKA_PYTHON}" != "true" ]]; then
        return
      fi
    
      if [[ "$(echo "${DATAPROC_IMAGE_VERSION} > 2.0" | bc)" -eq 1 ]]; then
        /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; /opt/conda/default/bin/pip install "${KAFKA_PYTHON_PACKAGE}"; }
      else
        OS=$(. /etc/os-release && echo "${ID}")
        if [[ "${OS}" == "rocky" ]]; then
          yum install -y python2-pip
        else
          apt-get install -y python-pip
        fi
        pip2 install "${KAFKA_PYTHON_PACKAGE}" || { sleep 10; pip2 install "${KAFKA_PYTHON_PACKAGE}"; } || { sleep 10; pip install "${KAFKA_PYTHON_PACKAGE}"; }
      fi
    }
    
    function remove_old_backports {
      # This script uses 'apt-get update' and is therefore potentially dependent on
      # backports repositories which have been archived.  In order to mitigate this
      # problem, we will remove any reference to backports repos older than oldstable
    
      # https://github.com/GoogleCloudDataproc/initialization-actions/issues/1157
      oldstable=$(curl -s https://deb.debian.org/debian/dists/oldstable/Release | awk '/^Codename/ {print $2}');
      stable=$(curl -s https://deb.debian.org/debian/dists/stable/Release | awk '/^Codename/ {print $2}');
    
      matched_files="$(grep -rsil '\-backports' /etc/apt/sources.list*)"
      if [[ -n "$matched_files" ]]; then
        for filename in "$matched_files"; do
          grep -e "$oldstable-backports" -e "$stable-backports" "$filename" || \
            sed -i -e 's/^.*-backports.*$//' "$filename"
        done
      fi
    }
    
    function main() {
      OS=$(. /etc/os-release && echo "${ID}")
      if [[ ${OS} == debian ]] && [[ $(echo "${DATAPROC_IMAGE_VERSION} <= 2.1" | bc -l) == 1 ]]; then
        remove_old_backports
      fi
      recv_keys || err 'Unable to receive keys.'
      update_apt_get || err 'Unable to update packages lists.'
      install_kafka_python_package
    
      # Only run the installation on workers; verify zookeeper on master(s).
      if [[ "${ROLE}" == 'Master' ]]; then
        service zookeeper-server status ||
          err 'Required zookeeper-server not running on master!'
        if [[ "${RUN_ON_MASTER}" == "true" ]]; then
          # Run installation on masters.
          install_and_configure_kafka_server
        else
          # On master nodes, just install kafka command-line tools and libs but not
          # kafka-server.
          install_apt_get kafka ||
            err 'Unable to install kafka libraries on master!'
        fi
      else
        # Run installation on workers.
        install_and_configure_kafka_server
      fi
    }
    
    main
    

  2. kafka.sh 初始化操作脚本复制到您的 Cloud Storage 存储桶。此脚本会在 Dataproc 集群上安装 Kafka。

    1. 打开 Cloud Shell,然后运行以下命令:

      gsutil cp gs://goog-dataproc-initialization-actions-REGION/kafka/kafka.sh gs://BUCKET_NAME/scripts/
      

      进行以下替换:

      • REGIONkafka.sh 存储在 Cloud Storage 中具有区域标记的公开存储分区中。指定地理位置相近的 Compute Engine 区域(例如:us-central1)。
      • BUCKET_NAME - Cloud Storage 存储桶的名称。

创建 Dataproc Kafka 集群

  1. 打开 Cloud Shell,然后运行以下 gcloud dataproc clusters create 命令,以创建安装 Kafka 和 ZooKeeper 组件的 Dataproc 高可用性集群集群:

    gcloud dataproc clusters create KAFKA_CLUSTER \
        --project=PROJECT_ID \
        --region=REGION \
        --image-version=2.1-debian11 \
        --num-masters=3 \
        --enable-component-gateway \
        --initialization-actions=gs://BUCKET_NAME/scripts/kafka.sh
    

    备注:

    • KAFKA_CLUSTER:集群名称,在项目中必须是唯一的。名称必须以小写字母开头,最多可以包含 51 个小写字母、数字和连字符。但不能以连字符结尾。可以重复使用已删除的集群的名称。
    • PROJECT_ID:要与此集群关联的项目。
    • REGION:集群所在的 Compute Engine 区域,例如 us-central1
      • 您可以添加可选的 --zone=ZONE 标志,以指定区域内的一个可用区,例如 us-central1-a。如果您未指定可用区,Dataproc 自动选择可用区功能会选择具有指定区域的可用区。
    • --image-version:本教程中建议使用 Dataproc 映像版本 2.1-debian11。注意:每个映像版本都包含一组预安装的组件,包括本教程中使用的 Hive 组件(请参阅支持的 Dataproc 映像版本)。
    • --num-master3 主节点会创建高可用性集群。Kafka 所需的 Zookeeper 组件预安装在高可用性集群上。
    • --enable-component-gateway:启用 Dataproc 组件网关
    • BUCKET_NAME:包含 /scripts/kafka.sh 初始化脚本的 Cloud Storage 存储桶的名称(请参阅将 Kafka 安装脚本复制到 Cloud Storage)。

创建 Kafka custdata 主题

如需在 Dataproc Kafka 集群上创建 Kafka 主题,请执行以下操作:

  1. 使用 SSH 实用程序在集群主服务器虚拟机上打开一个终端窗口。

  2. 创建一个 Kafka custdata 主题。

    /usr/lib/kafka/bin/kafka-topics.sh \
        --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
        --create --topic custdata
    

    备注:

    • KAFKA_CLUSTER:插入 Kafka 集群的名称。-w-0:9092 表示在 worker-0 节点的端口 9092 上运行的 Kafka 代理。

    • 创建 custdata 主题后,您可以运行以下命令:

      # List all topics.
      /usr/lib/kafka/bin/kafka-topics.sh \
          --bootstrap-server KAFKA_CLUSTER-w-0:9092 \
          --list
      
      # Consume then display topic data. /usr/lib/kafka/bin/kafka-console-consumer.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Count the number of messages in the topic. /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \     --broker-list KAFKA_CLUSTER-w-0:9092 \     --topic custdata
      # Delete topic. /usr/lib/kafka/bin/kafka-topics.sh \     --bootstrap-server KAFKA_CLUSTER-w-0:9092 \     --delete --topic custdata

将内容发布到 Kafka custdata 主题

以下脚本使用 kafka-console-producer.sh Kafka 工具生成 CSV 格式的虚构客户数据。

  1. 复制该脚本,然后将其粘贴到 Kafka 集群主节点上的 SSH 终端中。按 <return> 运行该脚本。

    for i in {1..10000}; do \
    custname="cust name${i}"
    uuid=$(dbus-uuidgen)
    age=$((45 + $RANDOM % 45))
    amount=$(echo "$(( $RANDOM % 99999 )).$(( $RANDOM % 99 ))")
    message="${uuid}:${custname},${age},${amount}"
    echo ${message}
    done | /usr/lib/kafka/bin/kafka-console-producer.sh \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata \
    --property "parse.key=true" \
    --property "key.separator=:"
    

    备注:

    • KAFKA_CLUSTER:Kafka 集群的名称。
  2. 运行以下 Kafka 命令以确认 custdata 主题包含 10000 条消息。

    /usr/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list KAFKA_CLUSTER-w-0:9092 \
    --topic custdata
    

    备注:

    • KAFKA_CLUSTER:Kafka 集群的名称。

    预期输出:

    custdata:0:10000
    

在 Cloud Storage 中创建 Hive 表

创建 Hive 表以接收流式 Kafka 主题数据。 执行以下步骤,在 Cloud Storage 存储桶中创建 cust_parquet (parquet) 和 cust_orc (ORC) Hive 表。

  1. 在以下脚本中插入 BUCKET_NAME,然后将该脚本复制并粘贴到 Kafka 集群主服务器节点上的 SSH 终端中,然后按 <return> 创建 ~/hivetables.hql(Hive 查询语言)脚本。

    在下一步中,您将运行 ~/hivetables.hql 脚本,以在 Cloud Storage 存储桶中创建 Parquet 和 ORC Hive 表。

    cat > ~/hivetables.hql <<EOF
    drop table if exists cust_parquet;
    create external table if not exists cust_parquet
    (uuid string, custname string, age string, amount string)
    row format delimited fields terminated by ','
    stored as parquet
    location "gs://BUCKET_NAME/tables/cust_parquet";
    

    drop table if exists cust_orc; create external table if not exists cust_orc (uuid string, custname string, age string, amount string) row format delimited fields terminated by ',' stored as orc location "gs://BUCKET_NAME/tables/cust_orc"; EOF
  2. 在 Kafka 集群主节点上的 SSH 终端中,提交 ~/hivetables.hql Hive 作业,以在 Cloud Storage 存储桶中创建 cust_parquet (Parquet) 表和 cust_orc (ORC) Hive 表。

    gcloud dataproc jobs submit hive \
        --cluster=KAFKA_CLUSTER \
        --region=REGION \
        -f ~/hivetables.hql
    

    备注:

    • Hive 组件已预安装在 Dataproc Kafka 集群上。如需查看最近发布的 2.1 映像中包含的 Hive 组件版本列表,请参阅 2.1.x 发布版本
    • KAFKA_CLUSTER:Kafka 集群的名称。
    • REGION:Kafka 集群所在的区域。

将 Kafka custdata 流式传输到 Hive 表

  1. 在 Kafka 集群主节点上的 SSH 终端中运行以下命令,以安装 kafka-python 库。如需将 Kafka 主题数据流式传输到 Cloud Storage,需要使用 Kafka 客户端。
    pip install kafka-python
    
  2. 插入 BUCKET_NAME,然后将以下 PySpark 代码复制并粘贴到 Kafka 集群主服务器节点上的 SSH 终端中,然后按 <return> 创建 streamdata.py 文件。

    该脚本会订阅 Kafka custdata 主题,然后将数据流式传输到 Cloud Storage 中的 Hive 表。输出格式(可以是 Parquet 或 ORC)将作为参数传入脚本中。

    cat > streamdata.py <<EOF
    #!/bin/python
    
    import sys
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from pyspark.sql import SparkSession
    from kafka import KafkaConsumer
    
    def getNameFn (data): return data.split(",")[0]
    def getAgeFn  (data): return data.split(",")[1]
    def getAmtFn  (data): return data.split(",")[2]
    
    def main(cluster, outputfmt):
        spark = SparkSession.builder.appName("APP").getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        Logger = spark._jvm.org.apache.log4j.Logger
        logger = Logger.getLogger(__name__)
    
        rows = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", cluster+"-w-0:9092").option("subscribe", "custdata") \
        .option("startingOffsets", "earliest")\
        .load()
    
        getNameUDF = udf(getNameFn, StringType())
        getAgeUDF  = udf(getAgeFn,  StringType())
        getAmtUDF  = udf(getAmtFn,  StringType())
    
        logger.warn("Params passed in are cluster name: " + cluster + "  output format(sink): " + outputfmt)
    
        query = rows.select (col("key").cast("string").alias("uuid"),\
            getNameUDF      (col("value").cast("string")).alias("custname"),\
            getAgeUDF       (col("value").cast("string")).alias("age"),\
            getAmtUDF       (col("value").cast("string")).alias("amount"))
    
        writer = query.writeStream.format(outputfmt)\
                .option("path","gs://BUCKET_NAME/tables/cust_"+outputfmt)\
                .option("checkpointLocation", "gs://BUCKET_NAME/chkpt/"+outputfmt+"wr") \
            .outputMode("append")\
            .start()
    
        writer.awaitTermination()
    
    if __name__=="__main__":
        if len(sys.argv) < 2:
            print ("Invalid number of arguments passed ", len(sys.argv))
            print ("Usage: ", sys.argv[0], " cluster  format")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  orc")
            print ("e.g.:  ", sys.argv[0], " <cluster_name>  parquet")
        main(sys.argv[1], sys.argv[2])
    
    EOF
    
  3. 在 Kafka 集群主节点上的 SSH 终端中,运行 spark-submit 以将数据流式传输到 Cloud Storage 中的 Hive 表。

    1. 插入 KAFKA_CLUSTER 的名称和输出 FORMAT,然后复制以下代码并将其粘贴到 Kafka 集群主节点上的 SSH 终端中,然后按 <return> 运行代码并将 Parquet 格式的 Kafka custdata 数据流式传输到 Cloud Storage 中的 Hive 表。

      spark-submit --packages \
      org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 \
          --conf spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE \
          --conf spark.driver.memory=4096m \
          --conf spark.executor.cores=2 \
          --conf spark.executor.instances=2 \
          --conf spark.executor.memory=6144m \
          streamdata.py KAFKA_CLUSTER FORMAT
          

      备注:

      • KAFKA_CLUSTER:插入 Kafka 集群的名称。
      • FORMAT:指定 parquetorc 作为输出格式。您可以连续运行该命令,以将这两种格式流式传输到 Hive 表:例如,在第一次调用时,指定 parquet 以将 Kafka custdata 主题流式传输到 Hive Parquet 表;然后在第二次调用时,指定 orc 格式,以将 custdata 流式传输到 Hive ORC 表。
  4. 在 SSH 终端中的标准输出停止(表示所有 custdata 都已进行流式传输)后,在 SSH 终端中按 <control-c> 以停止该进程。

  5. 列出 Cloud Storage 中的 Hive 表。

    gsutil ls -r gs://BUCKET_NAME/tables/*
    

    备注:

    • BUCKET_NAME:插入包含 Hive 表的 Cloud Storage 存储桶的名称(请参阅创建 Hive 表)。

查询流式数据

  1. 在 Kafka 集群主节点上的 SSH 终端中,运行以下 hive 命令,以计算 Cloud Storage 中 Hive 表中流式传输的 Kafka custdata 消息。

    hive -e "select count(1) from TABLE_NAME"
    

    备注:

    • TABLE_NAME:指定 cust_parquetcust_orc 作为 Hive 表名称。

    预期输出代码段:

...
Status: Running (Executing on YARN cluster with App id application_....)

----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED  
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Reducer 2 ...... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 9.89 s     
----------------------------------------------------------------------------------------------
OK
10000
Time taken: 21.394 seconds, Fetched: 1 row(s)

清理

删除项目

    删除 Google Cloud 项目:

    gcloud projects delete PROJECT_ID

删除资源

  • 删除存储分区:
    gcloud storage buckets delete BUCKET_NAME
  • 删除您的 Kafka 集群:
    gcloud dataproc clusters delete KAFKA_CLUSTER \
        --region=${REGION}