部署 Apache Hive on Dataproc

Last reviewed 2023-05-08 UTC

本文档介绍了如何在使用 Apache Hive on Dataproc 中部署架构。

本文档适用于希望部署 Apache Hive on Dataproc 和 Cloud SQL 中的 Hive Metastore 的云架构师和数据工程师。

架构

在本部署指南中,您将在同一 Google Cloud 区域中部署所有计算和存储服务,以最大限度地减少网络延迟时间和网络传输费用。

下图展示了 Hive 查询的生命周期。

单区域架构的示意图。

在该图中,Hive 客户端提交查询,系统会处理、提取并返回该查询。处理操作在 Hive 服务器中进行。系统会从存储在 Cloud Storage 的区域级存储桶中的 Hive 仓库请求和返回数据。

目标

  • 在 Cloud SQL 上为 Hive Metastore 创建一个 MySQL 实例。
  • 在 Dataproc 上部署 Hive 服务器。
  • 在 Dataproc 集群实例上安装 Cloud SQL 代理
  • 将 Hive 数据上传到 Cloud Storage。
  • 在多个 Dataproc 集群上运行 Hive 查询。

费用

本部署使用 Google Cloud 的以下收费组件:

  • Dataproc
  • Cloud Storage
  • Cloud SQL

您可使用价格计算器根据您的预计用量来估算费用。

Google Cloud 新用户可能有资格申请免费试用

完成本部署后,您可以通过删除您创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  2. 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

初始化环境

  1. 启动 Cloud Shell 实例:

    转到 Cloud Shell

  2. 在 Cloud Shell 中,将默认 Compute Engine 区域设置为要创建 Dataproc 集群的区域。

    export PROJECT=$(gcloud info --format='value(config.project)')
    export REGION=REGION
    export ZONE=ZONE
    gcloud config set compute/zone ${ZONE}

    请替换以下内容:

    • REGION:要在其中创建集群的地区,例如 us-central1
    • ZONE:要在其中创建集群的区域,例如 us-central1-a
  3. 通过在 Cloud Shell 中运行以下命令来启用 Dataproc 和 Cloud SQL Admin API:

    gcloud services enable dataproc.googleapis.com sqladmin.googleapis.com

(可选)创建仓库存储桶

如果您没有 Cloud Storage 存储桶来存储 Hive 数据,请创建一个仓库存储桶(可在 Cloud Shell 中运行以下命令),并将 BUCKET_NAME 替换为唯一存储桶名称:

export WAREHOUSE_BUCKET=BUCKET_NAME
gsutil mb -l ${REGION} gs://${WAREHOUSE_BUCKET}

创建 Cloud SQL 实例

在本部分,您将创建一个新的 Cloud SQL 实例,该实例稍后将用于托管 Hive Metastore。

在 Cloud Shell 中,创建一个新的 Cloud SQL 实例:

gcloud sql instances create hive-metastore \
    --database-version="MYSQL_5_7" \
    --activation-policy=ALWAYS \
    --zone ${ZONE}

此命令可能需要几分钟时间才能完成。

创建 Dataproc 集群

创建第一个 Dataproc 集群,将 CLUSTER_NAME 替换为 hive-cluster 等名称:

gcloud dataproc clusters create CLUSTER_NAME \
    --scopes sql-admin \
    --region ${REGION} \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \
    --properties "hive:hive.metastore.warehouse.dir=gs://${WAREHOUSE_BUCKET}/datasets" \
    --metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore" \
    --metadata "enable-cloud-sql-proxy-on-workers=false"

