Dataproc での Apache Hive の使用

Last reviewed 2022-04-06 UTC

このチュートリアルでは、DataprocApache Hive を効率的かつ柔軟に使用するために、Hive データを Cloud Storage に保存し、Hive メタストアを Cloud SQL 上の MySQL データベースでホストする方法を説明します。このようにコンピューティング リソースとストレージ リソースを分離すると、次のような利点があります。

  • 柔軟性とアジリティ: 特定の Hive ワークロードに合わせてクラスタ構成をカスタマイズし、必要に応じて各クラスタを個別にスケールアップまたはスケールダウンできます。
  • コストの削減: Hive ジョブを実行するときにエフェメラル クラスタを起動し、ジョブが完了したら削除できます。ジョブに必要なリソースは使用されるときにのみアクティブになるため、使用した分に限り課金されます。重要性の低いデータ処理や低コストでの大規模なクラスタ作成に、プリエンプティブル VM も使用できます。

Hive は、Apache Hadoop 上に構築された人気のあるオープンソースのデータ ウェアハウス システムです。Hive には SQL に似た HiveQL というクエリ言語が用意されており、これを使用して大規模な構造化データセットを分析します。Hive メタストアには、Hive テーブルに関するスキーマやロケーションなどのメタデータが格納されます。Cloud SQL を使用すると、Google Cloud 上のリレーショナル データベースのセットアップ、メンテナンス、管理、運用が容易になります。Cloud SQL では、MySQL が Hive メタストアのバックエンドとしてよく使用されます。

目標

  • Cloud SQL 上に Hive メタストア用の MySQL インスタンスを作成します。
  • Dataproc に Hive サーバーをデプロイします。
  • Dataproc クラスタ インスタンスに Cloud SQL Proxy をインストールします。
  • Hive データを Cloud Storage にアップロードします。
  • 複数の Dataproc クラスタで Hive クエリを実行します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • Dataproc
  • Cloud Storage
  • Cloud SQL

料金計算ツールを使用すると、予想使用量に基づいて費用の見積もりを作成できます。

新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

新しいプロジェクトを作成する

  1. Google Cloud コンソールでプロジェクトの選択ページに移動します。

    プロジェクト セレクタに移動

  2. Google Cloud プロジェクトを選択または作成します。

課金を有効にする

環境を初期化する

  1. Cloud Shell インスタンスを起動します。

    Cloud Shell に移動

  2. Cloud Shell で、デフォルトの Compute Engine ゾーンを、Dataproc クラスタを作成するゾーンに設定します。

    export PROJECT=$(gcloud info --format='value(config.project)')
    export REGION=REGION
    export ZONE=ZONE
    gcloud config set compute/zone ${ZONE}

    以下を置き換えます。

    • REGION: クラスタを作成するリージョン(us-central1 など)。
    • ZONE: クラスタを作成するゾーン(us-central1-a など)。
  3. Cloud Shell で次のコマンドを実行して、Dataproc と Cloud SQL Admin API を有効にします。

    gcloud services enable dataproc.googleapis.com sqladmin.googleapis.com

リファレンス アーキテクチャ

わかりやすくするため、このチュートリアルではコンピューティング サービスとストレージ サービスをすべて同じ Google Cloud リージョンにデプロイし、ネットワークのレイテンシとネットワーク転送コストを最小限に抑えています。図 1 に、このチュートリアルのアーキテクチャを示します。

シングルリージョン アーキテクチャの図
図 1: シングルリージョンの Hive アーキテクチャの例

このアーキテクチャでは、Hive クエリのライフサイクルは次のステップに従います。

  1. Hive クライアントが、エフェメラル Dataproc クラスタで動作する Hive サーバーにクエリを送信します。
  2. サーバーがクエリを処理し、メタストア サービスにメタデータをリクエストします。
  3. メタストア サービスは、Cloud SQL Proxy を介して Cloud SQL から Hive メタデータをフェッチします。
  4. サーバーが、Cloud Storage のリージョン バケットに存在する Hive ウェアハウスからデータを読み込みます。
  5. サーバーからクライアントに結果が返されます。

マルチリージョン アーキテクチャに関する考慮事項

