リモート関数を使用する

BigQuery リモート関数を使用すると、SQL と JavaScript 以外の言語や、BigQuery ユーザー定義関数で許可されていないライブラリやサービスを使用して関数を実装できます。

概要

BigQuery リモート関数は、Cloud Run 関数Cloud Run と直接統合することで、GoogleSQL 機能を BigQuery 以外のソフトウェアに組み込むことができます。BigQuery リモート関数を使用すると、サポートされている言語を使用して実装された Cloud Run 関数または Cloud Run に関数をデプロイしてから、その関数を GoogleSQL クエリから呼び出すことができます。

ワークフロー

  1. Cloud Run 関数または Cloud Run で HTTP エンドポイントを作成します。
  2. BigQuery にリモート関数を作成します。
    1. CLOUD_RESOURCE 型の接続を作成します。
    2. リモート関数を作成します。
  3. 他のユーザー定義関数と同様に、クエリ内でリモート関数を使用します。

制限事項

  • リモート関数は、引数の型または戻り値の型として、次のデータ型のいずれかのみをサポートします。

    • ブール値
    • バイト
    • 数値
    • 文字列
    • 日付
    • 日時
    • 時間
    • タイムスタンプ
    • JSON

    リモート関数では、ARRAYSTRUCTINTERVALGEOGRAPHY 型がサポートされていません。

  • 一時的なリモート関数は作成できません。

  • テーブル値のリモート関数は作成できません。

  • マテリアライズド ビューの作成時には、リモート関数を使用できません。

  • リモート関数の戻り値は変動すると常に想定されるため、リモート関数を呼び出したクエリの結果はキャッシュに保存されません。

  • 一時的なネットワーク エラーや BigQuery の内部エラーが原因で、レスポンスが成功した後も、同じデータのリクエストがエンドポイントに対して繰り返し表示されることがあります。

  • 条件式MERGE ステートメントWHEN [NOT] MATCHED を使うなど、短絡により一部の行でリモート関数の評価がスキップされる場合、リモート関数でバッチ処理は使用されません。この場合、HTTP リクエスト本文calls フィールドには 1 つだけ要素が含まれます。

  • リモート関数に関連付けられたデータセットが、クロスリージョン データセット レプリケーションによって宛先リージョンに複製されたものである場合、リモート関数は、その関数が作成されたリージョン内でのみクエリされます。

エンドポイントを作成する

ビジネス ロジックを実装できるリモート関数を作成するには、Cloud Run 関数または Cloud Run を使用して HTTP エンドポイントを作成する必要があります。エンドポイントは、単一の HTTP POST リクエストで行のバッチを処理し、バッチの結果を HTTP レスポンスとして返す必要があります。

BigQuery DataFrames を使用してリモート関数を作成する場合、HTTP エンドポイントを手動で作成する必要はありません。この処理は自動的に行われます。

Cloud Run 関数の作成、デプロイ、テスト、メンテナンスの方法については、Cloud Run 関数のチュートリアルとその他の Cloud Run 関数のドキュメントをご覧ください。

Cloud Run サービスの作成、デプロイ、テスト、メンテナンスの方法については、Cloud Run のクイックスタートCloud Run のドキュメントをご覧ください。

Cloud Run 関数または Cloud Run サービスの未認証の呼び出しを許可するのではなく、デフォルトの認証を使用することをおすすめします。

入力形式

BigQuery は、次の形式の JSON 本文を含む HTTP POST リクエストを送信します。

フィールド名 説明 フィールド タイプ
requestId リクエストの ID。GoogleSQL クエリでこのエンドポイントに送信される複数のリクエスト全体で一意です。 常に提供されます。文字列。
caller リモート関数を呼び出す GoogleSQL クエリのジョブの完全なリソース名。 常に提供されます。文字列。
sessionUser GoogleSQL クエリを実行するユーザーのメールアドレス。 常に提供されます。文字列。
userDefinedContext BigQuery でリモート関数を作成するときに使用されたユーザー定義のコンテキスト。 省略可。Key-Value ペアを含む JSON オブジェクト。
calls 入力データのバッチ。 常に提供されます。JSON 配列。

