Databricks と BigQuery の接続


このチュートリアルでは、Databricks ノートブックのデータを読み取り / 書き込みするために BigQuery テーブルやビューを接続する方法を説明します。ここでは、Google Cloud コンソールDatabricks ワークスペースを使用する手順を説明します。この処理は gclouddatabricks コマンドライン ツールでも可能ですが、このチュートリアルでは説明しません。

Google Cloud 上の Databricks は Google Cloud にホストされる Databricks 環境です。Google Kubernetes(GKE)上で実行され、BigQuery などの Google Cloud テクノロジーとの統合機能が事前に組み込まれています。Databricks を初めて使用する場合は、Databricks Unified Data Platform の概要にある動画で Databricks レイクハウス プラットフォームの概要をご覧ください。

目標

  • Databricks に接続するように Google Cloud を構成する。
  • Google Cloud に Databricks をデプロイする。
  • Databricks から BigQuery にクエリを実行する。

料金

このチュートリアルでは、Google Cloud コンソールの課金対象となるコンポーネント(BigQuery、GKE など)を使用します。BigQuery の料金GKE の料金が適用されます。Google Cloud で実行する Databricks アカウントに関連する費用については、Databricks ドキュメントで Set up your account and create a workspace をご覧ください。

始める前に

Databricks を BigQuery に接続する前に、次の手順を完了する必要があります。

  1. BigQuery Storage API を有効にする。
  2. Databricks のサービス アカウントを作成する。
  3. 一時ストレージとして Cloud Storage バケットを作成する。

BigQuery Storage API を有効にする

BigQuery を使用する新しいプロジェクトでは、BigQuery Storage API がデフォルトで有効になります。既存のプロジェクトで API が有効になっていない場合は、次の手順を行います。

  1. Google Cloud コンソールで [BigQuery Storage API] ページに移動します。

    [BigQuery Storage API] に移動

  2. BigQuery Storage API が有効になっていることを確認します。

    BigQuery Storage API が有効になっている

Databricks のサービス アカウントを作成する

次に、Identity and Access Management(IAM)サービス アカウントを作成して、Databricks クラスタから BigQuery にクエリを実行できるようにします。このサービス アカウントには、タスクの実行に必要な最小権限を付与することをおすすめします。BigQuery のロールと権限をご覧ください。

  1. Google Cloud コンソールで、[サービス アカウント] ページに移動します。

    [サービス アカウント] に移動

  2. [サービス アカウントを作成] をクリックし、サービス アカウントに「databricks-bigquery」という名前を付けます。「Databricks tutorial service account」などの簡単な説明を入力して、[作成して続行] をクリックします。

  3. [このサービス アカウントにプロジェクトへのアクセスを許可する] で、サービス アカウントのロールを指定します。同じプロジェクトの Databricks ワークスペースと BigQuery テーブルのデータを読み取る権限をサービス アカウントに付与するには、次のロールを付与します(特に、マテリアライズド ビューを参照しない場合)。

    • BigQuery 読み取りセッション ユーザー
    • BigQuery データ閲覧者

    データの書き込み権限を付与するには、次のロールを付与します。

    • BigQuery ジョブユーザー
    • BigQuery データ編集者
  4. 以降のステップで使用するため、新しいサービス アカウントのメールアドレスを記録しておきます。

  5. [完了] をクリックします。

Cloud Storage バケットを作成する

BigQuery に書き込むには、Databricks クラスタが Cloud Storage バケットにアクセスして、書き込まれたデータをバッファに格納できるようにする必要があります。

  1. Google Cloud コンソールで Cloud Storage のブラウザページに移動します。

    Storage の [ブラウザ] に移動

  2. [バケットを作成] をクリックして、[バケットの作成] ダイアログを開きます。

  3. BigQuery へのデータの書き込みに使用するバケットの名前を指定します。バケット名は、グローバルに一意の名前にする必要があります。すでに存在するバケット名を指定すると、Cloud Storage はエラー メッセージを返します。これが発生した場合は、バケットに別の名前を指定します。

    バケット ダイアログで「databricks-bq-123」という名前を付ける

  4. このチュートリアルでは、ストレージのロケーション、ストレージ クラス、アクセス制御、詳細設定にデフォルトの設定を使用します。

  5. [作成] をクリックして Cloud Storage バケットを作成します。

  6. [権限] をクリックして [追加] をクリックし、サービス アカウントのページで Databricks のアクセス用に作成したサービス アカウントのメールアドレスを指定します。

    画像

  7. [ロールを選択] をクリックし、ストレージ管理者ロールを追加します。

  8. [保存] をクリックします。