このチュートリアルでは、シングルリージョン アーキテクチャに焦点を当てています。ただし、Hive サーバーを異なる地理的地域で実行しなければならない場合は、マルチリージョン アーキテクチャを検討できます。その場合は、Cloud SQL インスタンスと同じリージョンに、メタストア サービスのホスト専用の Dataproc クラスタを別に作成する必要があります。メタストア サービスから MySQL データベースに送信されるリクエストの量は非常に多くなる可能性があるため、パフォーマンスへの影響を最小限に抑えるために、メタストア サービスを地理的に MySQL データベースに近づけることが重要です。それに比べて、Hive サーバーからメタストア サービスに送信されるリクエストの量は通常、はるかに少数です。したがって、レイテンシが増加しても、Hive サーバーとメタストア サービスは異なるリージョンに配置してかまわない場合があります。

メタストア サービスは Dataproc マスターノードでのみ実行でき、ワーカーノードでは実行できません。Dataproc の標準クラスタおよび高可用性クラスタでは、最低 2 つのワーカーノードが強制的に作成されます。未使用のワーカーノードでのリソースの浪費を避けるため、代わりにメタストア サービス用の単一ノードクラスタを作成できます。高可用性を実現するため、複数の単一ノードクラスタを作成できます。

Cloud SQL インスタンスに直接接続しなければならないのはメタストア サービス クラスタだけなので、Cloud SQL Proxy はメタストア サービス クラスタにのみインストールする必要があります。Hive サーバーからアクセスするメタストア サービス クラスタを指定するには、hive.metastore.uris プロパティの値として URI のカンマ区切りのリストを設定します。次に例を示します。

thrift://metastore1:9083,thrift://metastore2:9083

複数のロケーションにある Hive サーバーから Hive データにアクセスする必要がある場合は、デュアルリージョン バケットまたはマルチリージョン バケットを使用することも検討できます。バケット ロケーションのタイプはユースケースによって異なります。レイテンシ、可用性、コストのバランスをとる必要があります。

図 2 に、マルチリージョン アーキテクチャの例を示します。

マルチリージョン Hive アーキテクチャの図
図 2: マルチリージョンの Hive アーキテクチャの例

ご覧のように、マルチリージョン シナリオはやや複雑です。簡潔にするため、このチュートリアルではシングルリージョン アーキテクチャを使用します。

(省略可)ウェアハウス バケットの作成

Hive データを保存するための Cloud Storage バケットがない場合は、BUCKET_NAME バケットを一意のバケット名に置き換えて、ウェアハウス バケットを作成します(Cloud Shell で次のコマンドを実行できます)。

export WAREHOUSE_BUCKET=BUCKET_NAME
gsutil mb -l ${REGION} gs://${WAREHOUSE_BUCKET}

Cloud SQL インスタンスの作成

このセクションでは、後で Hive メタストアのホスティングに使用する新しい Cloud SQL インスタンスを作成します。

Cloud Shell で、新しい Cloud SQL インスタンスを作成します。

gcloud sql instances create hive-metastore \
    --database-version="MYSQL_5_7" \
    --activation-policy=ALWAYS \
    --zone ${ZONE}

このコマンドが完了するまで数分かかる場合があります。

Dataproc クラスタの作成

最初の Dataproc クラスタを作成します。CLUSTER_NAMEhive-cluster などの名前に置き換えます。

gcloud dataproc clusters create CLUSTER_NAME \
    --scopes sql-admin \
    --region ${REGION} \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \
    --properties "hive:hive.metastore.warehouse.dir=gs://${WAREHOUSE_BUCKET}/datasets" \
    --metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore" \
    --metadata "enable-cloud-sql-proxy-on-workers=false"

