使用 Dataproc 上的 Apache Hive

Last reviewed 2022-04-06 UTC

本教程介绍如何通过将 Hive 数据存储在 Cloud Storage 中以及将 Hive Metastore 托管在 Cloud SQL 上的 MySQL 数据库中,在 Dataproc 上高效灵活地使用 Apache Hive。计算资源和存储资源之间的这种分离提供了以下优势:

  • 灵活性和敏捷性:您可以为特定 Hive 工作负载定制集群配置,以及根据需要独立扩缩每个集群。
  • 节省费用:您可以在需要运行 Hive 作业时启动临时集群,然后在作业完成时将其删除。 作业所需的资源仅在使用时才有效,因此您只需为您使用的资源付费。 您还可以使用抢占式虚拟机进行非关键数据处理,或者以较低的总费用创建大型集群。

Hive 是一种基于 Apache Hadoop 的主流开源数据仓库系统。 Hive 提供了一种类似于 SQL 的查询语言,名为 HiveQL,用于分析大型结构化数据集。Hive Metastore 保存有关 Hive 表的元数据,例如其架构和位置。在 MySQL 通常用作 Hive Metastore 的后端的情况下,Cloud SQL 可让您轻松在 Google Cloud 上设置、维护、代管和管理关系型数据库。

目标

  • 在 Cloud SQL 上为 Hive Metastore 创建一个 MySQL 实例。
  • 在 Dataproc 上部署 Hive 服务器。
  • 在 Dataproc 集群实例上安装 Cloud SQL 代理
  • 将 Hive 数据上传到 Cloud Storage。
  • 在多个 Dataproc 集群上运行 Hive 查询。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Dataproc
  • Cloud Storage
  • Cloud SQL

您可使用价格计算器根据您的预计用量来估算费用。

Google Cloud 新用户可能有资格申请免费试用

准备工作

创建新项目

  1. 在 Google Cloud Console 中,转到项目选择器页面。

    转到“项目选择器”

  2. 选择或创建 Google Cloud 项目。

启用结算功能

初始化环境

  1. 启动 Cloud Shell 实例:

    转到 Cloud Shell

  2. 在 Cloud Shell 中,将默认 Compute Engine 区域设置为要创建 Dataproc 集群的区域。

    export PROJECT=$(gcloud info --format='value(config.project)')
    export REGION=REGION
    export ZONE=ZONE
    gcloud config set compute/zone ${ZONE}

    请替换以下内容:

    • REGION:要在其中创建集群的地区,例如 us-central1
    • ZONE:要在其中创建集群的区域,例如 us-central1-a
  3. 通过在 Cloud Shell 中运行以下命令来启用 Dataproc 和 Cloud SQL Admin API:

    gcloud services enable dataproc.googleapis.com sqladmin.googleapis.com

参考架构

为简单起见,在本教程中,您将在同一 Google Cloud 区域中部署所有计算和存储服务,以最大限度地减少网络延迟时间和网络传输费用。图 1 展示了本教程的架构。

单区域架构的示意图。
图 1:单区域 Hive 架构的示例

使用此架构,Hive 查询的生命周期遵循以下步骤:

  1. Hive 客户端将查询提交到在临时 Dataproc 集群中运行的 Hive 服务器。
  2. 服务器处理查询并从 Metastore 服务请求元数据。
  3. Metastore 服务通过 Cloud SQL 代理从 Cloud SQL 获取 Hive 元数据。
  4. 服务器从 Cloud Storage 的区域存储桶中的 Hive 仓库加载数据。
  5. 服务器将结果返回给客户端。

多区域架构的注意事项

本教程重点介绍单区域架构。但是,如果需要在不同地理区域中运行 Hive 服务器,则可以考虑多区域架构。在这种情况下,您应创建专用于托管 Metastore 服务并且与 Cloud SQL 实例位于同一区域的单独 Dataproc 集群。Metastore 服务有时会向 MySQL 数据库发送大量请求,因此请务必在地理上靠近 MySQL 数据库的位置保存 Metastore 服务,以便最大限度地降低对性能的影响。 相比之下,Hive 服务器向 Metastore 服务发送的请求通常要少得多。 因此,虽然延迟时间有所增加,但 Hive 服务器和 Metastore 服务位于不同区域可能更易于接受。

Metastore 服务只能在 Dataproc 主节点上运行,不能在工作器节点上运行。Dataproc 会在标准集群和高可用性集群中强制执行至少 2 个工作器节点。为避免在未使用的工作器节点上浪费资源,您可以为 Metastore 服务创建单节点集群。要实现高可用性,您可以创建多个单节点集群。

