BigQuery Storage API を使用して BigQuery データを pandas にダウンロードする

BigQuery Storage API は、BigQuery に保存されたデータに高速にアクセスする手段を提供します。BigQuery に保存されたデータを Python 用 pandas ライブラリなどの分析ツールで使用するには、BigQuery Storage API を使用してこれらのデータをダウンロードします。

目標

このチュートリアルでは、次のことができます。

  • Jupyter ノートブック内の BigQuery 用 IPython マジックから BigQuery Storage API を使用して、クエリ結果を pandas DataFrame にダウンロードする。
  • Python 用 BigQuery クライアント ライブラリを使用して、クエリ結果を pandas DataFrame にダウンロードする。
  • Python 用 BigQuery クライアント ライブラリを使用して、BigQuery テーブルのデータを pandas DataFrame にダウンロードする。
  • Python 用 BigQuery Storage API クライアント ライブラリを使用して、BigQuery テーブルのデータを pandas DataFrame にダウンロードする。

料金

BigQuery は有料プロダクトであり、クエリを実行すると BigQuery の使用料金が発生します。処理されるクエリデータは毎月 1 TB まで無料です。詳細については、BigQuery の料金ページをご覧ください。

BigQuery Storage API は有料プロダクトであり、DataFrame をダウンロードするときにスキャンするテーブルデータに対して使用料金が発生します。詳細については、BigQuery の料金ページをご覧ください。

始める前に

このチュートリアルを始める前に、Google Cloud Platform Console を使用して、プロジェクトを作成または選択し、課金を有効にします。

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP プロジェクトを選択または作成します。

    [リソースの管理] ページに移動

  3. プロジェクトに対して課金が有効になっていることを確認します。

    課金を有効にする方法について

  4. 新しいプロジェクトでは、BigQuery が自動的に有効になります。既存のプロジェクトで BigQuery を有効にする手順は以下のとおりです。 BigQuery、BigQuery Storage API を有効にします。

    APIを有効にする

  5. Python 開発環境を設定します。
    Python の設定
  6. 開発環境に対する認証を設定します。
    認証の設定

このチュートリアルを始める前に、BigQuery 用 IPython マジックBigQuery クライアント ライブラリpandas でのクライアント ライブラリの使用方法についても十分に理解しておく必要があります。

クライアント ライブラリのインストール

BigQuery Storage API と pandas ライブラリの統合機能を使用するには、BigQuery Python クライアント ライブラリ バージョン 1.9.0 以降をインストールします。

pip install --upgrade google-cloud-bigquery[pandas]

BigQuery Storage API Python クライアント ライブラリ、および fastavro ライブラリと pandas ライブラリのサポートをインストールします。

pip install --upgrade google-cloud-bigquery-storage[fastavro,pandas]

BigQuery 用 IPython マジックを使用してクエリ結果をダウンロードする

Jupyter ノートブック サーバーを起動し、新しい Jupyter ノートブックを作成します。%load_ext マジックを使用して、BigQuery 用 IPython マジックを読み込みます。

%load_ext google.cloud.bigquery

BigQuery Storage API でサイズの大きいクエリ結果をダウンロードするには、%%bigquery マジックに --use_bq_storage_api 引数を追加します。

%%bigquery nodejs_deps --use_bqstorage_api
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'npm'
LIMIT 2500000

サイズの小さいクエリ結果でこの引数を使用すると、マジックが BigQuery API を使用して結果をダウンロードします。

%%bigquery stackoverflow --use_bqstorage_api
SELECT
  CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
  view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10

デフォルトで BigQuery Storage API が使用されるようにするには、context.use_bqstorage_api プロパティの値を True に設定します。

import google.cloud.bigquery.magics

google.cloud.bigquery.magics.context.use_bqstorage_api = True

context.use_bqstorage_api プロパティを設定した後、引数を追加せずに %%bigquery マジックを実行すると、BigQuery Storage API を使用してサイズの大きい結果がダウンロードされます。

%%bigquery java_deps
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'maven'
LIMIT 2500000

Python クライアント ライブラリを使用する

Python クライアントを作成する

次のコードを使用して、BigQuery Client オブジェクトと BigQueryStorageClient を作成します。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

google-auth Python ライブラリを使用して、両方の API に十分に対応できる認証情報を作成します。認証が 2 回行われないよう、それぞれのコンストラクタに認証情報オブジェクトを渡します。

BigQuery クライアント ライブラリを使用してクエリ結果をダウンロードする

query メソッドを使用してクエリを実行します。to_dataframe メソッドを呼び出してクエリが完了するまで待機し、BigQuery Storage API を使用して結果をダウンロードします。

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()

    # Note: The BigQuery Storage API cannot be used to download small query
    # results, but as of google-cloud-bigquery version 1.11.1, the
    # to_dataframe method will fallback to the tabledata.list API when the
    # BigQuery Storage API fails to read the query results.
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

BigQuery クライアント ライブラリを使用してテーブルデータをダウンロードする

list_rows メソッドを使用してテーブル内のすべての行をダウンロードします。このメソッドからは RowIterator オブジェクトが返されます。BigQuery Storage API を使用して行をダウンロードするには、bqstorage_client 引数を指定した to_dataframe メソッドを呼び出します。

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

BigQuery Storage API クライアント ライブラリを使用してテーブルデータをダウンロードする

フィルタや並列処理のきめ細かな制御を直接行うには、BigQuery Storage API クライアント ライブラリを使用します。シンプルな行フィルタのみが必要な場合は、クエリの代わりに BigQuery Storage API 読み取りセッションを使用できます。

読み取り対象のテーブルに関する TableReference オブジェクトを作成します。列の選択や行のフィルタ処理を行う TableReadOptions オブジェクトを作成します。create_read_session メソッドを使用して読み取りセッションを作成します。

セッションにストリームが存在する場合は、read_rows メソッドを使用して、そのストリームから行の読み取りを開始します。その読み取りに対して to_dataframe メソッドを呼び出し、ストリーム全体を pandas DataFrame に書き込みます。パフォーマンスを向上させるには、複数のストリームから並列に読み取りますが、このコード例では単純にするために単一のストリームから読み取ります。

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table, parent, read_options=read_options
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

すべての例のソースコード

すべての例の完全なソースコードをご覧ください。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()

    # Note: The BigQuery Storage API cannot be used to download small query
    # results, but as of google-cloud-bigquery version 1.11.1, the
    # to_dataframe method will fallback to the tabledata.list API when the
    # BigQuery Storage API fails to read the query results.
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table, parent, read_options=read_options
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

クリーンアップ

このチュートリアルで使用するリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

プロジェクトを削除します。このチュートリアルでは BigQuery リソースを作成しませんでしたが、プロジェクトを削除すると、作成した他のすべてのリソースも削除されます。

  1. GCP Console で [プロジェクト] ページに移動します。

    プロジェクト ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...