メモ:

  • sql-admin アクセス スコープを指定し、クラスタ インスタンスが Cloud SQL Admin API にアクセスできるようにします。
  • Cloud Storage バケットに格納するスクリプトに初期化アクションを入れて、--initialization-actions フラグを使用してそのバケットを参照します。詳しくは、初期化アクション - 重要な考慮事項とガイドラインをご覧ください。
  • hive:hive.metastore.warehouse.dir プロパティで、Hive ウェアハウス バケットの URI を指定します。これにより、Hive サーバーから読み取り / 書き込みを行う場所が正しく構成されます。 このプロパティには少なくとも 1 つのディレクトリ(例: gs://my-bucket/my-directory)を含めることが必須です。このプロパティをディレクトリ(例: gs://my-bucket)なしでバケット名に設定する場合、Hive が正しく機能しません。
  • Cloud SQL Proxy がマスターノードでのみ実行されるように enable-cloud-sql-proxy-on-workers=false を指定します。これにより、Hive メタストア サービスが機能し、Cloud SQL への不要な負荷を回避できます。
  • Dataproc がすべてのクラスタ インスタンスで自動的に実行する、Cloud SQL Proxy 初期化アクションを指定します。このアクションは次の処理を行います。

    • Cloud SQL Proxy をインストールします。
    • hive-metastore-instance メタデータ パラメータで指定された Cloud SQL インスタンスへの安全な接続を確立します。
    • hive ユーザーと Hive メタストアのデータベースを作成します。

    この Cloud SQL Proxy 初期化アクションの完全なコードを GitHub で見ることができます。

  • わかりやすくするため、このチュートリアルではマスター インスタンスを 1 つだけ使用します。本番環境ワークロードの復元力を強化するには、Dataproc の高可用性モードを使用し、3 つのマスター インスタンスを持つクラスタを作成することを検討してください。

  • このチュートリアルでは、パブリック IP アドレスを持つ Cloud SQL インスタンスを使用します。プライベート IP アドレスのみのインスタンスを使用する場合は、--metadata "use-cloud-sql-private-ip=true" パラメータを渡すことで、プロキシがプライベート IP アドレスを使用するように強制できます。

Hive テーブルの作成

このセクションでは、サンプル データセットをウェアハウス バケットにアップロードし、新しい Hive テーブルを作成して、そのデータセットに対する HiveQL クエリをいくつか実行します。

  1. サンプル データセットをウェアハウス バケットにコピーします。

    gsutil cp gs://hive-solution/part-00000.parquet \
    gs://${WAREHOUSE_BUCKET}/datasets/transactions/part-00000.parquet

    サンプル データセットは Parquet 形式で圧縮されており、日付、金額、取引タイプの 3 つの列を持つ架空の銀行取引記録が数千件含まれています。

  2. このデータセット用の外部 Hive テーブルを作成します。

    gcloud dataproc jobs submit hive \
        --cluster CLUSTER_NAME \
        --region ${REGION} \
        --execute "
          CREATE EXTERNAL TABLE transactions
          (SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING)
          STORED AS PARQUET
          LOCATION 'gs://${WAREHOUSE_BUCKET}/datasets/transactions';"

Hive クエリの実行

Dataproc 内のさまざまなツールを使用して Hive クエリを実行できます。このセクションでは、次のツールを使用してクエリを実行する方法を説明します。

  • Dataproc の Hive Jobs API
  • BeelineSQLLine に基づく、よく利用されているコマンドライン クライアント)。
  • SparkSQL(構造化データへのクエリを実行する Apache Spark の API)。

各セクションで、サンプルクエリを実行します。

Dataproc Jobs API を使用した Hive に対するクエリ

次のシンプルな HiveQL クエリを実行して、parquet ファイルが Hive テーブルに正しくリンクされていることを確認します。

gcloud dataproc jobs submit hive \
    --cluster CLUSTER_NAME \
    --region ${REGION} \
    --execute "
      SELECT *
      FROM transactions
      LIMIT 10;"

次のような出力が表示されます。

+-----------------+--------------------+------------------+
| submissiondate  | transactionamount  | transactiontype  |
+-----------------+--------------------+------------------+
| 2017-12-03      | 1167.39            | debit            |
| 2017-09-23      | 2567.87            | debit            |
| 2017-12-22      | 1074.73            | credit           |
| 2018-01-21      | 5718.58            | debit            |
| 2017-10-21      | 333.26             | debit            |
| 2017-09-12      | 2439.62            | debit            |
| 2017-08-06      | 5885.08            | debit            |
| 2017-12-05      | 7353.92            | authorization    |
| 2017-09-12      | 4710.29            | authorization    |
| 2018-01-05      | 9115.27            | debit            |
+-----------------+--------------------+------------------+

