storage work

This commit is contained in:
John Smith 2023-04-06 20:21:45 -04:00
parent 7eded89b11
commit e46d64f648
7 changed files with 326 additions and 211 deletions

View File

@ -338,7 +338,7 @@ struct ValueData @0xb4b7416f169f2a3d {
struct OperationGetValueQ @0xf88a5b6da5eda5d0 { struct OperationGetValueQ @0xf88a5b6da5eda5d0 {
key @0 :TypedKey; # the location of the value key @0 :TypedKey; # the location of the value
subkey @1 :Subkey; # the index of the subkey (0 for the default subkey) subkey @1 :Subkey; # the index of the subkey
wantSchema @2 :bool; # whether or not to include the schema for the key wantSchema @2 :bool; # whether or not to include the schema for the key
} }

View File

@ -1,6 +1,6 @@
mod record_store; mod record_store;
mod value_record;
mod record_store_limits; mod record_store_limits;
mod value_record;
use record_store::*; use record_store::*;
use record_store_limits::*; use record_store_limits::*;
use value_record::*; use value_record::*;
@ -58,7 +58,6 @@ impl StorageManager {
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits { RecordStoreLimits {
record_cache_size: todo!(),
subkey_cache_size: todo!(), subkey_cache_size: todo!(),
max_records: None, max_records: None,
max_subkey_cache_memory_mb: Some(xxx), max_subkey_cache_memory_mb: Some(xxx),
@ -68,11 +67,10 @@ impl StorageManager {
fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
RecordStoreLimits { RecordStoreLimits {
record_cache_size: todo!(),
subkey_cache_size: todo!(), subkey_cache_size: todo!(),
max_records: Some(xxx), max_records: Some(xxx),
max_subkey_cache_memory_mb: Some(xxx), max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: Some(xxx) max_disk_space_mb: Some(xxx),
} }
} }
@ -84,7 +82,6 @@ impl StorageManager {
block_store: BlockStore, block_store: BlockStore,
rpc_processor: RPCProcessor, rpc_processor: RPCProcessor,
) -> StorageManager { ) -> StorageManager {
StorageManager { StorageManager {
unlocked_inner: Arc::new(Self::new_unlocked_inner( unlocked_inner: Arc::new(Self::new_unlocked_inner(
config, config,
@ -94,7 +91,7 @@ impl StorageManager {
block_store, block_store,
rpc_processor, rpc_processor,
)), )),
inner: Arc::new(Mutex::new(Self::new_inner())) inner: Arc::new(Mutex::new(Self::new_inner())),
} }
} }
@ -105,8 +102,16 @@ impl StorageManager {
let local_limits = Self::local_limits_from_config(config.clone()); let local_limits = Self::local_limits_from_config(config.clone());
let remote_limits = Self::remote_limits_from_config(config.clone()); let remote_limits = Self::remote_limits_from_config(config.clone());
inner.local_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "local", local_limits)); inner.local_record_store = Some(RecordStore::new(
inner.remote_record_store = Some(RecordStore::new(self.unlocked_inner.table_store.clone(), "remote", remote_limits)); self.unlocked_inner.table_store.clone(),
"local",
local_limits,
));
inner.remote_record_store = Some(RecordStore::new(
self.unlocked_inner.table_store.clone(),
"remote",
remote_limits,
));
Ok(()) Ok(())
} }
@ -120,15 +125,17 @@ impl StorageManager {
debug!("finished storage manager shutdown"); debug!("finished storage manager shutdown");
} }
async fn add_value_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { async fn new_local_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> {
// add value record to record store // add value record to record store
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.record_store. let Some(local_record_store) = inner.local_record_store else {
apibail_generic!("not initialized");
};
local_record_store.new_record(key, record)
} }
/// Creates a new DHT value with a specified crypto kind and schema pub async fn create_record(
/// Returns the newly allocated DHT Key if successful.
pub async fn create_value(
&self, &self,
kind: CryptoKind, kind: CryptoKind,
schema: &DHTSchema, schema: &DHTSchema,
@ -144,60 +151,50 @@ impl StorageManager {
let key = TypedKey::new(kind, keypair.key); let key = TypedKey::new(kind, keypair.key);
let secret = keypair.secret; let secret = keypair.secret;
// Add value record // Add new local value record
let record = ValueRecord::new(Some(secret), schema, safety_selection); let cur_ts = get_aligned_timestamp();
self.add_value_record(key, record) let record = ValueRecord::new(cur_ts, Some(secret), schema, safety_selection);
self.new_local_record(key, record)
.await .await
.map_err(VeilidAPIError::internal)?; .map_err(VeilidAPIError::internal)?;
Ok(key) Ok(key)
} }
/// Opens a DHT value at a specific key. Associates an owner secret if one is provided. pub async fn open_record(
/// Returns the DHT key descriptor for the opened key if successful
/// Value may only be opened or created once. To re-open with a different routing context, first close the value.
pub async fn open_value(
key: TypedKey, key: TypedKey,
secret: Option<SecretKey>, secret: Option<SecretKey>,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> Result<DHTDescriptor, VeilidAPIError> { ) -> Result<DHTRecordDescriptor, VeilidAPIError> {
unimplemented!(); unimplemented!();
} }
/// Closes a DHT value at a specific key that was opened with create_value or open_value. pub async fn close_record(key: TypedKey) -> Result<(), VeilidAPIError> {
/// Closing a value allows you to re-open it with a different routing context unimplemented!();
pub async fn close_value(key: TypedKey) -> Result<(), VeilidAPIError> { }
pub async fn delete_value(key: TypedKey) -> Result<(), VeilidAPIError> {
unimplemented!(); unimplemented!();
} }
/// Gets the latest value of a subkey from the network
/// Returns the possibly-updated value data of the subkey
pub async fn get_value( pub async fn get_value(
&self, &self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
force_refresh: bool, force_refresh: bool,
) -> Result<ValueData, VeilidAPIError> {
unimplemented!();
}
/// Pushes a changed subkey value to the network
/// Returns None if the value was successfully put
/// Returns Some(newer_value) if the value put was older than the one available on the network
pub async fn set_value(
&self,
key: TypedKey,
subkey: ValueSubkey,
value_data: ValueData,
) -> Result<Option<ValueData>, VeilidAPIError> { ) -> Result<Option<ValueData>, VeilidAPIError> {
unimplemented!(); unimplemented!();
} }
/// Watches changes to an opened or created value pub async fn set_value(
/// Changes to subkeys within the subkey range are returned via a ValueChanged callback &self,
/// If the subkey range is empty, all subkey changes are considered key: TypedKey,
/// Expiration can be infinite to keep the watch for the maximum amount of time subkey: ValueSubkey,
/// Return value upon success is the amount of time allowed for the watch data: Vec<u8>,
) -> Result<Option<ValueData>, VeilidAPIError> {
unimplemented!();
}
pub async fn watch_value( pub async fn watch_value(
&self, &self,
key: TypedKey, key: TypedKey,
@ -208,8 +205,6 @@ impl StorageManager {
unimplemented!(); unimplemented!();
} }
/// Cancels a watch early
/// This is a convenience function that cancels watching all subkeys in a range
pub async fn cancel_watch_value( pub async fn cancel_watch_value(
&self, &self,
key: TypedKey, key: TypedKey,

View File

@ -1,9 +1,15 @@
/// RecordStore
/// Keeps an LRU cache of dht keys and their associated subkey valuedata.
/// Instances of this store are used for 'local' (persistent) and 'remote' (ephemeral) dht key storage.
/// This store does not perform any validation on the schema, and all ValueRecordData passed in must have been previously validated.
/// Uses an in-memory store for the records, backed by the TableStore. Subkey data is LRU cached and rotated out by a limits policy,
/// and backed to the TableStore for persistence.
use super::*; use super::*;
use hashlink::LruCache; use hashlink::LruCache;
pub type RecordIndex = u32; pub type RecordIndex = u32;
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct RecordIndexKey { struct RecordIndexKey {
pub key: TypedKey, pub key: TypedKey,
} }
@ -30,7 +36,7 @@ impl TryFrom<&[u8]> for RecordIndexKey {
} }
} }
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct SubkeyCacheKey { struct SubkeyCacheKey {
pub key: TypedKey, pub key: TypedKey,
pub subkey: ValueSubkey, pub subkey: ValueSubkey,
@ -71,6 +77,9 @@ pub struct RecordStore {
subkey_table: Option<TableDB>, subkey_table: Option<TableDB>,
record_index: LruCache<RecordIndexKey, ValueRecord>, record_index: LruCache<RecordIndexKey, ValueRecord>,
subkey_cache: LruCache<SubkeyCacheKey, ValueRecordData>, subkey_cache: LruCache<SubkeyCacheKey, ValueRecordData>,
dead_records: Vec<(RecordIndexKey, ValueRecord)>,
changed_records: HashSet<(RecordIndexKey, Timestamp)>,
} }
impl RecordStore { impl RecordStore {
@ -84,6 +93,8 @@ impl RecordStore {
subkey_table: None, subkey_table: None,
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
subkey_cache: LruCache::new(subkey_cache_size), subkey_cache: LruCache::new(subkey_cache_size),
dead_records: Vec::new(),
changed_records: HashSet::new(),
} }
} }
@ -118,145 +129,163 @@ impl RecordStore {
}) })
} }
// Delete dead keys
if !dead_records.empty() {
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
for (k, v) in dead_records {
// Delete record
rt_xact.delete(0, &k.bytes());
// Delete subkeys
let subkey_count = v.subkey_count();
for sk in 0..subkey_count {
let sck = SubkeyCacheKey {
key: k.key,
subkey: sk,
};
st_xact.delete(0, &sck.bytes())?;
}
}
rt_xact.commit().await?;
st_xact.commit().await?;
}
self.record_table = Some(record_table); self.record_table = Some(record_table);
self.subkey_table = Some(record_table); self.subkey_table = Some(record_table);
Ok(()) Ok(())
} }
fix up new record fn add_dead_record(&mut self, key: RecordIndexKey, record: ValueRecord) {
self.dead_records.push((key, record));
}
pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> EyreResult<()> { fn mark_record_changed(&mut self, key: RecordIndexKey) {
let cur_ts = get_aligned_timestamp();
self.changed_records.insert((key, cur_ts));
}
async fn purge_dead_records(&mut self) {
// Delete dead keys
if self.dead_records.empty() {
return;
}
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
let mut dead_records = mem::take(&mut self.dead_records);
for (k, v) in dead_records {
// Delete record
rt_xact.delete(0, &k.bytes());
// Delete subkeys
let subkey_count = v.subkey_count();
for sk in 0..subkey_count {
// From table
let sck = SubkeyCacheKey {
key: k.key,
subkey: sk,
};
st_xact.delete(0, &sck.bytes())?;
// From cache
self.subkey_cache.remove(&sck);
}
}
if let Err(e) = rt_xact.commit().await {
log_stor!(error "failed to commit record table transaction: {}", e);
}
if let Err(e) = st_xact.commit().await {
log_stor!(error "failed to commit subkey table transaction: {}", e);
}
}
async fn flush_records(&mut self) {
// touch records
if self.changed_records.empty() {
return;
}
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
let mut changed_records = mem::take(&mut self.changed_records);
for (rik, ts) in changed_records {
// Flush record and changed subkeys
}
if let Err(e) = rt_xact.commit().await {
log_stor!(error "failed to commit record table transaction: {}", e);
}
if let Err(e) = st_xact.commit().await {
log_stor!(error "failed to commit subkey table transaction: {}", e);
}
}
pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
self.flush_records().await;
self.purge_dead_records().await;
}
pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> Result<(), VeilidAPIError> {
if self.with_record(key, |_| {})?.is_some() { if self.with_record(key, |_| {})?.is_some() {
bail!("record already exists"); apibail_generic!("record already exists");
} }
// Get record table // Get record table
let Some(record_table) = self.record_table.clone() else { let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized"); apibail_internal!("record store not initialized");
}; };
// Save to record table // Save to record table
record_table.store_rkyv(0, &key, &r).await?; record_table
.store_rkyv(0, &key, &r)
.await
.map_err(VeilidAPIError::internal)?;
// Cache it // Cache it
self.record_cache.insert(key, value, |_| {}); self.record_cache.insert(key, value, |k, v| {
self.add_dead_record(k, v);
});
Ok(()) Ok(())
} }
pub fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> EyreResult<Option<R>> pub fn with_record<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where where
F: FnOnce(&mut RecordStore, TypedKey, &ValueRecord) -> R, F: FnOnce(&ValueRecord) -> R,
{ {
// Get record table // Get record from index
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
let rck = RecordIndexKey { key }; let rck = RecordIndexKey { key };
if let Some(r) = self.record_cache.get(&rck) { if let Some(r) = self.record_index.get_mut(&rck) {
// Touch
r.touch(get_aligned_timestamp());
self.mark_record_changed(&rck);
// Callback // Callback
return Ok(Some(f(self, key, r))); return Some(f(key, r));
} }
// If not in cache, try to pull from table store None
let k = rck.bytes();
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, &r);
// Add to cache, do nothing with lru out
self.record_cache.insert(rck, r, |_| {});
return Ok(Some(out));
};
return Ok(None);
} }
pub fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> EyreResult<Option<R>> pub fn get_subkey<R, F>(
where
F: FnOnce(&mut RecordStore, TypedKey, &mut ValueRecord) -> R,
{
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
let rck = RecordIndexKey { key };
if let Some(r) = self.record_cache.get_mut(&rck) {
// Callback
return Ok(Some(f(self, key, r)));
}
// If not in cache, try to pull from table store
let k = rck.bytes();
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, &mut r);
// Save changes back to record table
record_table.store_rkyv(0, &k, &r).await?;
// Add to cache, do nothing with lru out
self.record_cache.insert(rck, r, |_| {});
return Ok(Some(out));
};
Ok(None)
}
pub fn with_subkey<R, F>(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
f: F, ) -> Result<Option<ValueRecordData>, VeilidAPIError> {
) -> EyreResult<Option<R>> // record from index
where let rck = RecordIndexKey { key };
F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &ValueRecordData) -> R, let Some(r) = self.record_index.get_mut(&rck) else {
{ apibail_invalid_argument!("no record at this key", "key", key);
};
// Touch
r.touch(get_aligned_timestamp());
self.mark_record_changed(&rck);
// Check if the subkey is in range
if subkey >= r.subkey_count() {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
// Get subkey table // Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else { let Some(subkey_table) = self.subkey_table.clone() else {
bail!("record store not initialized"); apibail_internal!("record store not initialized");
}; };
// If subkey exists in subkey cache, use that // If subkey exists in subkey cache, use that
let skck = SubkeyCacheKey { key, subkey }; let skck = SubkeyCacheKey { key, subkey };
if let Some(rd) = self.subkey_cache.get(&skck) { if let Some(rd) = self.subkey_cache.get_mut(&skck) {
// Callback let out = rd.clone();
return Ok(Some(f(self, key, subkey, rd)));
return Ok(Some(out));
} }
// If not in cache, try to pull from table store // If not in cache, try to pull from table store
let k = skck.bytes(); let k = skck.bytes();
if let Some(rd) = subkey_table.load_rkyv(0, &k)? { if let Some(rd) = subkey_table
// Callback .load_rkyv::<ValueRecordData>(0, &k)
let out = f(self, key, subkey, &rd); .map_err(VeilidAPIError::internal)?
{
let out = rd.clone();
// Add to cache, do nothing with lru out // Add to cache, do nothing with lru out
self.subkey_cache.insert(skck, r, |_| {}); self.subkey_cache.insert(skck, rd, |_| {});
return Ok(Some(out)); return Ok(Some(out));
}; };
@ -264,41 +293,52 @@ impl RecordStore {
return Ok(None); return Ok(None);
} }
pub fn with_subkey_mut<R, F>( pub fn set_subkey<R, F>(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
f: F, data: ValueRecordData,
) -> EyreResult<Option<R>> ) -> Result<(), VeilidAPIError> {
where // Get record from index
F: FnOnce(&mut RecordStore, TypedKey, ValueSubkey, &mut ValueRecord) -> R, let rck = RecordIndexKey { key };
{ let Some(r) = self.record_index.get_mut(&rck) else {
// Get record table apibail_invalid_argument!("no record at this key", "key", key);
let Some(subkey_table) = self.subkey_table.clone() else {
bail!("record store not initialized");
}; };
// If subkey exists in cache, use that // Touch
let skck = SubkeyCacheKey { key, subkey }; r.touch(get_aligned_timestamp());
if let Some(rd) = self.subkey_cache.get_mut(&skck) { self.mark_record_changed(&rck);
// Callback
return Ok(Some(f(self, key, subkey, rd))); // Check if the subkey is in range
if subkey >= r.subkey_count() {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
} }
// If not in cache, try to pull from table store
let k = skck.bytes();
if let Some(rd) = subkey_table.load_rkyv(0, &k)? {
// Callback
let out = f(self, key, subkey, &mut rd);
// Save changes back to record table // Get subkey table
subkey_table.store_rkyv(0, &k, &rd).await?; let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
// Add to cache, do nothing with lru out
self.subkey_cache.insert(key, r, |_| {});
return Ok(Some(out));
}; };
Ok(None) // Write to subkey cache
let skck = SubkeyCacheKey { key, subkey };
if let Some(rd) = self.subkey_cache.insert(skck, data, |_, _| {}) {
return Ok(Some(out));
}
xxx do we flush this now or queue it?
// Write subkey
// let k = skck.bytes();
// if let Some(rd) = subkey_table.load_rkyv::<ValueRecordData>(0, &k)? {
// let out = rd.data.clone();
// // Add to cache, do nothing with lru out
// self.subkey_cache.insert(skck, rd, |_| {});
// return Ok(Some(out));
// };
return Ok(None);
} }
} }

