Télécharger des données BigQuery sur pandas à l'aide de l'API BigQuery Storage

L'API BigQuery Storage offre un accès rapide aux données stockées dans BigQuery. Elle vous permet de télécharger les données stockées dans BigQuery afin de les utiliser dans des outils d'analyse tels que la bibliothèque pandas pour Python.

Objectifs

Dans ce tutoriel, vous allez :

  • télécharger les résultats de la requête sur un objet DataFrame pandas à l'aide de l'API BigQuery Storage à partir des commandes magiques IPython pour BigQuery dans un notebook Jupyter ;
  • télécharger les résultats de la requête sur un objet DataFrame pandas à l'aide de la bibliothèque cliente BigQuery pour Python ;
  • télécharger les données d'une table BigQuery sur un objet DataFrame pandas à l'aide de la bibliothèque cliente BigQuery pour Python ;
  • télécharger les données de la table BigQuery sur un objet DataFrame pandas à l'aide de la bibliothèque cliente de l'API BigQuery Storage pour Python.

Coûts

BigQuery est un produit payant. Des coûts d'utilisation vous seront donc facturés pour les requêtes que vous exécutez. Le premier To de données de requêtes traitées chaque mois est gratuit. Pour en savoir plus, consultez la page des tarifs de BigQuery.

L'API BigQuery Storage est un produit payant, dont les coûts d'utilisation vous sont facturés pour les données de table que vous analysez lors du téléchargement d'un DataFrame. Pour en savoir plus, consultez la page des tarifs de BigQuery.

Avant de commencer

Avant de commencer ce tutoriel, utilisez la console Google Cloud Platform pour créer ou sélectionner un projet et activer la facturation.

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Sélectionnez ou créez un projet Google Cloud Platform.

    Accéder à la page "Gérer les ressources"

  3. Assurez-vous que la facturation est activée pour votre projet Google Cloud Platform.

    Découvrir comment activer la facturation

  4. BigQuery est automatiquement activé dans les nouveaux projets. Pour activer BigQuery dans un projet préexistant, Activez BigQuery, API BigQuery Storageles API requises.

    Activer les API.

  5. Configurez un environnement de développement Python.
    Configurer Python
  6. Configurez l'authentification pour votre environnement de développement.
    Configurer l'authentification

Vous devez également vous être familiarisé avec les commandes magiques IPython pour BigQuery, la bibliothèque cliente BigQuery et l'utilisation de la bibliothèque cliente avec Pandas avant de pouvoir suivre ce tutoriel.

Installer les bibliothèques clientes

Installez la bibliothèque cliente BigQuery Python version 1.9.0 ou ultérieure et la bibliothèque cliente Python de l'API BigQuery Storage.

PIP

Installez les packages google-cloud-bigquery et google-cloud-bigquery-storage.

pip install --upgrade google-cloud-bigquery[bqstorage,pandas]

Conda

Installez les packages Conda BigQuery et BigQuery Storage API à partir du canal conda-forge exécuté par la communauté.

conda install -c conda-forge google-cloud-bigquery \
  google-cloud-bigquery-storage \
  pandas \
  pyarrow

Télécharger les résultats de requêtes à l'aide des commandes magiques IPython pour BigQuery

Démarrez le serveur de notebooks Jupyter et créez un notebook Jupyter. Chargez les commandes magiques IPython pour BigQuery à l'aide de la commande magique %load_ext.

%load_ext google.cloud.bigquery

Téléchargez des résultats de requête volumineux avec l'API BigQuery Storage en ajoutant l'argument --use_bq_storage_api à la commande magique %%bigquery.

%%bigquery nodejs_deps --use_bqstorage_api
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'npm'
LIMIT 2500000

Lorsque cet argument est utilisé avec des résultats de requête peu volumineux, les commandes magiques téléchargent les résultats à l'aide de l'API BigQuery.

%%bigquery stackoverflow --use_bqstorage_api
SELECT
  CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
  view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10

Définissez la propriété context.use_bqstorage_api sur True pour utiliser l'API BigQuery Storage par défaut.

import google.cloud.bigquery.magics

google.cloud.bigquery.magics.context.use_bqstorage_api = True

Après avoir défini la propriété context.use_bqstorage_api, exécutez la commande magique %%bigquery sans ajouter d'arguments pour pouvoir télécharger des résultats volumineux à l'aide de l'API BigQuery Storage.

%%bigquery java_deps
SELECT
    dependency_name,
    dependency_platform,
    project_name,
    project_id,
    version_number,
    version_id,
    dependency_kind,
    optional_dependency,
    dependency_requirements,
    dependency_project_id
FROM
    `bigquery-public-data.libraries_io.dependencies`
