Apache Spark ストアド プロシージャを操作する

このドキュメントは、BigQuery で Spark ストアド プロシージャを作成して呼び出すデータ エンジニア、データ サイエンティスト、データ アナリストを対象としています。

BigQuery では、Python、Java、Scala で記述された Spark ストアド プロシージャを作成できます。これらのストアド プロシージャは、SQL ストアド プロシージャと同様に、GoogleSQL クエリを使用して BigQuery で実行できます。

始める前に

Spark 用ストアド プロシージャを作成するには、管理者に Spark 接続の作成と共有を依頼します。また、管理者は、接続に関連付けされたサービス アカウントに必要な Identity and Access Management(IAM)権限を付与する必要があります。

必要なロール

このドキュメントのタスクの実行に必要な権限を取得するには、管理者に次の IAM のロールを付与するよう依頼してください。

ロールの付与の詳細については、アクセス権の管理をご覧ください。

これらの事前定義ロールには、このドキュメントのタスクを実行するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

このドキュメントのタスクを実行するには、次の権限が必要です。

  • 接続を作成する:
    • bigquery.connections.create
    • bigquery.connections.list
  • Spark ストアド プロシージャを作成する:
    • bigquery.routines.create
    • bigquery.connections.delegate
    • bigquery.jobs.create
  • Spark ストアド プロシージャを呼び出す:
    • bigquery.routines.get
    • bigquery.connections.use
    • bigquery.jobs.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

ロケーションに関する考慮事項

ストアド プロシージャは接続と同じロケーションで実行されるため、接続と同じロケーションに Spark 用ストアド プロシージャを作成する必要があります。たとえば、US マルチリージョンにストアド プロシージャを作成するには、US マルチリージョンにある接続を使用します。

料金

  • BigQuery で Spark プロシージャを実行する場合の料金は、Dataproc Serverless で Spark プロシージャを実行する場合の料金と同じです。詳細については、Dataproc Serverless の料金をご覧ください。

  • Spark ストアド プロシージャは、オンデマンド料金モデルBigQuery エディションのいずれでも使用できます。Spark プロシージャの課金は、プロジェクトで使用されているコンピューティング料金モデルに関係なく、常に BigQuery Enterprise エディションの従量課金モデルに基づいて行われます。

  • BigQuery 用の Spark ストアド プロシージャは、予約やコミットメントの使用をサポートしていません。既存の予約とコミットメントは、サポートされている他のクエリとプロシージャで引き続き使用されます。Spark ストアド プロシージャの使用料金は、Enterprise エディションの従量課金制の料金で請求されます。組織の割引が適用されます(該当する場合)。

  • Spark ストアド プロシージャは Spark 実行エンジンを使用しますが、Spark の実行に対しては別途料金は発生しません。前述のように、対応する料金は BigQuery Enterprise エディションの従量課金制 SKU として報告されます。

  • Spark ストアド プロシージャに無料枠はありません。

Spark ストアド プロシージャを作成する

ストアド プロシージャは、使用する接続と同じロケーションに作成する必要があります。

ストアド プロシージャの本文が 1 MB を超える場合は、インライン コードを使用する代わりに、Cloud Storage バケット内のファイルにストアド プロシージャを記述することをおすすめします。BigQuery には、Python を使用して Spark ストアド プロシージャを作成するための 2 つの方法が用意されています。

SQL クエリエディタを使用する