注意

  • 请提供 sql-admin 访问权限范围,以允许集群实例访问 Cloud SQL Admin API。
  • 请将初始化操作放入存储在 Cloud Storage 存储桶中的脚本,并使用 --initialization-actions 标志引用该存储桶。如需了解详情,请参阅初始化操作 - 重要注意事项和指南
  • 请在 hive:hive.metastore.warehouse.dir 属性中提供 Hive 仓库存储桶的 URI。此操作会将 Hive 服务器配置为在正确位置读取和写入。 此属性必须至少包含一个目录(例如 gs://my-bucket/my-directory);如果将此属性设置为没有目录的存储桶名称(例如 gs://my-bucket),则 Hive 将无法正常运行。
  • 您可以指定 enable-cloud-sql-proxy-on-workers=false 以确保 Cloud SQL 代理仅在主节点上运行,这足以让 Hive Metastore 服务正常运行,并避免在 Cloud SQL 上产生不必要的负载。
  • 请提供 Dataproc 在所有集群实例上自动运行的 Cloud SQL 代理初始化操作。该操作执行以下任务:

    • 安装 Cloud SQL 代理。
    • 建立与 hive-metastore-instance 元数据参数中指定的 Cloud SQL 实例的安全连接。
    • 创建 hive 用户和 Hive Metastore 数据库。

    您可以在 GitHub 上看到 Cloud SQL 代理初始化操作的完整代码

  • 本部署使用具有公共 IP 地址的 Cloud SQL 实例。如果您使用仅具有专用 IP 地址的实例,则可以通过传递 --metadata "use-cloud-sql-private-ip=true" 参数强制代理使用专用 IP 地址。

创建 Hive 表

在本部分,您将向仓库存储桶上传样本数据集、创建新的 Hive 表,并对该数据集运行一些 HiveQL 查询。

  1. 将样本数据集复制到仓库存储桶:

    gsutil cp gs://hive-solution/part-00000.parquet \
    gs://${WAREHOUSE_BUCKET}/datasets/transactions/part-00000.parquet

    样本数据集以 Parquet 格式压缩,包含数千条虚拟银行交易记录,这些记录分为三列:日期、金额和交易类型。

  2. 为数据集创建外部 Hive 表:

    gcloud dataproc jobs submit hive \
        --cluster CLUSTER_NAME \
        --region ${REGION} \
        --execute "
          CREATE EXTERNAL TABLE transactions
          (SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
          STORED AS PARQUET
          LOCATION 'gs://${WAREHOUSE_BUCKET}/datasets/transactions';"

运行 Hive 查询

您可以使用 Dataproc 中的不同工具来运行 Hive 查询。在本部分,您将学习如何使用以下工具执行查询:

在每个部分中,您将运行一个示例查询。

使用 Dataproc Jobs API 查询 Hive

运行以下简单 HiveQL 查询,以验证 parquet 文件是否已正确链接到 Hive 表:

gcloud dataproc jobs submit hive \
    --cluster CLUSTER_NAME \
    --region ${REGION} \
    --execute "
      SELECT *
      FROM transactions
      LIMIT 10;"

输出包括以下内容:

+-----------------+--------------------+------------------+
| submissiondate  | transactionamount  | transactiontype  |
+-----------------+--------------------+------------------+
| 2017-12-03      | 1167.39            | debit            |
| 2017-09-23      | 2567.87            | debit            |
| 2017-12-22      | 1074.73            | credit           |
| 2018-01-21      | 5718.58            | debit            |
| 2017-10-21      | 333.26             | debit            |
| 2017-09-12      | 2439.62            | debit            |
| 2017-08-06      | 5885.08            | debit            |
| 2017-12-05      | 7353.92            | authorization    |
| 2017-09-12      | 4710.29            | authorization    |
| 2018-01-05      | 9115.27            | debit            |
+-----------------+--------------------+------------------+

使用 Beeline 查询 Hive

  1. 使用 Dataproc 的主实例打开 SSH 会话(CLUSTER_NAME-m):

    gcloud compute ssh CLUSTER_NAME-m
  2. 在主实例的命令提示符中,打开 Beeline 会话:

    beeline -u "jdbc:hive2://localhost:10000"

    注意:

    • 您还可以将主实例的名称引用为主机而不是 localhost

      beeline -u "jdbc:hive2://CLUSTER_NAME-m:10000"
    • 如果您使用具有 3 个主实例的高可用性模式,则必须改用以下命令:

      beeline -u "jdbc:hive2://CLUSTER_NAME-m-0:2181,CLUSTER_NAME-m-1:2181,CLUSTER_NAME-m-2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"
  3. 出现 Beeline 提示时,运行以下 HiveQL 查询:

    SELECT TransactionType, AVG(TransactionAmount) AS AverageAmount
    FROM transactions
    WHERE SubmissionDate = '2017-12-22'
    GROUP BY TransactionType;

    输出包括以下内容:

    +------------------+--------------------+
    | transactiontype  |   averageamount    |
    +------------------+--------------------+
    | authorization    | 4890.092525252529  |
    | credit           | 4863.769269565219  |
    | debit            | 4982.781458176331  |
    +------------------+--------------------+
  4. 关闭 Beeline 会话:

    !quit
  5. 关闭 SSH 连接:

    exit

使用 SparkSQL 查询 Hive

  1. 使用 Dataproc 的主实例打开 SSH 会话:

    gcloud compute ssh CLUSTER_NAME-m
  2. 在主实例的命令提示符中,打开一个新的 PySpark shell 会话:

    pyspark
  3. 出现 PySpark shell 提示时,键入以下 Python 代码:

    from pyspark.sql import HiveContext
    hc = HiveContext(sc)
    hc.sql("""
    SELECT SubmissionDate, AVG(TransactionAmount) as AvgDebit
    FROM transactions
    WHERE TransactionType = 'debit'
    GROUP BY SubmissionDate
    HAVING SubmissionDate >= '2017-10-01' AND SubmissionDate < '2017-10-06'
    ORDER BY SubmissionDate
    """).show()

    输出包括以下内容:

    +-----------------+--------------------+
    | submissiondate  |      avgdebit      |
    +-----------------+--------------------+
    | 2017-10-01      | 4963.114920399849  |
    | 2017-10-02      | 5021.493300510582  |
    | 2017-10-03      | 4982.382279569891  |
    | 2017-10-04      | 4873.302702503676  |
    | 2017-10-05      | 4967.696333583777  |
    +-----------------+--------------------+
  4. 关闭 PySpark 会话:

    exit()
  5. 关闭 SSH 连接:

    exit

检查 Hive Metastore

现在,您可以验证 Cloud SQL 中的 Hive Metastore 是否包含有关 transactions 表的信息。

  1. 在 Cloud Shell 中,在 Cloud SQL 实例上启动新的 MySQL 会话:

    gcloud sql connect hive-metastore --user=root

    当系统提示您输入 root 用户密码时,请勿键入任何内容,只需按 RETURN 键即可。为简化本部署,您没有为 root 用户设置任何密码。如需了解如何设置密码以进一步保护 Metastore 数据库,请参阅 Cloud SQL 文档。Cloud SQL 代理初始化操作还提供了一种通过加密保护密码的机制 - 如需了解详情,请参阅操作的代码库

  2. 在 MySQL 命令提示符中,将 hive_metastore 设置为会话其余部分的默认数据库:

    USE hive_metastore;
  3. 验证仓库存储桶的位置是否已记录在 Metastore 中:

    SELECT DB_LOCATION_URI FROM DBS;

    输出如下所示:

    +-------------------------------------+
    | DB_LOCATION_URI                     |
    +-------------------------------------+
    | gs://[WAREHOUSE_BUCKET]/datasets   |
    +-------------------------------------+
  4. 验证在 Metastore 中是否正确引用了表:

    SELECT TBL_NAME, TBL_TYPE FROM TBLS;

    输出如下所示:

    +--------------+----------------+
    | TBL_NAME     | TBL_TYPE       |
    +--------------+----------------+
    | transactions | EXTERNAL_TABLE |
    +--------------+----------------+
  5. 验证是否还正确引用了表的列:

    SELECT COLUMN_NAME, TYPE_NAME
    FROM COLUMNS_V2 c, TBLS t
    WHERE c.CD_ID = t.SD_ID AND t.TBL_NAME = 'transactions';

    输出如下所示:

    +-------------------+-----------+
    | COLUMN_NAME       | TYPE_NAME |
    +-------------------+-----------+
    | submissiondate    | date      |
    | transactionamount | double    |
    | transactiontype   | string    |
    +-------------------+-----------+
  6. 验证是否还正确引用了输入格式和位置:

    SELECT INPUT_FORMAT, LOCATION
    FROM SDS s, TBLS t
    WHERE s.SD_ID = t.SD_ID AND t.TBL_NAME = 'transactions';

    输出如下所示:

    +---------------------------------------------------------------+------------------------------------------------+
    | INPUT_FORMAT                                                  | LOCATION                                       |
    +---------------------------------------------------------------+------------------------------------------------+
    | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | gs://[WAREHOUSE_BUCKET]/datasets/transactions |
    +---------------------------------------------------------------+------------------------------------------------+
    
  7. 关闭 MySQL 会话:

    exit

创建另一个 Dataproc 集群

在本部分,您将创建另一个 Dataproc 集群,以验证是否可以跨多个集群共享 Hive 数据和 Hive Metastore。

  1. 创建新的 Dataproc 集群:

    gcloud dataproc clusters create other-CLUSTER_NAME \
        --scopes cloud-platform \
        --image-version 2.0 \
        --region ${REGION} \
        --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \
        --properties "hive:hive.metastore.warehouse.dir=gs://${WAREHOUSE_BUCKET}/datasets" \
        --metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore"\
        --metadata "enable-cloud-sql-proxy-on-workers=false"
  2. 验证新集群是否可以访问数据:

    gcloud dataproc jobs submit hive \
        --cluster other-CLUSTER_NAME \
        --region ${REGION} \
        --execute "
          SELECT TransactionType, COUNT(TransactionType) as Count
          FROM transactions
          WHERE SubmissionDate = '2017-08-22'
          GROUP BY TransactionType;"

    输出包括以下内容:

    +------------------+--------+
    | transactiontype  | count  |
    +------------------+--------+
    | authorization    | 696    |
    | credit           | 1722   |
    | debit            | 2599   |
    +------------------+--------+

恭喜,您已完成本部署中的步骤。

清理

以下部分介绍了如何避免 Google Cloud 项目以及您在本部署中使用的 Apache Hive 和 Dataproc 资源日后产生费用。

删除 Google Cloud 项目

为避免系统因此部署中使用的资源向您的 Google Cloud 账号收取费用,您可以删除 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

逐个删除资源

在 Cloud Shell 中运行以下命令以删除单个资源而不是删除整个项目:

gcloud dataproc clusters delete CLUSTER_NAME --region ${REGION} --quiet
gcloud dataproc clusters delete other-CLUSTER_NAME --region ${REGION} --quiet
gcloud sql instances delete hive-metastore --quiet
gsutil rm -r gs://${WAREHOUSE_BUCKET}/datasets

后续步骤

  • 试用 BigQuery,它是 Google 推出的可扩缩性强、费用低廉的无服务器企业数据仓库。
  • 查看有关如何将 Hadoop 工作负载迁移到 Google Cloud 的指南
  • 查看此初始化操作,详细了解如何在 Dataproc 上使用 Hive HCatalog
  • 了解如何配置 Cloud SQL 以实现高可用性并提高服务可靠性。
  • 如需查看更多参考架构、图表和最佳实践,请浏览云架构中心