このドキュメントでは、以前の tabledata.insertAll
メソッドを使用して BigQuery にデータをストリーミングする方法について説明します。
新しいプロジェクトでは、tabledata.insertAll
メソッドの代わりに BigQuery Storage Write API を使用することをおすすめします。Storage Write API は、1 回限りの配信セマンティクスなど、低価格でより堅牢な機能を備えています。tabledata.insertAll
メソッドは引き続き完全にサポートされます。
始める前に
宛先テーブルを含んだデータセットへの書き込みアクセス権があることを確認します。テーブルにデータを書き込む前に、まずテーブルが存在している必要があります。ただし、テンプレート テーブルを使用する場合は異なります。テンプレート テーブルについて詳しくは、テンプレート テーブルを使用した自動的なテーブル作成をご覧ください。
データ ストリーミングの割り当てポリシーをチェックします。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
このドキュメントの各タスクを実行するために必要な権限をユーザーに与える Identity and Access Management(IAM)のロールを付与します。
無料枠でストリーミングは利用できません。課金を有効にせずストリーミングの使用を試みると、次のエラーが表示されます。BigQuery: Streaming insert is not allowed in the free tier.
必要な権限
BigQuery にデータをストリーミングするには、次の IAM 権限が必要です。
bigquery.tables.updateData
(テーブルにデータを挿入)bigquery.tables.get
(テーブルのメタデータを取得)bigquery.datasets.get
(データセットのメタデータを取得)bigquery.tables.create
(テンプレート テーブルを使用してテーブルを自動的に作成する場合は必須)
次の各 IAM 事前定義ロールには、BigQuery にデータをストリーミングするために必要な権限が含まれています。
roles/bigquery.dataEditor
roles/bigquery.dataOwner
roles/bigquery.admin
BigQuery での IAM のロールと権限について詳しくは、事前定義ロールと権限をご覧ください。
BigQuery へのデータのストリーミング
C#
このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の C# の設定手順を行ってください。詳細については、BigQuery C# API のリファレンス ドキュメントをご覧ください。
Go
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Go の手順に沿って設定を行ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Java の手順に沿って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Node.js の手順に沿って設定を行ってください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。
PHP
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの PHP の手順に沿って設定を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Python の手順に沿って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。
Ruby
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Ruby の手順に沿って設定を行ってください。詳細については、BigQuery Ruby API のリファレンス ドキュメントをご覧ください。
行の挿入時に insertID
フィールドに値を入力する必要はありません。次の例は、ストリーミング時に各行の insertID
の送信を防ぐ方法を示しています。
Java
このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Java の設定手順を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートの Python の手順に沿って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。
日時データの送信
日時フィールドの場合、tabledata.insertAll
メソッドで次のようにデータをフォーマットします。
種類 | 形式 |
---|---|
DATE |
"YYYY-MM-DD" 形式の文字列 |
DATETIME |
"YYYY-MM-DD [HH:MM:SS]" 形式の文字列 |
TIME |
"HH:MM:SS" 形式の文字列 |
TIMESTAMP |
1970-01-01(Unix エポック)からの経過秒数、または "YYYY-MM-DD HH:MM[:SS]" 形式の文字列 |
ストリーミング データの可用性
データは、BigQuery が tabledata.insertAll
リクエストを正常に確認すれば、すぐにGoogleSQL クエリによるリアルタイム分析に使用できるようになります。
最近取り込み時間パーティション分割テーブルにストリーミングした行は、_PARTITIONTIME
疑似列の値が一時的に NULL になります。このような行に対して、BigQuery はバックグラウンドで PARTITIONTIME
列の最終的な NULL 以外の値を割り当てます。これは通常、数分以内に行われます。まれに最大 90 分かかることがあります。
最近ストリーミングされた行の一部は、通常、数分間はテーブルのコピーに使用できない可能性があります。まれに最大 90 分かかることがあります。データがコピーに使用できるかどうかを確認するには、tables.get
に対するレスポンスに streamingBuffer
というセクションがあるかどうかを確認します。streamingBuffer
セクションがなければ、データはコピーに利用できます。streamingBuffer.oldestEntryTime
フィールドを使用してストリーミング バッファ内のレコードの滞在時間を確認することもできます。
ベスト エフォート型の重複排除
挿入された行に対して insertId
を指定した場合、BigQuery はこの ID を使用して、ベスト エフォート型の重複排除を最大 1 分間サポートします。つまり、その期間内に同じ insertId
の同じ行を同じテーブルに複数回ストリーミングしようとすると、BigQuery はその行の複数のオカレンスを重複排除して、それらのオカレンスの一つだけを保持する可能性があります。
このシステムでは、同じ insertId
が指定された行も同一のものであると想定されます。2 つの行が同じ insertId
を持つ場合、どちらの行を BigQuery が保持するかは非決定的になります。
重複排除は一般に、システムと BigQuery 間のネットワーク エラーや BigQuery の内部エラーといった特定のエラー状態でストリーミング挿入の状態を判断する方法がない分散システムでの再試行シナリオ向けです。挿入を再試行する場合は、BigQuery がデータの重複排除を試行できるよう、同じ行セットに同じ insertId
を使用するようにしてください。詳細については、ストリーミング挿入に関するトラブルシューティングをご覧ください。
BigQuery の重複排除はベスト エフォート型であり、データの重複がないことを保証するメカニズムとしての使用には適していません。さらに、データの高い信頼性と可用性を保証するために、BigQuery はベスト エフォート型の重複排除の品質を低下させる可能性があります。
データの重複排除に関して厳密な要件がある場合は、Google Cloud Datastore がトランザクションをサポートする代替サービスとなります。
ベスト エフォート型の重複排除の無効化
ベスト エフォート型の重複排除を無効にするには、挿入された各行の insertId
フィールドに値を設定しないようにします。これは、データの挿入を行う場合に推奨される方法です。
Apache Beam と Dataflow
Apache Beam の Java 用 BigQuery I/O コネクタを使用しているときにベスト エフォート型の重複排除を無効にするには、ignoreInsertIds()
メソッドを使用します。
手動の重複排除
ストリーミングの実行後に重複行が残らないようにするには、次の手動プロセスを使用します。
- テーブル スキーマ内の列として
insertId
を追加し、各行のデータにinsertId
値を含めます。 - ストリーミングが停止した後に、次のクエリを実行して重複をチェックします。
#standardSQL SELECT MAX(count) FROM( SELECT ID_COLUMN, count(*) as count FROM `TABLE_NAME` GROUP BY ID_COLUMN)
結果が 1 より大きい場合は、重複が存在します。 - 重複を排除するには、次のクエリを実行します。宛先テーブルを指定してサイズの大きい結果を許容し、結果のフラット化を無効にします。
#standardSQL SELECT * EXCEPT(row_number) FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY ID_COLUMN) row_number FROM `TABLE_NAME`) WHERE row_number = 1
重複排除クエリに関する注:
- 重複排除クエリの戦略として、新規テーブルをターゲットにした方が安全です。なお、書き込み処理
WRITE_TRUNCATE
を使用して、ソーステーブルをターゲットにすることもできます。 - 重複排除クエリは、テーブル スキーマの末尾に値を
1
に設定したrow_number
列を追加します。クエリでは、GoogleSQL のSELECT * EXCEPT
ステートメントを使用して、このrow_number
列を宛先テーブルから除外します。#standardSQL
接頭辞を使用すると、このクエリで Google SQL が有効になります。また、特定の列名を指定して、この列を省略することもできます。 - ライブデータのクエリで重複を排除するには、重複排除クエリを使用してテーブルのビューを作成するという方法もあります。なお、ビューに対するクエリコストはビュー内の選択列に基づいて計算されるため、スキャンされるバイトサイズが大きくなる可能性があります。
時間パーティション分割テーブルへのストリーミング
時間パーティション分割テーブルにデータをストリーミングする場合は、各パーティションにストリーミング バッファが存在します。writeDisposition
プロパティを WRITE_TRUNCATE
に設定すると、パーティションを上書きする読み込み、クエリ、またはコピーのジョブを実行したときにストリーミング バッファが保持されます。ストリーミング バッファを削除するには、そのパーティションに対して tables.get
を呼び出し、ストリーミング バッファが空であることを確認します。
取り込み時間パーティショニング
取り込み時間パーティション分割テーブルにストリーミングすると、BigQuery は現在の UTC 時間から宛先パーティションを推測します。
新しく到着したデータは、ストリーミング バッファ内にある間、一時的に __UNPARTITIONED__
パーティションに配置されます。パーティション分割されていないデータが十分蓄積されると、BigQuery はデータを正しいパーティションに分割します。ただし、データが __UNPARTITIONED__
パーティションから外れるまでの時間に関する SLA はありません。いずれかの疑似列(優先するデータ型に応じて _PARTITIONTIME
または _PARTITIONDATE
)を使用して、__UNPARTITIONED__
パーティションからの NULL
値を除外することにより、ストリーミング バッファ内のデータをクエリの対象外にすることができます。
日次パーティション分割テーブルにデータをストリーミングする場合は、insertAll
リクエストの一部としてパーティション デコレータを指定することで、日付の推定をオーバーライドできます。tableId
パラメータにデコレータを含めます。たとえば、次のようにパーティション デコレータを使用して、table1
テーブルの 2021-03-01 に対応するパーティションにストリーミングできます。
table1$20210301
パーティション デコレータを使用してストリーミングを行う際は、現在の UTC 時間に基づき、過去 31 日以内のパーティションと現在の日付から 16 日後までのパーティションにストリーミングできます。この範囲に含まれない日付のパーティションに書き込むには、パーティション分割テーブルデータの追加と上書きで説明するように、読み込みジョブまたはクエリジョブを使用します。
パーティション デコレータを使用したストリーミングは、日単位のパーティション分割テーブルでのみサポートされています。時間単位、月単位、年単位のパーティション分割テーブルではサポートされていません。
テストには、bq
コマンドライン ツールの bq insert
CLI コマンドを使用できます。たとえば、次のコマンドを実行すると、2017 年 1 月 1 日($20170101
)のパーティション全体のデータを、mydataset.mytable
という名前のパーティション分割テーブルに 1 行がストリーミングされます。
echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'
時間単位列パーティショニング
DATE
列、DATETIME
列、または TIMESTAMP
列で分割されたテーブルに、過去 5 年間、向こう 1 年間のデータをストリーミングできます。この範囲外のデータは拒否されます。
データがストリーミングされると、最初に __UNPARTITIONED__
パーティションに配置されます。パーティション分割されていないデータが十分蓄積されると、BigQuery はデータのパーティション再設定を自動的に行い、適切なパーティションに配置します。
ただし、データが __UNPARTITIONED__
パーティションから外れるまでの時間に関する SLA はありません。
- 注: 日単位のパーティションは、時間、月、年単位のパーティションとは異なる方法で処理されます。期間(過去 7 日間から将来の 3 日間まで)外のデータのみがパーティション分割されていないパーティションに抽出され、パーティションの再分割を待ちます。一方、時間単位のパーティション分割テーブルの場合、データは常にパーティション分割されていないパーティションに抽出され、後でパーティションの再分割が行われます。
テンプレート テーブルを使用したテーブルの自動作成
テンプレート テーブルは、論理テーブルを多数の小さなテーブルに分割して、より小さなデータの集合(たとえば、ユーザー ID ごと)を作成するメカニズムを提供します。テンプレート テーブルには、以下に説明するいくつかの制限があります。代わりにパーティション分割テーブルとクラスタ化テーブルを使用してこの動作を実現することをおすすめします。
BigQuery API を介してテンプレート テーブルを使用するには、insertAll
リクエストに templateSuffix
パラメータを追加します。bq
コマンドライン ツールの場合は、insert
コマンドに template_suffix
フラグを追加します。BigQuery は、templateSuffix
パラメータまたは template_suffix
フラグを検出した場合、ターゲット テーブルをベース テンプレートとして扱います。さらに、ターゲット テーブルと同じスキーマを共有する新しいテーブルを、指定されたサフィックスを含む名前で作成します。
<targeted_table_name> + <templateSuffix>
テンプレート テーブルを使用すると、各テーブルを個別に作成し、各テーブルのスキーマを指定するためのオーバーヘッドを回避できます。テンプレートを 1 つ作成し、複数のサフィックスを指定するだけで、新規テーブルを BigQuery に自動作成させることができます。BigQuery は、各テーブルを同じプロジェクトとデータセット内に配置します。
テンプレート テーブルを通じて作成されたテーブルは、通常数秒以内に利用可能になります。ただし、それより長い時間を要する場合もまれにあります。
テンプレート テーブル スキーマの変更
テンプレート テーブル スキーマを変更した場合は、それ以降に生成されるすべてのテーブルで、更新後のスキーマが使用されます。以前に生成されたテーブルには影響はありません(既存のテーブルにストリーミング バッファが残っている場合を除く)。
ストリーミング バッファが残っている既存のテーブルについては、テンプレート テーブル スキーマに対する変更が後方互換性のあるものであれば、現にストリーミングが行われているそれらの生成済テーブルのスキーマも更新されます。ただし、テンプレート テーブル スキーマに対する変更が後方互換性のないものである場合は、古いスキーマを使用するバッファデータがすべて失われます。また、互換性がなくなった古いスキーマを使用する既存の生成済テーブルに、新しいデータをストリーミングすることはできません。
テンプレート テーブル スキーマを変更した後は、その変更が伝播されるまで、新しいデータの挿入や、生成されたテーブルに対するクエリを行わないでください。新しいフィールドを挿入するリクエストは、数分以内に処理されます。新規フィールドに対するクエリ実行には、最大 90 分の待機時間を要する場合があります。
生成されたテーブルのスキーマを変更する場合は、テンプレート テーブルを経由したストリーミングが停止し、生成済みテーブルのストリーミング統計セクションが tables.get()
レスポンスからなくなる(テーブルでバッファされているデータがなくなる)まで、スキーマを変更しないようにしてください。
パーティション分割テーブルとクラスタ化テーブルには前述の制限がないため、おすすめのメカニズムです。
テンプレート テーブルの詳細
- テンプレート サフィックス値
templateSuffix
(または--template_suffix
)値には、英字(a~z、A~Z)、数字(0~9)、アンダースコア(_)のみを含める必要があります。テーブル名とテーブル サフィックスの最大連結文字数は 1,024 文字です。- 割り当て
テンプレート テーブルには、ストリーミング割り当ての制限があります。
tables.insert
API と同様、プロジェクトでは 1 秒あたり最大 10 個のテーブルを作成できます。この割り当ては、作成されるテーブルにのみ適用されます。変更するテーブルには適用されません。アプリケーションが 1 秒あたり 10 個を超えるテーブルを作成する必要がある場合は、クラスタ化テーブルを使用することをおすすめします。たとえば、単一のクラスタリング テーブルのキー列にカーディナリティの高いテーブル ID を配置できます。
- 有効期間
生成されたテーブルの有効期間はデータセットから継承されます。通常のストリーミング データと同様、生成されたテーブルをすぐにコピーすることはできません。
- 重複排除
重複排除は、宛先テーブルへの同質な参照間でのみ実行されます。 たとえば、テンプレート テーブルと通常の
insertAll
コマンドの両方を使用して生成済テーブルへのストリーミングを同時に実行した場合、テンプレート テーブルと通常のinsertAll
コマンドによって挿入された各行の間で重複排除は実行されません。- ビュー
ビューをテンプレート テーブルとして使用することはできません。また、テンプレート テーブルを元にビューを自動生成させることもできません。
ストリーミング挿入に関するトラブルシューティング
ストリーミング挿入時に発生したエラーの解決方法については、エラー メッセージのページのストリーミング挿入に関するトラブルシューティングをご覧ください。