WHERE
    LOWER(dependency_platform) = 'maven'
LIMIT 2500000

Utiliser les bibliothèques clientes Python

Créer des clients Python

Utilisez le code suivant pour créer un objet Client BigQuery et un objet BigQueryStorageClient.

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

Utilisez la bibliothèque Python google-auth pour créer des identifiants pouvant s'appliquer aux deux API. Transmettez un objet identifiants à chaque constructeur pour éviter une double authentification.

Télécharger les résultats de requêtes à l'aide de la bibliothèque cliente BigQuery

Exécutez une requête à l'aide de la méthode query. Appelez la méthode to_dataframe pour attendre la fin de la requête et téléchargez les résultats à l'aide de l'API BigQuery Storage.

# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())

Télécharger des données de table à l'aide de la bibliothèque cliente BigQuery

Téléchargez toutes les lignes d'une table à l'aide de la méthode list_rows, qui renvoie un objet RowIterator. Téléchargez des lignes à l'aide de l'API BigQuery Storage en appelant la méthode to_dataframe avec l'argument bqstorage_client.

# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())

Télécharger des données de table à l'aide de la bibliothèque cliente de l'API BigQuery Storage

Utilisez directement la bibliothèque cliente de l'API BigQuery Storage pour un contrôle plus précis des filtres et du parallélisme. Lorsque seuls des filtres de lignes simples sont nécessaires, une session de lecture de l'API BigQuery Storage peut être utilisée à la place d'une requête.

Créez un objet TableReference avec la table que vous souhaitez lire. Créez un objet TableReadOptions pour sélectionner des colonnes ou filtrer des lignes. Créez une session de lecture à l'aide de la méthode create_read_session.

S'il existe des flux dans la session, commencez à en lire les lignes à l'aide de la méthode read_rows. Appelez la méthode to_dataframe sur le lecteur pour écrire l'intégralité du flux sur un objet DataFrame pandas. Pour des performances optimales, lisez à partir de plusieurs flux en parallèle. Cet exemple de code ne lit qu'un seul flux par souci de simplicité.

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

Code source pour tous les exemples

Affichez le code source complet pour tous les exemples de bibliothèques clientes.

import google.auth
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1

# Explicitly create a credentials object. This allows you to use the same
# credentials for both the BigQuery and BigQuery Storage clients, avoiding
# unnecessary API calls to fetch duplicate authentication tokens.
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

# Make clients.
bqclient = bigquery.Client(
    credentials=credentials,
    project=your_project_id,
)
bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)
# Download a table.
table = bigquery.TableReference.from_string(
    "bigquery-public-data.utility_us.country_code_iso"
)
rows = bqclient.list_rows(
    table,
    selected_fields=[
        bigquery.SchemaField("country_name", "STRING"),
        bigquery.SchemaField("fips_code", "STRING"),
    ],
)
dataframe = rows.to_dataframe(bqstorage_client=bqstorageclient)
print(dataframe.head())
# Download query results.
query_string = """
SELECT
CONCAT(
    'https://stackoverflow.com/questions/',
    CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
"""

dataframe = (
    bqclient.query(query_string)
    .result()
    .to_dataframe(bqstorage_client=bqstorageclient)
)
print(dataframe.head())
table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_trees"
table.table_id = "tree_species"

# Select columns to read with read options. If no read options are
# specified, the whole table is read.
read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("species_common_name")
read_options.selected_fields.append("fall_color")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table,
    parent,
    read_options=read_options,
    # This API can also deliver data serialized in Apache Avro format.
    # This example leverages Apache Arrow.
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    # We use a LIQUID strategy in this example because we only read from a
    # single stream. Consider BALANCED if you're consuming multiple streams
    # concurrently and want more consistent stream sizes.
    sharding_strategy=(
        bigquery_storage_v1beta1.enums.ShardingStrategy.LIQUID
    ),
)

# This example reads from only a single stream. Read from multiple streams
# to fetch data faster. Note that the session may not contain any streams
# if there are no rows to read.
stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)

# Parse all Avro blocks and create a dataframe. This call requires a
# session, because the session contains the schema for the row blocks.
dataframe = reader.to_dataframe(session)
print(dataframe.head())

Effectuer un nettoyage

Pour éviter que les ressources utilisées dans ce tutoriel soient facturées sur votre compte Google Cloud Platform :

Supprimez votre projet. Vous n'avez pas créé de ressources BigQuery dans ce tutoriel, mais la suppression de votre projet supprime aussi toutes les autres ressources que vous avez créées.

  1. Dans la console GCP, accédez à la page "Projets".

    Accéder à la page Projets

  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Étape suivante

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…