Télécharger des données BigQuery sur pandas à l'aide de l'API BigQuery Storage

L'API BigQuery Storage offre un accès rapide aux données stockées dans BigQuery. L'API BigQuery Storage vous permet de télécharger les données stockées dans BigQuery afin de les utiliser dans des outils d'analyse tels que la bibliothèque pandas pour Python.

Objectifs

Dans ce tutoriel, vous allez :

  • télécharger les données d'une table BigQuery sur un objet DataFrame pandas à l'aide de la bibliothèque cliente BigQuery pour Python ;
  • télécharger les résultats de la requête sur un objet DataFrame pandas à l'aide de la bibliothèque cliente BigQuery pour Python ;
  • télécharger les données de la table BigQuery sur un objet DataFrame pandas à l'aide de la bibliothèque cliente de l'API BigQuery Storage pour Python.

Coûts

BigQuery est un produit payant. Des coûts d'utilisation vous seront donc facturés pour les requêtes que vous exécutez. Le premier To de données de requêtes traitées chaque mois est gratuit. Pour en savoir plus, consultez la page des tarifs de BigQuery.

L'API BigQuery Storage est un produit payant, dont les coûts d'utilisation vous sont facturés pour les données de table que vous analysez lors du téléchargement d'un DataFrame. Pour en savoir plus, consultez la page des tarifs de BigQuery.

Avant de commencer

Avant de commencer ce tutoriel, utilisez la console Google Cloud Platform pour créer ou sélectionner un projet et activer la facturation.

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Select or create a Google Cloud Platform project.

    Go to the Manage resources page

  3. Assurez-vous que la facturation est activée pour votre projet.

    En savoir plus sur l'activation de la facturation

  4. BigQuery est automatiquement activé dans les nouveaux projets. Pour activer BigQuery dans un projet préexistant, Activez BigQuery, API BigQuery Storageles API requises.

    Activer les API.

  5. Configurez un environnement de développement Python.
    Configurer Python
  6. Configurez l'authentification pour votre environnement de développement.
    Configurer l'authentification

Vous devez également vous familiariser avec la bibliothèque cliente BigQuery et apprendre à l'utiliser avec pandas avant de suivre ce tutoriel.

Installer les bibliothèques clientes

Installez la bibliothèque cliente BigQuery Python version 1.9.0 ou une version ultérieure pour utiliser l'intégration de l'API BigQuery Storage avec la bibliothèque pandas.

pip install --upgrade google-cloud-bigquery[pandas]

Installez la bibliothèque cliente Python de l'API BigQuery Storage compatible avec les bibliothèques fastavro et pandas.

pip install --upgrade google-cloud-bigquery-storage[fastavro,pandas]

Créer des clients Python

Utilisez le code ci-dessous pour construire un objet Client BigQuery et un objet BigQueryStorageClient.

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

Utilisez la bibliothèque Python google-auth pour créer des identifiants pouvant s'appliquer aux deux API. Transmettez un objet identifiants à chaque constructeur pour éviter une double authentification.

Télécharger des données de table à l'aide de la bibliothèque cliente BigQuery

Téléchargez toutes les lignes d'une table à l'aide de la méthode list_rows qui affiche un objet RowIterator. Téléchargez des lignes à l'aide de l'API BigQuery Storage en appelant la méthode to_dataframe avec l'argument bqstorage_client.

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

Télécharger les résultats de requêtes à l'aide de la bibliothèque cliente BigQuery

Créez un ensemble de données avec une expiration de table par défaut pour stocker temporairement les résultats de requête.

# Set the dataset_id to the dataset used to store temporary results.
dataset_id = "query_results_dataset"
dataset_ref = bqclient.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)

# Remove tables after 24 hours.
dataset.default_table_expiration_ms = 1000 * 60 * 60 * 24

bqclient.create_dataset(dataset)  # API request.

Exécutez une requête à l'aide de la méthode query. Appelez la méthode to_dataframe pour attendre la fin de la requête et téléchargez les résultats à l'aide de l'API BigQuery Storage.

import uuid

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""
# Use a random table name to avoid overwriting existing tables.
table_id = "queryresults_" + uuid.uuid4().hex
table = dataset_ref.table(table_id)
query_config = bigquery.QueryJobConfig(
    # Due to a known issue in the BigQuery Storage API, small query result
    # sets cannot be downloaded. To workaround this issue, write results to
    # a destination table.
    destination=table
)

dataframe = (
    bqclient.query(query_string, job_config=query_config)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

Télécharger des données de table à l'aide de la bibliothèque cliente de l'API BigQuery Storage

Utilisez directement la bibliothèque cliente de l'API BigQuery Storage pour un contrôle plus précis des filtres et du parallélisme. Lorsque seuls des filtres de lignes simples sont nécessaires, une session de lecture de l'API BigQuery Storage peut être utilisée à la place d'une requête.

Créez un objet TableReference avec la table que vous souhaitez lire. Créez un objet TableReadOptions pour sélectionner des colonnes ou filtrer des lignes. Créez une session de lecture à l'aide de la méthode create_read_session.

S'il existe des flux dans la session, commencez à en lire les lignes à l'aide de la méthode read_rows. Appelez la méthode to_dataframe sur le lecteur pour écrire l'intégralité du flux sur un objet DataFrame pandas. Pour des performances optimales, lisez à partir de plusieurs flux en parallèle. Cet exemple de code ne lit qu'un seul flux par souci de simplicité.

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table, parent, read_options=read_options
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

Code source pour tous les exemples

Affichez le code source complet pour tous les exemples.

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Set the dataset_id to the dataset used to store temporary results.
dataset_id = "query_results_dataset"
dataset_ref = bqclient.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)

# Remove tables after 24 hours.
dataset.default_table_expiration_ms = 1000 * 60 * 60 * 24

bqclient.create_dataset(dataset)  # API request.
import uuid

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""
# Use a random table name to avoid overwriting existing tables.
table_id = "queryresults_" + uuid.uuid4().hex
table = dataset_ref.table(table_id)
query_config = bigquery.QueryJobConfig(
    # Due to a known issue in the BigQuery Storage API, small query result
    # sets cannot be downloaded. To workaround this issue, write results to
    # a destination table.
    destination=table
)

dataframe = (
    bqclient.query(query_string, job_config=query_config)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table, parent, read_options=read_options
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

Effectuer un nettoyage

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud Platform :

  1. Dans la console GCP, accédez à la page "Projets".

    Accéder à la page Projets

  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Delete project (Supprimer le projet). Après avoir coché la case à côté du nom du projet, cliquez sur "Supprimer le projet".
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

La suppression de votre projet entraîne la suppression de toutes les ressources BigQuery. Si vous ne souhaitez pas supprimer votre projet Cloud Platform, vous pouvez supprimer l'ensemble de données utilisé pour stocker les résultats de la requête.

bqclient.delete_dataset(dataset_ref, delete_contents=True)

Étape suivante

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…