各要素自体は JSON 配列です。これは、1 つのリモート関数呼び出しの JSON エンコードされた引数リストです。

リクエストの例:

{
 "requestId": "124ab1c",
 "caller": "//bigquery.googleapis.com/projects/myproject/jobs/myproject:US.bquxjob_5b4c112c_17961fafeaf",
 "sessionUser": "test-user@test-company.com",
 "userDefinedContext": {
  "key1": "value1",
  "key2": "v2"
 },
 "calls": [
  [null, 1, "", "abc"],
  ["abc", "9007199254740993", null, null]
 ]
}

出力形式

BigQuery では、エンドポイントで次の形式で HTTP レスポンスが返されることが想定されています。そうでない場合、BigQuery はそれを消費できず、リモート関数を呼び出すクエリが失敗します。

フィールド名 説明 値の範囲
replies 戻り値のバッチ。 レスポンスを成功させるには必須。JSON 配列。

各要素は、外部関数の JSON エンコードされた戻り値に対応しています。

配列のサイズは、HTTP リクエスト内の calls の JSON 配列のサイズと一致させる必要があります。たとえば、calls の JSON 配列に 4 つの要素がある場合、この JSON 配列にも 4 つの要素が必要です。

errorMessage 200 以外の HTTP レスポンス コードが返される場合のエラー メッセージ。再試行不可能なエラーの場合、これは BigQuery ジョブのエラー メッセージの一部としてユーザーに返されます。 省略可。文字列。サイズは 1 KB 未満にする必要があります。

成功したレスポンスの例:

{
  "replies": [
    1,
    0
  ]
}

失敗レスポンスの例:

{
  "errorMessage": "Received but not expected that the argument 0 be null".
}

HTTP レスポンス コード

成功したレスポンスの場合、エンドポイントから HTTP レスポンス コード 200 が返されます。それ以外の値を受け取ると、BigQuery はレスポンスを失敗とみなし、HTTP レスポンス コードが 408、429、500、503、504 の場合は、なんらかの内部上限に達するまで再試行します。

SQL データ型の JSON エンコード

HTTP リクエスト/レスポンスでの JSON エンコードは、TO_JSON_STRING 関数用の既存の BigQuery JSON エンコードに従います。

Cloud Run 関数のコードサンプル

次のサンプル Python コードは、リモート関数のすべての整数引数の追加を実装します。バッチ呼び出しの引数を使用してリクエストを処理し、レスポンスですべての結果を返します。

import functions_framework

from flask import jsonify

# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992

@functions_framework.http
def batch_add(request):
  try:
    return_value = []
    request_json = request.get_json()
    calls = request_json['calls']
    for call in calls:
      return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
    replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
    return_json = jsonify( { "replies":  replies } )
    return return_json
  except Exception as e:
    return jsonify( { "errorMessage": str(e) } ), 400

関数が関数名 remote_add としてリージョン us-east1 のプロジェクト my_gcf_project にデプロイされていると想定する場合、エンドポイント https://us-east1-my_gcf_project.cloudfunctions.net/remote_add を介してアクセスできます。

Cloud Run のサンプルコード

次のサンプル Python コードは、同じ機能用に Cloud Run にビルドしてデプロイできるウェブサービスを実装します。

import os

from flask import Flask, request, jsonify

# Max INT64 value encoded as a number in JSON by TO_JSON_STRING. Larger values are encoded as
# strings.
# See https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions#json_encodings
_MAX_LOSSLESS=9007199254740992

app = Flask(__name__)

@app.route("/", methods=['POST'])
def batch_add():
  try:
    return_value = []
    request_json = request.get_json()
    calls = request_json['calls']
    for call in calls:
      return_value.append(sum([int(x) if isinstance(x, str) else x for x in call if x is not None]))
    replies = [str(x) if x > _MAX_LOSSLESS or x < -_MAX_LOSSLESS else x for x in return_value]
    return jsonify( { "replies" :  replies } )
  except Exception as e:
    return jsonify( { "errorMessage": str(e) } ), 400