Beeline を使用した Hive のクエリ

  1. Dataproc のマスター インスタンス(CLUSTER_NAME-m)との SSH セッションを開きます。

    gcloud compute ssh CLUSTER_NAME-m
  2. マスター インスタンスのコマンド プロンプトで、Beeline セッションを開きます。

    beeline -u "jdbc:hive2://localhost:10000"

    注:

    • localhost の代わりに、次のようにマスター インスタンスの名前をホストとして参照することもできます。

      beeline -u "jdbc:hive2://CLUSTER_NAME-m:10000"
    • 3 つのマスターを持つ高可用性モードを使用している場合、代わりに次のコマンドを使用する必要があります。

      beeline -u "jdbc:hive2://CLUSTER_NAME-m-0:2181,CLUSTER_NAME-m-1:2181,CLUSTER_NAME-m-2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2"
  3. Beeline プロンプトが表示されたら、次の HiveQL クエリを実行します。

    SELECT TransactionType, AVG(TransactionAmount) AS AverageAmount
    FROM transactions
    WHERE SubmissionDate = '2017-12-22'
    GROUP BY TransactionType;

    次のような出力が表示されます。

    +------------------+--------------------+
    | transactiontype  |   averageamount    |
    +------------------+--------------------+
    | authorization    | 4890.092525252529  |
    | credit           | 4863.769269565219  |
    | debit            | 4982.781458176331  |
    +------------------+--------------------+
  4. Beeline セッションを閉じます。

    !quit
  5. SSH 接続を閉じます。

    exit

SparkSQL を使用した Hive のクエリ

  1. Dataproc のマスター インスタンスとの SSH セッションを開きます。

    gcloud compute ssh CLUSTER_NAME-m
  2. マスター インスタンスのコマンド プロンプトで、新しい PySpark シェル セッションを開きます。

    pyspark
  3. PySpark シェル プロンプトが表示されたら、次の Python コードを入力します。

    from pyspark.sql import HiveContext
    hc = HiveContext(sc)
    hc.sql("""
    SELECT SubmissionDate, AVG(TransactionAmount) as AvgDebit
    FROM transactions
    WHERE TransactionType = 'debit'
    GROUP BY SubmissionDate
    HAVING SubmissionDate >= '2017-10-01' AND SubmissionDate < '2017-10-06'
    ORDER BY SubmissionDate
    """).show()

    次のような出力が表示されます。

    +-----------------+--------------------+
    | submissiondate  |      avgdebit      |
    +-----------------+--------------------+
    | 2017-10-01      | 4963.114920399849  |
    | 2017-10-02      | 5021.493300510582  |
    | 2017-10-03      | 4982.382279569891  |
    | 2017-10-04      | 4873.302702503676  |
    | 2017-10-05      | 4967.696333583777  |
    +-----------------+--------------------+
  4. PySpark セッションを閉じます。

    exit()
  5. SSH 接続を閉じます。

    exit

Hive メタストアの検査

