Python Client Library v0.27 からの移行

Python v0.28 用の Google BigQuery クライアント ライブラリでは、v0.27 以前に設計された旧バージョンと比較して、クライアント ライブラリの設計にいくつかの大きな変更が加えられています。この変更を、次にまとめます。

  • クエリおよび表示オペレーションのデフォルト言語が標準 SQL になりました。
  • ジョブに関連するクライアント関数(クエリの実行など)で、ジョブが直ちに開始されるようになりました。
  • データセットおよびテーブルを作成、取得、更新、削除する関数がクライアント クラスに移動しました。

このトピックでは、最新バージョンの Python クライアント ライブラリを使用するために BigQuery クライアント ライブラリ用 Python コードのどこを変更しなければならないかについて詳しく説明します。

旧バージョンのクライアント ライブラリの実行

現在使用中の Python クライアント ライブラリを最新バージョンにアップグレードすることは必須ではありません。ただし、BigQuery API の新しい機能は v0.28 以降のバージョンでのみサポートされています。

現在のコードを移行せず、旧バージョンの Python クライアント ライブラリを引き続き使用する場合は、アプリケーションで使用する Python クライアント ライブラリのバージョンを指定します。特定のバージョンのライブラリを指定するには、requirements.txt ファイルを次の例のように編集します。

google-cloud-bigquery==0.27

最新バージョンのクライアント ライブラリの実行

最新バージョンの Python クライアント ライブラリをインストールするには、pip コマンドを使用します。

pip install --upgrade google-cloud-bigquery

詳細な手順については、BigQuery クライアント ライブラリをご覧ください。

ライブラリのインポートとクライアントの作成

Python クライアント ライブラリのインポートとクライアント オブジェクトの作成は、旧バージョンと同じです。

from google.cloud import bigquery

client = bigquery.Client()

クエリに関するコードの変更

標準 SQL 言語によるデータのクエリ

v0.28 以降の変更点は以下のとおりです。

  • デフォルトの SQL 言語は標準 SQL です。
  • QueryJobConfig クラスを使用してクエリジョブを設定します。
  • client.query() は、API リクエストを発行してクエリを直ちに開始します。
  • ジョブ ID は省略可能です。指定されていない場合はクライアント ライブラリによって自動的に生成されます。

次のサンプルは、クエリを実行する方法を示します。

旧バージョンのクライアント ライブラリ:

client = bigquery.Client()
query_job = client.run_async_query(str(uuid.uuid4()), query)

# Use standard SQL syntax.
query_job.use_legacy_sql = False

# Set a destination table.
dest_dataset = client.dataset(dest_dataset_id)
dest_table = dest_dataset.table(dest_table_id)
query_job.destination = dest_table

# Allow the results table to be overwritten.
query_job.write_disposition = 'WRITE_TRUNCATE'

query_job.begin()
query_job.result()  # Wait for query to finish.

rows = query_job.query_results().fetch_data()
for row in rows:
    print(row)

バージョン 0.25.0 以前の google-cloud-bigquery ライブラリでは、job.result() の代わりに次のコードを使用してジョブ オブジェクトの終了を待つ必要がありました。

while True:
    job.reload()  # Refreshes the state via a GET request.
    if job.state == 'DONE':
        if job.error_result:
            raise RuntimeError(job.errors)
        return
    time.sleep(1)

バージョン 0.25.0 以前の google-cloud-bigquery ライブラリでは、job.query_results().fetch_data() の代わりに次のコードを使用して結果の行を取得していました。

rows = query_job.results().fetch_data()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

QUERY = (
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
    'WHERE state = "TX" '
    'LIMIT 100')
TIMEOUT = 30  # in seconds
query_job = client.query(QUERY)  # API request - starts the query

# Waits for the query to finish
iterator = query_job.result(timeout=TIMEOUT)
rows = list(iterator)