SQL クエリエディタで Spark ストアド プロシージャを作成するには、次の操作を行います。

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで、表示された CREATE PROCEDURE ステートメントのサンプルコードを追加します。

    または、[エクスプローラ] ペインで、接続リソースの作成に使用したプロジェクトの接続をクリックします。[ ストアド プロシージャを作成] をクリックして、Spark ストアド プロシージャを作成します。

    Python

    Python で Spark ストアド プロシージャを作成するには、次のサンプルを使用します。

    CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_PYTHON_FILE_URI"]);
     LANGUAGE PYTHON [AS PYSPARK_CODE]
    

    Java または Scala

    main_file_uri オプションを使用して Java または Scala で Spark ストアド プロシージャを作成するには、次のサンプルコードを使用します。

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_file_uri=["MAIN_JAR_URI"]);
     LANGUAGE JAVA|SCALA
    

    main_class オプションと jar_uris オプションを使用して Java または Scala で Spark ストアド プロシージャを作成するには、次のサンプルコードを使用します。

    CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
     WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
     OPTIONS (
         engine="SPARK", runtime_version="RUNTIME_VERSION",
         main_class=["CLASS_NAME"],
         jar_uris=["URI"]);
     LANGUAGE JAVA|SCALA
    

    次のように置き換えます。

    • PROJECT_ID: ストアド プロシージャを作成するプロジェクト。例: myproject
    • DATASET: ストアド プロシージャを作成するデータセット。例: mydataset
    • PROCEDURE_NAME: BigQuery で実行するストアド プロシージャの名前。例: mysparkprocedure
    • PROCEDURE_ARGUMENT: 入力引数を入力するパラメータ。

      このパラメータで、次のフィールドを指定します。

      • ARGUMENT_MODE: 引数のモード。

        有効な値は、INOUTINOUT です。デフォルトの値は IN です。

      • ARGUMENT_NAME: 引数の名前。
      • ARGUMENT_TYPE: 引数のタイプ。

      例: myproject.mydataset.mysparkproc(num INT64)

      詳細については、このドキュメントの「IN パラメータとして値を渡す」または「OUT パラメータと INOUT パラメータとして値を渡す」をご覧ください。

    • CONNECTION_PROJECT_ID: Spark プロシージャを実行するための接続を含むプロジェクト。
    • CONNECTION_REGION: Spark プロシージャを実行するための接続を含むリージョン。例: us
    • CONNECTION_ID: 接続 ID。例: myconnection

      Google Cloud コンソールで接続の詳細を表示する場合、接続 ID は接続 ID に表示される完全修飾接続 ID の最後のセクションの値です。例: projects/myproject/locations/connection_location/connections/myconnection

    • RUNTIME_VERSION: Spark のランタイム バージョン。例: 1.1
    • MAIN_PYTHON_FILE_URI: PySpark ファイルのパス。例: gs://mybucket/mypysparkmain.py

      また、CREATE PROCEDURE ステートメントでストアド プロシージャの本文を追加する場合は、このドキュメントのインライン コードを使用するの例に示すように、LANGUAGE PYTHON AS の後に PYSPARK_CODE を追加します。

    • PYSPARK_CODE: プロシージャの本文をインラインで渡す場合の CREATE PROCEDURE ステートメント内の PySpark アプリケーションの定義

      値は文字列リテラルです。コードに引用符やバックスラッシュが含まれる場合は、エスケープするか、元の文字列として表す必要があります。たとえば、コード "\n"; は次のいずれかで表されます。

      • 引用符付き文字列: "return \"\\n\";"。引用符とバックスラッシュはエスケープされます。
      • 三重引用符付き文字列: """return "\\n";"""。バックスラッシュはエスケープされますが、引用符はエスケープされません。
      • 元の文字列: r"""return "\n";"""。エスケープは不要です。
      インラインの PySpark コードを追加する方法については、インライン コードを使用するをご覧ください。
    • MAIN_JAR_URI: main クラスを含む JAR ファイルのパス。例: gs://mybucket/my_main.jar
    • CLASS_NAME: jar_uris オプションが設定された JAR セットのクラスの完全修飾名。例: com.example.wordcount
    • URI: main クラスで指定されたクラスを含む JAR ファイルのパス。例: gs://mybucket/mypysparkmain.jar

    OPTIONS で指定できるその他のオプションについては、プロシージャ オプションのリストをご覧ください。

PySpark エディタを使用する

