将 Presto 与 Dataproc 搭配使用

Presto 是一个分布式 SQL 查询引擎,可用于查询分布在一个或多个异构数据源上的大型数据集。Presto 可以通过连接器查询 Hive、MySQL、Kafka 和其他数据源。本教程将介绍如何执行以下操作:

  • 在 Dataproc 集群上安装 Presto 服务
  • 从安装在与集群上的 Presto 服务通信的本地机器上的 Presto 客户端查询公共数据
  • 从通过 Presto Java JDBC 驱动程序与集群上的 Presto 服务进行通信的 Java 应用运行查询。

目标

  1. 创建 Dataproc 集群并安装 Presto

  2. 准备数据。本教程使用 BigQuery 中提供的 Chicago Taxi Trips(芝加哥出租车行程)公共数据集。

    1. 从 BigQuery 中提取数据
    2. 以 CSV 文件的形式将数据加载到 Cloud Storage
    3. 转换数据:
      1. 将数据公开为 Hive 外部表,让 Presto 可以查询这些数据
      2. 将 CSV 格式的数据转换为 Parquet 格式,以加快查询速度
  3. 向集群上运行的 Presto 协调器发送 Presto CLI 查询(通过 SSH 隧道发送)或应用代码查询(通过 Presto JDBC 驱动程序发送)

  4. 通过 Presto 网页界面检查日志并监控 Presto 服务

费用

本教程使用 Google Cloud 的如下计费组件:

请使用价格计算器根据您的预计使用情况来估算费用。Google Cloud 新用户可能有资格申请免费试用

准备工作

如果您尚未执行此操作,请创建 Google Cloud 项目和 Cloud Storage 存储分区以保留此教程中使用的数据。

  1. 设置项目

    1. 登录您的 Google 帐号。

      如果您还没有 Google 帐号,请注册一个新帐号

    2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

      转到项目选择器页面

    3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

    4. 启用 Dataproc, Compute Engine, Cloud Storage, and BigQuery API。

      启用 API

    5. 安装并初始化 Cloud SDK

  2. 在项目中创建 Cloud Storage 存储分区以保留此教程中使用的数据。

    1. 在 Cloud Console 中,转到 Cloud Storage 浏览器页面。

      转到“Cloud Storage 浏览器”页面

    2. 点击创建存储分区
    3. 创建存储分区对话框中,指定以下属性:
    4. 点击创建

创建 Dataproc 集群

使用 optional-components 标志(在映像版本 1.3 及更高版本上提供)创建 Dataproc 集群以在该集群上安装 Presto 可选组件,并使用 enable-component-gateway 标志来启用组件网关,让您能够从 Cloud Console 访问 Presto 网页界面。

  1. 设置环境变量:
    • PROJECT:您的项目 ID
    • BUCKET_NAME:您在准备工作中创建的 Cloud Storage 存储分区的名称
    • REGION:将在其中创建此教程所使用集群的区域,例如“us-west1”
    • WORKERS:此教程推荐配备 3 到 5 个工作器
    export PROJECT=project-id
    export WORKERS=number
    export REGION=region
    export BUCKET_NAME=bucket-name
    
  2. 在本地机器上运行 gcloud 命令行工具以创建集群。
    gcloud beta dataproc clusters create presto-cluster \
        --project=${PROJECT} \
        --region=${REGION} \
        --num-workers=${WORKERS} \
        --scopes=cloud-platform \
        --optional-components=PRESTO \
        --image-version=1.3  \
        --enable-component-gateway
    

准备数据