assert query_job.state == 'DONE'
assert len(rows) == 100
row = rows[0]
assert row[0] == row.name == row['name']

最新バージョンの Python クライアント ライブラリを使用してクエリを実行するその他のサンプルについては、以下をご覧ください。

レガシー SQL 言語によるデータのクエリ

次のサンプルは、レガシー SQL 言語を使用してクエリを実行する方法を示します。

旧バージョンのクライアント ライブラリ:

旧バージョンのクライアント ライブラリは、レガシー SQL 構文がデフォルトに設定されていました。クエリを設定して実行する方法については、クエリのサンプルをご覧ください。

最新バージョンのクライアント ライブラリ:

最新バージョンでは、標準 SQL 構文がデフォルトに設定されています。レガシー SQL を使用するには、use_legacy_sql を true に設定します。クエリを設定して実行する方法については、クエリのサンプルをご覧ください。

データの同期クエリ

v0.28 以降では Client.query() メソッドから QueryJob 内のクエリの統計やその他のプロパティにアクセスできるため、このメソッドを使用することが推奨されます。

旧バージョンのクライアント ライブラリ:

query_results = client.run_sync_query(query)
query_results.use_legacy_sql = False

query_results.run()

# The query might not complete in a single request. To account for a
# long-running query, force the query results to reload until the query
# is complete.
while not query_results.complete:
  query_iterator = query_results.fetch_data()
  try:
     six.next(iter(query_iterator))
  except StopIteration:
      pass

rows = query_results.fetch_data()
for row in rows:
    print(row)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

QUERY = (
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
    'WHERE state = "TX" '
    'LIMIT 100')
query_job = client.query(QUERY)

for row in query_job:  # API request
    # Row values can be accessed by field name or index
    assert row[0] == row.name == row['name']

テーブルに関するコードの変更

テーブル参照

追加のプロパティなしでテーブルを参照するには TableReference オブジェクトを使用し、テーブル リソース全体を参照するには Table を使用します。これまで Table クラスを使用していたいくつかのプロパティが、v0.28 以降では TableReference クラスを使用するようになりました。次に例を示します。

  • QueryJob.destinationTableReference になりました。
  • client.dataset('mydataset').table('mytable')TableReference を返すようになりました。

TableReference クラスと Table クラスの両方を使用する例については、テーブルの作成方法をご覧ください。

ローカル ファイルからのデータの読み込み

次のサンプルは、ローカル ファイルを BigQuery テーブルに読み込む方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)

# Reload the table to get the schema.
table.reload()

with open(source_file_name, 'rb') as source_file:
    # This example uses CSV, but you can use other formats.
    # See https://cloud.google.com/bigquery/loading-data
    job = table.upload_from_file(
        source_file, source_format='text/csv')

# Wait for the load job to complete.
while True:
    job.reload()
    if job.state == 'DONE':
        if job.error_result:
            raise RuntimeError(job.errors)
        return
    time.sleep(1)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

    csv_file = six.BytesIO(b"""full_name,age
Phred Phlyntstone,32
Wylma Phlyntstone,29
""")

    table_ref = dataset.table(TABLE_ID)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = 'CSV'
    job_config.skip_leading_rows = 1
    job_config.autodetect = True
    job = client.load_table_from_file(
        csv_file, table_ref, job_config=job_config)  # API request
    job.result()  # Waits for table load to complete.

詳細については、ローカル データソースからのデータの読み込みをご覧ください。

Google Cloud Storage からのデータの読み込み

次のサンプルは、Google Cloud Storage から BigQuery テーブルにファイルを読み込む方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
job_id = str(uuid.uuid4())

job = client.load_table_from_storage(
    job_id, table, 'gs://bucket_name/object_name')
job.begin()

# Wait for the load job to complete.
while True:
    job.reload()
    if job.state == 'DONE':
        if job.error_result:
            raise RuntimeError(job.errors)
        return
    time.sleep(1)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