PySpark エディタを使用してプロシージャを作成する場合は、CREATE PROCEDURE ステートメントを使用する必要はありません。代わりに、Pyspark エディタで直接 Python コードを追加し、コードを保存または実行します。

PySpark エディタで Spark ストアド プロシージャを作成するには、次の操作を行います。

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. PySpark コードを直接入力する場合は、PySpark エディタを開きます。PySpark エディタを開くには、[ SQL クエリを作成] の横にある メニューをクリックし、[PySpark プロシージャを作成] を選択します。

  3. オプションを設定するには、[詳細] > [PySpark オプション] をクリックして、次の操作を行います。

    1. PySpark コードを実行する場所を指定します。

    2. [接続] フィールドで、Spark 接続を指定します。

    3. [ストアド プロシージャの呼び出し] セクションで、生成される一時ストアド プロシージャを保存するデータセットを指定します。PySpark コードを呼び出すために、特定のデータセットを設定することも、一時的なデータセットを使用することもできます。

      前の手順で指定したロケーションで一時データセットが生成されます。データセット名を指定する場合は、データセットと Spark 接続が同じロケーションにある必要があります。

    4. [パラメータ] セクションで、ストアド プロシージャのパラメータを定義します。パラメータの値は、セッション中に PySpark コードが実行されるときにのみ使用されますが、宣言自体はプロシージャに格納されます。

    5. [詳細オプション] セクションで、プロシージャのオプションを指定します。プロシージャのオプションの詳細については、プロシージャのオプションのリストをご覧ください。

    6. [プロパティ] セクションで、Key-Value ペアを追加してジョブを構成します。Dataproc Serverless Spark プロパティの任意の Key-Value ペアを使用できます。

    7. [サービス アカウントの設定] で、セッション内での PySpark コードの実行中に使用するカスタム サービス アカウント、CMEK、ステージング データセット、ステージング Cloud Storage フォルダを指定します。

    8. [保存] をクリックします。

Spark 用ストアド プロシージャを保存する

PySpark エディタを使用してストアド プロシージャを作成した後、ストアド プロシージャを保存できます。方法は次のとおりです。

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで、Python と PySpark エディタを使用して Spark 用ストアド プロシージャを作成します。

  3. [保存] > [プロシージャを保存] をクリックします。

  4. [ストアド プロシージャの保存] ダイアログで、ストアド プロシージャを保存するデータセット名とストアド プロシージャの名前を指定します。

  5. [保存] をクリックします。

    PySpark コードをストアド プロシージャとして保存するのではなく、実行するだけの場合は、[保存] ではなく [実行] をクリックします。

カスタム コンテナを使用する

カスタム コンテナは、ワークロードのドライバとエグゼキュータのプロセス用のランタイム環境を提供します。カスタム コンテナを使用するには、次のサンプルコードを使用します。

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