bigquery-public-data chicago_taxi_trips 数据集作为 CSV 文件导出到 Cloud Storage,然后创建 Hive 外部表以引用数据。

  1. 在本地机器上,运行以下命令,以 CSV 文件(不含标题)形式将出租车数据从 BigQuery 导入您在准备工作中创建的 Cloud Storage 存储分区。
    bq --location=us extract --destination_format=CSV \
         --field_delimiter=',' --print_header=false \
           "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
           gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
    
  2. 创建由您的 Cloud Storage 存储分区中的 CSV 和 Parquet 文件支持的 Hive 外部表。
    1. 创建 Hive 外部表 chicago_taxi_trips_csv
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              ROW FORMAT DELIMITED
              FIELDS TERMINATED BY ','
              STORED AS TEXTFILE
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
      
    2. 验证 Hive 外部表的创建过程。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
      
    3. 创建另一个具有相同列的 Hive 外部表 chicago_taxi_trips_parquet,但以 Parquet 格式存储数据可提高查询性能。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --region=${REGION} \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              STORED AS PARQUET
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
      
    4. 将 Hive CSV 表中的数据加载到 Hive Parquet 表中。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --region=${REGION} \
          --execute "
              INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
              SELECT * FROM chicago_taxi_trips_csv;"
      
    5. 验证数据已正确加载。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --region=${REGION} \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
      

运行查询

您可以在本地通过 Presto CLI 或通过应用运行查询。

Presto CLI 查询

此部分演示如何使用 Presto CLI 查询 Hive Parquet 出租车数据集。

  1. 在本地机器上运行以下命令,以通过 SSH 连接到集群的主节点。在执行命令期间,本地终端将停止响应。
    gcloud compute ssh presto-cluster-m
    
  2. 在集群主节点上的 SSH 终端窗口中,运行 Presto CLI 以便连接到在该主节点上运行的 Presto 服务器。
    presto --catalog hive --schema default
    
  3. presto:default 提示符下,验证 Presto 可以找到 Hive 表。
    show tables;
    
    Table
    ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
     chicago_taxi_trips_csv
     chicago_taxi_trips_parquet
    (2 rows)
    
  4. presto:default 提示符运行查询,并比较查询 Parquet 与 CSV 数据的性能。
    • Parquet 数据查询
      select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
      
       _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
    • CSV 数据查询
      select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
      
      _col0
      ‐‐‐‐‐‐‐‐
       117957
      (1 row)
      Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

Java 应用查询

如需通过 Presto Java JDBC 驱动程序从 Java 应用运行查询,请执行以下操作:

  1. 下载 Presto Java JDBC 驱动程序

  2. Maven pom.xml 中添加 presto-jdbc 依赖项。

    <dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-jdbc</artifactId>
    <version>0.206</version>
    </dependency>
    

Java 代码示例

package dataproc.codelab.presto;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class PrestoQuery {
  private static final String URL = "jdbc:presto://presto-cluster-m:8080/hive/default";
  private static final String SOCKS_PROXY = "localhost:1080";
  private static final String USER = "user";
  private static final String QUERY =
      "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";

  public static void main(String[] args) {
    try {
      Properties properties = new Properties();
      properties.setProperty("user", USER);
      properties.setProperty("socksProxy", SOCKS_PROXY);
      Connection connection = DriverManager.getConnection(URL, properties);
      try (Statement stmt = connection.createStatement()) {
        ResultSet rs = stmt.executeQuery(QUERY);
        while (rs.next()) {
          int count = rs.getInt("count");
          System.out.println("The number of long trips: " + count);
        }
      }
    } catch (SQLException e) {
      e.printStackTrace();
    }
  }
}

日志记录和监控

日志

Presto 日志位于集群主节点和工作器节点上的 /var/log/presto/

网页界面

请参阅查看和访问组件网关网址,从而在本地浏览器中打开集群的主节点上运行的 Presto 网页界面。

监控

Presto 通过运行时表公开集群运行时信息。 在 Presto 会话(从 presto:default)提示符中,运行以下查询以查看运行时表数据:

select * FROM system.runtime.nodes;

清理

完成"将 Presto 与 Cloud Dataproc 结合使用"教程后,您可以清理在 Google Cloud 上创建的资源,以避免占用配额,日后产生费用。以下部分介绍如何删除或关闭这些资源。

删除项目

为了避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除集群

  • 如需删除您的集群,请输入以下命令:
    gcloud dataproc clusters delete --project=${PROJECT} presto-cluster \
        --region=${REGION}
    

删除存储分区

  • 如需删除您在准备工作中创建的 Cloud Storage 存储分区(包括存储在存储分区中的数据文件),请输入以下命令:
    gsutil -m rm -r gs://${BUCKET_NAME}