Dataproc で Presto を使用する

Presto は、複数の異種混合データソースに分散された大規模なデータセットをクエリするために設計された分散 SQL クエリエンジンです。Presto は、コネクタを介して Hive、MySQL、Kafka などのデータソースに対してクエリを実行できます。このチュートリアルでは、次の方法について説明します。

  • Dataproc クラスタに Presto サービスをインストールする
  • クラスタ上の Presto サービスと通信するローカルマシンにインストールされた Presto クライアントから一般公開データをクエリする
  • Presto Java JDBC ドライバを介してクラスタの Presto サービスと通信する Java アプリケーションからクエリを実行する

目標

  1. Presto がインストールされた Dataproc クラスタを作成します

  2. データを準備します。このチュートリアルでは、BigQuery で利用可能な Chicago Taxi Trips の一般公開データセットを使用します。

    1. BigQuery からデータを抽出する
    2. データを CSV ファイルとして Cloud Storage にデータを読み込む
    3. データを変換する
      1. データを Hive 外部テーブルとして公開し、Presto でデータをクエリ可能にする
      2. データを CSV 形式から Parquet 形式に変換してクエリを高速化する
  3. SSH トンネルまたは Presto JDBC ドライバを個別に使用して、クラスタで実行されている Presto コーディネーターに Presto CLI またはアプリケーション コードクエリを送信します

  4. ログを確認し、Presto Web UI から Presto サービスを監視します

料金

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。Google Cloud の新規ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

まだ作成していない場合は、このチュートリアルで使用するデータを格納する Google Cloud プロジェクトと Cloud Storage バケットを作成します。

  1. プロジェクトを設定する

    1. Google アカウントにログインします。

      Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

    2. GCP Console のプロジェクト セレクタのページで、GCP プロジェクトを選択または作成します。

      プロジェクト セレクタのページに移動

    3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。 プロジェクトに対して課金が有効になっていることを確認する方法を学習する

    4. Dataproc, Compute Engine, Cloud Storage, and BigQuery必要な API を有効にします。

      API を有効にする

    5. Cloud SDK をインストールして初期化します。

  2. プロジェクトに Cloud Storage バケットを作成します。作成したバケットには、このチュートリアルで使用するデータを格納します。

    1. GCP Console で、Cloud Storage ブラウザページに移動します。

      Cloud Storage ブラウザページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットを作成] ダイアログ内で、以下の属性を指定します。
    4. [作成] をクリックします。

Presto クライアントをインストールする

ローカルマシンに Presto クライアント ソフトウェアをインストールします。

  1. ローカルマシンに presto-cli-nnn-executable.jar をダウンロードし、名前を presto-cli に変更して実行可能にします。
    chmod a+x presto-cli
    

Dataproc クラスタを作成する

オプションのコンポーネントベータ版の更新: 初期化アクションを使用せずに、クラスタに Presto をインストールするよう Dataproc に指示できるようになりました(オプションのコンポーネント→Presto を参照)。これを行うには、以下の gcloud dataproc clusters create コマンドを gcloud beta dataproc clusters create に変更し、--initialization-actions フラグを --optional-components=PRESTO および --image-version=1.3-deb9 フラグに置き換えます。オプションのコンポーネントを使用する場合、Presto サーバーはポート 8060 で実行されます。

