more clippy
This commit is contained in:
@@ -23,7 +23,7 @@ impl StorageManager {
|
||||
let reclaimed = local_record_store
|
||||
.reclaim_space(reclaim.unwrap_or(usize::MAX))
|
||||
.await;
|
||||
return format!("Local records purged: reclaimed {} bytes", reclaimed);
|
||||
format!("Local records purged: reclaimed {} bytes", reclaimed)
|
||||
}
|
||||
pub(crate) async fn purge_remote_records(&self, reclaim: Option<usize>) -> String {
|
||||
let mut inner = self.inner.lock().await;
|
||||
@@ -33,7 +33,7 @@ impl StorageManager {
|
||||
let reclaimed = remote_record_store
|
||||
.reclaim_space(reclaim.unwrap_or(usize::MAX))
|
||||
.await;
|
||||
return format!("Remote records purged: reclaimed {} bytes", reclaimed);
|
||||
format!("Remote records purged: reclaimed {} bytes", reclaimed)
|
||||
}
|
||||
pub(crate) async fn debug_local_record_subkey_info(
|
||||
&self,
|
||||
|
@@ -103,10 +103,10 @@ impl<T: PrimInt + Unsigned + fmt::Display + fmt::Debug> LimitedSize<T> {
|
||||
if let Some(uv) = self.uncommitted_value.take() {
|
||||
log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv);
|
||||
}
|
||||
return self.value;
|
||||
self.value
|
||||
}
|
||||
|
||||
pub fn get(&self) -> T {
|
||||
return self.value;
|
||||
self.value
|
||||
}
|
||||
}
|
||||
|
@@ -133,9 +133,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option<RPCProcessor> {
|
||||
if let Some(rpc_processor) = {
|
||||
inner.rpc_processor.clone()
|
||||
} {
|
||||
if let Some(rpc_processor) = { inner.rpc_processor.clone() } {
|
||||
if let Some(network_class) = rpc_processor
|
||||
.routing_table()
|
||||
.get_network_class(RoutingDomain::PublicInternet)
|
||||
@@ -158,12 +156,12 @@ impl StorageManager {
|
||||
|
||||
async fn online_writes_ready(&self) -> EyreResult<Option<RPCProcessor>> {
|
||||
let inner = self.lock().await?;
|
||||
return Ok(Self::online_writes_ready_inner(&*inner));
|
||||
Ok(Self::online_writes_ready_inner(&inner))
|
||||
}
|
||||
|
||||
async fn has_offline_subkey_writes(&self) -> EyreResult<bool> {
|
||||
let inner = self.lock().await?;
|
||||
Ok(inner.offline_subkey_writes.len() != 0)
|
||||
Ok(!inner.offline_subkey_writes.is_empty())
|
||||
}
|
||||
|
||||
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
|
||||
@@ -394,7 +392,7 @@ impl StorageManager {
|
||||
|
||||
// Make new subkey data
|
||||
let value_data = if let Some(last_signed_value_data) = last_subkey_result.value {
|
||||
if last_signed_value_data.value_data().data() == &data
|
||||
if last_signed_value_data.value_data().data() == data
|
||||
&& last_signed_value_data.value_data().writer() == &writer.key
|
||||
{
|
||||
// Data and writer is the same, nothing is changing,
|
||||
@@ -433,13 +431,17 @@ impl StorageManager {
|
||||
|
||||
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
|
||||
// Add to offline writes to flush
|
||||
inner.offline_subkey_writes.entry(key)
|
||||
.and_modify(|x| { x.subkeys.insert(subkey); } )
|
||||
.or_insert(OfflineSubkeyWrite{
|
||||
safety_selection,
|
||||
subkeys: ValueSubkeyRangeSet::single(subkey)
|
||||
inner
|
||||
.offline_subkey_writes
|
||||
.entry(key)
|
||||
.and_modify(|x| {
|
||||
x.subkeys.insert(subkey);
|
||||
})
|
||||
.or_insert(OfflineSubkeyWrite {
|
||||
safety_selection,
|
||||
subkeys: ValueSubkeyRangeSet::single(subkey),
|
||||
});
|
||||
return Ok(None)
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Drop the lock for network access
|
||||
|
@@ -65,7 +65,7 @@ where
|
||||
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
||||
{
|
||||
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
|
||||
let subkey_cache_size = limits.subkey_cache_size as usize;
|
||||
let subkey_cache_size = limits.subkey_cache_size;
|
||||
let limit_subkey_cache_total_size = limits
|
||||
.max_subkey_cache_memory_mb
|
||||
.map(|mb| mb * 1_048_576usize);
|
||||
@@ -104,7 +104,7 @@ where
|
||||
.await?;
|
||||
let subkey_table = self
|
||||
.table_store
|
||||
.open(&&format!("{}_subkeys", self.name), 1)
|
||||
.open(&format!("{}_subkeys", self.name), 1)
|
||||
.await?;
|
||||
|
||||
// Pull record index from table into a vector to ensure we sort them
|
||||
@@ -126,7 +126,7 @@ where
|
||||
self.total_storage_space
|
||||
.add((mem::size_of::<RecordTableKey>() + ri.1.total_size()) as u64)
|
||||
.unwrap();
|
||||
if let Err(_) = self.total_storage_space.commit() {
|
||||
if self.total_storage_space.commit().is_err() {
|
||||
// Revert the total storage space because the commit failed
|
||||
self.total_storage_space.rollback();
|
||||
|
||||
@@ -449,11 +449,15 @@ where
|
||||
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
||||
// Get record from index
|
||||
let Some((subkey_count, has_subkey, opt_descriptor)) = self.with_record(key, |record| {
|
||||
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
||||
Some(record.descriptor().clone())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
(
|
||||
record.subkey_count(),
|
||||
record.stored_subkeys().contains(subkey),
|
||||
if want_descriptor {
|
||||
Some(record.descriptor().clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)
|
||||
}) else {
|
||||
// Record not available
|
||||
return Ok(None);
|
||||
@@ -492,19 +496,20 @@ where
|
||||
let Some(record_data) = subkey_table
|
||||
.load_json::<RecordData>(0, &stk.bytes())
|
||||
.await
|
||||
.map_err(VeilidAPIError::internal)? else {
|
||||
apibail_internal!("failed to get subkey that was stored");
|
||||
};
|
||||
.map_err(VeilidAPIError::internal)?
|
||||
else {
|
||||
apibail_internal!("failed to get subkey that was stored");
|
||||
};
|
||||
|
||||
let out = record_data.signed_value_data().clone();
|
||||
|
||||
// Add to cache, do nothing with lru out
|
||||
self.add_to_subkey_cache(stk, record_data);
|
||||
|
||||
return Ok(Some(SubkeyResult {
|
||||
Ok(Some(SubkeyResult {
|
||||
value: Some(out),
|
||||
descriptor: opt_descriptor,
|
||||
}));
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) async fn peek_subkey(
|
||||
@@ -515,11 +520,15 @@ where
|
||||
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
||||
// record from index
|
||||
let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| {
|
||||
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
||||
Some(record.descriptor().clone())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
(
|
||||
record.subkey_count(),
|
||||
record.stored_subkeys().contains(subkey),
|
||||
if want_descriptor {
|
||||
Some(record.descriptor().clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)
|
||||
}) else {
|
||||
// Record not available
|
||||
return Ok(None);
|
||||
@@ -558,16 +567,17 @@ where
|
||||
let Some(record_data) = subkey_table
|
||||
.load_json::<RecordData>(0, &stk.bytes())
|
||||
.await
|
||||
.map_err(VeilidAPIError::internal)? else {
|
||||
apibail_internal!("failed to peek subkey that was stored");
|
||||
};
|
||||
.map_err(VeilidAPIError::internal)?
|
||||
else {
|
||||
apibail_internal!("failed to peek subkey that was stored");
|
||||
};
|
||||
|
||||
let out = record_data.signed_value_data().clone();
|
||||
|
||||
return Ok(Some(SubkeyResult {
|
||||
Ok(Some(SubkeyResult {
|
||||
value: Some(out),
|
||||
descriptor: opt_descriptor,
|
||||
}));
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn set_subkey(
|
||||
@@ -692,7 +702,7 @@ where
|
||||
for (rik, rec) in &self.record_index {
|
||||
out += &format!(
|
||||
" {} age={} len={} subkeys={}\n",
|
||||
rik.key.to_string(),
|
||||
rik.key,
|
||||
debug_duration(get_timestamp() - rec.last_touched().as_u64()),
|
||||
rec.record_data_size(),
|
||||
rec.stored_subkeys(),
|
||||
@@ -706,11 +716,11 @@ where
|
||||
out += &format!("Total Storage Space: {}\n", self.total_storage_space.get());
|
||||
out += &format!("Dead Records: {}\n", self.dead_records.len());
|
||||
for dr in &self.dead_records {
|
||||
out += &format!(" {}\n", dr.key.key.to_string());
|
||||
out += &format!(" {}\n", dr.key.key);
|
||||
}
|
||||
out += &format!("Changed Records: {}\n", self.changed_records.len());
|
||||
for cr in &self.changed_records {
|
||||
out += &format!(" {}\n", cr.key.to_string());
|
||||
out += &format!(" {}\n", cr.key);
|
||||
}
|
||||
|
||||
out
|
||||
|
@@ -37,9 +37,7 @@ fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
|
||||
max_subkey_size: MAX_SUBKEY_SIZE,
|
||||
max_record_total_size: MAX_RECORD_DATA_SIZE,
|
||||
max_records: None,
|
||||
max_subkey_cache_memory_mb: Some(
|
||||
c.network.dht.local_max_subkey_cache_memory_mb as usize,
|
||||
),
|
||||
max_subkey_cache_memory_mb: Some(c.network.dht.local_max_subkey_cache_memory_mb as usize),
|
||||
max_storage_space_mb: None,
|
||||
}
|
||||
}
|
||||
@@ -51,9 +49,7 @@ fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
|
||||
max_subkey_size: MAX_SUBKEY_SIZE,
|
||||
max_record_total_size: MAX_RECORD_DATA_SIZE,
|
||||
max_records: Some(c.network.dht.remote_max_records as usize),
|
||||
max_subkey_cache_memory_mb: Some(
|
||||
c.network.dht.remote_max_subkey_cache_memory_mb as usize,
|
||||
),
|
||||
max_subkey_cache_memory_mb: Some(c.network.dht.remote_max_subkey_cache_memory_mb as usize),
|
||||
max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize),
|
||||
}
|
||||
}
|
||||
@@ -74,8 +70,8 @@ impl StorageManagerInner {
|
||||
}
|
||||
|
||||
pub async fn init(&mut self, outer_self: StorageManager) -> EyreResult<()> {
|
||||
|
||||
let metadata_db = self.unlocked_inner
|
||||
let metadata_db = self
|
||||
.unlocked_inner
|
||||
.table_store
|
||||
.open(STORAGE_MANAGER_METADATA, 1)
|
||||
.await?;
|
||||
@@ -120,7 +116,6 @@ impl StorageManagerInner {
|
||||
}
|
||||
|
||||
pub async fn terminate(&mut self) {
|
||||
|
||||
// Stop ticker
|
||||
let tick_future = self.tick_future.take();
|
||||
if let Some(f) = tick_future {
|
||||
@@ -130,19 +125,19 @@ impl StorageManagerInner {
|
||||
// Final flush on record stores
|
||||
if let Some(mut local_record_store) = self.local_record_store.take() {
|
||||
if let Err(e) = local_record_store.tick().await {
|
||||
log_stor!(error "termination local record store tick failed: {}", e);
|
||||
log_stor!(error "termination local record store tick failed: {}", e);
|
||||
}
|
||||
}
|
||||
if let Some(mut remote_record_store) = self.remote_record_store.take() {
|
||||
if let Err(e) = remote_record_store.tick().await {
|
||||
log_stor!(error "termination remote record store tick failed: {}", e);
|
||||
log_stor!(error "termination remote record store tick failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Save metadata
|
||||
if self.metadata_db.is_some() {
|
||||
if let Err(e) = self.save_metadata().await {
|
||||
log_stor!(error "termination metadata save failed: {}", e);
|
||||
log_stor!(error "termination metadata save failed: {}", e);
|
||||
}
|
||||
self.metadata_db = None;
|
||||
}
|
||||
@@ -152,7 +147,7 @@ impl StorageManagerInner {
|
||||
self.initialized = false;
|
||||
}
|
||||
|
||||
async fn save_metadata(&mut self) -> EyreResult<()>{
|
||||
async fn save_metadata(&mut self) -> EyreResult<()> {
|
||||
if let Some(metadata_db) = &self.metadata_db {
|
||||
let tx = metadata_db.transact();
|
||||
tx.store_json(0, OFFLINE_SUBKEY_WRITES, &self.offline_subkey_writes)?;
|
||||
@@ -163,7 +158,8 @@ impl StorageManagerInner {
|
||||
|
||||
async fn load_metadata(&mut self) -> EyreResult<()> {
|
||||
if let Some(metadata_db) = &self.metadata_db {
|
||||
self.offline_subkey_writes = match metadata_db.load_json(0, OFFLINE_SUBKEY_WRITES).await {
|
||||
self.offline_subkey_writes = match metadata_db.load_json(0, OFFLINE_SUBKEY_WRITES).await
|
||||
{
|
||||
Ok(v) => v.unwrap_or_default(),
|
||||
Err(_) => {
|
||||
if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await {
|
||||
@@ -218,13 +214,16 @@ impl StorageManagerInner {
|
||||
Ok((dht_key, owner))
|
||||
}
|
||||
|
||||
async fn move_remote_record_to_local(&mut self, key: TypedKey, safety_selection: SafetySelection) -> VeilidAPIResult<Option<(PublicKey, DHTSchema)>>
|
||||
{
|
||||
async fn move_remote_record_to_local(
|
||||
&mut self,
|
||||
key: TypedKey,
|
||||
safety_selection: SafetySelection,
|
||||
) -> VeilidAPIResult<Option<(PublicKey, DHTSchema)>> {
|
||||
// Get local record store
|
||||
let Some(local_record_store) = self.local_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
|
||||
|
||||
// Get remote record store
|
||||
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
@@ -241,31 +240,36 @@ impl StorageManagerInner {
|
||||
|
||||
// Make local record
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let local_record = Record::new(cur_ts, remote_record.descriptor().clone(), LocalRecordDetail {
|
||||
safety_selection
|
||||
})?;
|
||||
let local_record = Record::new(
|
||||
cur_ts,
|
||||
remote_record.descriptor().clone(),
|
||||
LocalRecordDetail { safety_selection },
|
||||
)?;
|
||||
local_record_store.new_record(key, local_record).await?;
|
||||
|
||||
// Move copy subkey data from remote to local store
|
||||
for subkey in remote_record.stored_subkeys().iter() {
|
||||
let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await? else {
|
||||
let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await?
|
||||
else {
|
||||
// Subkey was missing
|
||||
warn!("Subkey was missing: {} #{}",key, subkey);
|
||||
warn!("Subkey was missing: {} #{}", key, subkey);
|
||||
continue;
|
||||
};
|
||||
let Some(subkey_data) = subkey_result.value else {
|
||||
// Subkey was missing
|
||||
warn!("Subkey data was missing: {} #{}",key, subkey);
|
||||
warn!("Subkey data was missing: {} #{}", key, subkey);
|
||||
continue;
|
||||
};
|
||||
local_record_store.set_subkey(key, subkey, subkey_data).await?;
|
||||
local_record_store
|
||||
.set_subkey(key, subkey, subkey_data)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Delete remote record from store
|
||||
remote_record_store.delete_record(key).await?;
|
||||
|
||||
// Return record information as transferred to local record
|
||||
Ok(Some((remote_record.owner().clone(), remote_record.schema())))
|
||||
Ok(Some((*remote_record.owner(), remote_record.schema())))
|
||||
}
|
||||
|
||||
pub async fn open_existing_record(
|
||||
@@ -292,14 +296,17 @@ impl StorageManagerInner {
|
||||
r.detail_mut().safety_selection = safety_selection;
|
||||
|
||||
// Return record details
|
||||
(r.owner().clone(), r.schema())
|
||||
(*r.owner(), r.schema())
|
||||
};
|
||||
let (owner, schema) = match local_record_store.with_record_mut(key, cb){
|
||||
let (owner, schema) = match local_record_store.with_record_mut(key, cb) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
// If we don't have a local record yet, check to see if we have a remote record
|
||||
// if so, migrate it to a local record
|
||||
let Some(v) = self.move_remote_record_to_local(key, safety_selection).await? else {
|
||||
let Some(v) = self
|
||||
.move_remote_record_to_local(key, safety_selection)
|
||||
.await?
|
||||
else {
|
||||
// No remote record either
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -348,7 +355,7 @@ impl StorageManagerInner {
|
||||
apibail_generic!("no descriptor");
|
||||
};
|
||||
// Get owner
|
||||
let owner = signed_value_descriptor.owner().clone();
|
||||
let owner = *signed_value_descriptor.owner();
|
||||
|
||||
// If the writer we chose is also the owner, we have the owner secret
|
||||
// Otherwise this is just another subkey writer
|
||||
@@ -410,7 +417,10 @@ impl StorageManagerInner {
|
||||
let Some(local_record_store) = self.local_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
if let Some(subkey_result) = local_record_store.get_subkey(key, subkey, want_descriptor).await? {
|
||||
if let Some(subkey_result) = local_record_store
|
||||
.get_subkey(key, subkey, want_descriptor)
|
||||
.await?
|
||||
{
|
||||
return Ok(subkey_result);
|
||||
}
|
||||
|
||||
@@ -428,7 +438,7 @@ impl StorageManagerInner {
|
||||
) -> VeilidAPIResult<()> {
|
||||
// See if it's in the local record store
|
||||
let Some(local_record_store) = self.local_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
|
||||
// Write subkey to local store
|
||||
@@ -449,7 +459,10 @@ impl StorageManagerInner {
|
||||
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
if let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, want_descriptor).await? {
|
||||
if let Some(subkey_result) = remote_record_store
|
||||
.get_subkey(key, subkey, want_descriptor)
|
||||
.await?
|
||||
{
|
||||
return Ok(subkey_result);
|
||||
}
|
||||
|
||||
@@ -472,12 +485,15 @@ impl StorageManagerInner {
|
||||
};
|
||||
|
||||
// See if we have a remote record already or not
|
||||
if remote_record_store.with_record(key, |_|{}).is_none() {
|
||||
if remote_record_store.with_record(key, |_| {}).is_none() {
|
||||
// record didn't exist, make it
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let remote_record_detail = RemoteRecordDetail { };
|
||||
let record =
|
||||
Record::<RemoteRecordDetail>::new(cur_ts, signed_value_descriptor, remote_record_detail)?;
|
||||
let remote_record_detail = RemoteRecordDetail {};
|
||||
let record = Record::<RemoteRecordDetail>::new(
|
||||
cur_ts,
|
||||
signed_value_descriptor,
|
||||
remote_record_detail,
|
||||
)?;
|
||||
remote_record_store.new_record(key, record).await?
|
||||
};
|
||||
|
||||
|
@@ -24,7 +24,7 @@ impl SignedValueData {
|
||||
) -> VeilidAPIResult<()> {
|
||||
let node_info_bytes = Self::make_signature_bytes(&self.value_data, owner, subkey)?;
|
||||
// validate signature
|
||||
vcrypto.verify(&self.value_data.writer(), &node_info_bytes, &self.signature)
|
||||
vcrypto.verify(self.value_data.writer(), &node_info_bytes, &self.signature)
|
||||
}
|
||||
|
||||
pub fn make_signature(
|
||||
@@ -37,7 +37,7 @@ impl SignedValueData {
|
||||
let node_info_bytes = Self::make_signature_bytes(&value_data, owner, subkey)?;
|
||||
|
||||
// create signature
|
||||
let signature = vcrypto.sign(&value_data.writer(), &writer_secret, &node_info_bytes)?;
|
||||
let signature = vcrypto.sign(value_data.writer(), &writer_secret, &node_info_bytes)?;
|
||||
Ok(Self {
|
||||
value_data,
|
||||
signature,
|
||||
|
Reference in New Issue
Block a user