fix crash and clean up record data size housekeeping
This commit is contained in:
parent
36957d84f1
commit
853976789f
@ -93,13 +93,14 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value);
|
log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value);
|
||||||
|
self.uncommitted_value = None;
|
||||||
self.value = uncommitted_value;
|
self.value = uncommitted_value;
|
||||||
}
|
}
|
||||||
Ok(self.value)
|
Ok(self.value)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rollback(&mut self) -> T {
|
pub fn rollback(&mut self) -> T {
|
||||||
if let Some(uv) = self.uncommitted_value {
|
if let Some(uv) = self.uncommitted_value.take() {
|
||||||
log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv);
|
log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv);
|
||||||
}
|
}
|
||||||
return self.value;
|
return self.value;
|
||||||
|
@ -7,6 +7,21 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use hashlink::LruCache;
|
use hashlink::LruCache;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/// A dead record that is yet to be purged from disk and statistics
|
||||||
|
struct DeadRecord<D>
|
||||||
|
where
|
||||||
|
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
||||||
|
{
|
||||||
|
// The key used in the record_index
|
||||||
|
key: RecordTableKey,
|
||||||
|
// The actual record
|
||||||
|
record: Record<D>,
|
||||||
|
// True if this record is accounted for in the total storage
|
||||||
|
// and needs to have the statistics updated or not when purged
|
||||||
|
in_total_storage: bool,
|
||||||
|
}
|
||||||
|
|
||||||
pub struct RecordStore<D>
|
pub struct RecordStore<D>
|
||||||
where
|
where
|
||||||
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
||||||
@ -15,16 +30,24 @@ where
|
|||||||
name: String,
|
name: String,
|
||||||
limits: RecordStoreLimits,
|
limits: RecordStoreLimits,
|
||||||
|
|
||||||
|
/// The tabledb used for record data
|
||||||
record_table: Option<TableDB>,
|
record_table: Option<TableDB>,
|
||||||
|
/// The tabledb used for subkey data
|
||||||
subkey_table: Option<TableDB>,
|
subkey_table: Option<TableDB>,
|
||||||
|
/// The in-memory index that keeps track of what records are in the tabledb
|
||||||
record_index: LruCache<RecordTableKey, Record<D>>,
|
record_index: LruCache<RecordTableKey, Record<D>>,
|
||||||
|
/// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db
|
||||||
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
|
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
|
||||||
|
/// Total storage space or subkey data inclusive of structures in memory
|
||||||
subkey_cache_total_size: LimitedSize<usize>,
|
subkey_cache_total_size: LimitedSize<usize>,
|
||||||
|
/// Total storage space of records in the tabledb inclusive of subkey data and structures
|
||||||
total_storage_space: LimitedSize<u64>,
|
total_storage_space: LimitedSize<u64>,
|
||||||
|
/// Records to be removed from the tabledb upon next purge
|
||||||
dead_records: Vec<(RecordTableKey, Record<D>)>,
|
dead_records: Vec<DeadRecord<D>>,
|
||||||
|
/// The list of records that have changed since last flush to disk (optimization for batched writes)
|
||||||
changed_records: HashSet<RecordTableKey>,
|
changed_records: HashSet<RecordTableKey>,
|
||||||
|
|
||||||
|
/// A mutex to ensure we handle this concurrently
|
||||||
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
|
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -97,33 +120,45 @@ where
|
|||||||
|
|
||||||
// Sort the record index by last touched time and insert in sorted order
|
// Sort the record index by last touched time and insert in sorted order
|
||||||
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
|
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
|
||||||
let mut dead_records = Vec::new();
|
let mut dead_records = Vec::<DeadRecord<D>>::new();
|
||||||
for ri in record_index_saved {
|
for ri in record_index_saved {
|
||||||
// total the storage space
|
// total the storage space
|
||||||
self.total_storage_space
|
self.total_storage_space
|
||||||
.add(mem::size_of::<RecordTableKey>() as u64)
|
.add((mem::size_of::<RecordTableKey>() + ri.1.total_size()) as u64)
|
||||||
.unwrap();
|
|
||||||
self.total_storage_space
|
|
||||||
.add(ri.1.total_size() as u64)
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
if let Err(_) = self.total_storage_space.commit() {
|
if let Err(_) = self.total_storage_space.commit() {
|
||||||
// If we overflow the limit, kill off the record
|
// Revert the total storage space because the commit failed
|
||||||
dead_records.push((ri.0, ri.1));
|
self.total_storage_space.rollback();
|
||||||
|
|
||||||
|
// If we overflow the limit, kill off the record, noting that it has not yet been added to the total storage space
|
||||||
|
dead_records.push(DeadRecord {
|
||||||
|
key: ri.0,
|
||||||
|
record: ri.1,
|
||||||
|
in_total_storage: false,
|
||||||
|
});
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to index and ensure we deduplicate in the case of an error
|
// add to index and ensure we deduplicate in the case of an error
|
||||||
if let Some(v) = self.record_index.insert_with_callback(ri.0, ri.1, |k, v| {
|
if let Some(v) = self.record_index.insert_with_callback(ri.0, ri.1, |k, v| {
|
||||||
// If the configuration change, we only want to keep the 'limits.max_records' records
|
// If the configuration change, we only want to keep the 'limits.max_records' records
|
||||||
dead_records.push((k, v));
|
dead_records.push(DeadRecord {
|
||||||
|
key: k,
|
||||||
|
record: v,
|
||||||
|
in_total_storage: true,
|
||||||
|
});
|
||||||
}) {
|
}) {
|
||||||
// This shouldn't happen, but deduplicate anyway
|
// This shouldn't happen, but deduplicate anyway
|
||||||
log_stor!(warn "duplicate record in table: {:?}", ri.0);
|
log_stor!(warn "duplicate record in table: {:?}", ri.0);
|
||||||
dead_records.push((ri.0, v));
|
dead_records.push(DeadRecord {
|
||||||
|
key: ri.0,
|
||||||
|
record: v,
|
||||||
|
in_total_storage: true,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (k, v) in dead_records {
|
for dr in dead_records {
|
||||||
self.add_dead_record(k, v);
|
self.dead_records.push(dr);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.record_table = Some(record_table);
|
self.record_table = Some(record_table);
|
||||||
@ -132,7 +167,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn add_dead_record(&mut self, key: RecordTableKey, record: Record<D>) {
|
fn add_dead_record(&mut self, key: RecordTableKey, record: Record<D>) {
|
||||||
self.dead_records.push((key, record));
|
self.dead_records.push(DeadRecord {
|
||||||
|
key,
|
||||||
|
record,
|
||||||
|
in_total_storage: true,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mark_record_changed(&mut self, key: RecordTableKey) {
|
fn mark_record_changed(&mut self, key: RecordTableKey) {
|
||||||
@ -208,23 +247,23 @@ where
|
|||||||
let rt_xact = record_table.transact();
|
let rt_xact = record_table.transact();
|
||||||
let st_xact = subkey_table.transact();
|
let st_xact = subkey_table.transact();
|
||||||
let dead_records = mem::take(&mut self.dead_records);
|
let dead_records = mem::take(&mut self.dead_records);
|
||||||
for (k, v) in dead_records {
|
for dr in dead_records {
|
||||||
// Record should already be gone from index
|
// Record should already be gone from index
|
||||||
if self.record_index.contains_key(&k) {
|
if self.record_index.contains_key(&dr.key) {
|
||||||
log_stor!(error "dead record found in index: {:?}", k);
|
log_stor!(error "dead record found in index: {:?}", dr.key);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete record
|
// Delete record
|
||||||
if let Err(e) = rt_xact.delete(0, &k.bytes()) {
|
if let Err(e) = rt_xact.delete(0, &dr.key.bytes()) {
|
||||||
log_stor!(error "record could not be deleted: {}", e);
|
log_stor!(error "record could not be deleted: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete subkeys
|
// Delete subkeys
|
||||||
let stored_subkeys = v.stored_subkeys();
|
let stored_subkeys = dr.record.stored_subkeys();
|
||||||
for sk in stored_subkeys.iter() {
|
for sk in stored_subkeys.iter() {
|
||||||
// From table
|
// From table
|
||||||
let stk = SubkeyTableKey {
|
let stk = SubkeyTableKey {
|
||||||
key: k.key,
|
key: dr.key.key,
|
||||||
subkey: sk,
|
subkey: sk,
|
||||||
};
|
};
|
||||||
let stkb = stk.bytes();
|
let stkb = stk.bytes();
|
||||||
@ -237,11 +276,12 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Remove from total size
|
// Remove from total size
|
||||||
self.total_storage_space
|
if dr.in_total_storage {
|
||||||
.saturating_sub(mem::size_of::<RecordTableKey>() as u64);
|
self.total_storage_space.saturating_sub(
|
||||||
self.total_storage_space
|
(mem::size_of::<RecordTableKey>() + dr.record.total_size()) as u64,
|
||||||
.saturating_sub(v.total_size() as u64);
|
);
|
||||||
self.total_storage_space.commit().unwrap();
|
self.total_storage_space.commit().unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Err(e) = rt_xact.commit().await {
|
if let Err(e) = rt_xact.commit().await {
|
||||||
log_stor!(error "failed to commit record table transaction: {}", e);
|
log_stor!(error "failed to commit record table transaction: {}", e);
|
||||||
@ -306,6 +346,9 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?;
|
.map_err(VeilidAPIError::internal)?;
|
||||||
|
|
||||||
|
// Update storage space (won't fail due to check_limit above)
|
||||||
|
self.total_storage_space.commit().unwrap();
|
||||||
|
|
||||||
// Save to record index
|
// Save to record index
|
||||||
let mut dead_records = Vec::new();
|
let mut dead_records = Vec::new();
|
||||||
if let Some(v) = self.record_index.insert_with_callback(rtk, record, |k, v| {
|
if let Some(v) = self.record_index.insert_with_callback(rtk, record, |k, v| {
|
||||||
@ -319,9 +362,6 @@ where
|
|||||||
self.add_dead_record(dr.0, dr.1);
|
self.add_dead_record(dr.0, dr.1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update storage space
|
|
||||||
self.total_storage_space.commit().unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -407,7 +447,7 @@ where
|
|||||||
subkey: ValueSubkey,
|
subkey: ValueSubkey,
|
||||||
want_descriptor: bool,
|
want_descriptor: bool,
|
||||||
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
||||||
// record from index
|
// Get record from index
|
||||||
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
|
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
|
||||||
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
||||||
Some(record.descriptor().clone())
|
Some(record.descriptor().clone())
|
||||||
@ -545,9 +585,9 @@ where
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get record from index
|
// Get record subkey count and total size of all record subkey data exclusive of structures
|
||||||
let Some((subkey_count, total_size)) = self.with_record(key, |record| {
|
let Some((subkey_count, prior_record_data_size)) = self.with_record(key, |record| {
|
||||||
(record.subkey_count(), record.total_size())
|
(record.subkey_count(), record.record_data_size())
|
||||||
}) else {
|
}) else {
|
||||||
apibail_invalid_argument!("no record at this key", "key", key);
|
apibail_invalid_argument!("no record at this key", "key", key);
|
||||||
};
|
};
|
||||||
@ -563,14 +603,14 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the previous subkey and ensure we aren't going over the record size limit
|
// Get the previous subkey and ensure we aren't going over the record size limit
|
||||||
let mut prior_record_data_size = 0usize;
|
let mut prior_subkey_size = 0usize;
|
||||||
|
|
||||||
// If subkey exists in subkey cache, use that
|
// If subkey exists in subkey cache, use that
|
||||||
let stk = SubkeyTableKey { key, subkey };
|
let stk = SubkeyTableKey { key, subkey };
|
||||||
let stk_bytes = stk.bytes();
|
let stk_bytes = stk.bytes();
|
||||||
|
|
||||||
if let Some(record_data) = self.subkey_cache.peek(&stk) {
|
if let Some(record_data) = self.subkey_cache.peek(&stk) {
|
||||||
prior_record_data_size = record_data.total_size();
|
prior_subkey_size = record_data.data_size();
|
||||||
} else {
|
} else {
|
||||||
// If not in cache, try to pull from table store
|
// If not in cache, try to pull from table store
|
||||||
if let Some(record_data) = subkey_table
|
if let Some(record_data) = subkey_table
|
||||||
@ -578,26 +618,26 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?
|
.map_err(VeilidAPIError::internal)?
|
||||||
{
|
{
|
||||||
prior_record_data_size = record_data.total_size();
|
prior_subkey_size = record_data.data_size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make new record data
|
// Make new record data
|
||||||
let record_data = RecordData::new(signed_value_data);
|
let subkey_record_data = RecordData::new(signed_value_data);
|
||||||
|
|
||||||
// Check new total record size
|
// Check new total record size
|
||||||
let new_record_data_size = record_data.total_size();
|
let new_subkey_size = subkey_record_data.data_size();
|
||||||
let new_total_size = total_size + new_record_data_size - prior_record_data_size;
|
let new_record_data_size = prior_record_data_size - prior_subkey_size + new_subkey_size;
|
||||||
if new_total_size > self.limits.max_record_total_size {
|
if new_record_data_size > self.limits.max_record_total_size {
|
||||||
apibail_generic!("dht record too large");
|
apibail_generic!("dht record too large");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check new total storage space
|
// Check new total storage space
|
||||||
self.total_storage_space
|
self.total_storage_space
|
||||||
.sub(prior_record_data_size as u64)
|
.sub(prior_subkey_size as u64)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
self.total_storage_space
|
self.total_storage_space
|
||||||
.add(new_record_data_size as u64)
|
.add(new_subkey_size as u64)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
if !self.total_storage_space.check_limit() {
|
if !self.total_storage_space.check_limit() {
|
||||||
apibail_try_again!();
|
apibail_try_again!();
|
||||||
@ -605,17 +645,17 @@ where
|
|||||||
|
|
||||||
// Write subkey
|
// Write subkey
|
||||||
subkey_table
|
subkey_table
|
||||||
.store_json(0, &stk_bytes, &record_data)
|
.store_json(0, &stk_bytes, &subkey_record_data)
|
||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::internal)?;
|
.map_err(VeilidAPIError::internal)?;
|
||||||
|
|
||||||
// Write to subkey cache
|
// Write to subkey cache
|
||||||
self.add_to_subkey_cache(stk, record_data);
|
self.add_to_subkey_cache(stk, subkey_record_data);
|
||||||
|
|
||||||
// Update record
|
// Update record
|
||||||
self.with_record_mut(key, |record| {
|
self.with_record_mut(key, |record| {
|
||||||
record.store_subkey(subkey);
|
record.store_subkey(subkey);
|
||||||
record.set_record_data_size(new_total_size);
|
record.set_record_data_size(new_record_data_size);
|
||||||
})
|
})
|
||||||
.expect("record should still be here");
|
.expect("record should still be here");
|
||||||
|
|
||||||
@ -666,7 +706,7 @@ where
|
|||||||
out += &format!("Total Storage Space: {}\n", self.total_storage_space.get());
|
out += &format!("Total Storage Space: {}\n", self.total_storage_space.get());
|
||||||
out += &format!("Dead Records: {}\n", self.dead_records.len());
|
out += &format!("Dead Records: {}\n", self.dead_records.len());
|
||||||
for dr in &self.dead_records {
|
for dr in &self.dead_records {
|
||||||
out += &format!(" {}\n", dr.0.key.to_string());
|
out += &format!(" {}\n", dr.key.key.to_string());
|
||||||
}
|
}
|
||||||
out += &format!("Changed Records: {}\n", self.changed_records.len());
|
out += &format!("Changed Records: {}\n", self.changed_records.len());
|
||||||
for cr in &self.changed_records {
|
for cr in &self.changed_records {
|
||||||
|
@ -74,7 +74,9 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
mem::size_of::<Record<D>>() + self.descriptor.total_size() + self.record_data_size
|
(mem::size_of::<Self>() - mem::size_of::<SignedValueDescriptor>())
|
||||||
|
+ self.descriptor.total_size()
|
||||||
|
+ self.record_data_size
|
||||||
}
|
}
|
||||||
|
|
||||||
// pub fn detail(&self) -> &D {
|
// pub fn detail(&self) -> &D {
|
||||||
|
@ -12,7 +12,11 @@ impl RecordData {
|
|||||||
pub fn signed_value_data(&self) -> &SignedValueData {
|
pub fn signed_value_data(&self) -> &SignedValueData {
|
||||||
&self.signed_value_data
|
&self.signed_value_data
|
||||||
}
|
}
|
||||||
|
pub fn data_size(&self) -> usize {
|
||||||
|
self.signed_value_data.data_size()
|
||||||
|
}
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
mem::size_of::<RecordData>() + self.signed_value_data.value_data().data().len()
|
(mem::size_of::<Self>() - mem::size_of::<SignedValueData>())
|
||||||
|
+ self.signed_value_data.total_size()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,10 @@ impl SignedValueData {
|
|||||||
&self.signature
|
&self.signature
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn data_size(&self) -> usize {
|
||||||
|
self.value_data.data_size()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
(mem::size_of::<Self>() - mem::size_of::<ValueData>()) + self.value_data.total_size()
|
(mem::size_of::<Self>() - mem::size_of::<ValueData>()) + self.value_data.total_size()
|
||||||
}
|
}
|
||||||
|
@ -56,6 +56,10 @@ impl ValueData {
|
|||||||
&self.data
|
&self.data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn data_size(&self) -> usize {
|
||||||
|
self.data.len()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn total_size(&self) -> usize {
|
pub fn total_size(&self) -> usize {
|
||||||
mem::size_of::<Self>() + self.data.len()
|
mem::size_of::<Self>() + self.data.len()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user