add better dht debugging
This commit is contained in:
@@ -15,4 +15,24 @@ impl StorageManager {
|
||||
};
|
||||
remote_record_store.debug_records()
|
||||
}
|
||||
pub(crate) async fn purge_local_records(&self, reclaim: Option<usize>) -> String {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let Some(local_record_store) = &mut inner.local_record_store else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
let reclaimed = local_record_store
|
||||
.reclaim_space(reclaim.unwrap_or(usize::MAX))
|
||||
.await;
|
||||
return format!("Local records purged: reclaimed {} bytes", reclaimed);
|
||||
}
|
||||
pub(crate) async fn purge_remote_records(&self, reclaim: Option<usize>) -> String {
|
||||
let mut inner = self.inner.lock().await;
|
||||
let Some(remote_record_store) = &mut inner.remote_record_store else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
let reclaimed = remote_record_store
|
||||
.reclaim_space(reclaim.unwrap_or(usize::MAX))
|
||||
.await;
|
||||
return format!("Remote records purged: reclaimed {} bytes", reclaimed);
|
||||
}
|
||||
}
|
||||
|
111
veilid-core/src/storage_manager/limited_size.rs
Normal file
111
veilid-core/src/storage_manager/limited_size.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use super::*;
|
||||
use num_traits::{PrimInt, Unsigned};
|
||||
|
||||
#[derive(ThisError, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum LimitError {
|
||||
#[error("limit overflow")]
|
||||
OverLimit,
|
||||
}
|
||||
|
||||
#[derive(ThisError, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum NumericError {
|
||||
#[error("numeric overflow")]
|
||||
Overflow,
|
||||
#[error("numeric underflow")]
|
||||
Underflow,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LimitedSize<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> {
|
||||
description: String,
|
||||
value: T,
|
||||
limit: Option<T>,
|
||||
uncommitted_value: Option<T>,
|
||||
}
|
||||
|
||||
impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
|
||||
pub fn new(description: &str, value: T, limit: Option<T>) -> Self {
|
||||
Self {
|
||||
description: description.to_owned(),
|
||||
value,
|
||||
limit,
|
||||
uncommitted_value: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn current_value(&self) -> T {
|
||||
self.uncommitted_value.unwrap_or(self.value)
|
||||
}
|
||||
|
||||
pub fn set(&mut self, new_value: T) {
|
||||
self.uncommitted_value = Some(new_value);
|
||||
}
|
||||
|
||||
pub fn add(&mut self, v: T) -> Result<T, NumericError> {
|
||||
let current_value = self.current_value();
|
||||
let max_v = T::max_value() - current_value;
|
||||
if v > max_v {
|
||||
return Err(NumericError::Overflow);
|
||||
}
|
||||
let new_value = current_value + v;
|
||||
self.uncommitted_value = Some(new_value);
|
||||
Ok(new_value)
|
||||
}
|
||||
pub fn sub(&mut self, v: T) -> Result<T, NumericError> {
|
||||
let current_value = self.current_value();
|
||||
let max_v = current_value - T::min_value();
|
||||
if v > max_v {
|
||||
return Err(NumericError::Underflow);
|
||||
}
|
||||
let new_value = current_value - v;
|
||||
self.uncommitted_value = Some(new_value);
|
||||
Ok(new_value)
|
||||
}
|
||||
pub fn saturating_sub(&mut self, mut v: T) -> T {
|
||||
let current_value = self.current_value();
|
||||
let max_v = current_value - T::min_value();
|
||||
if v > max_v {
|
||||
log_stor!(debug "Numeric underflow ({})", self.description);
|
||||
v = max_v;
|
||||
}
|
||||
let new_value = current_value - v;
|
||||
self.uncommitted_value = Some(new_value);
|
||||
new_value
|
||||
}
|
||||
|
||||
pub fn check_limit(&self) -> bool {
|
||||
if let Some(uncommitted_value) = self.uncommitted_value {
|
||||
if let Some(limit) = self.limit {
|
||||
if uncommitted_value > limit {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub fn commit(&mut self) -> Result<T, LimitError> {
|
||||
if let Some(uncommitted_value) = self.uncommitted_value {
|
||||
if let Some(limit) = self.limit {
|
||||
if uncommitted_value > limit {
|
||||
log_stor!(debug "Commit over limit failed ({}): {} > {}", self.description, uncommitted_value, limit);
|
||||
return Err(LimitError::OverLimit);
|
||||
}
|
||||
}
|
||||
log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value);
|
||||
self.value = uncommitted_value;
|
||||
}
|
||||
Ok(self.value)
|
||||
}
|
||||
|
||||
pub fn rollback(&mut self) -> T {
|
||||
if let Some(uv) = self.uncommitted_value {
|
||||
log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv);
|
||||
}
|
||||
return self.value;
|
||||
}
|
||||
|
||||
pub fn get(&self) -> T {
|
||||
return self.value;
|
||||
}
|
||||
}
|
@@ -1,6 +1,7 @@
|
||||
mod debug;
|
||||
mod get_value;
|
||||
mod keys;
|
||||
mod limited_size;
|
||||
mod record_store;
|
||||
mod record_store_limits;
|
||||
mod set_value;
|
||||
@@ -9,6 +10,7 @@ mod tasks;
|
||||
mod types;
|
||||
|
||||
use keys::*;
|
||||
use limited_size::*;
|
||||
use record_store::*;
|
||||
use record_store_limits::*;
|
||||
use storage_manager_inner::*;
|
||||
|
@@ -21,8 +21,8 @@ where
|
||||
subkey_table: Option<TableDB>,
|
||||
record_index: LruCache<RecordTableKey, Record<D>>,
|
||||
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
|
||||
subkey_cache_total_size: usize,
|
||||
total_storage_space: usize,
|
||||
subkey_cache_total_size: LimitedSize<usize>,
|
||||
total_storage_space: LimitedSize<u64>,
|
||||
|
||||
dead_records: Vec<(RecordTableKey, Record<D>)>,
|
||||
changed_records: HashSet<RecordTableKey>,
|
||||
@@ -47,6 +47,13 @@ where
|
||||
{
|
||||
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
|
||||
let subkey_cache_size = limits.subkey_cache_size as usize;
|
||||
let limit_subkey_cache_total_size = limits
|
||||
.max_subkey_cache_memory_mb
|
||||
.map(|mb| mb * 1_048_576usize);
|
||||
let limit_max_storage_space = limits
|
||||
.max_storage_space_mb
|
||||
.map(|mb| mb as u64 * 1_048_576u64);
|
||||
|
||||
Self {
|
||||
table_store,
|
||||
name: name.to_owned(),
|
||||
@@ -55,8 +62,16 @@ where
|
||||
subkey_table: None,
|
||||
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
|
||||
subkey_cache: LruCache::new(subkey_cache_size),
|
||||
subkey_cache_total_size: 0,
|
||||
total_storage_space: 0,
|
||||
subkey_cache_total_size: LimitedSize::new(
|
||||
"subkey_cache_total_size",
|
||||
0,
|
||||
limit_subkey_cache_total_size,
|
||||
),
|
||||
total_storage_space: LimitedSize::new(
|
||||
"total_storage_space",
|
||||
0,
|
||||
limit_max_storage_space,
|
||||
),
|
||||
dead_records: Vec::new(),
|
||||
changed_records: HashSet::new(),
|
||||
purge_dead_records_mutex: Arc::new(AsyncMutex::new(())),
|
||||
@@ -89,8 +104,17 @@ where
|
||||
let mut dead_records = Vec::new();
|
||||
for ri in record_index_saved {
|
||||
// total the storage space
|
||||
self.total_storage_space += mem::size_of::<RecordTableKey>();
|
||||
self.total_storage_space += ri.1.total_size();
|
||||
self.total_storage_space
|
||||
.add(mem::size_of::<RecordTableKey>() as u64)
|
||||
.unwrap();
|
||||
self.total_storage_space
|
||||
.add(ri.1.total_size() as u64)
|
||||
.unwrap();
|
||||
if let Err(_) = self.total_storage_space.commit() {
|
||||
// If we overflow the limit, kill off the record
|
||||
dead_records.push((ri.0, ri.1));
|
||||
continue;
|
||||
}
|
||||
|
||||
// add to index and ensure we deduplicate in the case of an error
|
||||
if let Some(v) = self.record_index.insert(ri.0, ri.1, |k, v| {
|
||||
@@ -130,24 +154,32 @@ where
|
||||
// Old data
|
||||
dead_size += old_record_data.total_size();
|
||||
}
|
||||
self.subkey_cache_total_size -= dead_size;
|
||||
self.subkey_cache_total_size += record_data_total_size;
|
||||
self.subkey_cache_total_size.sub(dead_size).unwrap();
|
||||
self.subkey_cache_total_size
|
||||
.add(record_data_total_size)
|
||||
.unwrap();
|
||||
|
||||
// Purge over size limit
|
||||
if let Some(max_subkey_cache_memory_mb) = self.limits.max_subkey_cache_memory_mb {
|
||||
while self.subkey_cache_total_size > (max_subkey_cache_memory_mb * 1_048_576usize) {
|
||||
if let Some((_, v)) = self.subkey_cache.remove_lru() {
|
||||
self.subkey_cache_total_size -= v.total_size();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
while self.subkey_cache_total_size.commit().is_err() {
|
||||
if let Some((_, v)) = self.subkey_cache.remove_lru() {
|
||||
self.subkey_cache_total_size.saturating_sub(v.total_size());
|
||||
} else {
|
||||
self.subkey_cache_total_size.rollback();
|
||||
|
||||
log_stor!(error "subkey cache should not be empty, has {} bytes unaccounted for",self.subkey_cache_total_size.get());
|
||||
|
||||
self.subkey_cache_total_size.set(0);
|
||||
self.subkey_cache_total_size.commit().unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_from_subkey_cache(&mut self, key: SubkeyTableKey) {
|
||||
if let Some(dead_record_data) = self.subkey_cache.remove(&key) {
|
||||
self.subkey_cache_total_size -= dead_record_data.total_size();
|
||||
self.subkey_cache_total_size
|
||||
.saturating_sub(dead_record_data.total_size());
|
||||
self.subkey_cache_total_size.commit().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,8 +238,11 @@ where
|
||||
}
|
||||
|
||||
// Remove from total size
|
||||
self.total_storage_space -= mem::size_of::<RecordTableKey>();
|
||||
self.total_storage_space -= v.total_size();
|
||||
self.total_storage_space
|
||||
.saturating_sub(mem::size_of::<RecordTableKey>() as u64);
|
||||
self.total_storage_space
|
||||
.saturating_sub(v.total_size() as u64);
|
||||
self.total_storage_space.commit().unwrap();
|
||||
}
|
||||
if let Err(e) = rt_xact.commit().await {
|
||||
log_stor!(error "failed to commit record table transaction: {}", e);
|
||||
@@ -258,12 +293,12 @@ where
|
||||
};
|
||||
|
||||
// If over size limit, dont create record
|
||||
let new_total_storage_space =
|
||||
self.total_storage_space + mem::size_of::<RecordTableKey>() + record.total_size();
|
||||
if let Some(max_storage_space_mb) = &self.limits.max_storage_space_mb {
|
||||
if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) {
|
||||
apibail_try_again!();
|
||||
}
|
||||
self.total_storage_space
|
||||
.add((mem::size_of::<RecordTableKey>() + record.total_size()) as u64)
|
||||
.unwrap();
|
||||
if !self.total_storage_space.check_limit() {
|
||||
self.total_storage_space.rollback();
|
||||
apibail_try_again!();
|
||||
}
|
||||
|
||||
// Save to record table
|
||||
@@ -286,7 +321,7 @@ where
|
||||
}
|
||||
|
||||
// Update storage space
|
||||
self.total_storage_space = new_total_storage_space;
|
||||
self.total_storage_space.commit().unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -482,12 +517,14 @@ where
|
||||
}
|
||||
|
||||
// Check new total storage space
|
||||
let new_total_storage_space =
|
||||
self.total_storage_space + new_record_data_size - prior_record_data_size;
|
||||
if let Some(max_storage_space_mb) = self.limits.max_storage_space_mb {
|
||||
if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) {
|
||||
apibail_try_again!();
|
||||
}
|
||||
self.total_storage_space
|
||||
.sub(prior_record_data_size as u64)
|
||||
.unwrap();
|
||||
self.total_storage_space
|
||||
.add(new_record_data_size as u64)
|
||||
.unwrap();
|
||||
if !self.total_storage_space.check_limit() {
|
||||
apibail_try_again!();
|
||||
}
|
||||
|
||||
// Write subkey
|
||||
@@ -506,6 +543,9 @@ where
|
||||
})
|
||||
.expect("record should still be here");
|
||||
|
||||
// Update storage space
|
||||
self.total_storage_space.commit().unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -513,16 +553,19 @@ where
|
||||
/// This will force a garbage collection of the space immediately
|
||||
/// If zero is passed in here, a garbage collection will be performed of dead records
|
||||
/// without removing any live records
|
||||
pub async fn reclaim_space(&mut self, space: usize) {
|
||||
pub async fn reclaim_space(&mut self, space: usize) -> usize {
|
||||
let mut reclaimed = 0usize;
|
||||
while reclaimed < space {
|
||||
if let Some((k, v)) = self.record_index.remove_lru() {
|
||||
reclaimed += mem::size_of::<RecordTableKey>();
|
||||
reclaimed += v.total_size();
|
||||
self.add_dead_record(k, v);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.purge_dead_records(false).await;
|
||||
reclaimed
|
||||
}
|
||||
|
||||
pub(super) fn debug_records(&self) -> String {
|
||||
@@ -532,9 +575,9 @@ where
|
||||
out += "Record Index:\n";
|
||||
for (rik, rec) in &self.record_index {
|
||||
out += &format!(
|
||||
" {} @ {} len={} subkeys={}\n",
|
||||
" {} age={} len={} subkeys={}\n",
|
||||
rik.key.to_string(),
|
||||
rec.last_touched().as_u64(),
|
||||
debug_duration(get_timestamp() - rec.last_touched().as_u64()),
|
||||
rec.record_data_size(),
|
||||
rec.stored_subkeys(),
|
||||
);
|
||||
@@ -542,9 +585,9 @@ where
|
||||
out += &format!("Subkey Cache Count: {}\n", self.subkey_cache.len());
|
||||
out += &format!(
|
||||
"Subkey Cache Total Size: {}\n",
|
||||
self.subkey_cache_total_size
|
||||
self.subkey_cache_total_size.get()
|
||||
);
|
||||
out += &format!("Total Storage Space: {}\n", self.total_storage_space);
|
||||
out += &format!("Total Storage Space: {}\n", self.total_storage_space.get());
|
||||
out += &format!("Dead Records: {}\n", self.dead_records.len());
|
||||
for dr in &self.dead_records {
|
||||
out += &format!(" {}\n", dr.0.key.to_string());
|
||||
|
Reference in New Issue
Block a user