table_ref = dataset.table('my_table')
GS_URL = 'gs://{}/{}'.format(bucket_name, blob_name)
job_id_prefix = "my_job"
job_config = bigquery.LoadJobConfig()
job_config.create_disposition = 'NEVER'
job_config.skip_leading_rows = 1
job_config.source_format = 'CSV'
job_config.write_disposition = 'WRITE_EMPTY'
load_job = client.load_table_from_uri(
    GS_URL, table_ref, job_config=job_config,
    job_id_prefix=job_id_prefix)  # API request

assert load_job.state == 'RUNNING'
assert load_job.job_type == 'load'

load_job.result()  # Waits for table load to complete.

assert load_job.state == 'DONE'
assert load_job.job_id.startswith(job_id_prefix)

詳細については、Google Cloud Storage からのデータの読み込みをご覧ください。

Google Cloud Storage へのテーブルの抽出

次のサンプルは、Google Cloud Storage にテーブルを抽出する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
job_id = str(uuid.uuid4())

job = client.extract_table_to_storage(
    job_id, table, 'gs://bucket_name/object_name')
job.begin()

# Wait for the job to complete.
while True:
    job.reload()
    if job.state == 'DONE':
        if job.error_result:
            raise RuntimeError(job.errors)
        return
    time.sleep(1)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# bucket_name = 'my-bucket'
destination_uri = 'gs://{}/{}'.format(bucket_name, 'shakespeare.csv')
dataset_ref = client.dataset('samples', project='bigquery-public-data')
table_ref = dataset_ref.table('shakespeare')

extract_job = client.extract_table(
    table_ref, destination_uri)  # API request
extract_job.result()  # Waits for job to complete.

詳細については、データのエクスポートをご覧ください。

テーブルのコピー

次のサンプルは、テーブルを別のテーブルにコピーする方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
destination_table = dataset.table(new_table_name)

job_id = str(uuid.uuid4())
job = client.copy_table(job_id, destination_table, table)

job.create_disposition = (
        google.cloud.bigquery.job.CreateDisposition.CREATE_IF_NEEDED)
job.begin()

# Wait for the job to complete.
while True:
    job.reload()
    if job.state == 'DONE':
        if job.error_result:
            raise RuntimeError(job.errors)
        return
    time.sleep(1)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
source_dataset = client.dataset('samples', project='bigquery-public-data')
source_table_ref = source_dataset.table('shakespeare')

# dataset_id = 'my_dataset'
dest_table_ref = client.dataset(dataset_id).table('destination_table')

job = client.copy_table(source_table_ref, dest_table_ref)  # API request
job.result()  # Waits for job to complete.

assert job.state == 'DONE'
dest_table = client.get_table(dest_table_ref)  # API request
assert dest_table.num_rows > 0

詳細については、テーブルのコピーをご覧ください。

テーブルへのデータのストリーミング

次のサンプルは、テーブルのストリーミン バッファに行を書き込む方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)

# Reload the table to get the schema.
table.reload()

rows = [('values', 'in', 'same', 'order', 'as', 'schema')]
errors = table.insert_data(rows)

if not errors:
    print('Loaded 1 row into {}:{}'.format(dataset_name, table_name))
else:
    do_something_with(errors)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

詳細については、BigQuery へのデータのストリーミングをご覧ください。

テーブルの一覧表示

次のサンプルは、データセット内のテーブルを一覧表示する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
for table in dataset.list_tables():
    print(table.name)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_ref = client.dataset('my_dataset')

tables = list(client.list_tables(dataset_ref))  # API request(s)
assert len(tables) == 0

table_ref = dataset.table('my_table')
table = bigquery.Table(table_ref)
client.create_table(table)                  # API request
tables = list(client.list_tables(dataset))  # API request(s)

assert len(tables) == 1
assert tables[0].table_id == 'my_table'

詳細については、データセット内のテーブルの一覧表示をご覧ください。

テーブルの取得

