使用 BigQuery Storage API 将 BigQuery 数据下载到 Pandas

通过 BigQuery Storage API,您可以快速访问在 BigQuery 中存储的数据。使用 BigQuery Storage API 可以下载 BigQuery 中存储的数据,用于 Python 版本 Pandas 库等分析工具。

目标

在本教程中,您将执行以下操作:

  • 在 Jupyter 笔记本中,从适用于 BigQuery 的 IPython 魔法命令使用 BigQuery Storage API,将查询结果下载到 Pandas DataFrame。
  • 通过使用 Python 版 BigQuery 客户端库将查询结果下载到 Pandas DataFrame。
  • 通过使用 Python 版 BigQuery 客户端库将 BigQuery 表数据下载到 Pandas DataFrame。
  • 通过使用 Python 版 BigQuery Storage API 客户端库将 BigQuery 表数据下载到 Pandas DataFrame。

费用

BigQuery 是一款付费产品,您通过它运行的查询会产生 BigQuery 使用费用。每月处理的前 1 TB 查询数据免费。如需了解详情,请参阅 BigQuery 价格页面。

BigQuery Storage API 是一款付费产品,您在下载 DataFrame 时扫描的表数据会产生使用费用。如需了解详情,请参阅 BigQuery 价格页面。

准备工作

在开始本教程之前,请使用 Google Cloud Platform Console 创建或选择项目并启用结算功能。

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 选择或创建 Google Cloud Platform 项目。

    转到“管理资源”页面

  3. 确保您的 Google Cloud Platform 项目已启用结算功能。

    了解如何启用结算功能

  4. 新项目中会自动启用 BigQuery。要在现有项目中激活 BigQuery, 启用BigQuery, BigQuery Storage API API。

    启用 API

  5. 设置 Python 开发环境。
    设置 Python
  6. 为您的开发环境设置身份验证。
    设置身份验证

在完成本教程之前,您还应熟悉适用于 BigQuery 的 IPython 魔法命令BigQuery 客户端库以及如何将该客户端库与 Pandas 搭配使用

安装客户端库

安装 BigQuery Python 客户端库 1.9.0 版或更高版本以及 BigQuery Storage API Python 客户端库。

PIP

安装 google-cloud-bigquerygoogle-cloud-bigquery-storage 软件包。

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

Conda

从社区运行的 conda-forge 渠道安装 BigQueryBigQuery Storage API Conda 软件包。

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

使用适用于 BigQuery 的 IPython 魔法命令下载查询结果

启动 Jupyter 笔记本服务器并创建一个新的 Jupyter 笔记本。使用 %load_ext 魔法命令加载适用于 BigQuery 的 IPython 魔法命令。

%load_ext google.cloud.bigquery

通过将 --use_bq_storage_api 参数添加到 %%bigquery 魔法命令,使用 BigQuery Storage API 下载大型查询结果。

%%bigquery nodejs_deps --use_bqstorage_api
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'npm'
LIMIT 2500000

将此参数用于小型查询结果时,魔法命令使用 BigQuery API 下载结果。

%%bigquery stackoverflow --use_bqstorage_api
SELECT
  CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
  view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10

context.use_bqstorage_api 属性设置为 True,以默认使用 BigQuery Storage API。

import google.cloud.bigquery.magics

google.cloud.bigquery.magics.context.use_bqstorage_api = True

设置 context.use_bqstorage_api 属性后,运行 %%bigquery 魔法命令(无需额外参数)以使用 BigQuery Storage API 下载大型结果。

%%bigquery java_deps
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'maven'
LIMIT 2500000

使用 Python 客户端库

创建 Python 客户端

使用以下代码构造一个 BigQuery Client 对象和一个 BigQueryStorageClient

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

使用 google-auth Python 库创建范围足以涵盖这两个 API 的凭据。向每个构造函数传入凭据对象以避免进行两次身份验证。

使用 BigQuery 客户端库下载查询结果

使用 query 方法运行查询。调用 to_dataframe 方法来等待查询完成,并使用 BigQuery Storage API 下载结果。

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

使用 BigQuery 客户端库下载表数据

使用 list_rows 方法下载表中的所有行,该方法会返回一个 RowIterator 对象。如需使用 BigQuery Storage API 下载各行,调用带 bqstorage_client 参数的 to_dataframe 方法即可。

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

使用 BigQuery Storage API 客户端库下载表数据

直接使用 BigQuery Storage API 客户端库以精确控制过滤器和并行性。如果仅需要简单的行过滤器,可以使用 BigQuery Storage API 读取会话来代替查询。

使用要读取的表创建 TableReference 对象。创建 TableReadOptions 对象以选择列或过滤行。使用 create_read_session 方法创建读取会话。

如果会话中有任何流,则使用 read_rows 方法从中开始读取行。对读取器调用 to_dataframe 方法可将整个流写入 Pandas DataFrame。为了获得更好的性能,请从多个流中并行读取,但为简单起见,此代码示例仅从单个流中读取。

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

所有示例的源代码

查看所有客户端库示例的完整源代码。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

清理

为避免系统因本教程中使用的资源向您的 Google Cloud Platform 帐号收取费用,请执行以下操作:

删除您的项目。在本教程中,您没有创建任何 BigQuery 资源,删除您的项目将移除您创建的所有其他资源。

  1. 在 GCP Console 中,转到项目页面。

    转到“项目”页面

  2. 在项目列表中,选择您要删除的项目,然后点击删除 delete
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤

此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
需要帮助?请访问我们的支持页面