diff --git a/veilid-core/src/storage_manager/limited_size.rs b/veilid-core/src/storage_manager/limited_size.rs index dbb82bec..22a4a93b 100644 --- a/veilid-core/src/storage_manager/limited_size.rs +++ b/veilid-core/src/storage_manager/limited_size.rs @@ -93,13 +93,14 @@ impl LimitedSize { } } log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value); + self.uncommitted_value = None; self.value = uncommitted_value; } Ok(self.value) } pub fn rollback(&mut self) -> T { - if let Some(uv) = self.uncommitted_value { + if let Some(uv) = self.uncommitted_value.take() { log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv); } return self.value; diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 1de083f2..8a52d93a 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -7,6 +7,21 @@ use super::*; use hashlink::LruCache; +#[derive(Debug, Clone)] +/// A dead record that is yet to be purged from disk and statistics +struct DeadRecord +where + D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, +{ + // The key used in the record_index + key: RecordTableKey, + // The actual record + record: Record, + // True if this record is accounted for in the total storage + // and needs to have the statistics updated or not when purged + in_total_storage: bool, +} + pub struct RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, @@ -15,16 +30,24 @@ where name: String, limits: RecordStoreLimits, + /// The tabledb used for record data record_table: Option, + /// The tabledb used for subkey data subkey_table: Option, + /// The in-memory index that keeps track of what records are in the tabledb record_index: LruCache>, + /// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db subkey_cache: LruCache, + /// Total storage space or subkey data inclusive of structures in memory subkey_cache_total_size: LimitedSize, + /// Total storage space of records in the tabledb inclusive of subkey data and structures total_storage_space: LimitedSize, - - dead_records: Vec<(RecordTableKey, Record)>, + /// Records to be removed from the tabledb upon next purge + dead_records: Vec>, + /// The list of records that have changed since last flush to disk (optimization for batched writes) changed_records: HashSet, + /// A mutex to ensure we handle this concurrently purge_dead_records_mutex: Arc>, } @@ -97,33 +120,45 @@ where // Sort the record index by last touched time and insert in sorted order record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched())); - let mut dead_records = Vec::new(); + let mut dead_records = Vec::>::new(); for ri in record_index_saved { // total the storage space self.total_storage_space - .add(mem::size_of::() as u64) - .unwrap(); - self.total_storage_space - .add(ri.1.total_size() as u64) + .add((mem::size_of::() + ri.1.total_size()) as u64) .unwrap(); if let Err(_) = self.total_storage_space.commit() { - // If we overflow the limit, kill off the record - dead_records.push((ri.0, ri.1)); + // Revert the total storage space because the commit failed + self.total_storage_space.rollback(); + + // If we overflow the limit, kill off the record, noting that it has not yet been added to the total storage space + dead_records.push(DeadRecord { + key: ri.0, + record: ri.1, + in_total_storage: false, + }); continue; } // add to index and ensure we deduplicate in the case of an error if let Some(v) = self.record_index.insert_with_callback(ri.0, ri.1, |k, v| { // If the configuration change, we only want to keep the 'limits.max_records' records - dead_records.push((k, v)); + dead_records.push(DeadRecord { + key: k, + record: v, + in_total_storage: true, + }); }) { // This shouldn't happen, but deduplicate anyway log_stor!(warn "duplicate record in table: {:?}", ri.0); - dead_records.push((ri.0, v)); + dead_records.push(DeadRecord { + key: ri.0, + record: v, + in_total_storage: true, + }); } } - for (k, v) in dead_records { - self.add_dead_record(k, v); + for dr in dead_records { + self.dead_records.push(dr); } self.record_table = Some(record_table); @@ -132,7 +167,11 @@ where } fn add_dead_record(&mut self, key: RecordTableKey, record: Record) { - self.dead_records.push((key, record)); + self.dead_records.push(DeadRecord { + key, + record, + in_total_storage: true, + }); } fn mark_record_changed(&mut self, key: RecordTableKey) { @@ -208,23 +247,23 @@ where let rt_xact = record_table.transact(); let st_xact = subkey_table.transact(); let dead_records = mem::take(&mut self.dead_records); - for (k, v) in dead_records { + for dr in dead_records { // Record should already be gone from index - if self.record_index.contains_key(&k) { - log_stor!(error "dead record found in index: {:?}", k); + if self.record_index.contains_key(&dr.key) { + log_stor!(error "dead record found in index: {:?}", dr.key); } // Delete record - if let Err(e) = rt_xact.delete(0, &k.bytes()) { + if let Err(e) = rt_xact.delete(0, &dr.key.bytes()) { log_stor!(error "record could not be deleted: {}", e); } // Delete subkeys - let stored_subkeys = v.stored_subkeys(); + let stored_subkeys = dr.record.stored_subkeys(); for sk in stored_subkeys.iter() { // From table let stk = SubkeyTableKey { - key: k.key, + key: dr.key.key, subkey: sk, }; let stkb = stk.bytes(); @@ -237,11 +276,12 @@ where } // Remove from total size - self.total_storage_space - .saturating_sub(mem::size_of::() as u64); - self.total_storage_space - .saturating_sub(v.total_size() as u64); - self.total_storage_space.commit().unwrap(); + if dr.in_total_storage { + self.total_storage_space.saturating_sub( + (mem::size_of::() + dr.record.total_size()) as u64, + ); + self.total_storage_space.commit().unwrap(); + } } if let Err(e) = rt_xact.commit().await { log_stor!(error "failed to commit record table transaction: {}", e); @@ -306,6 +346,9 @@ where .await .map_err(VeilidAPIError::internal)?; + // Update storage space (won't fail due to check_limit above) + self.total_storage_space.commit().unwrap(); + // Save to record index let mut dead_records = Vec::new(); if let Some(v) = self.record_index.insert_with_callback(rtk, record, |k, v| { @@ -319,9 +362,6 @@ where self.add_dead_record(dr.0, dr.1); } - // Update storage space - self.total_storage_space.commit().unwrap(); - Ok(()) } @@ -407,7 +447,7 @@ where subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { - // record from index + // Get record from index let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| { (record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor { Some(record.descriptor().clone()) @@ -545,9 +585,9 @@ where ); } - // Get record from index - let Some((subkey_count, total_size)) = self.with_record(key, |record| { - (record.subkey_count(), record.total_size()) + // Get record subkey count and total size of all record subkey data exclusive of structures + let Some((subkey_count, prior_record_data_size)) = self.with_record(key, |record| { + (record.subkey_count(), record.record_data_size()) }) else { apibail_invalid_argument!("no record at this key", "key", key); }; @@ -563,14 +603,14 @@ where }; // Get the previous subkey and ensure we aren't going over the record size limit - let mut prior_record_data_size = 0usize; + let mut prior_subkey_size = 0usize; // If subkey exists in subkey cache, use that let stk = SubkeyTableKey { key, subkey }; let stk_bytes = stk.bytes(); if let Some(record_data) = self.subkey_cache.peek(&stk) { - prior_record_data_size = record_data.total_size(); + prior_subkey_size = record_data.data_size(); } else { // If not in cache, try to pull from table store if let Some(record_data) = subkey_table @@ -578,26 +618,26 @@ where .await .map_err(VeilidAPIError::internal)? { - prior_record_data_size = record_data.total_size(); + prior_subkey_size = record_data.data_size(); } } // Make new record data - let record_data = RecordData::new(signed_value_data); + let subkey_record_data = RecordData::new(signed_value_data); // Check new total record size - let new_record_data_size = record_data.total_size(); - let new_total_size = total_size + new_record_data_size - prior_record_data_size; - if new_total_size > self.limits.max_record_total_size { + let new_subkey_size = subkey_record_data.data_size(); + let new_record_data_size = prior_record_data_size - prior_subkey_size + new_subkey_size; + if new_record_data_size > self.limits.max_record_total_size { apibail_generic!("dht record too large"); } // Check new total storage space self.total_storage_space - .sub(prior_record_data_size as u64) + .sub(prior_subkey_size as u64) .unwrap(); self.total_storage_space - .add(new_record_data_size as u64) + .add(new_subkey_size as u64) .unwrap(); if !self.total_storage_space.check_limit() { apibail_try_again!(); @@ -605,17 +645,17 @@ where // Write subkey subkey_table - .store_json(0, &stk_bytes, &record_data) + .store_json(0, &stk_bytes, &subkey_record_data) .await .map_err(VeilidAPIError::internal)?; // Write to subkey cache - self.add_to_subkey_cache(stk, record_data); + self.add_to_subkey_cache(stk, subkey_record_data); // Update record self.with_record_mut(key, |record| { record.store_subkey(subkey); - record.set_record_data_size(new_total_size); + record.set_record_data_size(new_record_data_size); }) .expect("record should still be here"); @@ -666,7 +706,7 @@ where out += &format!("Total Storage Space: {}\n", self.total_storage_space.get()); out += &format!("Dead Records: {}\n", self.dead_records.len()); for dr in &self.dead_records { - out += &format!(" {}\n", dr.0.key.to_string()); + out += &format!(" {}\n", dr.key.key.to_string()); } out += &format!("Changed Records: {}\n", self.changed_records.len()); for cr in &self.changed_records { diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/types/record.rs index 76f61bc0..d8ca49ee 100644 --- a/veilid-core/src/storage_manager/types/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -74,7 +74,9 @@ where } pub fn total_size(&self) -> usize { - mem::size_of::>() + self.descriptor.total_size() + self.record_data_size + (mem::size_of::() - mem::size_of::()) + + self.descriptor.total_size() + + self.record_data_size } // pub fn detail(&self) -> &D { diff --git a/veilid-core/src/storage_manager/types/record_data.rs b/veilid-core/src/storage_manager/types/record_data.rs index 532f4e0b..8b646c7a 100644 --- a/veilid-core/src/storage_manager/types/record_data.rs +++ b/veilid-core/src/storage_manager/types/record_data.rs @@ -12,7 +12,11 @@ impl RecordData { pub fn signed_value_data(&self) -> &SignedValueData { &self.signed_value_data } + pub fn data_size(&self) -> usize { + self.signed_value_data.data_size() + } pub fn total_size(&self) -> usize { - mem::size_of::() + self.signed_value_data.value_data().data().len() + (mem::size_of::() - mem::size_of::()) + + self.signed_value_data.total_size() } } diff --git a/veilid-core/src/storage_manager/types/signed_value_data.rs b/veilid-core/src/storage_manager/types/signed_value_data.rs index 1a2a3cf8..5724f0c3 100644 --- a/veilid-core/src/storage_manager/types/signed_value_data.rs +++ b/veilid-core/src/storage_manager/types/signed_value_data.rs @@ -56,6 +56,10 @@ impl SignedValueData { &self.signature } + pub fn data_size(&self) -> usize { + self.value_data.data_size() + } + pub fn total_size(&self) -> usize { (mem::size_of::() - mem::size_of::()) + self.value_data.total_size() } diff --git a/veilid-core/src/veilid_api/types/dht/value_data.rs b/veilid-core/src/veilid_api/types/dht/value_data.rs index 34d6d8ac..98f73910 100644 --- a/veilid-core/src/veilid_api/types/dht/value_data.rs +++ b/veilid-core/src/veilid_api/types/dht/value_data.rs @@ -56,6 +56,10 @@ impl ValueData { &self.data } + pub fn data_size(&self) -> usize { + self.data.len() + } + pub fn total_size(&self) -> usize { mem::size_of::() + self.data.len() }