透過 BigQuery Storage API 將 BigQuery 資料下載至 pandas

BigQuery Storage API 可快速存取儲存在 BigQuery 中的資料。透過 BigQuery Storage API 下載 BigQuery 儲存的資料,以在 Python 專用的 pandas 程式庫等分析工具中使用。

目標

在此教學課程中,您將執行下列作業:

  • 在 Jupyter Notebook 中,透過 BigQuery 專用的 IPython magics,使用 BigQuery Storage API 將查詢結果下載至 pandas DataFrame。
  • 透過 Python 專用的 BigQuery 用戶端程式庫,將查詢結果下載至 pandas DataFrame。
  • 透過 Python 專用的 BigQuery 用戶端程式庫,將 BigQuery 資料表資料下載至 pandas DataFrame。
  • 透過 Python 專用的 BigQuery Storage API 用戶端程式庫,將 BigQuery 資料表資料下載至 pandas DataFrame。

費用

BigQuery 是一種付費產品,系統會向您收取執行查詢所產生的 BigQuery 使用費,不過每個月處理的前 1 TB 查詢資料則為免費。詳情請參閱 BigQuery 定價頁面。

BigQuery Storage API 是一種付費產品,當您下載 DataFrame 時,系統會針對您掃描的資料表資料收取使用費。詳情請參閱 BigQuery 定價頁面。

事前準備

開始這個教學課程之前,請使用 Google Cloud Platform 主控台來建立或選取專案並啟用計費功能。

  1. 登入您的 Google 帳戶。

    如果您沒有帳戶,請申請新帳戶

  2. 選取或建立 Google Cloud Platform 專案。

    前往「Manage resources」(管理資源) 頁面

  3. 請確認您已啟用 Google Cloud Platform 專案的計費功能。

    瞭解如何啟用計費功能

  4. 新專案會自動啟用 BigQuery。 如要在預先存在的專案中啟用 BigQuery, 啟用BigQuery, BigQuery Storage API API。

    啟用 API

  5. 設定 Python 開發環境。
    設定 Python
  6. 為您的開發環境設定驗證方法。
    設定驗證方法

完成本教學課程之前,您還應熟悉 BigQuery 專用的 IPython magicsBigQuery 用戶端程式庫,以及如何搭配使用 pandas 和用戶端程式庫

安裝用戶端程式庫

安裝 BigQuery Python 用戶端程式庫 1.9.0 以上版本,以及 BigQuery Storage API Python 用戶端程式庫。

PIP

安裝 google-cloud-bigquerygoogle-cloud-bigquery-storage 套件。

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

Conda

從社群管理的 conda-forge 管道安裝 BigQueryBigQuery Storage API Conda 套件。

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

使用 BigQuery 專用的 IPython magics 下載查詢結果

啟動 Jupyter Notebook 伺服器,並建立新的 Jupyter Notebook。使用 %load_ext magic 載入 BigQuery 專用的 IPython magics。

%load_ext google.cloud.bigquery

使用 BigQuery Storage API,透過將 --use_bq_storage_api 引數新增至 %%bigquery magics 來下載大量查詢結果。

%%bigquery nodejs_deps --use_bqstorage_api
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'npm'
LIMIT 2500000

當此引數用於少量的查詢結果時,magics 會使用 BigQuery API 來下載結果。

%%bigquery stackoverflow --use_bqstorage_api
SELECT
  CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
  view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10

context.use_bqstorage_api 屬性設為 True,以預設使用 BigQuery Storage API。

import google.cloud.bigquery.magics

google.cloud.bigquery.magics.context.use_bqstorage_api = True

完成 context.use_bqstorage_api 屬性的設定後,執行 %%bigquery magics (不搭配其他引數),以使用 BigQuery Storage API 來下載大量結果。

%%bigquery java_deps
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'maven'
LIMIT 2500000

使用 Python 用戶端程式庫

建立 Python 用戶端

使用下列程式碼建構 BigQuery Client 物件與 BigQueryStorageClient

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

使用 google-auth Python 程式庫建立範圍同時適用於兩個 API 的憑證。將憑證物件傳送至每個建構函式以避免驗證兩次。

使用 BigQuery 用戶端程式庫下載查詢結果

使用 query 方法執行查詢。呼叫 to_dataframe 方法來等候查詢作業完成,並使用 BigQuery Storage API 下載結果。

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

使用 BigQuery 用戶端程式庫下載資料表資料

使用 list_rows 方法傳回 RowIterator 物件,以下載資料表中的所有資料列。透過 bqstorage_client 引數呼叫 to_dataframe,即可使用 BigQuery Storage API 下載資料列。

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

透過 BigQuery Storage API 用戶端程式庫來下載資料表資料

直接使用 BigQuery Storage API 用戶端程式庫,就能讓您以精細的方式控管篩選器和平行處理量。如果您只需要簡易的資料列篩選器,則可使用 BigQuery Storage API 讀取工作階段來代替查詢。

使用要讀取的資料表來建立 TableReference 物件。建立 TableReadOptions 物件以選取資料欄或篩選資料列。使用 create_read_session 方法建立讀取工作階段。

如果工作階段有任何串流,請使用 read_rows 方法從串流開始讀取資料列。呼叫讀取程式的 to_dataframe 方法,以將完整的串流寫入 pandas DataFrame。您可以平行讀取多個串流來獲得更好的效能,不過為求說明簡潔,此程式碼範例只會讀取單一串流。

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

所有範例的原始碼

查看所有範例的完整原始碼。

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

清除所用資源

如何避免系統向您的 Google Cloud Platform 帳戶收取在本教學課程中使用資源的相關費用:

刪除專案。您在本教學課程中並未建立任何 BigQuery 資源,但刪除專案會移除您建立的其他任何資源。

  1. 前往 GCP 主控台的「Projects」(專案) 頁面。

    前往專案頁面

  2. 在專案清單中選取要刪除的專案,然後按一下 [Delete] (刪除)
  3. 在對話方塊中輸入專案 ID,按一下 [Shut down] (關閉) 即可刪除專案。

後續步驟

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