Use Trino with Dataproc


Trino (formerly Presto) is a distributed SQL query engine designed to query large data sets distributed over one or more heterogeneous data sources. Trino can query Hive, MySQL, Kafka and other data sources through connectors. This tutorial shows you how to:

  • Install the Trino service on a Dataproc cluster
  • Query public data from a Trino client installed on your local machine that communicates with a Trino service on your cluster
  • Run queries from a Java application that communicates with the Trino service on your cluster through the Trino Java JDBC driver.

Objectives

  • Create a Dataproc cluster with Trino installed
  • Prepare data. This tutorial uses the Chicago Taxi Trips public dataset, available in BigQuery.
    1. Extract the data from BigQuery
    2. Load the data into Cloud Storage as CSV files
    3. Transform data:
      1. Expose the data as a Hive external table to make the data queryable by Trino
      2. Convert the data from CSV format into Parquet format to make querying faster
  • Send Trino CLI or application code queries using an SSH tunnel or Trino JDBC driver, respectively, to the Trino coordinator running on the cluster
  • Check logs and monitor the Trino service through the Trino Web UI
  • Costs

    In this document, you use the following billable components of Google Cloud:

    To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

    Before you begin

    If you haven't already done so, create a Google Cloud project and a Cloud Storage bucket to hold the data used in this tutorial. 1. Setting up your project
    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Make sure that billing is enabled for your Google Cloud project.

    4. Enable the Dataproc, Compute Engine, Cloud Storage, and BigQuery APIs.

      Enable the APIs

    5. Install the Google Cloud CLI.
    6. To initialize the gcloud CLI, run the following command:

      gcloud init
    7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    8. Make sure that billing is enabled for your Google Cloud project.

    9. Enable the Dataproc, Compute Engine, Cloud Storage, and BigQuery APIs.

      Enable the APIs

    10. Install the Google Cloud CLI.
    11. To initialize the gcloud CLI, run the following command:

      gcloud init
    1. Creating a Cloud Storage bucket in your project to hold the data used in this tutorial.
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a name that meets the bucket naming requirements.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select a storage class.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.

    Create a Dataproc cluster

    Create a Dataproc cluster using the optional-components flag (available on image version 2.1 and later) to install the Trino optional component on the cluster and the enable-component-gateway flag to enable the Component Gateway to allow you to access the Trino Web UI from the Google Cloud console.

    1. Set environment variables:
      • PROJECT: your project ID
      • BUCKET_NAME: the name of the Cloud Storage bucket you created in Before you begin
      • REGION: region where the cluster used in this tutorial will be created, for example, "us-west1"
      • WORKERS: 3 - 5 workers are recommended for this tutorial
      export PROJECT=project-id
      export WORKERS=number
      export REGION=region
      export BUCKET_NAME=bucket-name
      
    2. Run the Google Cloud CLI on your local machine to create the cluster.
      gcloud beta dataproc clusters create trino-cluster \
          --project=${PROJECT} \
          --region=${REGION} \
          --num-workers=${WORKERS} \
          --scopes=cloud-platform \
          --optional-components=TRINO \
          --image-version=2.1  \
          --enable-component-gateway
      

    Prepare data

    Export the bigquery-public-data chicago_taxi_trips dataset to Cloud Storage as CSV files, then create a Hive external table to reference the data.

    1. On your local machine, run the following command to import the taxi data from BigQuery as CSV files without headers into the Cloud Storage bucket you created in Before you begin.
      bq --location=us extract --destination_format=CSV \
           --field_delimiter=',' --print_header=false \
             "bigquery-public-data:chicago_taxi_trips.taxi_trips" \
             gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv
      
    2. Create Hive external tables that are backed by the CSV and Parquet files in your Cloud Storage bucket.
      1. Create the Hive external table chicago_taxi_trips_csv.
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_csv(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                ROW FORMAT DELIMITED
                FIELDS TERMINATED BY ','
                STORED AS TEXTFILE
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';"
        
      2. Verify the creation of the Hive external table.
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;"
        
      3. Create another Hive external table chicago_taxi_trips_parquet with the same columns, but with data stored in Parquetformat for better query performance.
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                CREATE EXTERNAL TABLE chicago_taxi_trips_parquet(
                  unique_key   STRING,
                  taxi_id  STRING,
                  trip_start_timestamp  TIMESTAMP,
                  trip_end_timestamp  TIMESTAMP,
                  trip_seconds  INT,
                  trip_miles   FLOAT,
                  pickup_census_tract  INT,
                  dropoff_census_tract  INT,
                  pickup_community_area  INT,
                  dropoff_community_area  INT,
                  fare  FLOAT,
                  tips  FLOAT,
                  tolls  FLOAT,
                  extras  FLOAT,
                  trip_total  FLOAT,
                  payment_type  STRING,
                  company  STRING,
                  pickup_latitude  FLOAT,
                  pickup_longitude  FLOAT,
                  pickup_location  STRING,
                  dropoff_latitude  FLOAT,
                  dropoff_longitude  FLOAT,
                  dropoff_location  STRING)
                STORED AS PARQUET
                location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';"
        
      4. Load the data from the Hive CSV table into the Hive Parquet table.
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "
                INSERT OVERWRITE TABLE chicago_taxi_trips_parquet
                SELECT * FROM chicago_taxi_trips_csv;"
        
      5. Verify that the data loaded correctly.
        gcloud dataproc jobs submit hive \
            --cluster trino-cluster \
            --region=${REGION} \
            --execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;"
        

    Run queries

    You can run queries locally from the Trino CLI or from an application.

    Trino CLI queries

    This section demonstrates how to query the Hive Parquet taxi dataset using the Trino CLI.

    1. Run the following command on your local machine to SSH into your cluster's master node. The local terminal will stop responding during the execution of the command.
      gcloud compute ssh trino-cluster-m
      
    2. In SSH terminal window on your cluster's master node, run the Trino CLI, which connects to the Trino server running on the master node.
      trino --catalog hive --schema default
      
    3. At the trino:default prompt, verify that Trino can find the Hive tables.
      show tables;
      
      Table
      ‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
       chicago_taxi_trips_csv
       chicago_taxi_trips_parquet
      (2 rows)
      
    4. Run queries from the trino:default prompt, and compare the performance of querying Parquet versus CSV data.
      • Parquet data query
        select count(*) from chicago_taxi_trips_parquet where trip_miles > 50;
        
         _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171735_00006_2sz8c, FINISHED, 3 nodes Splits: 308 total, 308 done (100.00%) 0:16 [113M rows, 297MB] [6.91M rows/s, 18.2MB/s]
      • CSV data query
        select count(*) from chicago_taxi_trips_csv where trip_miles > 50;
        
        _col0
        ‐‐‐‐‐‐‐‐
         117957
        (1 row)
        Query 20180928_171936_00009_2sz8c, FINISHED, 3 nodes Splits: 881 total, 881 done (100.00%) 0:47 [113M rows, 41.5GB] [2.42M rows/s, 911MB/s]

    Java application queries

    To run queries from a Java application through the Trino Java JDBC driver: 1. Download the Trino Java JDBC driver. 1. Add a trino-jdbc dependency in Maven pom.xml.

    <dependency>
      <groupId>io.trino</groupId>
      <artifactId>trino-jdbc</artifactId>
      <version>376</version>
    </dependency>
    
    Sample Java code
    package dataproc.codelab.trino;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    public class TrinoQuery {
      private static final String URL = "jdbc:trino://trino-cluster-m:8080/hive/default";
      private static final String SOCKS_PROXY = "localhost:1080";
      private static final String USER = "user";
      private static final String QUERY =
          "select count(*) as count from chicago_taxi_trips_parquet where trip_miles > 50";
      public static void main(String[] args) {
        try {
          Properties properties = new Properties();
          properties.setProperty("user", USER);
          properties.setProperty("socksProxy", SOCKS_PROXY);
          Connection connection = DriverManager.getConnection(URL, properties);
          try (Statement stmt = connection.createStatement()) {
            ResultSet rs = stmt.executeQuery(QUERY);
            while (rs.next()) {
              int count = rs.getInt("count");
              System.out.println("The number of long trips: " + count);
            }
          }
        } catch (SQLException e) {
          e.printStackTrace();
        }
      }
    }
    

    Logging and monitoring

    Logging

    The Trino logs are located at /var/log/trino/ on the cluster's master and worker nodes.

    Web UI

    See Viewing and Accessing Component Gateway URLs to open the Trino Web UI running on the cluster's master node in your local browser.

    Monitoring

    Trino exposes cluster runtime information through runtime tables. In a Trino session (from the trino:default) prompt, run the following query to view runtime table data:

    select * FROM system.runtime.nodes;
    

    Clean up

    After you finish the tutorial, you can clean up the resources that you created so that they stop using quota and incurring charges. The following sections describe how to delete or turn off these resources.

    Delete the project

    The easiest way to eliminate billing is to delete the project that you created for the tutorial.

    To delete the project:

    1. In the Google Cloud console, go to the Manage resources page.

      Go to Manage resources

    2. In the project list, select the project that you want to delete, and then click Delete.
    3. In the dialog, type the project ID, and then click Shut down to delete the project.

    Delete the cluster

    • To delete your cluster:
      gcloud dataproc clusters delete --project=${PROJECT} trino-cluster \
          --region=${REGION}
      

    Delete the bucket

    • To delete the Cloud Storage bucket you created in Before you begin, including the data files stored in the bucket:
      gcloud storage rm gs://${BUCKET_NAME} --recursive