由于只有 Metastore 服务集群才需要直接连接到 Cloud SQL 实例,因此只需在 Metastore 服务集群上安装 Cloud SQL 代理。然后,通过将 hive.metastore.uris 属性设置为 URI 的英文逗号分隔列表,Hive 服务器会指向 Metastore 服务集群。例如:

thrift://metastore1:9083,thrift://metastore2:9083

如果需要从位于多个位置的 Hive 服务器访问 Hive 数据,您还可以考虑使用双区域或多区域存储桶。如何在各种存储桶位置类型之间进行选择取决于您的具体用例。您必须在延迟时间、可用性和费用之间找到平衡。

图 2 展示了一个多区域架构的示例。

多区域 Hive 架构的示意图。
图 2:多区域 Hive 架构的示例

您可以看到,多区域场景稍微复杂一些。为保持简洁,本教程使用单区域架构。

(可选)创建仓库存储桶

如果您没有 Cloud Storage 存储桶来存储 Hive 数据,请创建一个仓库存储桶(可在 Cloud Shell 中运行以下命令),并将 BUCKET_NAME 替换为唯一存储桶名称:

export WAREHOUSE_BUCKET=BUCKET_NAME
gsutil mb -l ${REGION} gs://${WAREHOUSE_BUCKET}

创建 Cloud SQL 实例

在本部分,您将创建一个新的 Cloud SQL 实例,该实例稍后将用于托管 Hive Metastore。

在 Cloud Shell 中,创建一个新的 Cloud SQL 实例:

gcloud sql instances create hive-metastore \
    --database-version="MYSQL_5_7" \
    --activation-policy=ALWAYS \
    --zone ${ZONE}

此命令可能需要几分钟时间才能完成。

创建 Dataproc 集群

创建第一个 Dataproc 集群,将 CLUSTER_NAME 替换为 hive-cluster 等名称:

gcloud dataproc clusters create CLUSTER_NAME \
    --scopes sql-admin \
    --region ${REGION} \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \
    --properties "hive:hive.metastore.warehouse.dir=gs://${WAREHOUSE_BUCKET}/datasets" \
    --metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore" \
    --metadata "enable-cloud-sql-proxy-on-workers=false"