初期化アクションを使用して Dataproc クラスタを作成し、Presto サービス ソフトウェアをクラスタにインストールします。

  1. 環境変数を設定します。

    • PROJECT: プロジェクト ID
    • BUCKET_NAME: 始める前にで作成した Cloud Storage バケットの名前
    • REGION: このチュートリアルで使用するクラスタが作成されるリージョン。例:「us-west1」
    • ZONE: このチュートリアルで使用するクラスタが配置されるクラスタ リージョン内のゾーン。例:「us-west1-a」
    • WORKERS: このチュートリアルでは 3~5 人のワーカーをおすすめします
      export PROJECT=project-id
      export WORKERS=number
      export REGION=region
      export ZONE=zone
      export BUCKET_NAME=bucket-name
      
  2. デフォルト スクリプトまたはカスタム初期化スクリプトを使用してクラスタを作成します。

    デフォルト

    1. ローカルの INIT_ACTION 変数を設定して、新しい Dataproc クラスタに Presto サービスをインストールする Cloud Storage の初期化スクリプトの場所を指定します。
      export INIT_ACTION=gs://dataproc-initialization-actions/presto/presto.sh
      
    2. ローカルマシンで gcloud コマンドライン ツールを実行して、クラスタを作成します。
      gcloud dataproc clusters create presto-cluster \
          --project=${PROJECT} \
          --zone=${ZONE} \
          --num-workers=${WORKERS} \
          --scopes=cloud-platform \
          --initialization-actions=${INIT_ACTION}
      

      GitHub の dataproc-initialization-actions リポジトリに配置されている Presto 初期化アクションでは、クラスタのマスターノードの /presto-server-version に Presto がインストールされます。

    カスタム

    1. GitHub の Presto 初期化スクリプトを表示して、変更する初期化設定を確認します。初期化スクリプトの設定には、次のものが含まれます。
      • Presto バージョン(現在設定されているデフォルト バージョンについては、GitHub の Presto 初期化スクリプトを参照)
      • Presto HTTP ポート(デフォルト: 8080)
      • コネクタ(デフォルト: Apache Hive コネクタ)
    2. Presto 初期化アクションを Cloud Storage からローカルマシンにコピーします。
      gsutil cp gs://dataproc-initialization-actions/presto/presto.sh local-path
      
    3. 初期化スクリプトを変更して、Presto 構成をカスタマイズします。
    4. 始める前にで作成した Cloud Storage バケットに、ローカルマシンのカスタム Presto 初期化アクションをアップロードします。
      gsutil cp presto.sh gs://${BUCKET_NAME}/
      
    5. Cloud Storage のカスタム スクリプトの場所を指すように、ローカルの INIT_ACTION 変数を設定します。
      export INIT_ACTION=gs://${BUCKET_NAME}/presto.sh
      
    6. ローカルマシンで gcloud コマンドライン ツールを実行して、クラスタを作成します。
      gcloud dataproc clusters create presto-cluster \
          --project=${PROJECT} \
          --zone=${ZONE} \
          --num-workers=${WORKERS} \
          --scopes=cloud-platform \
          --initialization-actions=${INIT_ACTION}
      

  3. クラスタへの Presto インストールを監視またはトラブルシューティングするには、SSH でクラスタ VM に接続し、初期化アクションログを表示します。

データの準備

bigquery-public-datachicago_taxi_trips データセットを CSV ファイルとして Cloud Storage にエクスポートし、データを参照する Hive 外部テーブルを作成します。

  1. ローカルマシンで次のコマンドを実行して、始める前にで作成した Cloud Storage バケットに、BigQuery のタクシーデータをヘッダーのない CSV ファイルとしてインポートします。
    bq --location=us extract --destination_format=CSV \
         --field_delimiter=',' --print_header=false \
           "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
           gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
    
  2. Cloud Storage バケット内の CSV ファイルと Parquet ファイルに基づいて Hive 外部テーブルを作成します。
    1. Hive 外部テーブル chicago_taxi_trips_csv を作成します。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              ROW FORMAT DELIMITED
              FIELDS TERMINATED BY ','
              STORED AS TEXTFILE
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
      
    2. Hive 外部テーブルの作成を確認します。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
      
    3. 同じ列に別の Hive 外部テーブル chicago_taxi_trips_parquet を作成します。ただし、クエリ パフォーマンスを向上させるために、データは Parquet 形式で保存します。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --execute "
              CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                unique_key   STRING,
                taxi_id  STRING,
                trip_start_timestamp  TIMESTAMP,
                trip_end_timestamp  TIMESTAMP,
                trip_seconds  INT,
                trip_miles   FLOAT,
                pickup_census_tract  INT,
                dropoff_census_tract  INT,
                pickup_community_area  INT,
                dropoff_community_area  INT,
                fare  FLOAT,
                tips  FLOAT,
                tolls  FLOAT,
                extras  FLOAT,
                trip_total  FLOAT,
                payment_type  STRING,
                company  STRING,
                pickup_latitude  FLOAT,
                pickup_longitude  FLOAT,
                pickup_location  STRING,
                dropoff_latitude  FLOAT,
                dropoff_longitude  FLOAT,
                dropoff_location  STRING)
              STORED AS PARQUET
              location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
      
    4. Hive CSV テーブルから Hive Parquet テーブルにデータを読み込みます。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --execute "
              INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
              SELECT * FROM chicago_taxi_trips_csv;"
      
    5. データが正しく読み込まれたことを確認します。
      gcloud dataproc jobs submit hive \
          --cluster presto-cluster \
          --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
      

クエリを実行する

クエリは、Presto CLI またはアプリケーションからローカルで実行できます。

Presto CLI クエリ

このセクションでは、Presto CLI を使用して Hive Parquet タクシー データセットをクエリする方法を説明します。