Google Cloud に Databricks をデプロイする

次の手順を行い、Google Cloud に Databricks をデプロイする準備をします。

  1. Databricks アカウントを設定するには、Databricks のドキュメント Set up your Databricks on Google Cloud account の手順を行います。
  2. 登録後は、Databricks アカウントの管理方法を確認してください。

Databricks ワークスペース、クラスタ、ノートブックを作成する

以下では、BigQuery にアクセスするコードを記述するために、Databricks ワークスペース、クラスタ、Python ノートブックを作成する方法について説明します。

  1. Databrick の前提条件を確認します。

  2. 最初のワークスペースを作成します。Databricks アカウント コンソールで、[Create Workspace] をクリックします。

  3. [Workspace name] に gcp-bq を指定し、リージョンを選択します。

    ワークスペース名、リージョン、Google Cloud プロジェクト ID が表示されている [Create Workspace] 画面

  4. Google Cloud プロジェクト ID を指定するには、Google Cloud コンソールでプロジェクト ID をコピーし、[Google Cloud project ID] フィールドに貼り付けます。

    Google Cloud コンソールに移動

  5. [Save] をクリックして、Databricks ワークスペースを作成します。

  6. Databricks ランタイム 7.6 以降で Databricks クラスタを作成するには、左側のメニューバーで [Clusters] を選択し、上部にある [Create Cluster] をクリックします。

  7. クラスタの名前とサイズを指定して、[Advanced Options] をクリックし、Google Cloud サービス アカウントのメールアドレスを指定します。

    Google サービス アカウントの詳細が表示されている [New Cluster] 画面

  8. [Create Cluster] をクリックします。

  9. Databricks 用の Python ノートブックを作成するには、ノートブックを作成するの手順を行ってください。

Databricks から BigQuery へのクエリ

前述の構成を使用すると、Databricks を BigQuery に安全に接続できます。Databricks は、オープンソースの Google Spark アダプタの fork を使用して BigQuery にアクセスします。

Databricks は、ネストされた列のフィルタリングなど、特定のクエリ述語を自動的に BigQuery にプッシュダウンすることで、データ転送を削減してクエリを高速化します。また、query() API を使用して BigQuery で最初に SQL クエリを実行する追加機能により、結果のデータセットの転送サイズが削減されます。

以降の手順では、BigQuery のデータセットにアクセスして独自のデータを BigQuery に書き込む方法について説明します。

BigQuery の一般公開データセットにアクセスする