次のサンプルは、テーブルを取得する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
table.reload()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_id = 'my_dataset'
# table_id = 'my_table'

dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)  # API Request

# View table properties
print(table.schema)
print(table.description)
print(table.num_rows)

詳細については、テーブルに関する情報の取得をご覧ください。

テーブルの存在チェック

BigQuery API ではネイティブの exists メソッドは提供されていません。その代わりに、テーブル リソースを取得するリクエストを発行して 404 エラーが返されるかどうかを確認します。旧バージョンのクライアント ライブラリには、この確認を行う exists() ヘルパーが用意されていました。exists() ヘルパーでは、リソース全体を取得する前に exists() を呼び出すなど、いくつかの非効率的な使い方が許容されていました。そのため、exists() ヘルパーはクライアント ライブラリから削除されました。

次のサンプルは、テーブルが存在するかどうかを確認する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
if table.exists():
    # do something
else:
    # do something else

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

def table_exists(client, table_reference):
    """Return if a table exists.

    Args:
        client (google.cloud.bigquery.client.Client):
            A client to connect to the BigQuery API.
        table_reference (google.cloud.bigquery.table.TableReference):
            A reference to the table to look for.

    Returns:
        bool: ``True`` if the table exists, ``False`` otherwise.
    """
    from google.cloud.exceptions import NotFound

    try:
        client.get_table(table_reference)
        return True
    except NotFound:
        return False

テーブルの作成

次のサンプルは、テーブルを作成する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
table.create()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_ref = client.dataset('my_dataset')

schema = [
    bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'),
    bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'),
]
table_ref = dataset_ref.table('my_table')
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)  # API request

assert table.table_id == 'my_table'

詳細については、テーブルの作成をご覧ください。

テーブルの更新

次のサンプルは、テーブルを更新する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
table.patch(description='new description')

旧バージョンのライブラリでは、etag プロパティによってテーブル リソースのバージョンがチェックされていなかったため、「読み取り - 変更 - 書き込み」の手順は安全ではありません。

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# table_ref = client.dataset('my_dataset').table('my_table')
# table = client.get_table(table_ref)  # API request

assert table.description == 'Original description.'
table.description = 'Updated description.'

table = client.update_table(table, ['description'])  # API request

assert table.description == 'Updated description.'

詳細については、テーブル プロパティの更新をご覧ください。

テーブルの削除

次のサンプルは、テーブルを削除する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
table = dataset.table(table_name)
table.delete()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_id = 'my_dataset'
# table_id = 'my_table'

table_ref = client.dataset(dataset_id).table(table_id)
client.delete_table(table_ref)  # API request

print('Table {}:{} deleted.'.format(dataset_id, table_id))

詳細については、テーブルの削除をご覧ください。

データセットに関するコードの変更

データセット参照

追加のプロパティなしでデータセットを参照するには DatasetReference オブジェクトを使用し、データセット リソース全体を参照するには Dataset を使用します。これまで Dataset クラスを使用していたいくつかのメソッドが、v0.28 以降では DatasetReference クラスを使用するようになりました。次に例を示します。

  • client.dataset('mydataset')DatasetReference を返すようになりました。

DatasetReference クラスと Dataset クラスの両方を使用する例については、データセットの作成方法をご覧ください。

データセットの一覧表示

次のサンプルは、プロジェクト内のデータセットを一覧表示する方法を示します。

旧バージョンのクライアント ライブラリ:

for dataset in client.list_datasets():
    print(dataset.name)

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
datasets = list(client.list_datasets())
project = client.project

if datasets:
    print('Datasets in project {}:'.format(project))
    for dataset in datasets:  # API request(s)
        print('\t{}'.format(dataset.dataset_id))
else:
    print('{} project does not contain any datasets.'.format(project))

詳細については、データセットの一覧表示をご覧ください。

データセットの取得

次のサンプルは、データセットを取得する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
dataset.reload()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
dataset = client.get_dataset(dataset_ref)  # API request