1. SSH トンネルを作成します。次のコマンドを実行して、動的ポート転送を使用してローカルマシンに localhost:1080 の SOCKS プロキシを作成し、SSH トンネルを介して Dataproc クラスタのマスターノードに接続します。コマンドの実行中、ターミナルは停止します。


gcloud compute ssh presto-cluster-m \
    --project=${PROJECT} \
    --zone=${ZONE} \
    -- -D 1080 -N

  1. ローカルマシンの新しいターミナル ウィンドウで Presto CLI を実行し、クラスタのマスターノードのポート 8080 で実行されている Presto サーバーに接続します。

    ./presto-cli \
        --server presto-cluster-m:8080 \
        --socks-proxy localhost:1080 \
        --catalog hive \
        --schema default
    

  2. presto:default> プロンプトで、Presto で Hive テーブルの検出が可能なことを確認します。

    show tables;
    
           Table
    ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
    chicago_taxi_trips_csv
    chicago_taxi_trips_parquet
    (2 rows)
    

  3. presto:default> プロンプトからクエリを実行し、Parquet と CSV データのクエリのパフォーマンスを比較します。

    1. Parquet データのクエリ
      select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
      
      _col0
      ‐‐‐‐‐‐‐‐
      117957
      (1 row)
      Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
      1. CSV データのクエリ
        select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
        
        _col0
        ‐‐‐‐‐‐‐‐
        117957
        (1 row)
        Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

Java アプリケーションのクエリ

Presto Java JDBC ドライバを介して Java アプリケーションからクエリを実行する場合:

  1. Presto Java JDBC ドライバをダウンロードします

  2. Maven pom.xml にある presto-jdbc 依存関係を追加します。

    <dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-jdbc</artifactId>
    <version>0.206</version>
    </dependency>
    

Java コードのサンプル

package dataproc.codelab.presto;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class PrestoQuery {
  private static final String URL = "jdbc:presto://presto-cluster-m:8080/hive/default";
  private static final String SOCKS_PROXY = "localhost:1080";
  private static final String USER = "user";
  private static final String QUERY =
      "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";

  public static void main(String[] args) {
    try {
      Properties properties = new Properties();
      properties.setProperty("user", USER);
      properties.setProperty("socksProxy", SOCKS_PROXY);
      Connection connection = DriverManager.getConnection(URL, properties);
      try (Statement stmt = connection.createStatement()) {
        ResultSet rs = stmt.executeQuery(QUERY);
        while (rs.next()) {
          int count = rs.getInt("count");
          System.out.println("The number of long trips: " + count);
        }
      }
    } catch (SQLException e) {
      e.printStackTrace();
    }
  }
}

ロギングとモニタリング

ロギング

デフォルトでは、Presto 初期化アクションにより、クラスタのマスターノードとワーカーノードの /var/presto/data が Presto のデータ ディレクトリとして構成されます。Presto のログは /var/presto/data/var/log/ にあります。

ウェブ UI

デフォルトでは、Presto 初期化アクションにより、クラスタのマスターノードのポート 8080 で Presto サービスが実行されるように構成されます。マスターノードの Presto ウェブ インターフェースに接続するには:

  1. SOCKS プロキシを使用して、ローカルマシンからマスターノードのポート 1080 への SSH トンネルを作成します

  2. socks5://localhost:1080 プロキシを使用して Chrome ブラウザを起動します。

    Linux

    /usr/bin/google-chrome \
        --proxy-server="socks5://localhost:1080" \
        --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" \
        --user-data-dir=/tmp/presto-cluster-m
    

    macOS

    "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \
        --proxy-server="socks5://localhost:1080" \
        --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" \
        --user-data-dir=/tmp/presto-cluster-m
    

  3. ブラウザで、次の場所に移動します。

    http://presto-cluster-m:8080
    

モニタリング

Presto は、ランタイム テーブルを介してクラスタのランタイム情報を公開します。Presto セッション(presto:default> から)のプロンプトで、次のクエリを実行してランタイム テーブルのデータを表示します。

select * FROM system.runtime.nodes;

クリーンアップ

「Presto と Cloud Dataproc の併用」チュートリアルを完了したら、GCP で作成したリソースをクリーンアップすることで、割り当てを使い果たすことなく、それらのリソースに課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

プロジェクトの削除

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. GCP Console で [リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

クラスタの削除

  • クラスタを削除するには:
    gcloud dataproc clusters delete --project=${PROJECT} presto-cluster
    

バケットの削除

  • 始める前にで作成した Cloud Storage バケットを削除する(バケットに保存されているデータファイルを含む)には:
    gsutil -m rm -r gs://${BUCKET_NAME}