次のように置き換えます。

  • PROJECT_ID: ストアド プロシージャを作成するプロジェクト。例: myproject
  • DATASET: ストアド プロシージャを作成するデータセット。例: mydataset
  • PROCEDURE_NAME: BigQuery で実行するストアド プロシージャの名前。例: mysparkprocedure
  • PROCEDURE_ARGUMENT: 入力引数を入力するパラメータ。

    このパラメータで、次のフィールドを指定します。

    • ARGUMENT_MODE: 引数のモード。

      有効な値は、INOUTINOUT です。デフォルトの値は IN です。

    • ARGUMENT_NAME: 引数の名前。
    • ARGUMENT_TYPE: 引数のタイプ。

    例: myproject.mydataset.mysparkproc(num INT64)

    詳細については、このドキュメントの「IN パラメータとして値を渡す」または「OUT パラメータと INOUT パラメータとして値を渡す」をご覧ください。

  • CONNECTION_PROJECT_ID: Spark プロシージャを実行するための接続を含むプロジェクト。
  • CONNECTION_REGION: Spark プロシージャを実行するための接続を含むリージョン。例: us
  • CONNECTION_ID: 接続 ID。例: myconnection

    Google Cloud コンソールで接続の詳細を表示する場合、接続 ID は接続 ID に表示される完全修飾接続 ID の最後のセクションの値です。例: projects/myproject/locations/connection_location/connections/myconnection

  • RUNTIME_VERSION: Spark のランタイム バージョン。例: 1.1
  • MAIN_PYTHON_FILE_URI: PySpark ファイルのパス。例: gs://mybucket/mypysparkmain.py

    また、CREATE PROCEDURE ステートメントでストアド プロシージャの本文を追加する場合は、このドキュメントのインライン コードを使用するの例に示すように、LANGUAGE PYTHON AS の後に PYSPARK_CODE を追加します。

  • PYSPARK_CODE: プロシージャの本文をインラインで渡す場合の CREATE PROCEDURE ステートメント内の PySpark アプリケーションの定義

    値は文字列リテラルです。コードに引用符やバックスラッシュが含まれる場合は、エスケープするか、元の文字列として表す必要があります。たとえば、コード "\n"; は次のいずれかで表されます。

    • 引用符付き文字列: "return \"\\n\";"。引用符とバックスラッシュはエスケープされます。
    • 三重引用符付き文字列: """return "\\n";"""。バックスラッシュはエスケープされますが、引用符はエスケープされません。
    • 元の文字列: r"""return "\n";"""。エスケープは不要です。
    インラインの PySpark コードを追加する方法については、インライン コードを使用するをご覧ください。
  • CONTAINER_IMAGE: Artifact Registry 内のイメージのパス。プロシージャで使用するライブラリのみを含める必要があります。指定しない場合、ランタイム バージョンに関連付けられたシステムのデフォルトのコンテナ イメージが使用されます。

Spark を使用してカスタム コンテナ イメージをビルドする方法については、カスタム コンテナ イメージをビルドするをご覧ください。

Spark ストアド プロシージャを呼び出す

ストアド プロシージャを作成したら、次のいずれかの方法で呼び出すことができます。

コンソール

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. [エクスプローラ] ペインでプロジェクトを開き、実行する Spark のストアド プロシージャを選択します。

  3. [ストアド プロシージャ情報] ウィンドウで、[ストアド プロシージャを呼び出す] をクリックします。あるいは、[アクションを表示] オプションを開いて [呼び出し] をクリックします。

  4. [実行] をクリックします。

  5. [すべての結果] セクションで、[結果を表示] をクリックします。

  6. 省略可: [クエリ結果] セクションで、次の操作を行います。

    • Spark ドライバのログを表示するには、[実行の詳細] をクリックします。

    • Cloud Logging でログを表示する場合、[ジョブ情報] をクリックし、[ログ] フィールドで [ログ] をクリックします。

    • Spark History Server エンドポイントを取得する場合は、[ジョブ情報]、[Spark History Server] の順にクリックします。

SQL

ストアド プロシージャを呼び出すには、CALL PROCEDURE ステートメントを使用します。

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで次のステートメントを入力します。

    CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
    

  3. [実行] をクリックします。

クエリの実行方法については、インタラクティブ クエリを実行するをご覧ください。

カスタム サービス アカウントを使用する

Spark コード内のデータにアクセスする場合、データアクセスに Spark 接続のサービス ID を使用する代わりに、カスタム サービス アカウントを使用できます。

カスタム サービス アカウントを使用するには、Spark ストアド プロシージャの作成時に INVOKER セキュリティ モードを指定します(EXTERNAL SECURITY INVOKER ステートメントを使用)。また、ストアド プロシージャを呼び出すときに、サービス アカウントを指定します。

Cloud Storage から Spark コードにアクセスして使用するには、Spark 接続のサービス ID に必要な権限を付与する必要があります。接続のサービス アカウントに storage.objects.get IAM 権限または storage.objectViewer IAM ロールを付与する必要があります。