if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

コードをビルドしてデプロイする方法については、ガイドをご覧ください。

Cloud Run サービスが、サービス名 remote_add としてリージョン us-east1 内のプロジェクト my_gcf_project にデプロイされていることを前提とすると、そのサービスはエンドポイント https://remote_add-<project_id_hash>-ue.a.run.app からアクセスできます。

リモート関数を作成する

BigQuery は CLOUD_RESOURCE 接続を使用して、Cloud Run 関数とやり取りします。リモート関数を作成するには、CLOUD_RESOURCE 接続を作成する必要があります。プロジェクト IAM 管理者(roles/resourcemanager.projectIamAdmin)ロールが付与されている場合、BigQuery DataFrames を使用してリモート関数を作成するときには、接続を手動で作成してアクセス権限を付与する必要はありません。この処理は自動的に行われます。

接続を作成する

Cloud Run 関数と Cloud Run に接続するには、Cloud リソースの接続が必要です。

次のオプションのいずれかを選択します。

コンソール

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. 接続を作成するには、[追加] をクリックし、続いて [外部データソースへの接続] をクリックします。

  3. [接続タイプ] リストで、[Vertex AI リモートモデル、リモート関数、BigLake(Cloud リソース)] を選択します。

  4. [接続 ID] フィールドに接続の名前を入力します。

  5. [接続を作成] をクリックします。

  6. [接続へ移動] をクリックします。

  7. [接続情報] ペインで、次の手順で使用するサービス アカウント ID をコピーします。

bq

  1. コマンドライン環境で接続を作成します。

    bq mk --connection --location=REGION --project_id=PROJECT_ID \
        --connection_type=CLOUD_RESOURCE CONNECTION_ID
    

    --project_id パラメータは、デフォルト プロジェクトをオーバーライドします。

    次のように置き換えます。

    接続リソースを作成すると、BigQuery は、一意のシステム サービス アカウントを作成し、それを接続に関連付けます。

    トラブルシューティング: 次の接続エラーが発生した場合は、Google Cloud SDK を更新します。

    Flags parsing error: flag --connection_type=CLOUD_RESOURCE: value should be one of...
    
  2. 後の手順で使用するため、サービス アカウント ID を取得してコピーします。

    bq show --connection PROJECT_ID.REGION.CONNECTION_ID
    

    出力は次のようになります。

    name                          properties
    1234.REGION.CONNECTION_ID     {"serviceAccountId": "connection-1234-9u56h9@gcp-sa-bigquery-condel.iam.gserviceaccount.com"}
    

Terraform

main.tf ファイルに次のセクションを追加します。

 ## This creates a cloud resource connection.
 ## Note: The cloud resource nested object has only one output only field - serviceAccountId.
 resource "google_bigquery_connection" "connection" {
    connection_id = "CONNECTION_ID"
    project = "PROJECT_ID"
    location = "REGION"
    cloud_resource {}
}        
次のように置き換えます。

アクセス権限を設定する

Cloud Run 関数または Cloud Run サービスに対する読み取り専用権限を新しい接続に付与する必要があります。Cloud Run 関数または Cloud Run サービスに対する未認証の呼び出しを許可することはおすすめしません。

ロールを付与するには、次の操作を行います。

  1. [IAM と管理] ページに移動します。

    [IAM と管理] に移動

  2. [追加] をクリックします。

    [プリンシパルを追加] ダイアログが開きます。

  3. [新しいプリンシパル] フィールドに、前の手順でコピーしたサービス アカウント ID を入力します。

  4. [ロールを選択] フィールドで、次のいずれかのオプションを選択します。

    • 第 1 世代の Cloud Run 関数を使用している場合は、[Cloud Function] を選択して、[Cloud Function Invoker role] を選択します。
    • 第 2 世代の Cloud Run 関数を使用している場合は、[Cloud Run] を選択して、[Cloud Run Invoker role] を選択します。
    • Cloud Run サービスを使用している場合は、[Cloud Run] を選択して、[Cloud Run Invoker role] を選択します。
  5. [保存] をクリックします。

