Dataproc で Trino を使用する


Trino(旧 Presto)は、1 つ以上の異種混合データソースに分散された大規模なデータセットをクエリするために設計された分散 SQL クエリエンジンです。Trino は、コネクタを介して Hive、MySQL、Kafka などのデータソースに対してクエリを実行できます。このチュートリアルでは、次の方法について説明します。

  • Dataproc クラスタに Trino サービスをインストールする
  • クラスタ上の Trino サービスと通信するローカルマシンにインストールされた Trino クライアントから一般公開データをクエリする
  • Trino Java JDBC ドライバを介してクラスタの Trino サービスと通信する Java アプリケーションからクエリを実行する

目標

  • Trino がインストールされた Dataproc クラスタを作成します
  • データを準備します。このチュートリアルでは、BigQuery で利用可能な Chicago Taxi Trips の一般公開データセットを使用します。
    1. BigQuery からデータを抽出する
    2. データを CSV ファイルとして Cloud Storage にデータを読み込む
    3. データを変換する
      1. データを Hive 外部テーブルとして公開し、Trino でデータをクエリ可能にする
      2. データを CSV 形式から Parquet 形式に変換してクエリを高速化する
  • SSH トンネルまたは Trino JDBC ドライバを個別に使用して、クラスタで実行されている Trino コーディネーターに Trino CLI またはアプリケーション コードクエリを送信します
  • ログを確認し、Trino Web UI から Trino サービスを監視します
  • 費用

    このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

    料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

    始める前に

    まだ作成していない場合は、このチュートリアルで使用するデータを格納する Google Cloud プロジェクトと Cloud Storage バケットを作成します。 1. プロジェクトの設定
    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    3. Google Cloud プロジェクトで課金が有効になっていることを確認します

    4. Dataproc, Compute Engine, Cloud Storage, and BigQuery API を有効にします。

      API を有効にする

    5. Google Cloud CLI をインストールします。
    6. gcloud CLI を初期化するには:

      gcloud init
    7. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

      プロジェクト セレクタに移動

    8. Google Cloud プロジェクトで課金が有効になっていることを確認します

    9. Dataproc, Compute Engine, Cloud Storage, and BigQuery API を有効にします。

      API を有効にする

    10. Google Cloud CLI をインストールします。
    11. gcloud CLI を初期化するには:

      gcloud init
    1. プロジェクトに Cloud Storage バケットを作成します。作成したバケットには、このチュートリアルで使用するデータを保存します。
    1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

      [バケット] ページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケットに名前を付ける] で、バケット名の要件を満たす名前を入力します。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、ストレージ クラスを選択します。
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。

    Dataproc クラスタを作成する

    optional-components フラグ(イメージ バージョン 2.1 以降で使用可能)を使用して Dataproc クラスタを作成して、クラスタに Trino オプション コンポーネントをインストールし、コンポーネント ゲートウェイを有効にして Google Cloud コンソールから Trino Web UI にアクセスできるようにする enable-component-gateway フラグを作成します。

    1. 環境変数を設定する
      • PROJECT: プロジェクト ID
      • BUCKET_NAME: 始める前にで作成した Cloud Storage バケットの名前
      • REGION: このチュートリアルで使用するクラスタが作成されるリージョン。例: 「us-west1」
      • WORKERS: このチュートリアルでは 3~5 人のワーカーをおすすめします
      export PROJECT=project-id
      export WORKERS=number
      export REGION=region
      export BUCKET_NAME=bucket-name
      
    2. ローカルマシンで Google Cloud CLI を実行して、クラスタを作成します。
      gcloud beta dataproc clusters create trino-cluster \
          --project=${PROJECT} \
          --region=${REGION} \
          --num-workers=${WORKERS} \
          --scopes=cloud-platform \
          --optional-components=TRINO \
          --image-version=2.1  \
          --enable-component-gateway
      

    データの準備

    bigquery-public-datachicago_taxi_trips データセットを CSV ファイルとして Cloud Storage にエクスポートし、データを参照する Hive 外部テーブルを作成します。

    1. ローカルマシンで次のコマンドを実行して、始める前にで作成した Cloud Storage バケットに、BigQuery のタクシーデータをヘッダーのない CSV ファイルとしてインポートします。
      bq --location=us extract --destination_format=CSV \
           --field_delimiter=',' --print_header=false \
             "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
             gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
      
    2. Cloud Storage バケット内の CSV ファイルと Parquet ファイルに基づいて Hive 外部テーブルを作成します。
      1. Hive 外部テーブル chicago_taxi_trips_csv を作成します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY ','
                STORED AS TEXTFILE
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
        
      2. Hive 外部テーブルの作成を確認します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
        
      3. 同じ列に別の Hive 外部テーブル chicago_taxi_trips_parquet を作成します。ただし、クエリ パフォーマンスを向上させるために、データは Parquet 形式で保存します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                STORED AS PARQUET
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
        
      4. Hive CSV テーブルから Hive Parquet テーブルにデータを読み込みます。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
                SELECT * FROM chicago_taxi_trips_csv;"
        
      5. データが正しく読み込まれたことを確認します。
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
        

    クエリを実行する

    クエリは、Trino CLI またはアプリケーションからローカルで実行できます。

    Trino CLI のクエリ

    このセクションでは、Trino CLI を使用して Hive Parquet タクシー データセットをクエリする方法を説明します。

    1. ローカルマシンで次のコマンドを実行して、クラスタのマスターノードに SSH 接続します。コマンドの実行中、ローカル ターミナルは応答しなくなります。
      gcloud compute ssh trino-cluster-m
      
    2. クラスタのマスターノードの SSH ターミナルウィンドウで、マスターノードで実行されている Trino サーバーに接続する Trino CLI を実行します。
      trino --catalog hive --schema default
      
    3. trino:default プロンプトで、Trino で Hive テーブルの検出が可能なことを確認します。
      show tables;
      
      Table
      ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
       chicago_taxi_trips_csv
       chicago_taxi_trips_parquet
      (2 rows)
      
    4. trino:default プロンプトからクエリを実行し、Parquet と CSV データのクエリのパフォーマンスを比較します。
      • Parquet データのクエリ
        select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
        
         _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
      • CSV データのクエリ
        select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
        
        _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

    Java アプリケーションのクエリ

    Trino Java JDBC ドライバを介して Java アプリケーションからクエリを実行する場合:Trino Java JDBC ドライバをダウンロードします。1. Maven pom.xml にある trino-jdbc 依存関係を追加します。

    <dependency>
      <groupId>io.trino</groupId>
      <artifactId>trino-jdbc</artifactId>
      <version>376</version>
    </dependency>
    
    Java コードのサンプル
    package dataproc.codelab.trino;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class TrinoQuery {
      private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
      private static final String SOCKS_PROXY = "localhost:1080";
      private static final String USER = "user";
      private static final String QUERY =
          "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
      public static void main(String[] args) {
        try {
          Properties properties = new Properties();
          properties.setProperty("user", USER);
          properties.setProperty("socksProxy", SOCKS_PROXY);
          Connection connection = DriverManager.getConnection(URL, properties);
          try (Statement stmt = connection.createStatement()) {
            ResultSet rs = stmt.executeQuery(QUERY);
            while (rs.next()) {
              int count = rs.getInt("count");
              System.out.println("The number of long trips: " + count);
            }
          }
        } catch (SQLException e) {
          e.printStackTrace();
        }
      }
    }
    

    ロギングとモニタリング

    ロギング

    Trino のログは、クラスタのマスターノードとワーカーノードの /var/log/trino/ にあります。

    ウェブ UI

    クラスタのマスターノード上で実行されている Trino ウェブ UI をローカルブラウザで開くには、コンポーネント ゲートウェイの URL を表示してアクセスするをご覧ください。

    モニタリング

    Trino は、ランタイム テーブルを介してクラスタのランタイム情報を公開します。Trino セッション(trino:default から)のプロンプトで、次のクエリを実行してランタイム テーブルのデータを表示します。

    select * FROM system.runtime.nodes;
    

    クリーンアップ

    チュートリアルが終了したら、作成したリソースをクリーンアップして、割り当ての使用を停止し、課金されないようにできます。次のセクションで、リソースを削除または無効にする方法を説明します。

    プロジェクトの削除

    課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

    プロジェクトを削除するには:

    1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

      [リソースの管理] に移動

    2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
    3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

    クラスタの削除

    • クラスタを削除するには:
      gcloud dataproc clusters delete --project=${PROJECT} trino-cluster \
          --region=${REGION}
      

    バケットの削除

    • 始める前にで作成した Cloud Storage バケットを削除する(バケットに保存されているデータファイルを含む)には:
      gsutil -m rm -r gs://${BUCKET_NAME}