storage manager work

This commit is contained in:
John Smith 2023-04-02 20:40:46 -04:00
parent c78035a5d9
commit 39ade462c2
5 changed files with 283 additions and 4 deletions

View File

@ -1,7 +1,16 @@
mod record_store;
mod value_record;
mod record_store_limits;
use record_store::*;
use record_store_limits::*;
use value_record::*;
use super::*;
use crate::rpc_processor::*;
struct StorageManagerInner {}
struct StorageManagerInner {
record_store: RecordStore,
}
struct StorageManagerUnlockedInner {
config: VeilidConfig,
@ -36,7 +45,11 @@ impl StorageManager {
rpc_processor,
}
}
fn new_inner() -> StorageManagerInner {}
fn new_inner() -> StorageManagerInner {
StorageManagerInner {
record_store: RecordStore::new(table_store),
}
}
pub fn new(
config: VeilidConfig,
@ -76,6 +89,12 @@ impl StorageManager {
debug!("finished storage manager shutdown");
}
async fn add_value_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> {
// add value record to record store
let mut inner = self.inner.lock();
inner.record_store.
}
/// Creates a new DHT value with a specified crypto kind and schema
/// Returns the newly allocated DHT Key if successful.
pub async fn create_value(
@ -84,10 +103,26 @@ impl StorageManager {
schema: &DHTSchema,
safety_selection: SafetySelection,
) -> Result<TypedKey, VeilidAPIError> {
unimplemented!();
// Get cryptosystem
let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else {
apibail_generic!("unsupported cryptosystem");
};
// New values require a new owner key
let keypair = vcrypto.generate_keypair();
let key = TypedKey::new(kind, keypair.key);
let secret = keypair.secret;
// Add value record
let record = ValueRecord::new(Some(secret), schema, safety_selection);
self.add_value_record(key, record)
.await
.map_err(VeilidAPIError::internal)?;
Ok(key)
}
/// Opens a DHT value at a specific key. Associates a secret if one is provided to provide writer capability.
/// Opens a DHT value at a specific key. Associates an owner secret if one is provided.
/// Returns the DHT key descriptor for the opened key if successful
/// Value may only be opened or created once. To re-open with a different routing context, first close the value.
pub async fn open_value(

View File

@ -0,0 +1,152 @@
use super::*;
use hashlink::LruCache;
pub type RecordIndex = u32;
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct RecordCacheKey {
record_idx: RecordIndex,
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct SubkeyCacheKey {
record_idx: RecordIndex,
subkey: ValueSubkey,
}
pub struct RecordStore {
table_store: TableStore,
name: String,
limits: RecordStoreLimits,
record_table: Option<TableDB>,
subkey_table: Option<TableDB>,
record_index: HashMap<TypedKey, RecordIndex>,
free_record_index_list: Vec<RecordIndex>,
record_cache: LruCache<RecordIndex, ValueRecord>,
subkey_cache: LruCache<SubkeyCacheKey, ValueRecordData>,
}
impl RecordStore {
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
let record_cache_size = limits.record_cache_size as usize;
let subkey_cache_size = limits.subkey_cache_size as usize;
Self {
table_store,
name: name.to_owned(),
limits,
record_table: None,
subkey_table: None,
record_index: HashMap::new(),
free_record_index_list: Vec::new(), // xxx can this be auto-recovered? should we ever compact the allocated indexes?
record_cache: LruCache::new(record_cache_size),
subkey_cache: LruCache::new(subkey_cache_size),
}
}
pub async fn init(&mut self) -> EyreResult<()> {
let record_table = self
.table_store
.open(&format!("{}_records", self.name), 1)
.await?;
let subkey_table = self
.table_store
.open(&&format!("{}_subkeys", self.name), 1)
.await?;
// xxx get record index and free record index list
self.record_table = Some(record_table);
self.subkey_table = Some(record_table);
Ok(())
}
fn key_bytes(key: TypedKey) -> [u8; PUBLIC_KEY_LENGTH + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4];
bytes[0..4] = key.kind.0;
bytes[4..PUBLIC_KEY_LENGTH + 4] = key.value.bytes;
bytes
}
pub fn with_record<R, F: FnOnce(TypedKey, &ValueRecord) -> R>(
&mut self,
key: TypedKey,
f: F,
) -> EyreResult<Option<R>> {
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
if let Some(r) = self.record_cache.get(&key) {
// Callback
return Ok(Some(f(key, r)));
}
// If not in cache, try to pull from table store
let k = Self::key_bytes(key);
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(key, &r);
// Add to cache, do nothing with lru out
self.record_cache.insert(key, r, |_| {});
return Ok(Some(out));
};
return Ok(None);
}
pub fn with_record_mut<R, F: FnOnce(TypedKey, &mut ValueRecord) -> R>(
&mut self,
key: TypedKey,
f: F,
) -> EyreResult<Option<R>> {
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// If record exists in cache, use that
if let Some(r) = self.record_cache.get_mut(&key) {
// Callback
return Ok(Some(f(key, r)));
}
// If not in cache, try to pull from table store
let k = Self::key_bytes(key);
if let Some(r) = record_table.load_rkyv(0, &k)? {
// Callback
let out = f(key, &mut r);
// Save changes back to record table
record_table.store_rkyv(0, &k, &r).await?;
// Add to cache, do nothing with lru out
self.record_cache.insert(key, r, |_| {});
return Ok(Some(out));
};
Ok(None)
}
pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> EyreResult<()> {
if self.with_record(key, |_| {})?.is_some() {
bail!("record already exists");
}
// Get record table
let Some(record_table) = self.record_table.clone() else {
bail!("record store not initialized");
};
// Save to record table
record_table.store_rkyv(0, &key, &r).await?;
// Cache it
self.record_cache.insert(key, value, |_| {});
Ok(())
}
}

View File

@ -0,0 +1,11 @@
use super::*;
/// Configuration for the record store
#[derive(Debug, Default, Copy, Clone)]
pub struct RecordStoreLimits {
pub record_cache_size: u32,
pub subkey_cache_size: u32,
pub max_records: Option<u32>,
pub max_cache_memory_mb: Option<u32>,
pub max_disk_space_mb: Option<u32>,
}

View File

@ -0,0 +1,58 @@
use super::*;
#[derive(
Clone,
Debug,
Default,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueRecordData {
data: ValueData,
signature: Signature,
}
#[derive(
Clone,
Debug,
Default,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueRecord {
secret: Option<SecretKey>,
schema: DHTSchema,
safety_selection: SafetySelection,
total_size: usize,
subkeys: Vec<ValueRecordData>,
}
impl ValueRecord {
pub fn new(
secret: Option<SecretKey>,
schema: DHTSchema,
safety_selection: SafetySelection,
) -> Self {
// Get number of subkeys
let subkey_count = schema.subkey_count();
Self {
secret,
schema,
safety_selection,
subkeys: vec![Vec::new(); subkey_count],
}
}
}

View File

@ -2434,6 +2434,7 @@ pub struct DHTSchemaDFLT {
}
impl DHTSchemaDFLT {
/// Build the data representation of the schema
pub fn compile(&self) -> Vec<u8> {
let mut out = Vec::<u8>::with_capacity(6);
// kind
@ -2442,6 +2443,11 @@ impl DHTSchemaDFLT {
out.extend_from_slice(&self.o_cnt.to_le_bytes());
out
}
/// Get the number of subkeys this schema allocates
pub fn subkey_count(&self) -> usize {
self.o_cnt as usize
}
}
/// Simple DHT Schema (SMPL) Member
@ -2469,6 +2475,7 @@ pub struct DHTSchemaSMPL {
}
impl DHTSchemaSMPL {
/// Build the data representation of the schema
pub fn compile(&self) -> Vec<u8> {
let mut out = Vec::<u8>::with_capacity(6 + (self.members.len() * (PUBLIC_KEY_LENGTH + 2)));
// kind
@ -2484,6 +2491,13 @@ impl DHTSchemaSMPL {
}
out
}
/// Get the number of subkeys this schema allocates
pub fn subkey_count(&self) -> usize {
self.members
.iter()
.fold(o_cnt as usize, |acc, x| acc + (x.m_cnt as usize))
}
}
/// Enum over all the supported DHT Schemas
@ -2505,12 +2519,21 @@ impl DHTSchema {
DHTSchema::SMPL(DHTSchemaSMPL { o_cnt, members })
}
/// Build the data representation of the schema
pub fn compile(&self) -> Vec<u8> {
match self {
DHTSchema::DFLT(d) => d.compile(),
DHTSchema::SMPL(s) => s.compile(),
}
}
/// Get the number of subkeys this schema allocates
pub fn subkey_count(&self) -> usize {
match self {
DHTSchema::DFLT(d) => d.subkey_count(),
DHTSchema::SMPL(s) => s.subkey_count(),
}
}
}
/// DHT Key Descriptor