使用 BigQuery Storage API 将 BigQuery 数据下载到 Pandas

通过 BigQuery Storage API,您可以快速访问在 BigQuery 中存储的数据。使用 BigQuery Storage API 可以下载在 BigQuery 中存储的数据,用于 Python 版 Pandas 库等分析工具。

目标

  • 在 Jupyter 笔记本中,从适用于 BigQuery 的 IPython 魔法命令使用 BigQuery Storage API,将查询结果下载到 Pandas DataFrame。
  • 通过使用 Python 版 BigQuery 客户端库将查询结果下载到 Pandas DataFrame。
  • 通过使用 Python 版 BigQuery 客户端库将 BigQuery 表数据下载到 Pandas DataFrame。
  • 通过使用 Python 版 BigQuery Storage API 客户端库将 BigQuery 表数据下载到 Pandas DataFrame。

费用

BigQuery 是一款付费产品,您通过它运行的查询会产生 BigQuery 使用费用。每月处理的前 1 TB 查询数据免费。如需了解详情,请参阅 BigQuery 价格页面。

BigQuery Storage API 是一款付费产品,您在下载 DataFrame 时扫描的表数据会产生使用费用。如需了解详情,请参阅 BigQuery 价格页面。

准备工作

在开始学习本教程之前,请使用 Google Cloud Console 创建或选择一个项目并启用结算功能。

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  4. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  5. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

  6. 新项目会自动启用 BigQuery。如需在现有项目中启用 BigQuery,请转到

    启用 BigQuery, BigQuery Storage API API。

    启用 API

  7. 设置 Python 开发环境。
    设置 Python
  8. 为您的开发环境设置身份验证。
    设置身份验证

在完成本教程之前,您还应熟悉适用于 BigQuery 的 IPython 魔法命令BigQuery 客户端库以及如何将该客户端库与 Pandas 搭配使用

安装客户端库

安装 BigQuery Python 客户端库 1.9.0 版或更高版本以及 BigQuery Storage API Python 客户端库。

PIP

安装 google-cloud-bigquerygoogle-cloud-bigquery-storage 软件包。

pip install --upgrade 'google-cloud-bigquery[bqstorage,pandas]'

Conda

从社区运行的 conda-forge 渠道安装 BigQueryBigQuery Storage API Conda 软件包。

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

使用适用于 BigQuery 的 IPython 魔法命令下载查询结果

启动 Jupyter 笔记本服务器并创建一个新的 Jupyter 笔记本。使用 %load_ext 魔法命令加载适用于 BigQuery 的 IPython 魔法命令。

%load_ext google.cloud.bigquery

google-cloud-bigquery Python 软件包 1.26.0 版开始,默认使用 BigQuery Storage API 从 %%bigquery 魔法命令下载结果。

%%bigquery tax_forms
SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`

使用 Python 客户端库

使用 BigQuery 客户端库下载查询结果

使用 query 方法运行查询。调用 to_dataframe 方法来等待查询完成,并使用 BigQuery Storage API 下载结果。

from google.cloud import bigquery

bqclient = bigquery.Client()

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(
        # Optionally, explicitly request to use the BigQuery Storage API. As of
        # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
        # API is used by default.
        create_bqstorage_client=True,
    )
)
print(dataframe.head())

使用 BigQuery 客户端库下载表数据

使用 list_rows 方法下载表中的所有行,该方法会返回一个 RowIterator 对象。如需使用 BigQuery Storage API 下载各行,将 to_dataframe 方法与 bqstorage_client 参数一起调用。

from google.cloud import bigquery

bqclient = bigquery.Client()

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(
    # Optionally, explicitly request to use the BigQuery Storage API. As of
    # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
    # API is used by default.
    create_bqstorage_client=True,
)
print(dataframe.head())

使用 BigQuery Storage API 客户端库下载表数据

直接使用 BigQuery Storage API 客户端库以精确控制过滤器和并行性。如果仅需要简单的行过滤器,可以使用 BigQuery Storage API 读取会话来代替查询。

创建 TableReadOptions 对象以选择列或过滤行。使用 create_read_session 方法创建读取会话。

如果会话中有任何流,则使用 read_rows 方法从中开始读取行。循环遍历 pages 属性。调用 to_dataframe 方法以将消息转换为 Pandas DataFrame。为了获得更好的性能,请从多个流中并行读取,但为简单起见,此代码示例仅从单个流中读取。

your_project_id = "project-for-read-session"
from google.cloud import bigquery_storage
from google.cloud.bigquery_storage import types
import pandas

bqstorageclient = bigquery_storage.BigQueryReadClient()

project_id = "bigquery-public-data"
dataset_id = "new_york_trees"
table_id = "tree_species"
table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = types.ReadSession.TableReadOptions(
    selected_fields=["species_common_name", "fall_color"]
)

parent = "projects/{}".format(your_project_id)

requested_session = types.ReadSession(
    table=table,
    # Avro is also supported, but the Arrow data format is optimized to
    # work well with column-oriented data structures such as pandas
    # DataFrames.
    data_format=types.DataFormat.ARROW,
    read_options=read_options,
)
read_session = bqstorageclient.create_read_session(
    parent=parent,
    read_session=requested_session,
    max_stream_count=1,
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = read_session.streams[0]
reader = bqstorageclient.read_rows(stream.name)

# Parse all Arrow blocks and create a dataframe.
frames = []
for message in reader.rows().pages:
    frames.append(message.to_dataframe())
dataframe = pandas.concat(frames)
print(dataframe.head())

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除您的项目。在本教程中,您没有创建任何 BigQuery 资源,删除您的项目将移除您创建的所有其他资源。

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