シャーディングされたタイムスタンプ
コレクションにシーケンシャル インデックス値を持つドキュメントが含まれている場合、Firestore は書き込みレートを 500 回/秒の書き込みに制限します。このページでは、この制限に対処するためにドキュメント フィールドをシャーディングする方法を説明します。まず、「シーケンシャル インデックス フィールド」の意味を定義し、この制限がどのような場合に適用されるのかを明確にしましょう。
シーケンシャル インデックス フィールド
「シーケンシャル インデックス フィールド」とは、値が単調に増加または減少するインデックス フィールドを含むドキュメントのコレクションを意味します。多くの場合、これは timestamp
フィールドを意味しますが、単調に増加または減少するフィールド値により、500 回/秒の書き込み制限がトリガーされることがあります。
この制限がドキュメントのコレクションに適用されるのは、たとえばコレクション内の user
ドキュメントにインデックス フィールド userid
が含まれていて、アプリが次のように userid
値を割り当てる場合です。
1281, 1282, 1283, 1284, 1285, ...
一方、すべての timestamp
フィールドにより、この制限がトリガーされるわけではありません。timestamp
フィールドでランダムに分散された値が追跡される場合、書き込み制限は適用されません。また、フィールドの実際の値も重要ではなく、フィールド値は単調に増加または減少します。たとえば、単調に増加する以下の一連のフィールド値は、どちらも書き込み制限をトリガーします。
100000, 100001, 100002, 100003, ...
0, 1, 2, 3, ...
タイムスタンプ フィールドのシャーディング
たとえば、値が単調に増加する timestamp
フィールドをアプリで使用しているとします。アプリがこの timestamp
フィールドをクエリで使用しない場合、タイムスタンプ フィールドのインデックスを作成しなければ、500 回/秒の書き込み制限を取り除くことができます。クエリに timestamp
フィールドが必要な場合は、次のようにシャーディングされたタイムスタンプを使用することで制限を回避できます。
timestamp
フィールドとともにshard
フィールドを追加します。shard
フィールドには、1..n
の一意の値を使用します。これにより、このコレクションに対する書き込み制限が500*n
に引き上げられますが、n
クエリの集約が必要になります。- 書き込みロジックを更新して、各ドキュメントにランダムに
shard
値が割り当てられるようにします。 - シャーディングされた結果セットを集約するようにクエリを更新します。
shard
フィールドとtimestamp
フィールドの両方の単一フィールド インデックスを無効にします。timestamp
フィールドを含む既存の複合インデックスを削除します。- 更新後のクエリをサポートする新しい複合インデックスを作成します。インデックス内のフィールドの順序は重要です。
timestamp
フィールドの前にshard
フィールドを配置する必要があります。timestamp
フィールドを含むインデックスには、shard
フィールドも含める必要があります。
シャーディングされたタイムスタンプは、書き込みレートが常に 1 秒あたり 500 回を超えるユースケースに限って実装するようにしてください。それ以外の場合では、早計な最適化になってしまいます。timestamp
フィールドをシャーディングすると、500 回/秒の書き込み制限が取り除かれますが、クライアントサイドでクエリの集約が必要になるというトレードオフがあります。
以下の例で、timestamp
フィールドをシャーディングする方法と、シャーディングされた結果セットをクエリする方法を説明します。
データモデルとクエリの例
一例として、通貨、普通株、ETF などの金融商品をほぼリアルタイムで分析するアプリがあるとします。このアプリは、次のようにドキュメントを instruments
コレクションに書き込みます。
Node.js
async function insertData() { const instruments = [ { symbol: 'AAA', price: { currency: 'USD', micros: 34790000 }, exchange: 'EXCHG1', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.010Z')) }, { symbol: 'BBB', price: { currency: 'JPY', micros: 64272000000 }, exchange: 'EXCHG2', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.101Z')) }, { symbol: 'Index1 ETF', price: { currency: 'USD', micros: 473000000 }, exchange: 'EXCHG1', instrumentType: 'etf', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.001Z')) } ]; const batch = fs.batch(); for (const inst of instruments) { const ref = fs.collection('instruments').doc(); batch.set(ref, inst); } await batch.commit(); }
このアプリは次のクエリを実行し、timestamp
フィールドを基準に並べ替えます。
Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) { return fs.collection('instruments') .where(fieldName, fieldOperator, fieldValue) .orderBy('timestamp', 'desc') .limit(limit) .get(); } function queryCommonStock() { return createQuery('instrumentType', '==', 'commonstock'); } function queryExchange1Instruments() { return createQuery('exchange', '==', 'EXCHG1'); } function queryUSDInstruments() { return createQuery('price.currency', '==', 'USD'); }
insertData() .then(() => { const commonStock = queryCommonStock() .then( (docs) => { console.log('--- queryCommonStock: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const exchange1Instruments = queryExchange1Instruments() .then( (docs) => { console.log('--- queryExchange1Instruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const usdInstruments = queryUSDInstruments() .then( (docs) => { console.log('--- queryUSDInstruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); return Promise.all([commonStock, exchange1Instruments, usdInstruments]); });
調査の結果、このアプリが 1 秒あたりに受信する金融商品の更新は 1,000~1,500 件であると判断しました。このレートは、インデックス付きタイムスタンプ フィールドを持つドキュメントのコレクションに適用される 500 回/秒の書き込み制限を超えています。書き込みスループットを向上させるには、3 つの shard 値が必要です(MAX_INSTRUMENT_UPDATES/500 = 3
)。この例では、shard 値 x
、y
、z
を使用します。shard 値には数字または他の文字を使用することもできます。
shard フィールドを追加する
shard
フィールドをドキュメントに追加します。shard
フィールドに値 x
、y
、または z
を設定すると、コレクションに対する書き込み制限が 1,500 回/秒の書き込みに引き上げられます。
Node.js
// Define our 'K' shard values const shards = ['x', 'y', 'z']; // Define a function to help 'chunk' our shards for use in queries. // When using the 'in' query filter there is a max number of values that can be // included in the value. If our number of shards is higher than that limit // break down the shards into the fewest possible number of chunks. function shardChunks() { const chunks = []; let start = 0; while (start < shards.length) { const elements = Math.min(MAX_IN_VALUES, shards.length - start); const end = start + elements; chunks.push(shards.slice(start, end)); start = end; } return chunks; } // Add a convenience function to select a random shard function randomShard() { return shards[Math.floor(Math.random() * Math.floor(shards.length))]; }
async function insertData() { const instruments = [ { shard: randomShard(), // add the new shard field to the document symbol: 'AAA', price: { currency: 'USD', micros: 34790000 }, exchange: 'EXCHG1', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.010Z')) }, { shard: randomShard(), // add the new shard field to the document symbol: 'BBB', price: { currency: 'JPY', micros: 64272000000 }, exchange: 'EXCHG2', instrumentType: 'commonstock', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.101Z')) }, { shard: randomShard(), // add the new shard field to the document symbol: 'Index1 ETF', price: { currency: 'USD', micros: 473000000 }, exchange: 'EXCHG1', instrumentType: 'etf', timestamp: Timestamp.fromMillis( Date.parse('2019-01-01T13:45:23.001Z')) } ]; const batch = fs.batch(); for (const inst of instruments) { const ref = fs.collection('instruments').doc(); batch.set(ref, inst); } await batch.commit(); }
シャーディングされたタイムスタンプをクエリする
shard
フィールドを追加した後は、シャーディングされた結果を集約するようにクエリを更新する必要があります。
Node.js
function createQuery(fieldName, fieldOperator, fieldValue, limit = 5) { // For each shard value, map it to a new query which adds an additional // where clause specifying the shard value. return Promise.all(shardChunks().map(shardChunk => { return fs.collection('instruments') .where('shard', 'in', shardChunk) // new shard condition .where(fieldName, fieldOperator, fieldValue) .orderBy('timestamp', 'desc') .limit(limit) .get(); })) // Now that we have a promise of multiple possible query results, we need // to merge the results from all of the queries into a single result set. .then((snapshots) => { // Create a new container for 'all' results const docs = []; snapshots.forEach((querySnapshot) => { querySnapshot.forEach((doc) => { // append each document to the new all container docs.push(doc); }); }); if (snapshots.length === 1) { // if only a single query was returned skip manual sorting as it is // taken care of by the backend. return docs; } else { // When multiple query results are returned we need to sort the // results after they have been concatenated. // // since we're wanting the `limit` newest values, sort the array // descending and take the first `limit` values. By returning negated // values we can easily get a descending value. docs.sort((a, b) => { const aT = a.data().timestamp; const bT = b.data().timestamp; const secondsDiff = aT.seconds - bT.seconds; if (secondsDiff === 0) { return -(aT.nanoseconds - bT.nanoseconds); } else { return -secondsDiff; } }); return docs.slice(0, limit); } }); } function queryCommonStock() { return createQuery('instrumentType', '==', 'commonstock'); } function queryExchange1Instruments() { return createQuery('exchange', '==', 'EXCHG1'); } function queryUSDInstruments() { return createQuery('price.currency', '==', 'USD'); }
insertData() .then(() => { const commonStock = queryCommonStock() .then( (docs) => { console.log('--- queryCommonStock: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const exchange1Instruments = queryExchange1Instruments() .then( (docs) => { console.log('--- queryExchange1Instruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); const usdInstruments = queryUSDInstruments() .then( (docs) => { console.log('--- queryUSDInstruments: '); docs.forEach((doc) => { console.log(`doc = ${util.inspect(doc.data(), {depth: 4})}`); }); } ); return Promise.all([commonStock, exchange1Instruments, usdInstruments]); });
インデックス定義を更新する
500 回/秒の書き込みという制約を取り除くには、既存の単一フィールド インデックスと、timestamp
フィールドを使用する複合インデックスを削除します。
複合インデックス定義を削除する
Firebase コンソール
Firebase コンソールで Firestore の [インデックス] ページを開き、[複合] タブを表示します。
timestamp
フィールドを含む各インデックスについて、 ボタンをクリックし、[削除] をクリックします。
GCP Console
Google Cloud コンソールで [Database] ページに移動します。
データベースのリストから、必要なデータベースを選択します。
ナビゲーション メニューで、[インデックス] をクリックし、[複合] タブをクリックします。
[フィルタ] フィールドを使用して、
timestamp
フィールドを含むインデックス定義を検索します。該当するインデックスのそれぞれについて、
ボタンをクリックし、[削除] をクリックします。
Firebase CLI
- Firebase CLI を設定していない場合は、この指示に従って CLI をインストールし、
firebase init
コマンドを実行します。init
コマンドの実行中には、必ずFirestore: Deploy rules and create indexes for Firestore
を選択してください。 - セットアップ中に、Firebase CLI は既存のインデックス定義を
firestore.indexes.json
という名前のファイル(デフォルト)にダウンロードします。 timestamp
フィールドを含むインデックス定義を削除します。次に例を示します。{ "indexes": [ // Delete composite index definition that contain the timestamp field { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "exchange", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "instrumentType", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "price.currency", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, ] }
更新済みのインデックス定義をデプロイします。
firebase deploy --only firestore:indexes
単一フィールド インデックス定義を更新する
Firebase コンソール
Firebase コンソールで Firestore の [インデックス] ページを開き、[単一フィールド] タブを表示します。
[除外を追加] をクリックします。
[コレクション ID] に、「
instruments
」と入力します。[フィールドのパス] に、「timestamp
」と入力します。[クエリの範囲] で、[コレクション] と [コレクション グループ] の両方をオンにします。
[次へ] をクリックします。
すべてのインデックス設定を [無効] に切り替えます。[保存] をクリックします。
shard
フィールドについて、同じ手順を繰り返します。
GCP Console
Google Cloud コンソールで [Database] ページに移動します。
データベースのリストから、必要なデータベースを選択します。
ナビゲーション メニューで [インデックス] をクリックし、[単一フィールド] タブをクリックします。
[単一フィールド] タブをクリックします。
[除外を追加] をクリックします。
[コレクション ID] に、「
instruments
」と入力します。[フィールドのパス] に、「timestamp
」と入力します。[クエリの範囲] で、[コレクション] と [コレクション グループ] の両方をオンにします。
[次へ] をクリックします。
すべてのインデックス設定を [無効] に切り替えます。[保存] をクリックします。
shard
フィールドについて、同じ手順を繰り返します。
Firebase CLI
インデックス定義ファイルの
fieldOverrides
セクションに、次の行を追加します。{ "fieldOverrides": [ // Disable single-field indexing for the timestamp field { "collectionGroup": "instruments", "fieldPath": "timestamp", "indexes": [] }, ] }
更新済みのインデックス定義をデプロイします。
firebase deploy --only firestore:indexes
新しい複合インデックスを作成する
以前の timestamp
が含まれるインデックスをすべて削除した後、アプリに必要な新しいインデックスを定義します。timestamp
フィールドを含むインデックスには、shard
フィールドも含める必要があります。たとえば、上記のクエリをサポートするには、次のインデックスを追加します。
コレクション | インデックス登録されるフィールド | クエリの範囲 |
---|---|---|
instruments | shard、 price.currency、 timestamp | コレクション |
instruments | shard、 exchange、 timestamp | コレクション |
instruments | shard、 instrumentType、 timestamp | コレクション |
エラー メッセージ
更新後のクエリを実行すると、上記のインデックスを作成できます。
クエリごとに、Firebase コンソールで必要なインデックスを作成するためのリンクが示されたエラー メッセージが返されます。
Firebase CLI
インデックス定義ファイルに、次のインデックスを追加します。
{ "indexes": [ // New indexes for sharded timestamps { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "shard", "order": "DESCENDING" }, { "fieldPath": "exchange", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "shard", "order": "DESCENDING" }, { "fieldPath": "instrumentType", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, { "collectionGroup": "instruments", "queryScope": "COLLECTION", "fields": [ { "fieldPath": "shard", "order": "DESCENDING" }, { "fieldPath": "price.currency", "order": "ASCENDING" }, { "fieldPath": "timestamp", "order": "DESCENDING" } ] }, ] }
更新済みのインデックス定義をデプロイします。
firebase deploy --only firestore:indexes
シーケンシャル インデックス フィールドの書き込み制限について
シーケンシャル インデックス フィールドの書き込みレートに対する制限は、Firestore がインデックス値を保存してインデックス書き込みをスケーリングする方法に起因します。Firestore はインデックスの書き込みごとに、ドキュメント名と各インデックス フィールドの値を連結した Key-Value エントリを定義します。Firestore は、これらのインデックス エントリを「タブレット」と呼ばれるデータのグループに整理します。各 Firestore サーバーは 1 つ以上のタブレットを保持します。特定のタブレットへの書き込み負荷が高くなりすぎると、Firestore は水平スケーリングを行うために、タブレットを小さな複数のタブレットに分割し、それらの新しいタブレットを複数の Firestore サーバーに分散します。
Firestore は、辞書順が近い一連のインデックス エントリを同じタブレットに配置します。タイムスタンプ フィールドの場合のように、タブレット内のインデックス値が近すぎると、Firestore はタブレットを小さな複数のタブレットに効率的に分割できません。このことから、1 つのタブレットで大量のトラフィックを受信する、ホットスポットという状況が生じます。ホットスポットへの読み取り / 書き込みオペレーションには時間がかかります。
タイムスタンプ フィールドをシャーディングすると、Firestore がワークロードを複数のタブレットに効率的に分割できるようになります。それでもタイムスタンプ フィールドの値は近いままかもしれませんが、連結されたシャードとインデックスの値により、エントリ間には Firestore がインデックス エントリを複数のタブレットに分割するのに十分な間隔が与えられます。
次のステップ
- スケールを考慮して設計する際のベスト プラクティスを確認する。
- 単一ドキュメントへの書き込みレートが高い場合の分散カウンタについて確認する。
- Firestore の標準制限をご覧ください。