接続で Dataproc Metastore と Dataproc Persistent History Server を指定した場合、これらに対するアクセス権を接続のサービス アカウントに付与できます。詳細については、サービス アカウントへのアクセス権を付与するをご覧ください。

CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT)
  EXTERNAL SECURITY INVOKER
  WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID`
  OPTIONS (
      engine="SPARK", runtime_version="RUNTIME_VERSION",
      main_file_uri=["MAIN_PYTHON_FILE_URI"]);
  LANGUAGE PYTHON [AS PYSPARK_CODE]

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

必要に応じて、次のコードに次の引数を追加できます。

SET @@spark_proc_properties.staging_bucket='BUCKET_NAME';
SET @@spark_proc_properties.staging_dataset_id='DATASET';

次のように置き換えます。

  • CUSTOM_SERVICE_ACCOUNT: 必須。ユーザーが提供するカスタム サービス アカウント。
  • BUCKET_NAME: 省略可。デフォルトの Spark アプリケーション ファイル システムとして使用される Cloud Storage バケット。指定しない場合、プロジェクトにデフォルトの Cloud Storage バケットが作成され、同じプロジェクトで実行されるすべてのジョブでバケットが共有されます。
  • DATASET: 省略可。プロシージャの呼び出しによって生成される一時データを格納するデータセット。ジョブが完了すると、データはクリーンアップされます。指定しなかった場合、ジョブ用にデフォルトの一時データセットが作成されます。

カスタム サービス アカウントには、次の権限が必要です。

  • デフォルトの Spark アプリケーション ファイル システムとして使用されるステージング バケットに読み書きするには:

    • 指定したステージング バケットに対する storage.objects.* 権限または roles/storage.objectAdmin IAM ロール。
    • また、ステージング バケットが指定されていない場合は、プロジェクトに対する storage.buckets.* 権限または roles/storage.Admin IAM ロール。
  • (省略可)BigQuery との間でデータを読み書きするには:

    • BigQuery テーブルに対する bigquery.tables.*
    • プロジェクトに対する bigquery.readsessions.*
    • roles/bigquery.admin IAM ロールには、以前の権限が含まれています。
  • (省略可)Cloud Storage との間でデータの読み取りと書き込みを行うには:

    • Cloud Storage オブジェクトに対する storage.objects.* 権限または roles/storage.objectAdmin IAM ロール。
  • (省略可)INOUT/OUT パラメータに使用されるステージング データセットの読み取りと書き込みを行うには:

    • 指定したステージング データセットに対する bigquery.tables.* または roles/bigquery.dataEditor IAM ロール。
    • また、ステージング データセットが指定されていない場合は、プロジェクトに対する bigquery.datasets.create 権限または roles/bigquery.dataEditor IAM ロール。

Spark ストアド プロシージャの例

このセクションでは、Apache Spark 用ストアド プロシージャの作成例を示します。

Cloud Storage で PySpark または JAR ファイルを使用する

次の例は、my-project-id.us.my-connection 接続と、Cloud Storage バケットに保存されている PySpark または JAR ファイルを使用して、Spark 用ストアド プロシージャを作成する方法を示しています。

Python

CREATE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py")
LANGUAGE PYTHON

Java または Scala

main_file_uri を使用してストアド プロシージャを作成します。

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar")
LANGUAGE SCALA

main_class を使用してストアド プロシージャを作成します。

CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1",
main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"])
LANGUAGE SCALA

インライン コードを使用する

次の例は、接続 my-project-id.us.my-connection とインライン PySpark コードを使用して Spark 用ストアド プロシージャを作成する方法を示しています。

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
  .option("table", "bigquery-public-data:samples.shakespeare") \
  .load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("wordcount_dataset.wordcount_output")
"""

入力パラメータとして値を渡す

次の例では、Python で入力パラメータとして値を渡す 2 つの方法を示しています。

方法 1: 環境変数を使用する

PySpark コードでは、Spark ドライバとエグゼキュータで環境変数を使用して、Spark 用ストアド プロシージャの入力パラメータを取得できます。環境変数の名前の形式は BIGQUERY_PROC_PARAM.PARAMETER_NAME です。ここで、PARAMETER_NAME は入力パラメータの名前です。たとえば、入力パラメータの名前が var の場合、対応する環境変数の名前は BIGQUERY_PROC_PARAM.var です。入力パラメータは JSON でエンコードされています。PySpark コードで、環境変数から JSON 文字列の入力パラメータ値を取得し、Python 変数にデコードできます。

次の例は、INT64 型の入力パラメータの値を PySpark コードに取得する方法を示しています。

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
import os
import json

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
sc = spark.sparkContext

# Get the input parameter num in JSON string and convert to a Python variable
num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))

"""

方法 2: 組み込みライブラリを使用する

PySpark コードで組み込みライブラリをインポートして、すべてのタイプのパラメータを自動的に入力します。パラメータをエグゼキュータに渡すには、Spark ドライバのパラメータを Python 変数として設定し、その値をエグゼキュータに渡します。組み込みライブラリは、INTERVALGEOGRAPHYNUMERICBIGNUMERIC を除く、ほとんどの BigQuery データ型をサポートしています。

BigQuery のデータ型 Python のデータ型
BOOL bool
STRING str
FLOAT64 float
INT64 int
BYTES bytes
DATE datetime.date
TIMESTAMP datetime.datetime
TIME datetime.time
DATETIME datetime.datetime
Array Array
Struct Struct
JSON Object
NUMERIC サポート対象外
BIGNUMERIC サポート対象外
INTERVAL サポート対象外
GEOGRAPHY サポート対象外

次の例は、組み込みライブラリをインポートして、INT64 型の入力パラメータと ARRAY<STRUCT<a INT64, b STRING>> 型の入力パラメータを PySpark コードに設定する方法を示しています。

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>)
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession
from bigquery.spark.procedure import SparkProcParamContext

def check_in_param(x, num):
  return x['a'] + num

def main():
  spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()
  sc=spark.sparkContext
  spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

  # Get the input parameter num of type INT64
  num = spark_proc_param_context.num

  # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>>
  info = spark_proc_param_context.info

  # Pass the parameter to executors
  df = sc.parallelize(info)
  value = df.map(lambda x : check_in_param(x, num)).sum()

main()
"""

Java または Scala のコードでは、Spark ドライバとエグゼキュータの環境変数を使用して、Spark 用ストアド プロシージャの入力パラメータを取得できます。環境変数の名前の形式は BIGQUERY_PROC_PARAM.PARAMETER_NAME です。ここで、PARAMETER_NAME は入力パラメータの名前です。たとえば、入力パラメータの名前が var の場合、対応する環境変数の名前は BIGQUERY_PROC_PARAM.var です。Java または Scala のコードでは、環境変数から入力パラメータ値を取得できます。

次の例は、環境変数から Scala コードへの入力パラメータの値を取得する方法を示しています。

val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get

次の例は、環境変数から Java コードへの入力パラメータを取得する方法を示しています。

String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");

OUT パラメータと INOUT パラメータとして値を渡す

出力パラメータは Spark プロシージャからの値を返します。INOUT パラメータは、プロシージャの値を受け取って、プロシージャからの値を返します。OUT パラメータと INOUT パラメータを使用するには、Spark プロシージャを作成するときにパラメータ名の前に OUT キーワードまたは INOUT キーワードを追加します。PySpark コードでは、組み込みライブラリを使用して値を OUT または INOUT パラメータとして返します。入力パラメータと同様に、組み込みライブラリは INTERVALGEOGRAPHYNUMERICBIGNUMERIC を除くほとんどの BigQuery データ型をサポートしています。TIME 型と DATETIME 型の値は、OUT または INOUT パラメータとして返されるときに UTC タイムゾーンに変換されます。


CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON)
WITH CONNECTION `my_bq_project.my_dataset.my_connection`
OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS
R"""
from pyspark.sql.session import SparkSession
import datetime
from bigquery.spark.procedure import SparkProcParamContext

spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()
spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)

# Reading the IN and INOUT parameter values.
int = spark_proc_param_context.int
dt = spark_proc_param_context.datetime
print("IN parameter value: ", int, ", INOUT parameter value: ", dt)

# Returning the value of the OUT and INOUT parameters.
spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.b = True
spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]
spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)
spark_proc_param_context.f = 20.23
spark_proc_param_context.bs = b"hello"
spark_proc_param_context.date = datetime.date(1985, 4, 12)
spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)
spark_proc_param_context.js = {"name": "Alice", "age": 30}
""";

Hive メタストア テーブルから読み取って結果を BigQuery に書き込む

次の例は、Hive メタストア テーブルを変換し、結果を BigQuery に書き込む方法を示しています。

CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc()
WITH CONNECTION `my-project-id.us.my-connection`
OPTIONS(engine="SPARK", runtime_version="1.1")
LANGUAGE PYTHON AS R"""
from pyspark.sql import SparkSession

spark = SparkSession \
   .builder \
   .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \
   .enableHiveSupport() \
   .getOrCreate()

spark.sql("CREATE DATABASE IF NOT EXISTS records")

spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)")

spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)")

df = spark.sql("SELECT * FROM records.student")

df.write.format("bigquery") \
  .option("writeMethod", "direct") \
  .save("records_dataset.student")
"""

ログフィルタを表示する

Spark 用ストアド プロシージャを呼び出した後に、ログ情報を表示できます。Cloud Logging のフィルタ情報と Spark History Cluster エンドポイントを取得するには、bq show コマンドを使用します。フィルタ情報は、子ジョブの SparkStatistics フィールドで確認できます。ログフィルタを取得するには、次の操作を行います。

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで、ストアド プロシージャのスクリプト ジョブの子ジョブを一覧表示します。

    bq ls -j --parent_job_id=$parent_job_id
    

    ジョブ ID を取得する方法については、ジョブの詳細を表示するをご覧ください。

    出力は次のようになります。

                    jobId                         Job Type     State       Start Time         Duration
    ---------------------------------------------- ---------   ---------  ---------------  ----------------
    script_job_90fb26c32329679c139befcc638a7e71_0   query      SUCCESS   07 Sep 18:00:27   0:05:15.052000
    
  3. ストアド プロシージャに対する jobId を特定し、bq show コマンドを使用して、ジョブの詳細を表示します。

    bq show --format=prettyjson --job $child_job_id
    

    別の手順で必要になるため、sparkStatistics フィールドをコピーします。

    出力は次のようになります。

    {
    "configuration": {...}
    …
    "statistics": {
     …
      "query": {
        "sparkStatistics": {
          "loggingInfo": {
            "projectId": "myproject",
            "resourceType": "myresource"
          },
          "sparkJobId": "script-job-90f0",
          "sparkJobLocation": "us-central1"
        },
        …
      }
    }
    }
    

  4. Logging 用に、SparkStatistics フィールドを使用してログフィルタを生成します。

    resource.type = sparkStatistics.loggingInfo.resourceType
    resource.labels.resource_container=sparkStatistics.loggingInfo.projectId
    resource.labels.spark_job_id=sparkStatistics.sparkJobId
    resource.labels.location=sparkStatistics.sparkJobLocation
    
    

    ログは、bigquery.googleapis.com/SparkJob モニタリング対象リソースに書き込まれます。ログには、INFODRIVEREXECUTOR コンポーネントのラベルが付けられます。Spark ドライバからログをフィルタするには、ログフィルタに labels.component = "DRIVER" コンポーネントを追加します。Spark エグゼキュータからログをフィルタするには、ログフィルタに labels.component = "EXECUTOR" コンポーネントを追加します。

顧客管理の暗号鍵を使用する

BigQuery Spark プロシージャは、顧客管理の暗号鍵(CMEK)と BigQuery のデフォルトの暗号化を使用してコンテンツを保護します。Spark プロシージャで CMEK を使用するには、まず BigQuery の暗号化サービス アカウントの作成をトリガーし、必要な権限を付与します。プロジェクトに適用されている場合、Spark プロシージャは CMEK の組織のポリシーもサポートします。

ストアド プロシージャが INVOKER セキュリティ モードを使用している場合は、プロシージャを呼び出すときに SQL システム変数を使用して CMEK を指定する必要があります。それ以外の場合は、ストアド プロシージャに関連付けられた接続を介して CMEK を指定できます。

Spark ストアド プロシージャの作成時に接続を介して CMEK を指定するには、次のサンプルコードを使用します。

bq mk --connection --connection_type='SPARK' \
 --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \
 --project_id=PROJECT_ID \
 --location=LOCATION \
 CONNECTION_NAME

プロシージャを呼び出すときに SQL システム変数を使用して CMEK を指定するには、次のサンプルコードを使用します。

SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT';
SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME;
CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();

VPC Service Controls を使用する

VPC Service Controls を使用すると、データ漏洩を防ぐためのセキュアな境界を設定できます。セキュリティを強化するために、Spark プロシージャで VPC Service Controls を使用するには、まずサービス境界を作成します。

Spark プロシージャ ジョブを完全に保護するには、次の API をサービス境界に追加します。

  • BigQuery API(bigquery.googleapis.com
  • Cloud Logging API(logging.googleapis.com
  • Cloud Storage を使用している場合: Cloud Storage API(storage.googleapis.com
  • カスタム コンテナを使用する場合: Artifact Registry API(artifactregistry.googleapis.com)または Container Registry API(containerregistry.googleapis.com
  • Dataproc Metastore を使用する場合: Dataproc Metastore API(metastore.googleapis.com)、Cloud Run Admin API(run.googleapis.com

Spark プロシージャのクエリ プロジェクトを境界に追加します。Spark コードやデータをホストする他のプロジェクトを境界に追加します。

ベスト プラクティス

  • プロジェクトで初めて接続を使用する場合は、プロビジョニングにさらに 1 分ほどかかります。Spark ストアド プロシージャを作成するときに既存の Spark 接続を再利用すると時間を短縮できます。

  • 本番環境で使用するために Spark プロシージャを作成する場合は、ランタイム バージョンを指定することをおすすめします。サポートされているランタイム バージョンのリストについては、Dataproc Serverless ランタイム バージョンをご覧ください。長期サポート(LTS)バージョンの使用をおすすめします。

  • Spark プロシージャでカスタム コンテナを指定する場合は、Artifact Registry とイメージ ストリーミングを使用することをおすすめします。

  • パフォーマンスを向上させるには、Spark プロシージャでリソース割り当てプロパティを指定します。Spark ストアド プロシージャは、Dataproc Serverless と同じリソース割り当てプロパティのリストをサポートしています。

制限事項

  • Dataproc Metastore に接続できるのは、gRPC エンドポイント プロトコルのみです。他のタイプの Hive メタストアはまだサポートされていません。
  • 顧客管理の暗号鍵(CMEK)は、顧客が単一リージョンの Spark プロシージャを作成する場合にのみ使用できます。グローバル リージョンの CMEK 鍵とマルチリージョンの CMEK 鍵(EUUS など)はサポートされていません。
  • 出力パラメータの引き渡しは PySpark でのみサポートされています。
  • Spark 用ストアド プロシージャに関連付けられたデータセットが、クロスリージョン データセット レプリケーションによって宛先リージョンに複製される場合、ストアド プロシージャは、作成されたリージョン内でのみクエリされます。

割り当てと上限

割り当てと上限の詳細については、Spark 用ストアド プロシージャの割り当てと上限をご覧ください。

次のステップ