Cloud SQL の Hive メタストアに transactions テーブルに関する情報が含まれていることを確認します。

  1. Cloud Shell で、Cloud SQL インスタンスの新しい MySQL セッションを開始します。

    gcloud sql connect hive-metastore --user=root

    root ユーザーのパスワードを求めるプロンプトが表示されたら、何も入力せずに RETURN キーを押します。わかりやすくするために、このチュートリアルでは root ユーザーのパスワードは設定していません。パスワードを設定してメタストア データベースの保護を強化する方法については、Cloud SQL のドキュメントをご覧ください。また、Cloud SQL Proxy 初期化アクションで、暗号化によってパスワードを保護するメカニズムを提供することもできます。詳細については、アクションのコード リポジトリをご覧ください。

  2. MySQL コマンド プロンプトで、残りのセッションのデフォルト データベースを hive_metastore に設定します。

    USE hive_metastore;
  3. ウェアハウス バケットのロケーションがメタストアに記録されていることを確認します。

    SELECT DB_LOCATION_URI FROM DBS;

    出力は次のようになります。

    +-------------------------------------+
    | DB_LOCATION_URI                     |
    +-------------------------------------+
    | gs://[WAREHOUSE_BUCKET]/datasets   |
    +-------------------------------------+
  4. テーブルがメタストア内で正しく参照されていることを確認します。

    SELECT TBL_NAME, TBL_TYPE FROM TBLS;

    出力は次のようになります。

    +--------------+----------------+
    | TBL_NAME     | TBL_TYPE       |
    +--------------+----------------+
    | transactions | EXTERNAL_TABLE |
    +--------------+----------------+
  5. テーブルの列も正しく参照されていることを確認します。

    SELECT COLUMN_NAME, TYPE_NAME
    FROM COLUMNS_V2 c, TBLS t
    WHERE c.CD_ID = t.SD_ID AND t.TBL_NAME = 'transactions';

    出力は次のようになります。

    +-------------------+-----------+
    | COLUMN_NAME       | TYPE_NAME |
    +-------------------+-----------+
    | submissiondate    | date      |
    | transactionamount | double    |
    | transactiontype   | string    |
    +-------------------+-----------+
  6. 入力形式と場所も正しく参照されていることを確認します。

    SELECT INPUT_FORMAT, LOCATION
    FROM SDS s, TBLS t
    WHERE s.SD_ID = t.SD_ID AND t.TBL_NAME = 'transactions';

    出力は次のようになります。

    +---------------------------------------------------------------+------------------------------------------------+
    | INPUT_FORMAT                                                  | LOCATION                                       |
    +---------------------------------------------------------------+------------------------------------------------+
    | org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | gs://[WAREHOUSE_BUCKET]/datasets/transactions |
    +---------------------------------------------------------------+------------------------------------------------+
    
  7. MySQL セッションを閉じます。

    exit

別の Dataproc クラスタの作成

このセクションでは、別の Dataproc クラスタを作成して、Hive データと Hive メタストアを複数のクラスタ間で共有できることを確認します。

  1. 新しい Dataproc クラスタを作成します。

    gcloud dataproc clusters create other-CLUSTER_NAME \
        --scopes cloud-platform \
        --image-version 2.0 \
        --region ${REGION} \
        --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \
        --properties "hive:hive.metastore.warehouse.dir=gs://${WAREHOUSE_BUCKET}/datasets" \
        --metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore"\
        --metadata "enable-cloud-sql-proxy-on-workers=false"
  2. 新しいクラスタがデータにアクセスできることを確認します。

    gcloud dataproc jobs submit hive \
        --cluster other-CLUSTER_NAME \
        --region ${REGION} \
        --execute "
          SELECT TransactionType, COUNT(TransactionType) as Count
          FROM transactions
          WHERE SubmissionDate = '2017-08-22'
          GROUP BY TransactionType;"

    次のような出力が表示されます。

    +------------------+--------+
    | transactiontype  | count  |
    +------------------+--------+
    | authorization    | 696    |
    | credit           | 1722   |
    | debit            | 2599   |
    +------------------+--------+

チュートリアルはこれで完了です。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにする手順は次のとおりです。

  • 作成したすべてのリソースをクリーンアップして、これ以後リソースの料金が発生しないようにします。課金を停止する最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。
  • あるいは、リソースを個別に削除することもできます。

プロジェクトの削除

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

リソースを個別に削除する

プロジェクト全体ではなく個々のリソースを削除するには、Cloud Shell で次のコマンドを実行します。

gcloud dataproc clusters delete CLUSTER_NAME --region ${REGION} --quiet
gcloud dataproc clusters delete other-CLUSTER_NAME --region ${REGION} --quiet
gcloud sql instances delete hive-metastore --quiet
gsutil rm -r gs://${WAREHOUSE_BUCKET}/datasets

次のステップ

  • Google のサーバーレス、高スケーラビリティ、低コストのエンタープライズ データ ウェアハウスである BigQuery を試す。
  • Hadoop ワークロードの Google Cloud への移行に関するガイドを読む。
  • 初期化アクションを確認して、Dataproc で Hive HCatalog を使用する方法の詳細を確認する。
  • 高可用性対応の Cloud SQL を構成してサービスの信頼性を向上させる方法を学ぶ。
  • Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud Architecture Center をご覧ください。