# View dataset properties
print('Dataset ID: '.format(dataset_id))
print('Description: '.format(dataset.description))
print('Labels:')
for label, value in dataset.labels.items():
    print('\t{}: {}'.format(label, value))
# View tables in dataset
print('Tables:')
tables = list(client.list_tables(dataset_ref))  # API request(s)
if tables:
    for table in tables:
        print('\t{}'.format(table.table_id))
else:
    print('\tThis dataset does not contain any tables.')

詳細については、データセットに関する情報の取得をご覧ください。

データセットの存在チェック

BigQuery API ではネイティブの exists メソッドは提供されていません。その代わりに、データセット リソースを取得するリクエストを発行して 404 エラーが返されるかどうかを確認します。旧バージョンのクライアント ライブラリには、この確認を行う exists() ヘルパーが用意されていました。exists() ヘルパーでは、リソース全体を取得する前に exists() を呼び出すなど、いくつかの非効率的な使い方が許容されていました。そのため、exists() ヘルパーはクライアント ライブラリから削除されました。

次のサンプルは、データセットが存在するかどうかを確認する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
if dataset.exists():
    # do something
else:
    # do something else

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

def dataset_exists(client, dataset_reference):
    """Return if a dataset exists.

    Args:
        client (google.cloud.bigquery.client.Client):
            A client to connect to the BigQuery API.
        dataset_reference (google.cloud.bigquery.dataset.DatasetReference):
            A reference to the dataset to look for.

    Returns:
        bool: ``True`` if the dataset exists, ``False`` otherwise.
    """
    from google.cloud.exceptions import NotFound

    try:
        client.get_dataset(dataset_reference)
        return True
    except NotFound:
        return False

データセットの作成

次のサンプルは、データセットを作成する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
dataset.create()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_id = 'my_dataset'

# Create a DatasetReference using a chosen dataset ID.
# The project defaults to the Client's project if not specified.
dataset_ref = client.dataset(dataset_id)

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_ref)
# Specify the geographic location where the dataset should reside.
dataset.location = 'US'

# Send the dataset to the API for creation.
# Raises google.api_core.exceptions.AlreadyExists if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset)  # API request

詳細については、データセットの作成をご覧ください。

データセットの更新

次のサンプルは、データセットを更新する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
dataset.patch(description='new description')

旧バージョンのライブラリでは、etag プロパティによってデータセット リソースのバージョンがチェックされていなかったため、「読み取り - 変更 - 書き込み」の手順は安全ではありません。

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()
# dataset_ref = client.dataset('my_dataset')
# dataset = client.get_dataset(dataset_ref)  # API request

assert dataset.description == 'Original description.'
dataset.description = 'Updated description.'

dataset = client.update_dataset(dataset, ['description'])  # API request

assert dataset.description == 'Updated description.'

詳細については、データセット プロパティの更新をご覧ください。

データセットの削除

次のサンプルは、データセットを削除する方法を示します。

旧バージョンのクライアント ライブラリ:

dataset = client.dataset(dataset_name)
dataset.delete()

最新バージョンのクライアント ライブラリ:

Python

BigQuery クライアントのインストールと作成の詳細については、BigQuery クライアント ライブラリをご覧ください。

# client = bigquery.Client()

# Delete a dataset that does not contain any tables
# dataset1_id = 'my_empty_dataset'
dataset1_ref = client.dataset(dataset1_id)
client.delete_dataset(dataset1_ref)  # API request

print('Dataset {} deleted.'.format(dataset1_id))

# Use the delete_contents parameter to delete a dataset and its contents
# dataset2_id = 'my_dataset_with_tables'
dataset2_ref = client.dataset(dataset2_id)
client.delete_dataset(dataset2_ref, delete_contents=True)  # API request

print('Dataset {} deleted.'.format(dataset2_id))

詳細については、データセットの削除をご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...