dht fixes
This commit is contained in:
@@ -35,4 +35,44 @@ impl StorageManager {
|
||||
.await;
|
||||
return format!("Remote records purged: reclaimed {} bytes", reclaimed);
|
||||
}
|
||||
pub(crate) async fn debug_local_record_subkey_info(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
) -> String {
|
||||
let inner = self.inner.lock().await;
|
||||
let Some(local_record_store) = &inner.local_record_store else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
local_record_store
|
||||
.debug_record_subkey_info(key, subkey)
|
||||
.await
|
||||
}
|
||||
pub(crate) async fn debug_remote_record_subkey_info(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
) -> String {
|
||||
let inner = self.inner.lock().await;
|
||||
let Some(remote_record_store) = &inner.remote_record_store else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
remote_record_store
|
||||
.debug_record_subkey_info(key, subkey)
|
||||
.await
|
||||
}
|
||||
pub(crate) async fn debug_local_record_info(&self, key: TypedKey) -> String {
|
||||
let inner = self.inner.lock().await;
|
||||
let Some(local_record_store) = &inner.local_record_store else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
local_record_store.debug_record_info(key)
|
||||
}
|
||||
pub(crate) async fn debug_remote_record_info(&self, key: TypedKey) -> String {
|
||||
let inner = self.inner.lock().await;
|
||||
let Some(remote_record_store) = &inner.remote_record_store else {
|
||||
return "not initialized".to_owned();
|
||||
};
|
||||
remote_record_store.debug_record_info(key)
|
||||
}
|
||||
}
|
||||
|
@@ -144,6 +144,7 @@ impl StorageManager {
|
||||
// The initial writer is the owner of the record
|
||||
inner
|
||||
.open_existing_record(key, Some(owner), safety_selection)
|
||||
.await
|
||||
.map(|r| r.unwrap())
|
||||
}
|
||||
|
||||
@@ -159,7 +160,10 @@ impl StorageManager {
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
// See if we have a local record already or not
|
||||
if let Some(res) = inner.open_existing_record(key, writer, safety_selection)? {
|
||||
if let Some(res) = inner
|
||||
.open_existing_record(key, writer, safety_selection)
|
||||
.await?
|
||||
{
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
|
@@ -9,7 +9,7 @@ use hashlink::LruCache;
|
||||
|
||||
pub struct RecordStore<D>
|
||||
where
|
||||
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
|
||||
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
@@ -41,7 +41,7 @@ pub struct SubkeyResult {
|
||||
|
||||
impl<D> RecordStore<D>
|
||||
where
|
||||
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
|
||||
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
@@ -363,6 +363,20 @@ where
|
||||
out
|
||||
}
|
||||
|
||||
pub(super) fn peek_record<R, F>(&self, key: TypedKey, f: F) -> Option<R>
|
||||
where
|
||||
F: FnOnce(&Record<D>) -> R,
|
||||
{
|
||||
// Get record from index
|
||||
let mut out = None;
|
||||
let rtk = RecordTableKey { key };
|
||||
if let Some(record) = self.record_index.peek(&rtk) {
|
||||
// Callback
|
||||
out = Some(f(record));
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
pub(super) fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
|
||||
where
|
||||
F: FnOnce(&mut Record<D>) -> R,
|
||||
@@ -454,6 +468,69 @@ where
|
||||
}));
|
||||
}
|
||||
|
||||
pub(crate) async fn peek_subkey(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
want_descriptor: bool,
|
||||
) -> VeilidAPIResult<Option<SubkeyResult>> {
|
||||
// record from index
|
||||
let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| {
|
||||
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
|
||||
Some(record.descriptor().clone())
|
||||
} else {
|
||||
None
|
||||
})
|
||||
}) else {
|
||||
// Record not available
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Check if the subkey is in range
|
||||
if subkey as usize >= subkey_count {
|
||||
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
|
||||
}
|
||||
|
||||
// See if we have this subkey stored
|
||||
if !has_subkey {
|
||||
// If not, return no value but maybe with descriptor
|
||||
return Ok(Some(SubkeyResult {
|
||||
value: None,
|
||||
descriptor: opt_descriptor,
|
||||
}));
|
||||
}
|
||||
|
||||
// Get subkey table
|
||||
let Some(subkey_table) = self.subkey_table.clone() else {
|
||||
apibail_internal!("record store not initialized");
|
||||
};
|
||||
|
||||
// If subkey exists in subkey cache, use that
|
||||
let stk = SubkeyTableKey { key, subkey };
|
||||
if let Some(record_data) = self.subkey_cache.peek(&stk) {
|
||||
let out = record_data.signed_value_data().clone();
|
||||
|
||||
return Ok(Some(SubkeyResult {
|
||||
value: Some(out),
|
||||
descriptor: opt_descriptor,
|
||||
}));
|
||||
}
|
||||
// If not in cache, try to pull from table store if it is in our stored subkey set
|
||||
let Some(record_data) = subkey_table
|
||||
.load_rkyv::<RecordData>(0, &stk.bytes())
|
||||
.await
|
||||
.map_err(VeilidAPIError::internal)? else {
|
||||
apibail_internal!("failed to peek subkey that was stored");
|
||||
};
|
||||
|
||||
let out = record_data.signed_value_data().clone();
|
||||
|
||||
return Ok(Some(SubkeyResult {
|
||||
value: Some(out),
|
||||
descriptor: opt_descriptor,
|
||||
}));
|
||||
}
|
||||
|
||||
pub async fn set_subkey(
|
||||
&mut self,
|
||||
key: TypedKey,
|
||||
@@ -599,4 +676,23 @@ where
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
pub(super) fn debug_record_info(&self, key: TypedKey) -> String {
|
||||
self.peek_record(key, |r| format!("{:#?}", r))
|
||||
.unwrap_or("Not found".to_owned())
|
||||
}
|
||||
|
||||
pub(super) async fn debug_record_subkey_info(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
subkey: ValueSubkey,
|
||||
) -> String {
|
||||
match self.peek_subkey(key, subkey, true).await {
|
||||
Ok(Some(v)) => {
|
||||
format!("{:#?}", v)
|
||||
}
|
||||
Ok(None) => "Subkey not available".to_owned(),
|
||||
Err(e) => format!("{}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -166,8 +166,19 @@ impl StorageManager {
|
||||
pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option<SignedValueDescriptor>) -> VeilidAPIResult<NetworkResult<Option<SignedValueData>>> {
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
// See if the subkey we are modifying has a last known local value
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
|
||||
// See if this is a remote or local value
|
||||
let (is_local, last_subkey_result) = {
|
||||
// See if the subkey we are modifying has a last known local value
|
||||
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
|
||||
// If this is local, it must have a descriptor already
|
||||
if last_subkey_result.descriptor.is_some() {
|
||||
(true, last_subkey_result)
|
||||
} else {
|
||||
// See if the subkey we are modifying has a last known remote value
|
||||
let last_subkey_result = inner.handle_get_remote_value(key, subkey, true).await?;
|
||||
(false, last_subkey_result)
|
||||
}
|
||||
};
|
||||
|
||||
// Make sure this value would actually be newer
|
||||
if let Some(last_value) = &last_subkey_result.value {
|
||||
@@ -210,7 +221,12 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
// Do the set and return no new value
|
||||
match inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await {
|
||||
let res = if is_local {
|
||||
inner.handle_set_local_value(key, subkey, value).await
|
||||
} else {
|
||||
inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await
|
||||
};
|
||||
match res {
|
||||
Ok(()) => {},
|
||||
Err(VeilidAPIError::Internal { message }) => {
|
||||
apibail_internal!(message);
|
||||
|
@@ -209,7 +209,57 @@ impl StorageManagerInner {
|
||||
Ok((dht_key, owner))
|
||||
}
|
||||
|
||||
pub fn open_existing_record(
|
||||
async fn move_remote_record_to_local(&mut self, key: TypedKey, safety_selection: SafetySelection) -> VeilidAPIResult<Option<(PublicKey, DHTSchema)>>
|
||||
{
|
||||
// Get local record store
|
||||
let Some(local_record_store) = self.local_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
|
||||
// Get remote record store
|
||||
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
|
||||
apibail_not_initialized!();
|
||||
};
|
||||
|
||||
let rcb = |r: &Record<RemoteRecordDetail>| {
|
||||
// Return record details
|
||||
r.clone()
|
||||
};
|
||||
let Some(remote_record) = remote_record_store.with_record(key, rcb) else {
|
||||
// No local or remote record found, return None
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Make local record
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let local_record = Record::new(cur_ts, remote_record.descriptor().clone(), LocalRecordDetail {
|
||||
safety_selection
|
||||
})?;
|
||||
local_record_store.new_record(key, local_record).await?;
|
||||
|
||||
// Move copy subkey data from remote to local store
|
||||
for subkey in remote_record.stored_subkeys().iter() {
|
||||
let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await? else {
|
||||
// Subkey was missing
|
||||
warn!("Subkey was missing: {} #{}",key, subkey);
|
||||
continue;
|
||||
};
|
||||
let Some(subkey_data) = subkey_result.value else {
|
||||
// Subkey was missing
|
||||
warn!("Subkey data was missing: {} #{}",key, subkey);
|
||||
continue;
|
||||
};
|
||||
local_record_store.set_subkey(key, subkey, subkey_data).await?;
|
||||
}
|
||||
|
||||
// Delete remote record from store
|
||||
remote_record_store.delete_record(key).await?;
|
||||
|
||||
// Return record information as transferred to local record
|
||||
Ok(Some((remote_record.owner().clone(), remote_record.schema())))
|
||||
}
|
||||
|
||||
pub async fn open_existing_record(
|
||||
&mut self,
|
||||
key: TypedKey,
|
||||
writer: Option<KeyPair>,
|
||||
@@ -235,8 +285,17 @@ impl StorageManagerInner {
|
||||
// Return record details
|
||||
(r.owner().clone(), r.schema())
|
||||
};
|
||||
let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else {
|
||||
return Ok(None);
|
||||
let (owner, schema) = match local_record_store.with_record_mut(key, cb){
|
||||
Some(v) => v,
|
||||
None => {
|
||||
// If we don't have a local record yet, check to see if we have a remote record
|
||||
// if so, migrate it to a local record
|
||||
let Some(v) = self.move_remote_record_to_local(key, safety_selection).await? else {
|
||||
// No remote record either
|
||||
return Ok(None);
|
||||
};
|
||||
v
|
||||
}
|
||||
};
|
||||
// Had local record
|
||||
|
||||
@@ -424,7 +483,7 @@ impl StorageManagerInner {
|
||||
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
|
||||
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
|
||||
where
|
||||
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
|
||||
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
|
@@ -6,7 +6,7 @@ use super::*;
|
||||
#[archive_attr(repr(C), derive(CheckBytes))]
|
||||
pub struct Record<D>
|
||||
where
|
||||
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
|
||||
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
@@ -20,7 +20,7 @@ where
|
||||
|
||||
impl<D> Record<D>
|
||||
where
|
||||
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
|
||||
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
|
||||
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
|
||||
{
|
||||
|
@@ -5,7 +5,6 @@ use super::*;
|
||||
|
||||
#[derive(
|
||||
Clone,
|
||||
Debug,
|
||||
PartialOrd,
|
||||
PartialEq,
|
||||
Eq,
|
||||
@@ -79,3 +78,13 @@ impl SignedValueDescriptor {
|
||||
self.schema_data.cmp(&other.schema_data)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SignedValueDescriptor {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("SignedValueDescriptor")
|
||||
.field("owner", &self.owner)
|
||||
.field("schema_data", &format!("{:?}", &self.schema_data))
|
||||
.field("signature", &self.signature)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user