BigQuery Storage API を使用して BigQuery データを pandas にダウンロードする

BigQuery Storage API を使用すれば、BigQuery に保存されたデータにすばやくアクセスできます。BigQuery に保存されたデータを Python 用 pandas ライブラリなどの分析ツールで使用するには、BigQuery Storage API を使用してこれらのデータをダウンロードします。

目標

このチュートリアルでは、次の方法を学びます。

  • Jupyter ノートブック内の BigQuery 用 IPython マジックから BigQuery Storage API を使用して、クエリ結果を pandas DataFrame にダウンロードする。
  • Python 用 BigQuery クライアント ライブラリを使用して、クエリ結果を pandas DataFrame にダウンロードする。
  • Python 用 BigQuery クライアント ライブラリを使用して、BigQuery テーブルのデータを pandas DataFrame にダウンロードする。
  • Python 用 BigQuery Storage API クライアント ライブラリを使用して、BigQuery テーブルのデータを pandas DataFrame にダウンロードする。

料金

BigQuery は有料プロダクトであり、クエリを実行すると BigQuery の使用料金が発生します。処理されるクエリデータは毎月 1 TB まで無料です。詳細については、BigQuery の料金ページをご覧ください。

BigQuery Storage API は有料プロダクトであり、DataFrame をダウンロードするときにスキャンするテーブルデータに対して使用料金が発生します。詳細については、BigQuery の料金ページをご覧ください。

始める前に

このチュートリアルを始める前に、Google Cloud Platform Console を使用して、プロジェクトを作成または選択し、課金を有効にします。

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。 詳しくは、課金を有効にする方法をご覧ください。

  4. 新しいプロジェクトでは、BigQuery が自動的に有効になります。既存のプロジェクトで BigQuery を有効にするには、 BigQuery, BigQuery Storage API API を有効にします。

    APIを有効にする

  5. Python 開発環境を設定します。
    Python の設定
  6. 開発環境に対する認証を設定します。
    認証を設定します。

このチュートリアルを始める前に、BigQuery 用 IPython マジックBigQuery クライアント ライブラリpandas でのクライアント ライブラリの使用方法についても十分に理解しておく必要があります。

クライアント ライブラリのインストール

BigQuery Python クライアント ライブラリ バージョン 1.9.0 以降と BigQuery Storage API Python クライアント ライブラリをインストールします。

PIP

google-cloud-bigquery パッケージと google-cloud-bigquery-storage パッケージをインストールします。

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

Conda

コミュニティ ベースの conda-forge チャネルから BigQuery パッケージと BigQuery Storage API Conda パッケージをインストールします。

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

BigQuery 用 IPython マジックを使用してクエリ結果をダウンロードする

Jupyter ノートブック サーバーを起動し、新しい Jupyter ノートブックを作成します。%load_ext マジックを使用して、BigQuery 用 IPython マジックを読み込みます。

%load_ext google.cloud.bigquery

BigQuery Storage API でサイズの大きいクエリ結果をダウンロードするには、%%bigquery マジックに --use_bq_storage_api 引数を追加します。

%%bigquery tax_forms --use_bqstorage_api
SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`

サイズの小さいクエリ結果でこの引数を使用すると、BigQuery API を使用して結果がダウンロードされます。

%%bigquery stackoverflow --use_bqstorage_api
SELECT
  CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
  view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10

BigQuery Storage API がデフォルトで使用されるようにするには、context.use_bqstorage_api プロパティの値を True に設定します。

import google.cloud.bigquery.magics

google.cloud.bigquery.magics.context.use_bqstorage_api = True

context.use_bqstorage_api プロパティを設定した後、引数を追加せずに %%bigquery マジックを実行すると、BigQuery Storage API を使用してサイズの大きい結果がダウンロードされます。

%%bigquery tax_forms
SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`

Python クライアント ライブラリを使用する

Python クライアントを作成する

次のコードを使用して、BigQuery Client オブジェクトと BigQueryStorageClient を作成します。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

google-auth Python ライブラリを使用して、両方の API に十分に対応できる認証情報を作成します。認証が 2 回行われないよう、それぞれのコンストラクタに認証情報オブジェクトを渡します。

BigQuery クライアント ライブラリを使用してクエリ結果をダウンロードする

query メソッドを使用してクエリを実行します。to_dataframe メソッドを呼び出し、クエリが完了して結果が BigQuery Storage API によってダウンロードされるまで待ちます。

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

BigQuery クライアント ライブラリを使用してテーブルデータをダウンロードする

list_rows メソッドを使用してテーブル内のすべての行をダウンロードします。このメソッドは RowIterator オブジェクトを返します。bqstorage_client 引数を指定して to_dataframe メソッドを呼び出すことで、BigQuery Storage API によって行をダウンロードします。

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

BigQuery Storage API クライアント ライブラリを使用してテーブルデータをダウンロードする

フィルタや並列処理のきめ細かな制御を直接行うには、BigQuery Storage API クライアント ライブラリを使用します。シンプルな行フィルタのみが必要な場合は、クエリの代わりに BigQuery Storage API 読み取りセッションを使用できます。

読み取り対象のテーブルに関する TableReference オブジェクトを作成します。列の選択や行のフィルタ処理を行う TableReadOptions オブジェクトを作成します。create_read_session メソッドを使用して読み取りセッションを作成します。

セッションにストリームが存在する場合は、read_rows メソッドを使用して、そのストリームから行の読み取りを開始します。その読み取りに対して to_dataframe メソッドを呼び出し、ストリーム全体を pandas DataFrame に書き込みます。パフォーマンスを向上させるには、複数のストリームから並列に読み取りますが、このコード例では単純にするために単一のストリームから読み取ります。

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

すべての例のソースコード

クライアント ライブラリのすべての例の完全なソースコードをご覧ください。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

プロジェクトを削除します。このチュートリアルでは BigQuery リソースを作成しませんでしたが、プロジェクトを削除すると、作成した他のすべてのリソースも削除されます。

  1. GCP Console で [プロジェクト] ページに移動します。

    プロジェクト ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。