リモート関数を作成する

リモート関数を作成するには:

SQL

BigQuery で次の CREATE FUNCTION ステートメントを実行します。

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで次のステートメントを入力します。

    CREATE FUNCTION PROJECT_ID.DATASET_ID.remote_add(x INT64, y INT64) RETURNS INT64
    REMOTE WITH CONNECTION PROJECT_ID.LOCATION.CONNECTION_NAME
    OPTIONS (
      endpoint = 'ENDPOINT_URL'
    )
    

    次のように置き換えます。

    • DATASET_ID: BigQuery データセットの ID。
    • ENDPOINT_URL: Cloud Run 関数または Cloud Run のリモート関数エンドポイントの URL。

  3. [実行] をクリックします。

クエリの実行方法については、インタラクティブ クエリを実行するをご覧ください。

BigQuery DataFrames

BigQuery DataFrames はプレビュー版です。

  1. 必要な API を有効にし、リモート関数要件セクションに説明されているように必要なロールが付与されていることを確認します。
  2. remote_function デコレータを使用します。

    import bigframes.pandas as bpd
    
    # Set BigQuery DataFrames options
    bpd.options.bigquery.project = your_gcp_project_id
    bpd.options.bigquery.location = "us"
    
    # BigQuery DataFrames gives you the ability to turn your custom scalar
    # functions into a BigQuery remote function. It requires the GCP project to
    # be set up appropriately and the user having sufficient privileges to use
    # them. One can find more details about the usage and the requirements via
    # `help` command.
    help(bpd.remote_function)
    
    # Read a table and inspect the column of interest.
    df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins")
    df["body_mass_g"].head(10)
    
    # Define a custom function, and specify the intent to turn it into a remote
    # function. It requires a BigQuery connection. If the connection is not
    # already created, BigQuery DataFrames will attempt to create one assuming
    # the necessary APIs and IAM permissions are setup in the project. In our
    # examples we would be using a pre-created connection named
    # `bigframes-rf-conn`. We will also set `reuse=False` to make sure we don't
    # step over someone else creating remote function in the same project from
    # the exact same source code at the same time. Let's try a `pandas`-like use
    # case in which we want to apply a user defined scalar function to every
    # value in a `Series`, more specifically bucketize the `body_mass_g` value
    # of the penguins, which is a real number, into a category, which is a
    # string.
    @bpd.remote_function(
        [float],
        str,
        bigquery_connection="bigframes-rf-conn",
        reuse=False,
    )
    def get_bucket(num):
        if not num:
            return "NA"
        boundary = 4000
        return "at_or_above_4000" if num >= boundary else "below_4000"
    
    # Then we can apply the remote function on the `Series`` of interest via
    # `apply` API and store the result in a new column in the DataFrame.
    df = df.assign(body_mass_bucket=df["body_mass_g"].apply(get_bucket))
    
    # This will add a new column `body_mass_bucket` in the DataFrame. You can
    # preview the original value and the bucketized value side by side.
    df[["body_mass_g", "body_mass_bucket"]].head(10)
    
    # The above operation was possible by doing all the computation on the
    # cloud. For that, there is a google cloud function deployed by serializing
    # the user code, and a BigQuery remote function created to call the cloud
    # function via the latter's http endpoint on the data in the DataFrame.
    
    # The BigQuery remote function created to support the BigQuery DataFrames
    # remote function can be located via a property `bigframes_remote_function`
    # set in the remote function object.
    print(f"Created BQ remote function: {get_bucket.bigframes_remote_function}")
    
    # The cloud function can be located via another property
    # `bigframes_cloud_function` set in the remote function object.
    print(f"Created cloud function: {get_bucket.bigframes_cloud_function}")
    
    # Warning: The deployed cloud function may be visible to other users with
    # sufficient privilege in the project, so the user should be careful about
    # having any sensitive data in the code that will be deployed as a remote
    # function.
    
    # Let's continue trying other potential use cases of remote functions. Let's
    # say we consider the `species`, `island` and `sex` of the penguins
    # sensitive information and want to redact that by replacing with their hash
    # code instead. Let's define another scalar custom function and decorate it
    # as a remote function
    @bpd.remote_function(
        [str], str, bigquery_connection="bigframes-rf-conn", reuse=False
    )
    def get_hash(input):
        import hashlib
    
        # handle missing value
        if input is None:
            input = ""
        encoded_input = input.encode()
        hash = hashlib.md5(encoded_input)
        return hash.hexdigest()
    
    # We can use this remote function in another `pandas`-like API `map` that
    # can be applied on a DataFrame
    df_redacted = df[["species", "island", "sex"]].map(get_hash)
    df_redacted.head(10)
    
    

