BigQuery Storage API を使用して BigQuery データを pandas にダウンロードする

BigQuery Storage API を使用すると、BigQuery に保存されたデータにすばやくアクセスできます。BigQuery に保存されたデータを Python 用 pandas ライブラリなどの分析ツールで使用するには、BigQuery Storage API を使用してこれらのデータをダウンロードします。

目標

  • Jupyter ノートブック内の BigQuery 用 IPython マジックから BigQuery Storage API を使用して、クエリ結果を pandas DataFrame にダウンロードする。
  • Python 用 BigQuery クライアント ライブラリを使用して、クエリ結果を pandas DataFrame にダウンロードする。
  • Python 用 BigQuery クライアント ライブラリを使用して、BigQuery テーブルのデータを pandas DataFrame にダウンロードする。
  • Python 用 BigQuery Storage API クライアント ライブラリを使用して、BigQuery テーブルのデータを pandas DataFrame にダウンロードする。

料金

BigQuery は有料プロダクトであり、クエリを実行すると BigQuery の使用料金が発生します。処理されるクエリデータは毎月 1 TB まで無料です。詳細については、BigQuery の料金ページをご覧ください。

BigQuery Storage API は有料プロダクトであり、DataFrame をダウンロードするときにスキャンするテーブルデータに対して使用料金が発生します。詳細については、BigQuery の料金ページをご覧ください。

準備

このチュートリアルを始める前に、Google Cloud Console を使用して、プロジェクトを作成または選択し、課金を有効にします。

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  6. 新しいプロジェクトでは、BigQuery が自動的に有効になります。既存のプロジェクトで BigQuery を有効にするには

    BigQuery, BigQuery Storage API API を有効にします。

    API を有効にする

  7. Python 開発環境を設定します。
    Python を設定する
  8. 開発環境に対する認証を設定します。
    認証を設定する

このチュートリアルを始める前に、BigQuery 用 IPython マジック コマンドBigQuery クライアント ライブラリpandas でのクライアント ライブラリの使用方法についても理解しておく必要があります。

クライアント ライブラリのインストール

BigQuery Python クライアント ライブラリ バージョン 1.9.0 以降と BigQuery Storage API Python クライアント ライブラリをインストールします。

PIP

google-cloud-bigquery パッケージと google-cloud-bigquery-storage パッケージをインストールします。

pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'

Conda

コミュニティが運営する conda-forge チャンネルから BigQuery パッケージと BigQuery Storage API Conda パッケージをインストールします。

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

BigQuery 用 IPython マジックを使用してクエリ結果をダウンロードする

Jupyter ノートブック サーバーを起動し、新しい Jupyter ノートブックを作成します。%load_ext マジック コマンドを使用して、BigQuery 用 IPython マジック コマンドを読み込みます。

%load_ext google.cloud.bigquery

google-cloud-bigquery Python パッケージのバージョン 1.26.0 では、%%bigquery マジックから結果をダウンロードする際にデフォルトで BigQuery Storage API が使用されます。

%%bigquery tax_forms
SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`

Python クライアント ライブラリを使用する

BigQuery クライアント ライブラリを使用してクエリ結果をダウンロードする

query メソッドを使用してクエリを実行します。to_dataframe メソッドを呼び出し、クエリが完了して結果が BigQuery Storage API によってダウンロードされるまで待ちます。

from google.cloud import bigquery

bqclient = bigquery.Client()

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(
        # Optionally, explicitly request to use the BigQuery Storage API. As of
        # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
        # API is used by default.
        create_bqstorage_client=True,
    )
)
print(dataframe.head())

BigQuery クライアント ライブラリを使用してテーブルデータをダウンロードする

list_rows メソッドを使用してテーブル内の行をすべてダウンロードします。このメソッドは RowIterator オブジェクトを返します。bqstorage_client 引数を指定して to_dataframe メソッドを呼び出すことで、BigQuery Storage API を使用して行をダウンロードします。

from google.cloud import bigquery

bqclient = bigquery.Client()

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(
    # Optionally, explicitly request to use the BigQuery Storage API. As of
    # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
    # API is used by default.
    create_bqstorage_client=True,
)
print(dataframe.head())

BigQuery Storage API クライアント ライブラリを使用してテーブルデータをダウンロードする

フィルタや並列処理のきめ細かな制御を直接行うには、BigQuery Storage API クライアント ライブラリを使用します。シンプルな行フィルタのみが必要な場合は、クエリの代わりに BigQuery Storage API 読み取りセッションを使用できます。

列の選択や行のフィルタ処理を行う TableReadOptions オブジェクトを作成します。create_read_session メソッドを使用して読み取りセッションを作成します。

セッションにストリームが存在する場合は、read_rows メソッドを使用して、そのストリームから行の読み取りを開始します。pages プロパティをループ処理します。メッセージを pandas DataFrame に変換するには、to_dataframe メソッドを呼び出します。パフォーマンスを向上させるには、複数のストリームから並列に読み取りますが、このコード例では単純にするために単一のストリームから読み取ります。

your_project_id = "project-for-read-session"
from google.cloud import bigquery_storage
from google.cloud.bigquery_storage import types
import pandas

bqstorageclient = bigquery_storage.BigQueryReadClient()

project_id = "bigquery-public-data"
dataset_id = "new_york_trees"
table_id = "tree_species"
table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = types.ReadSession.TableReadOptions(
    selected_fields=["species_common_name", "fall_color"]
)

parent = "projects/{}".format(your_project_id)

requested_session = types.ReadSession(
    table=table,
    # Avro is also supported, but the Arrow data format is optimized to
    # work well with column-oriented data structures such as pandas
    # DataFrames.
    data_format=types.DataFormat.ARROW,
    read_options=read_options,
)
read_session = bqstorageclient.create_read_session(
    parent=parent,
    read_session=requested_session,
    max_stream_count=1,
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = read_session.streams[0]
reader = bqstorageclient.read_rows(stream.name)

# Parse all Arrow blocks and create a dataframe.
frames = []
for message in reader.rows().pages:
    frames.append(message.to_dataframe())
dataframe = pandas.concat(frames)
print(dataframe.head())

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除します。このチュートリアルでは BigQuery リソースを作成しませんでしたが、プロジェクトを削除すると、作成した他のすべてのリソースも削除されます。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