Dataproc で Trino を使用する


Trino(旧 Presto)は、1 つ以上の異種混合データソースに分散された大規模なデータセットをクエリするために設計された分散 SQL クエリエンジンです。Trino は、コネクタを介して Hive、MySQL、Kafka などのデータソースに対してクエリを実行できます。このチュートリアルでは、次の方法について説明します。

  • Dataproc クラスタに Trino サービスをインストールする
  • クラスタ上の Trino サービスと通信するローカルマシンにインストールされた Trino クライアントから一般公開データをクエリする
  • Trino Java JDBC ドライバを介してクラスタの Trino サービスと通信する Java アプリケーションからクエリを実行する

目標

  • Trino がインストールされた Dataproc クラスタを作成します
  • データを準備します。このチュートリアルでは、BigQuery で利用可能な Chicago Taxi Trips の一般公開データセットを使用します。
    1. BigQuery からデータを抽出する
    2. データを CSV ファイルとして Cloud Storage にデータを読み込む
    3. データを変換する
      1. データを Hive 外部テーブルとして公開し、Trino でデータをクエリ可能にする
      2. データを CSV 形式から Parquet 形式に変換してクエリを高速化する
  • SSH トンネルまたは Trino JDBC ドライバを個別に使用して、クラスタで実行されている Trino コーディネーターに Trino CLI またはアプリケーション コードクエリを送信します
  • ログを確認し、Trino Web UI から Trino サービスを監視します
  • 費用

    このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

    料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

    始める前に

    まだ作成していない場合は、このチュートリアルで使用するデータを格納する Google Cloud プロジェクトと Cloud Storage バケットを作成します。 1. プロジェクトの設定
    1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
    2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    3. Google Cloud プロジェクトで課金が有効になっていることを確認します

    4. Dataproc, Compute Engine, Cloud Storage, and BigQuery API を有効にします。

      API を有効にする

    5. Google Cloud CLI をインストールします。
    6. gcloud CLI を初期化するには:

      gcloud init
    7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    8. Google Cloud プロジェクトで課金が有効になっていることを確認します

    9. Dataproc, Compute Engine, Cloud Storage, and BigQuery API を有効にします。

      API を有効にする

    10. Google Cloud CLI をインストールします。
    11. gcloud CLI を初期化するには:

      gcloud init
    1. プロジェクトに Cloud Storage バケットを作成します。作成したバケットには、このチュートリアルで使用するデータを保存します。
    1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

      [バケット] ページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケットに名前を付ける] で、バケット名の要件を満たす名前を入力します。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、ストレージ クラスを選択します。
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。

    Dataproc クラスタを作成する

    optional-components フラグ(イメージ バージョン 2.1 以降で使用可能)を使用して Dataproc クラスタを作成して、クラスタに Trino オプション コンポーネントをインストールし、コンポーネント ゲートウェイを有効にして Google Cloud コンソールから Trino Web UI にアクセスできるようにする enable-component-gateway フラグを作成します。

    1. 環境変数を設定する
      • PROJECT: プロジェクト ID
      • BUCKET_NAME: 始める前にで作成した Cloud Storage バケットの名前
      • REGION: このチュートリアルで使用するクラスタが作成されるリージョン。例: 「us-west1」
      • WORKERS: このチュートリアルでは 3~5 人のワーカーをおすすめします
      export PROJECT=project-id
      export WORKERS=number
      export REGION=region
      export BUCKET_NAME=bucket-name
      
    2. ローカルマシンで Google Cloud CLI を実行して、クラスタを作成します。
      gcloud beta dataproc clusters create trino-cluster \
          --project=${PROJECT} \
          --region=${REGION} \
          --num-workers=${WORKERS} \
          --scopes=cloud-platform \
          --optional-components=TRINO \
          --image-version=2.1  \
          --enable-component-gateway
      

    データの準備

    bigquery-public-datachicago_taxi_trips データセットを CSV ファイルとして Cloud Storage にエクスポートし、データを参照する Hive 外部テーブルを作成します。

    1. ローカルマシンで次のコマンドを実行して、始める前にで作成した Cloud Storage バケットに、BigQuery のタクシーデータをヘッダーのない CSV ファイルとしてインポートします。
      bq --location=us extract --destination_format=CSV \
           --field_delimiter=',' --print_header=false \
             "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
             gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
      
    2. Cloud Storage バケット内の CSV ファイルと Parquet ファイルに基づいて Hive 外部テーブルを作成します。
      1. Hive 外部テーブル chicago_taxi_trips_csv を作成します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY ','
                STORED AS TEXTFILE
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
        
      2. Hive 外部テーブルの作成を確認します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
        
      3. 同じ列に別の Hive 外部テーブル chicago_taxi_trips_parquet を作成します。ただし、クエリ パフォーマンスを向上させるために、データは Parquet 形式で保存します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                STORED AS PARQUET
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
        
      4. Hive CSV テーブルから Hive Parquet テーブルにデータを読み込みます。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
                SELECT * FROM chicago_taxi_trips_csv;"
        
      5. データが正しく読み込まれたことを確認します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
        

    クエリを実行する

    クエリは、Trino CLI またはアプリケーションからローカルで実行できます。

    Trino CLI のクエリ

    このセクションでは、Trino CLI を使用して Hive Parquet タクシー データセットをクエリする方法を説明します。

    1. ローカルマシンで次のコマンドを実行して、クラスタのマスターノードに SSH 接続します。コマンドの実行中、ローカル ターミナルは応答しなくなります。
      gcloud compute ssh trino-cluster-m
      
    2. クラスタのマスターノードの SSH ターミナルウィンドウで、マスターノードで実行されている Trino サーバーに接続する Trino CLI を実行します。
      trino --catalog hive --schema default
      
    3. trino:default プロンプトで、Trino で Hive テーブルの検出が可能なことを確認します。
      show tables;
      
      Table
      ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
       chicago_taxi_trips_csv
       chicago_taxi_trips_parquet
      (2 rows)
      
    4. trino:default プロンプトからクエリを実行し、Parquet と CSV データのクエリのパフォーマンスを比較します。
      • Parquet データのクエリ
        select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
        
         _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
      • CSV データのクエリ
        select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
        
        _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

    Java アプリケーションのクエリ

    Trino Java JDBC ドライバを介して Java アプリケーションからクエリを実行する場合:Trino Java JDBC ドライバをダウンロードします。1. Maven pom.xml にある trino-jdbc 依存関係を追加します。

    <dependency>
      <groupId>io.trino</groupId>
      <artifactId>trino-jdbc</artifactId>
      <version>376</version>
    </dependency>
    
    Java コードのサンプル
    package dataproc.codelab.trino;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class TrinoQuery {
      private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
      private static final String SOCKS_PROXY = "localhost:1080";
      private static final String USER = "user";
      private static final String QUERY =
          "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
      public static void main(String[] args) {
        try {
          Properties properties = new Properties();
          properties.setProperty("user", USER);
          properties.setProperty("socksProxy", SOCKS_PROXY);
          Connection connection = DriverManager.getConnection(URL, properties);
          try (Statement stmt = connection.createStatement()) {
            ResultSet rs = stmt.executeQuery(QUERY);
            while (rs.next()) {
              int count = rs.getInt("count");
              System.out.println("The number of long trips: " + count);
            }
          }
        } catch (SQLException e) {
          e.printStackTrace();
        }
      }
    }
    

    ロギングとモニタリング

    ロギング

    Trino のログは、クラスタのマスターノードとワーカーノードの /var/log/trino/ にあります。

    ウェブ UI

    クラスタのマスターノード上で実行されている Trino ウェブ UI をローカルブラウザで開くには、コンポーネント ゲートウェイの URL を表示してアクセスするをご覧ください。

    モニタリング

    Trino は、ランタイム テーブルを介してクラスタのランタイム情報を公開します。Trino セッション(trino:default から)のプロンプトで、次のクエリを実行してランタイム テーブルのデータを表示します。

    select * FROM system.runtime.nodes;
    

    クリーンアップ

    チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

    プロジェクトの削除

    課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

    プロジェクトを削除するには:

    1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

      [リソースの管理] に移動

    2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
    3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

    クラスタの削除

    • クラスタを削除するには:
      gcloud dataproc clusters delete --project=${PROJECT} trino-cluster \
          --region=${REGION}
      

    バケットの削除

    • 始める前にで作成した Cloud Storage バケットを削除する(バケットに保存されているデータファイルを含む)には:
      gsutil -m rm -r gs://${BUCKET_NAME}