注意

  • 请提供 sql-admin 访问权限范围,以允许集群实例访问 Cloud SQL Admin API。
  • 请将初始化操作放入存储在 Cloud Storage 存储桶中的脚本,并使用 --initialization-actions 标志引用该存储桶。如需了解详情,请参阅初始化操作 - 重要注意事项和指南
  • 请在 hive:hive.metastore.warehouse.dir 属性中提供 Hive 仓库存储桶的 URI。此操作会将 Hive 服务器配置为在正确位置读取和写入。 此属性必须至少包含一个目录(例如 gs://my-bucket/my-directory);如果将此属性设置为没有目录的存储桶名称(例如 gs://my-bucket),则 Hive 将无法正常运行。
  • 您可以指定 enable-cloud-sql-proxy-on-workers=false 以确保 Cloud SQL 代理仅在主节点上运行,这足以让 Hive Metastore 服务正常运行,并避免在 Cloud SQL 上产生不必要的负载。
  • 请提供 Dataproc 在所有集群实例上自动运行的 Cloud SQL 代理初始化操作。该操作执行以下任务:

    • 安装 Cloud SQL 代理。
    • 建立与 hive-metastore-instance 元数据参数中指定的 Cloud SQL 实例的安全连接。
    • 创建 hive 用户和 Hive Metastore 数据库。

    您可以在 GitHub 上看到 Cloud SQL 代理初始化操作的完整代码

  • 为简单起见,本教程仅使用一个主实例。要提高生产工作负载的弹性,应考虑使用 Dataproc 的高可用性模式创建包含三个主实例的集群。

  • 本教程使用具有公共 IP 地址的 Cloud SQL 实例。如果您使用仅具有专用 IP 地址的实例,则可以通过传递 --metadata "use-cloud-sql-private-ip=true" 参数强制代理使用专用 IP 地址。

创建 Hive 表

在本部分,您将向仓库存储桶上传样本数据集、创建新的 Hive 表,并对该数据集运行一些 HiveQL 查询。

  1. 将样本数据集复制到仓库存储桶:

    gsutil cp gs://hive-solution/part-00000.parquet \
    gs://${WAREHOUSE_BUCKET}/datasets/transactions/part-00000.parquet

    样本数据集以 Parquet 格式压缩,包含数千条虚拟银行交易记录,这些记录分为三列:日期、金额和交易类型。

  2. 为数据集创建外部 Hive 表:

    gcloud dataproc jobs submit hive \
        --cluster CLUSTER_NAME \
        --region ${REGION} \
        --execute "
          CREATE EXTERNAL TABLE transactions
          (SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
          STORED AS PARQUET
          LOCATION 'gs://${WAREHOUSE_BUCKET}/datasets/transactions';"

运行 Hive 查询

您可以使用 Dataproc 中的不同工具来运行 Hive 查询。在本部分,您将学习如何使用以下工具执行查询:

在每个部分中,您将运行一个示例查询。

使用 Dataproc Jobs API 查询 Hive

运行以下简单 HiveQL 查询,以验证 parquet 文件是否已正确链接到 Hive 表:

gcloud dataproc jobs submit hive \
    --cluster CLUSTER_NAME \
    --region ${REGION} \
    --execute "
      SELECT *
      FROM transactions
      LIMIT 10;"

输出包括以下内容:

+-----------------+--------------------+------------------+
| submissiondate  | transactionamount  | transactiontype  |
+-----------------+--------------------+------------------+
| 2017-12-03      | 1167.39            | debit            |
| 2017-09-23      | 2567.87            | debit            |
| 2017-12-22      | 1074.73            | credit           |
| 2018-01-21      | 5718.58            | debit            |
| 2017-10-21      | 333.26             | debit            |
| 2017-09-12      | 2439.62            | debit            |
| 2017-08-06      | 5885.08            | debit            |
| 2017-12-05      | 7353.92            | authorization    |
| 2017-09-12      | 4710.29            | authorization    |
| 2018-01-05      | 9115.27            | debit            |
+-----------------+--------------------+------------------+

使用 Beeline 查询 Hive

  1. 使用 Dataproc 的主实例打开 SSH 会话(CLUSTER_NAME-m):

    gcloud compute ssh CLUSTER_NAME-m
  2. 在主实例的命令提示符中,打开 Beeline 会话:

    beeline -u "jdbc:hive2://localhost:10000"

    注意:

    • 您还可以将主实例的名称引用为主机而不是 localhost

      beeline -u "jdbc:hive2://CLUSTER_NAME-m:10000"
    • 如果您使用具有 3 个主实例的高可用性模式,则必须改用以下命令:

      beeline -u "jdbc:hive2://CLUSTER_NAME-m-0:2181,CLUSTER_NAME-m-1:2181,CLUSTER_NAME-m-2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"
  3. 出现 Beeline 提示时,运行以下 HiveQL 查询:

    SELECT TransactionType, AVG(TransactionAmount) AS AverageAmount
    FROM transactions
    WHERE SubmissionDate = '2017-12-22'
    GROUP BY TransactionType;

    输出包括以下内容:

    +------------------+--------------------+
    | transactiontype  |   averageamount    |
    +------------------+--------------------+
    | authorization    | 4890.092525252529  |
    | credit           | 4863.769269565219  |
    | debit            | 4982.781458176331  |
    +------------------+--------------------+
  4. 关闭 Beeline 会话:

    !quit
  5. 关闭 SSH 连接:

    exit

使用 SparkSQL 查询 Hive

  1. 使用 Dataproc 的主实例打开 SSH 会话:

    gcloud compute ssh CLUSTER_NAME-m
  2. 在主实例的命令提示符中,打开一个新的 PySpark shell 会话:

    pyspark
  3. 出现 PySpark shell 提示时,键入以下 Python 代码:

    from pyspark.sql import HiveContext
    hc = HiveContext(sc)
    hc.sql("""
    SELECT SubmissionDate, AVG(TransactionAmount) as AvgDebit
    FROM transactions
    WHERE TransactionType = 'debit'
    GROUP BY SubmissionDate
    HAVING SubmissionDate >= '2017-10-01' AND SubmissionDate < '2017-10-06'
    ORDER BY SubmissionDate
    """).show()

    输出包括以下内容:

    +-----------------+--------------------+
    | submissiondate  |      avgdebit      |
    +-----------------+--------------------+
    | 2017-10-01      | 4963.114920399849  |
    | 2017-10-02      | 5021.493300510582  |
    | 2017-10-03      | 4982.382279569891  |
    | 2017-10-04      | 4873.302702503676  |
    | 2017-10-05      | 4967.696333583777  |
    +-----------------+--------------------+
  4. 关闭 PySpark 会话:

    exit()
  5. 关闭 SSH 连接:

    exit

检查 Hive Metastore

现在,您可以验证 Cloud SQL 中的 Hive Metastore 是否包含有关 transactions 表的信息。

  1. 在 Cloud Shell 中,在 Cloud SQL 实例上启动新的 MySQL 会话:

    gcloud sql connect hive-metastore --user=root

    当系统提示您输入 root 用户密码时,请勿键入任何内容,只需按 RETURN 键即可。为简化本教程,您没有为 root 用户设置任何密码。如需了解如何设置密码以进一步保护 Metastore 数据库,请参阅 Cloud SQL 文档。Cloud SQL 代理初始化操作还提供了一种通过加密保护密码的机制 - 如需了解详情,请参阅操作的代码库

  2. 在 MySQL 命令提示符中,将 hive_metastore 设置为会话其余部分的默认数据库:

    USE hive_metastore;
  3. 验证仓库存储桶的位置是否已记录在 Metastore 中:

    SELECT DB_LOCATION_URI FROM DBS;

    输出如下所示:

    +-------------------------------------+
    | DB_LOCATION_URI                     |
    +-------------------------------------+
    | gs://[WAREHOUSE_BUCKET]/datasets   |
    +-------------------------------------+
  4. 验证在 Metastore 中是否正确引用了表:

    SELECT TBL_NAME, TBL_TYPE FROM TBLS;

    输出如下所示:

    +--------------+----------------+
    | TBL_NAME     | TBL_TYPE       |
    +--------------+----------------+
    | transactions | EXTERNAL_TABLE |
    +--------------+----------------+
  5. 验证是否还正确引用了表的列:

    SELECT COLUMN_NAME, TYPE_NAME
    FROM COLUMNS_V2 c, TBLS t
    WHERE c.CD_ID = t.SD_ID AND t.TBL_NAME = 'transactions';

    输出如下所示:

    +-------------------+-----------+
    | COLUMN_NAME       | TYPE_NAME |
    +-------------------+-----------+
    | submissiondate    | date      |
    | transactionamount | double    |
    | transactiontype   | string    |
    +-------------------+-----------+
  6. 验证是否还正确引用了输入格式和位置:

    SELECT INPUT_FORMAT, LOCATION
    FROM SDS s, TBLS t
    WHERE s.SD_ID = t.SD_ID AND t.TBL_NAME = 'transactions';

    输出如下所示:

    +---------------------------------------------------------------+------------------------------------------------+
    | INPUT_FORMAT                                                  | LOCATION                                       |
    +---------------------------------------------------------------+------------------------------------------------+
    | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | gs://[WAREHOUSE_BUCKET]/datasets/transactions |
    +---------------------------------------------------------------+------------------------------------------------+
    
  7. 关闭 MySQL 会话:

    exit

创建另一个 Dataproc 集群

在本部分,您将创建另一个 Dataproc 集群,以验证是否可以跨多个集群共享 Hive 数据和 Hive Metastore。

  1. 创建新的 Dataproc 集群:

    gcloud dataproc clusters create other-CLUSTER_NAME \
        --scopes cloud-platform \
        --image-version 2.0 \
        --region ${REGION} \
        --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \
        --properties "hive:hive.metastore.warehouse.dir=gs://${WAREHOUSE_BUCKET}/datasets" \
        --metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore"\
        --metadata "enable-cloud-sql-proxy-on-workers=false"
  2. 验证新集群是否可以访问数据:

    gcloud dataproc jobs submit hive \
        --cluster other-CLUSTER_NAME \
        --region ${REGION} \
        --execute "
          SELECT TransactionType, COUNT(TransactionType) as Count
          FROM transactions
          WHERE SubmissionDate = '2017-08-22'
          GROUP BY TransactionType;"

    输出包括以下内容:

    +------------------+--------+
    | transactiontype  | count  |
    +------------------+--------+
    | authorization    | 696    |
    | credit           | 1722   |
    | debit            | 2599   |
    +------------------+--------+

恭喜,您已学完教程!

清除数据

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请执行以下操作:

  • 清理您创建的所有资源,避免日后再为这些资源支付费用。要避免产生费用,最简单的方法是删除您为本教程创建的项目。
  • 或者,您可以删除单个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

在 Cloud Shell 中运行以下命令以删除单个资源而不是删除整个项目:

gcloud dataproc clusters delete CLUSTER_NAME --region ${REGION} --quiet
gcloud dataproc clusters delete other-CLUSTER_NAME --region ${REGION} --quiet
gcloud sql instances delete hive-metastore --quiet
gsutil rm -r gs://${WAREHOUSE_BUCKET}/datasets

后续步骤

  • 试用 BigQuery,它是 Google 推出的可扩缩性强、费用低廉的无服务器企业数据仓库。
  • 查看有关如何将 Hadoop 工作负载迁移到 Google Cloud 的指南
  • 查看此初始化操作,详细了解如何在 Dataproc 上使用 Hive HCatalog
  • 了解如何配置 Cloud SQL 以实现高可用性并提高服务可靠性。
  • 探索有关 Google Cloud 的参考架构、图表和最佳实践。查看我们的 Cloud Architecture Center