BigQuery には、利用可能な一般公開データセットのリストが用意されています。一般公開データセットの一部である BigQuery Shakespeare データセットにクエリを実行するには、次の手順を行います。

  1. BigQuery テーブルを読み取るには、Databricks ノートブックで次のコード スニペットを使用します。

    table = "bigquery-public-data.samples.shakespeare"
    df = spark.read.format("bigquery").option("table",table).load()
    df.createOrReplaceTempView("shakespeare")
    

    Shift+Return を押してコードを実行します。

    これで、Spark DataFrame(df)を介して BigQuery テーブルにクエリを実行できるようになりました。たとえば、データフレームの最初の 3 行を表示するには、次のように入力します。

    df.show(3)
    

    別のテーブルにクエリを実行するには、table 変数を更新します。

  2. Databricks ノートブックの主な特長は、Scala、Python、SQL などのさまざまな言語のセルを 1 つのノートブックに混在できる点です。

    次の SQL クエリを使用すると、一時ビューを作成する前のセルを実行した後に、Shakespeare 内の単語数を可視化できます。

    %sql
    SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC LIMIT 12
    
    

    Shakespeare ワードカウントの棒グラフ

    上述のセルは、BigQuery ではなく、Databricks クラスタ内のデータフレームに Spark SQL クエリを実行します。このアプローチの利点は、データ分析が Spark レベルで行われるため、それ以上 BigQuery API 呼び出しは発行されず、追加の BigQuery コストが発生しない点です。

  3. また、query() API を使用して SQL クエリの実行を BigQuery に委任することで、結果として生じるデータフレームの転送サイズを小さくすることもできます。この方法を使用すると、Spark で処理を行う上述の例とは異なり、BigQuery でのクエリの実行に対して料金とクエリの最適化が適用されます。

    次の例では、Scala、query() API、および 一般公開の Shakespeare データセットを BigQuery で使用して、シェイクスピア作品の中で出現頻度の高い上位 5 つの単語を計算します。コードを実行する前に、BigQuery で mdataset という空のデータセットを作成し、コードから参照できるようにします。詳細については、BigQuery へのデータの書き込みをご覧ください。

    %scala
    // public dataset
    val table = "bigquery-public-data.samples.shakespeare"
    
    // existing dataset where the Google Cloud user has table creation permission
    val tempLocation = "mdataset"
    // query string
    val q = s"""SELECT word, SUM(word_count) AS word_count FROM ${table}
        GROUP BY word ORDER BY word_count DESC LIMIT 10 """
    
    // read the result of a GoogleSQL query into a DataFrame
    val df2 =
      spark.read.format("bigquery")
      .option("query", q)
      .option("materializationDataset", tempLocation)
      .load()
    
    // show the top 5 common words in Shakespeare
    df2.show(5)
    

    その他のコード例については、Databricks BigQuery サンプル ノートブックをご覧ください。

BigQuery へのデータの書き込み

BigQuery テーブルは、データセットに存在します。BigQuery テーブルにデータを書き込む前に、BigQuery に新しいデータセットを作成する必要があります。Databricks Python ノートブックのデータセットを作成するには、次の手順を行います。

  1. Google Cloud コンソールの [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. [アクションを表示] オプションを開き、[データセットを作成] をクリックして、「together」という名前を付けます。

  3. Databricks Python ノートブックで次のコード スニペットを使用して、Python リストから 3 つの文字列エントリを含むシンプルな Spark データフレームを作成します。

    from pyspark.sql.types import StringType
    mylist = ["Google", "Databricks", "better together"]
    
    df = spark.createDataFrame(mylist, StringType())
    
  4. 別のセルをノートブックに追加し、前の手順で作成した Spark データフレームをデータセット together の BigQuery テーブル myTable に書き込みます。テーブルが作成または上書きされます。前に指定したバケット名が使用されます。

    bucket = YOUR_BUCKET_NAME
    table = "together.myTable"
    
    df.write
      .format("bigquery")
      .option("temporaryGcsBucket", bucket)
      .option("table", table)
      .mode("overwrite").save()
    
  5. データが正常に書き込まれたことを確認するには、Spark データフレーム(df)を介して BigQuery テーブルにクエリを実行し、結果を表示します。

    display(spark.read.format("bigquery").option("table", table).load)
    

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

Databricks を削除する前に、データとノートブックのバックアップを必ず行ってください。Databricks をクリーンアップして完全に削除するには、Google Cloud コンソールで Databricks の登録をキャンセルし、Google Cloud コンソールから作成した関連リソースをすべて削除します。

Cloud Storage バケットが空でない場合、Databricks ワークスペースを削除しても、Databricks によって作成された databricks-WORKSPACE_IDdatabricks-WORKSPACE_ID-system という名前の 2 つの Cloud Storage バケットが削除されないことがあります。このようなオブジェクトは、ワークスペースを削除した後に、Google Cloud コンソールでプロジェクトから手動で削除できます。

次のステップ

このセクションでは、追加のドキュメントとチュートリアルを紹介します。