Como fazer download de dados do BigQuery para o pandas usando a API BigQuery Storage

A API BigQuery Storage fornece acesso rápido aos dados armazenados no BigQuery. Use essa API para fazer o download de dados armazenados no BigQuery para utilização em ferramentas de análise, como a biblioteca pandas para Python.

Objetivos

Neste tutorial, você:

  • fará o download dos resultados das consultas para um DataFrame do pandas usando a API BigQuery Storage a partir dos comandos mágicos do IPython para BigQuery em um notebook Jupyter;
  • fará o download dos resultados da consulta para um DataFrame do pandas usando a biblioteca de cliente do BigQuery para Python;
  • fará o download dos dados da tabela do BigQuery para um DataFrame do pandas usando a biblioteca de cliente do BigQuery para Python;
  • fará o download dos dados da tabela do BigQuery para um DataFrame do pandas usando a biblioteca de cliente da API BigQuery Storage para Python.

Custos

O BigQuery é um produto pago. Você paga pelas consultas que executa nele. O primeiro 1 TB de dados de consulta processados por mês é gratuito. Para mais informações, consulte a página Preços do BigQuery.

A API BigQuery Storage é um produto pago. Você paga pelos dados da tabela que verifica durante o download de um DataFrame. Para mais informações, consulte a página Preços do BigQuery.page.

Antes de começar

Antes de começar este tutorial, use o Console do Google Cloud para criar ou selecionar um projeto e ativar o faturamento.

  1. Faça login na sua conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. No Console do Cloud, na página do seletor de projetos, selecione ou crie um projeto do Cloud.

    Acessar a página do seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud. Saiba como confirmar se a cobrança está ativada para o seu projeto.

  4. O BigQuery é ativado automaticamente em novos projetos. Para ativar o BigQuery em um projeto preexistente, Ative as APIs BigQuery, BigQuery Storage API.

    Ative as APIs

  5. Configure um ambiente de desenvolvimento para Python.
    Configurar Python
  6. Configure a autenticação para seu ambiente de desenvolvimento.
    Configurar a autenticação

Antes de terminar este tutorial, também é necessário se familiarizar com os comandos mágicos do IPython para BigQuery (em inglês), a biblioteca de cliente do BigQuery e como usar a biblioteca de cliente com pandas (em inglês).

Instalar as bibliotecas de cliente

Instale a biblioteca de cliente Python para BigQuery versão 1.9.0 (em inglês) ou superior e a biblioteca de cliente Python da API BigQuery Storage.

PIP

Instale os pacotes google-cloud-bigquery e google-cloud-bigquery-storage.

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

Conda

Instale os pacotes Conda do BigQuery e da API BigQuery Storage (páginas em inglês) usando o canal conda-forge gerenciado pela comunidade.

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

Fazer download dos resultados da consulta usando os comandos mágicos do IPython para BigQuery

Inicie o servidor do Jupyter Notebook e crie um notebook. Carregue os comandos mágicos do IPython para BigQuery usando o comando mágico %load_ext.

%load_ext google.cloud.bigquery

Faça o download de resultados de consulta extensos com a API BigQuery Storage adicionando o argumento --use_bq_storage_api aos comandos mágicos %%bigquery.

%%bigquery tax_forms --use_bqstorage_api
SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`

Quando esse argumento é usado com pequenos resultados de consulta, os comandos mágicos utilizam a API BigQuery para fazer o download dos resultados.

%%bigquery stackoverflow --use_bqstorage_api
SELECT
  CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
  view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10

Defina a propriedade context.use_bqstorage_api como True para usar a API BigQuery Storage por padrão.

import google.cloud.bigquery.magics

google.cloud.bigquery.magics.context.use_bqstorage_api = True

Depois de definir a propriedade context.use_bqstorage_api, execute o comando mágico %%bigquery sem outros argumentos para usar a API BigQuery Storage para fazer o download de grandes resultados.

%%bigquery tax_forms
SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`

Usar as bibliotecas de cliente do Python

Criar clientes do Python

Use o código a seguir para construir um objeto BigQuery Client e um BigQueryStorageClient.

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

Use a biblioteca para Python google-auth para criar credenciais com escopo suficiente para as duas APIs. Transmita um objeto de credenciais para cada construtor para não precisar fazer a autenticação duas vezes.

Fazer download dos resultados da consulta usando a biblioteca de cliente do BigQuery

Execute uma consulta usando o método query. Chame o método to_dataframe para aguardar a conclusão da consulta e fazer o download dos resultados usando a API BigQuery Storage.

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

Fazer o download dos dados da tabela usando a biblioteca de cliente do BigQuery

Faça o download de todas as linhas em uma tabela usando o método list_rows, que retorna um objeto RowIterator. Faça o download das linhas usando a API BigQuery Storage chamando o método to_dataframe com o argumento bqstorage_client.

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

Fazer o download dos dados da tabela usando a biblioteca de cliente da API BigQuery Storage

Use a biblioteca de cliente da API BigQuery Storage diretamente para ter um controle refinado sobre filtros e paralelismo. Quando apenas filtros de linha simples são necessários, uma sessão de leitura da API BigQuery Storage pode ser usada no lugar de uma consulta.

Crie um objeto TableReference com a tabela que quiser ler. Crie um objeto TableReadOptions para selecionar colunas ou filtrar linhas. Crie uma sessão de leitura usando o método create_read_session.

Se houver algum stream na sessão, comece a ler as linhas a partir dele usando o método read_rows. Chame o método to_dataframe no leitor para gravar todo o stream em um DataFrame do pandas. Para um melhor desempenho, leia a partir de vários streams em paralelo. Para simplificar, este exemplo de código lê apenas a partir de um único stream.

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

Código-fonte de todos os exemplos

Veja o código fonte completo para todos os exemplos de biblioteca de cliente.

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

Limpar

Para evitar que os recursos usados neste tutorial sejam cobrados na conta do Google Cloud Platform, faça o seguinte:

Exclua o projeto. Você não criou nenhum recurso do BigQuery neste tutorial, mas a exclusão do projeto removerá todos os outros recursos que você criou.

  1. No Console do Cloud, acesse a página Gerenciar recursos:

    Acessar a página "Gerenciar recursos"

  2. Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir .
  3. Na caixa de diálogo, digite o ID do projeto e clique em Encerrar para excluí-lo.

A seguir