リモート関数を作成するデータセットに対する bigquery.routines.create 権限と、リモート関数によって使用される接続に対する bigquery.connections.delegate 権限(BigQuery Connection 管理者ロールに含まれる)が必要です。

ユーザー定義のコンテキストの提供

OPTIONSuser_defined_context を Key-Value ペアの形式として指定できます。これは、エンドポイントに対するすべての HTTP リクエストの一部になります。ユーザー定義のコンテキストでは、複数のリモート関数を作成して、単一のエンドポイントを再利用できます。これにより、そこに渡されたコンテキストに基づいて異なる動作が提供されます。

次の例では、同じエンドポイントを使用して BYTES データを暗号化、復号する 2 つのリモート関数を作成します。

CREATE FUNCTION `PROJECT_ID.DATASET_ID`.encrypt(x BYTES)
RETURNS BYTES
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
  endpoint = 'ENDPOINT_URL',
  user_defined_context = [("mode", "encryption")]
)

CREATE FUNCTION `PROJECT_ID.DATASET_ID`.decrypt(x BYTES)
RETURNS BYTES
REMOTE WITH CONNECTION `PROJECT_ID.LOCATION.CONNECTION_NAME`
OPTIONS (
  endpoint = 'ENDPOINT_URL',
  user_defined_context = [("mode", "decryption")]
)

バッチ リクエストの行数制限

Cloud Run 関数のタイムアウトが発生しないように、各 HTTP リクエストの最大行数として OPTIONSmax_batching_rows を指定できます。指定しない場合は、BigQuery がバッチに含める行数を決定します。

クエリでリモート関数を使用する

リモート関数の接続に関連付けられた BigQuery のサービス アカウントにアクセスできるように、Cloud Run 関数に対する権限が付与されていることを確認します。

また、リモート関数のあるデータセットに対する権限 bigquery.routines.get と、リモート関数で使用する接続に対する bigquery.connections.use 権限も必要です(これらの権限は BigQuery Connection User ロールを介して得られます)。

リモート関数は、ユーザー定義関数と同様、クエリで使用できます。

たとえば、remote_add 関数は、次のサンプルクエリの形で使用できます。

SELECT
  val,
  `PROJECT_ID.DATASET_ID`.remote_add(val, 2)
FROM
  UNNEST([NULL,2,3,5,8]) AS val;

この例では、次の出力が生成されます。

+------+-----+
|  val | f0_ |
+------+-----+
| NULL |   2 |
|    2 |   4 |
|    3 |   5 |
|    5 |   7 |
|    8 |  10 |
+------+-----+

サポートされているリージョン

BigQuery のロケーションには、次の 2 種類があります。

  • リージョンは、ロンドンなどの特定の地理的な場所となります。

  • マルチリージョンは、米国などの、2 つ以上の地理的な場所を含む広い地理的なエリアとなります。

単一リージョン

BigQuery の単一リージョン データセットでは、同じリージョンにデプロイされた Cloud Run 関数を使用するリモート関数のみを作成できます。次に例を示します。

  • BigQuery の単一リージョン us-east4 のリモート関数では、us-east4 の Cloud Run 関数のみを使用できます。

したがって、単一リージョンの場合、リモート関数は Cloud Run 関数と BigQuery の両方をサポートするリージョンでのみサポートされます。

マルチリージョン