View File

@ -14,8 +14,8 @@ use super::*;
)] )]
#[archive_attr(repr(C), derive(CheckBytes))] #[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueRecordData { pub struct ValueRecordData {
data: ValueData, pub data: ValueData,
signature: Signature, pub signature: Signature,
} }
#[derive( #[derive(

View File

@ -195,64 +195,74 @@ impl RoutingContext {
} }
/////////////////////////////////// ///////////////////////////////////
/// DHT Values /// DHT Records
/// Creates a new DHT value with a specified crypto kind and schema /// Creates a new DHT record a specified crypto kind and schema
/// Returns the newly allocated DHT Key if successful. /// Returns the newly allocated DHT record's key if successful. The records is considered 'open' after the create operation succeeds.
pub async fn create_value( pub async fn create_dht_record(
&self, &self,
kind: CryptoKind, kind: CryptoKind,
schema: &DHTSchema, schema: &DHTSchema,
) -> Result<TypedKey, VeilidAPIError> { ) -> Result<TypedKey, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?; let storage_manager = self.api.storage_manager()?;
storage_manager storage_manager
.create_value(kind, schema, self.unlocked_inner.safety_selection) .create_record(kind, schema, self.unlocked_inner.safety_selection)
.await .await
} }
/// Opens a DHT value at a specific key. Associates a secret if one is provided to provide writer capability. /// Opens a DHT record at a specific key. Associates a secret if one is provided to provide writer capability.
/// Returns the DHT key descriptor for the opened key if successful /// Returns the DHT record descriptor for the opened record if successful
/// Value may only be opened or created once. To re-open with a different routing context, first close the value. /// Records may only be opened or created . To re-open with a different routing context, first close the value.
pub async fn open_value( pub async fn open_dht_record(
key: TypedKey, key: TypedKey,
secret: Option<SecretKey>, secret: Option<SecretKey>,
) -> Result<DHTDescriptor, VeilidAPIError> { ) -> Result<DHTRecordDescriptor, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?; let storage_manager = self.api.storage_manager()?;
storage_manager storage_manager
.open_value(key, secret, self.unlocked_inner.safety_selection) .open_record(key, secret, self.unlocked_inner.safety_selection)
.await .await
} }
/// Closes a DHT value at a specific key that was opened with create_value or open_value. /// Closes a DHT record at a specific key that was opened with create_dht_record or open_dht_record.
/// Closing a value allows you to re-open it with a different routing context /// Closing a record allows you to re-open it with a different routing context
pub async fn close_value(key: TypedKey) -> Result<(), VeilidAPIError> { pub async fn close_dht_record(key: TypedKey) -> Result<(), VeilidAPIError> {
let storage_manager = self.api.storage_manager()?; let storage_manager = self.api.storage_manager()?;
storage_manager.close_value(key).await storage_manager.close_record(key).await
} }
/// Gets the latest value of a subkey from the network /// Deletes a DHT record at a specific key. If the record is opened, it must be closed before it is deleted.
/// Returns the possibly-updated value data of the subkey /// Deleting a record does not delete it from the network immediately, but will remove the storage of the record
pub async fn get_value( /// locally, and will prevent its value from being refreshed on the network by this node.
pub async fn delete_dht_record(key: TypedKey) -> Result<(), VeilidAPIError> {
let storage_manager = self.api.storage_manager()?;
storage_manager.delete_record(key).await
}
/// Gets the latest value of a subkey
/// May pull the latest value from the network, but by settings 'force_refresh' you can force a network data refresh
/// Returns None if the value subkey has not yet been set
/// Returns Some(data) if the value subkey has valid data
pub async fn get_dht_value(
&self, &self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
force_refresh: bool, force_refresh: bool,
) -> Result<ValueData, VeilidAPIError> { ) -> Result<Option<ValueData>, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?; let storage_manager = self.api.storage_manager()?;
storage_manager.get_value(key, subkey, force_refresh).await storage_manager.get_value(key, subkey, force_refresh).await
} }
/// Pushes a changed subkey value to the network /// Pushes a changed subkey value to the network
/// Returns None if the value was successfully put /// Returns None if the value was successfully put
/// Returns Some(newer_value) if the value put was older than the one available on the network /// Returns Some(data) if the value put was older than the one available on the network
pub async fn set_value( pub async fn set_dht_value(
&self, &self,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
value_data: ValueData, data: Vec<u8>,
) -> Result<Option<ValueData>, VeilidAPIError> { ) -> Result<Option<ValueData>, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?; let storage_manager = self.api.storage_manager()?;
storage_manager.set_value(key, subkey, value_data).await storage_manager.set_value(key, subkey, data).await
} }
/// Watches changes to an opened or created value /// Watches changes to an opened or created value
@ -260,7 +270,7 @@ impl RoutingContext {
/// If the subkey range is empty, all subkey changes are considered /// If the subkey range is empty, all subkey changes are considered
/// Expiration can be infinite to keep the watch for the maximum amount of time /// Expiration can be infinite to keep the watch for the maximum amount of time
/// Return value upon success is the amount of time allowed for the watch /// Return value upon success is the amount of time allowed for the watch
pub async fn watch_value( pub async fn watch_dht_values(
&self, &self,
key: TypedKey, key: TypedKey,
subkeys: &[ValueSubkeyRange], subkeys: &[ValueSubkeyRange],
@ -275,7 +285,7 @@ impl RoutingContext {
/// Cancels a watch early /// Cancels a watch early
/// This is a convenience function that cancels watching all subkeys in a range /// This is a convenience function that cancels watching all subkeys in a range
pub async fn cancel_watch_value( pub async fn cancel_dht_watch(
&self, &self,
key: TypedKey, key: TypedKey,
subkeys: &[ValueSubkeyRange], subkeys: &[ValueSubkeyRange],

View File

@ -353,12 +353,15 @@ pub struct VeilidState {
)] )]
#[archive_attr(repr(C), derive(CheckBytes))] #[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueData { pub struct ValueData {
pub seq: ValueSeqNum, seq: ValueSeqNum,
pub data: Vec<u8>, data: Vec<u8>,
pub writer: PublicKey, writer: PublicKey,
} }
impl ValueData { impl ValueData {
pub const MAX_LEN: usize = 32768;
pub fn new(data: Vec<u8>, writer: PublicKey) -> Self { pub fn new(data: Vec<u8>, writer: PublicKey) -> Self {
assert!(data.len() <= Self::MAX_LEN);
Self { Self {
seq: 0, seq: 0,
data, data,
@ -366,11 +369,30 @@ impl ValueData {
} }
} }
pub fn new_with_seq(seq: ValueSeqNum, data: Vec<u8>, writer: PublicKey) -> Self { pub fn new_with_seq(seq: ValueSeqNum, data: Vec<u8>, writer: PublicKey) -> Self {
assert!(data.len() <= Self::MAX_LEN);
Self { seq, data, writer } Self { seq, data, writer }
} }
pub fn change(&mut self, data: Vec<u8>) {
self.data = data; pub fn seq(&self) -> ValueSeqNum {
self.seq
}
pub fn writer(&self) -> PublicKey {
self.writer
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn with_data_mut<F, R>(&mut self, f: F)
where
F: FnOnce(&mut Vec<u8>) -> R,
{
let out = f(&mut self.data);
assert(self.data.len() <= Self::MAX_LEN);
self.seq += 1; self.seq += 1;
out
} }
} }
@ -2444,7 +2466,7 @@ impl DHTSchemaDFLT {
out out
} }
/// Get the number of subkeys this schema allocates /// Get the number of subkeys this schema allocates
pub fn subkey_count(&self) -> usize { pub fn subkey_count(&self) -> usize {
self.o_cnt as usize self.o_cnt as usize
} }
@ -2492,7 +2514,7 @@ impl DHTSchemaSMPL {
out out
} }
/// Get the number of subkeys this schema allocates /// Get the number of subkeys this schema allocates
pub fn subkey_count(&self) -> usize { pub fn subkey_count(&self) -> usize {
self.members self.members
.iter() .iter()
@ -2527,7 +2549,7 @@ impl DHTSchema {
} }
} }
/// Get the number of subkeys this schema allocates /// Get the number of subkeys this schema allocates
pub fn subkey_count(&self) -> usize { pub fn subkey_count(&self) -> usize {
match self { match self {
DHTSchema::DFLT(d) => d.subkey_count(), DHTSchema::DFLT(d) => d.subkey_count(),
@ -2536,12 +2558,12 @@ impl DHTSchema {
} }
} }
/// DHT Key Descriptor /// DHT Record Descriptor
#[derive( #[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)] )]
#[archive_attr(repr(C), derive(CheckBytes))] #[archive_attr(repr(C), derive(CheckBytes))]
pub struct DHTDescriptor { pub struct DHTRecordDescriptor {
pub owner: PublicKey, pub owner: PublicKey,
pub schema: DHTSchema, pub schema: DHTSchema,
} }

View File

@ -1,6 +1,6 @@
// LogThru // LogThru
// Pass errors through and log them simultaneously via map_err() // Pass errors through and log them simultaneously via map_err()
// Also contains common log facilities (net, rpc, rtab, pstore, crypto, etc ) // Also contains common log facilities (net, rpc, rtab, stor, pstore, crypto, etc )
use super::*; use super::*;
@ -123,6 +123,42 @@ macro_rules! log_rtab {
} }
} }
#[macro_export]
macro_rules! log_stor {
(error $text:expr) => { error!(
target: "stor",
"{}",
$text,
)};
(error $fmt:literal, $($arg:expr),+) => {
error!(target:"stor", $fmt, $($arg),+);
};
(warn $text:expr) => { warn!(
target: "stor",
"{}",
$text,
)};
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"stor", $fmt, $($arg),+);
};
(debug $text:expr) => { debug!(
target: "stor",
"{}",
$text,
)};
(debug $fmt:literal, $($arg:expr),+) => {
debug!(target:"stor", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "stor",
"{}",
$text,
)};
($fmt:literal, $($arg:expr),+) => {
trace!(target:"stor", $fmt, $($arg),+);
}
}
#[macro_export] #[macro_export]
macro_rules! log_pstore { macro_rules! log_pstore {
(error $text:expr) => { error!( (error $text:expr) => { error!(
@ -216,6 +252,18 @@ macro_rules! logthru_rtab {
} }
} }
#[macro_export] #[macro_export]
macro_rules! logthru_stor {
($($level:ident)?) => {
logthru!($($level)? "stor")
};
($($level:ident)? $text:literal) => {
logthru!($($level)? "stor", $text)
};
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
logthru!($($level)? "stor", $fmt, $($arg),+)
}
}
#[macro_export]
macro_rules! logthru_pstore { macro_rules! logthru_pstore {
($($level:ident)?) => { ($($level:ident)?) => {
logthru!($($level)? "pstore") logthru!($($level)? "pstore")