BigQuery マルチリージョン(USEU)データセットでは、同じ大規模な地理的領域(米国、EU など)内のリージョンにデプロイされた Cloud Run 関数を使用するリモート関数のみを作成できます。次に例を示します。

  • BigQuery US マルチリージョン内のリモート関数では、米国内の単一リージョン(us-central1us-east4us-west2 など)にデプロイされた Cloud Run 関数のみを使用できます。
  • BigQuery EU マルチリージョン内のリモート関数では、EU の加盟国にある任意の単一リージョン(europe-north1europe-west3 など)にデプロイされている Cloud Run 関数のみを使用できます。

BigQuery のリージョンとマルチリージョンの詳細については、データセットのロケーションのページをご覧ください。Cloud Run 関数のリージョンの詳細については、Cloud Run 関数のロケーションをご覧ください。

接続

単一リージョンのロケーションまたはマルチリージョンのロケーションの場合、使用する接続と同じロケーションにのみリモート関数を作成できます。たとえば、US マルチリージョンにリモート関数を作成するには、US マルチリージョンのロケーションにある接続を使用します。

料金

  • 標準の BigQuery の料金が適用されます。

  • また、Cloud Run 関数と Cloud Run では、この機能の使用により料金が発生することがあります。詳細については、Cloud Run 関数Cloud Run の料金ページをご覧ください。

VPC Service Controls を使用する

VPC Service Controls は、Google Cloud の機能で、データ漏洩を防ぐためのセキュアな境界を設定できます。セキュリティを強化するために、リモート関数で VPC Service Controls を使用するか、internal traffic 上り(内向き)設定でエンドポイントを使用する場合は、VPC Service Controls ガイドに沿って次の操作を行ってください。

  1. サービス境界を作成する。

  2. リモート関数を使用したクエリの BigQuery プロジェクトを境界に追加する。

  3. エンドポイント プロジェクトを境界に追加し、エンドポイント タイプに基づいて制限付きサービスに Cloud Functions API または Cloud Run API を設定します。詳細については、Cloud Run 関数の VPC Service ControlsCloud Run の VPC Service Controls をご覧ください。

リモート関数のベスト プラクティス

  • 入力を事前に絞り込む: 入力をリモート関数に渡す前に、簡単に絞り込むことができれば、クエリをより高速かつ低コストで実行できます。

  • Cloud Run 関数のスケーラビリティを保つ。スケーラビリティは、最小インスタンス最大インスタンス同時実行に基づいています。

    • 可能であれば、Cloud Run 関数の最大インスタンス数にはデフォルト値を使用します。
    • 第 1 世代の HTTP Cloud Run 関数には、デフォルトの上限はありません。テスト中または本番環境において、第 1 世代の HTTP Cloud Run 関数で無制限のスケーリング イベントが発生しないように、上限を設定することをおすすめします(例: 3,000)。
  • パフォーマンスを向上させるために、他の Cloud Run 関数のヒントを採り入れます。レイテンシが大きい Cloud Run 関数とやり取りするリモート関数クエリは、タイムアウトで失敗する場合があります。

  • 失敗したレスポンスに対して適切な HTTP レスポンス コードとペイロードを返すようにエンドポイントを実装します。

    • BigQuery からの再試行を最小限に抑えるには、失敗したレスポンスに 408、429、500、503、504 以外の HTTP レスポンス コードを使用し、関数コード内のすべての例外をキャッチします。それ以外の場合、キャッチされない例外に対して HTTP サービス フレームワークは自動的に 500 を返します。失敗したデータ パーティションまたはクエリが BigQuery によって再試行されると、HTTP リクエストが再試行されることがあります。

    • エンドポイントは、失敗したレスポンスに対して定義された形式の JSON ペイロードを返す必要があります。厳密には必須ではありませんが、これによって、失敗したレスポンスが関数の実装によるものか、Cloud Run 関数 / Cloud Run のインフラストラクチャによるものかを BigQuery が区別しやすくなります。後者の場合、BigQuery は別の内部上限を使用して再試行する可能性があります。

割り当て

リモート関数の割り当てについては、割り当てと上限をご覧ください。