checkpoint

This commit is contained in:
John Smith
2023-04-07 19:58:11 -04:00
parent e46d64f648
commit 777efaff24
10 changed files with 283 additions and 138 deletions

View File

@@ -0,0 +1,63 @@
use super::*;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct RecordTableKey {
pub key: TypedKey,
}
impl RecordTableKey {
pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4];
bytes[0..4].copy_from_slice(&self.key.kind.0);
bytes[4..PUBLIC_KEY_LENGTH + 4].copy_from_slice(&self.key.value.bytes);
bytes
}
}
impl TryFrom<&[u8]> for RecordTableKey {
type Error = EyreReport;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
if bytes.len() != PUBLIC_KEY_LENGTH + 4 {
bail!("invalid bytes length");
}
let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?;
let value =
PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?;
let key = TypedKey::new(kind, value);
Ok(RecordTableKey { key })
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SubkeyTableKey {
pub key: TypedKey,
pub subkey: ValueSubkey,
}
impl SubkeyTableKey {
pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4 + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4 + 4];
bytes[0..4].copy_from_slice(&self.key.kind.0);
bytes[4..PUBLIC_KEY_LENGTH + 4].copy_from_slice(&self.key.value.bytes);
bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4]
.copy_from_slice(&self.subkey.to_le_bytes());
bytes
}
}
impl TryFrom<&[u8]> for SubkeyTableKey {
type Error = EyreReport;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
if bytes.len() != PUBLIC_KEY_LENGTH + 4 {
bail!("invalid bytes length");
}
let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?;
let value =
PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?;
let subkey = ValueSubkey::from_le_bytes(
bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4]
.try_into()
.wrap_err("invalid subkey")?,
);
let key = TypedKey::new(kind, value);
Ok(SubkeyTableKey { key, subkey })
}
}

View File

@@ -1,6 +1,9 @@
mod keys;
mod record_store;
mod record_store_limits;
mod value_record;
use keys::*;
use record_store::*;
use record_store_limits::*;
use value_record::*;
@@ -8,6 +11,11 @@ use value_record::*;
use super::*;
use crate::rpc_processor::*;
/// The maximum size of a single subkey
const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN;
/// The maximum total size of all subkeys of a record
const MAX_RECORD_DATA_SIZE: usize = 1_048_576;
/// Locked structure for storage manager
struct StorageManagerInner {
/// Records that have been 'created' or 'opened' by this node
@@ -62,6 +70,8 @@ impl StorageManager {
max_records: None,
max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: None,
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_data_size: MAX_RECORD_DATA_SIZE,
}
}
@@ -71,6 +81,8 @@ impl StorageManager {
max_records: Some(xxx),
max_subkey_cache_memory_mb: Some(xxx),
max_disk_space_mb: Some(xxx),
max_subkey_size: MAX_SUBKEY_SIZE,
max_record_data_size: MAX_RECORD_DATA_SIZE,
}
}
@@ -116,7 +128,7 @@ impl StorageManager {
Ok(())
}
pub fn terminate(&self) {
pub async fn terminate(&self) {
debug!("starting storage manager shutdown");
// Release the storage manager
@@ -125,10 +137,14 @@ impl StorageManager {
debug!("finished storage manager shutdown");
}
async fn new_local_record(&self, key: TypedKey, record: ValueRecord) -> EyreResult<()> {
async fn new_local_record(
&self,
key: TypedKey,
record: ValueRecord,
) -> Result<(), VeilidAPIError> {
// add value record to record store
let mut inner = self.inner.lock();
let Some(local_record_store) = inner.local_record_store else {
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_generic!("not initialized");
};
@@ -138,7 +154,7 @@ impl StorageManager {
pub async fn create_record(
&self,
kind: CryptoKind,
schema: &DHTSchema,
schema: DHTSchema,
safety_selection: SafetySelection,
) -> Result<TypedKey, VeilidAPIError> {
// Get cryptosystem
@@ -162,6 +178,7 @@ impl StorageManager {
}
pub async fn open_record(
&self,
key: TypedKey,
secret: Option<SecretKey>,
safety_selection: SafetySelection,
@@ -169,11 +186,11 @@ impl StorageManager {
unimplemented!();
}
pub async fn close_record(key: TypedKey) -> Result<(), VeilidAPIError> {
pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> {
unimplemented!();
}
pub async fn delete_value(key: TypedKey) -> Result<(), VeilidAPIError> {
pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> {
unimplemented!();
}
@@ -195,7 +212,7 @@ impl StorageManager {
unimplemented!();
}
pub async fn watch_value(
pub async fn watch_values(
&self,
key: TypedKey,
subkeys: &[ValueSubkeyRange],
@@ -205,7 +222,7 @@ impl StorageManager {
unimplemented!();
}
pub async fn cancel_watch_value(
pub async fn cancel_watch_values(
&self,
key: TypedKey,
subkeys: &[ValueSubkeyRange],

View File

@@ -7,67 +7,6 @@
use super::*;
use hashlink::LruCache;
pub type RecordIndex = u32;
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct RecordIndexKey {
pub key: TypedKey,
}
impl RecordIndexKey {
pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4];
bytes[0..4] = self.key.kind.0;
bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes;
bytes
}
}
impl TryFrom<&[u8]> for RecordIndexKey {
type Error = EyreReport;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
if bytes.len() != PUBLIC_KEY_LENGTH + 4 {
bail!("invalid bytes length");
}
let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?;
let value =
PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?;
let key = TypedKey::new(kind, value);
Ok(RecordIndexKey { key })
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct SubkeyCacheKey {
pub key: TypedKey,
pub subkey: ValueSubkey,
}
impl SubkeyCacheKey {
pub fn bytes(&self) -> [u8; PUBLIC_KEY_LENGTH + 4 + 4] {
let mut bytes = [0u8; PUBLIC_KEY_LENGTH + 4 + 4];
bytes[0..4] = self.key.kind.0;
bytes[4..PUBLIC_KEY_LENGTH + 4] = self.key.value.bytes;
bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4] = self.subkey.to_le_bytes();
bytes
}
}
impl TryFrom<&[u8]> for SubkeyCacheKey {
type Error = EyreReport;
fn try_from(bytes: &[u8]) -> Result<Self, Self::Error> {
if bytes.len() != PUBLIC_KEY_LENGTH + 4 {
bail!("invalid bytes length");
}
let kind = FourCC::try_from(&bytes[0..4]).wrap_err("invalid kind")?;
let value =
PublicKey::try_from(&bytes[4..PUBLIC_KEY_LENGTH + 4]).wrap_err("invalid value")?;
let subkey =
ValueSubkey::try_from(&bytes[PUBLIC_KEY_LENGTH + 4..PUBLIC_KEY_LENGTH + 4 + 4])
.wrap_err("invalid subkey")?;
let key = TypedKey::new(kind, value);
Ok(SubkeyCacheKey { key, subkey })
}
}
pub struct RecordStore {
table_store: TableStore,
name: String,
@@ -75,11 +14,11 @@ pub struct RecordStore {
record_table: Option<TableDB>,
subkey_table: Option<TableDB>,
record_index: LruCache<RecordIndexKey, ValueRecord>,
subkey_cache: LruCache<SubkeyCacheKey, ValueRecordData>,
record_index: LruCache<RecordTableKey, ValueRecord>,
subkey_cache: LruCache<SubkeyTableKey, ValueRecordData>,
dead_records: Vec<(RecordIndexKey, ValueRecord)>,
changed_records: HashSet<(RecordIndexKey, Timestamp)>,
dead_records: Vec<(RecordTableKey, ValueRecord)>,
changed_records: HashSet<(RecordTableKey, Timestamp)>,
}
impl RecordStore {
@@ -110,11 +49,12 @@ impl RecordStore {
// Pull record index from table into a vector to ensure we sort them
let record_table_keys = record_table.get_keys(0)?;
let mut record_index_saved: Vec<(RecordIndexKey, ValueRecord)> =
let mut record_index_saved: Vec<(RecordTableKey, ValueRecord)> =
Vec::with_capacity(record_table_keys.len());
for rtk in record_table_keys {
if let Some(vr) = record_table.load_rkyv::<ValueRecord>(0, &rtk)? {
record_index_saved.push((rtk, vr));
let rik = RecordTableKey::try_from(rtk.as_ref())?;
record_index_saved.push((rik, vr));
}
}
@@ -122,49 +62,51 @@ impl RecordStore {
record_index_saved.sort_by(|a, b| a.1.last_touched().cmp(&b.1.last_touched()));
let mut dead_records = Vec::new();
for ri in record_index_saved {
let rik = RecordIndexKey::try_from(&ri.0)?;
self.record_index.insert(rik, ri.1, |k, v| {
self.record_index.insert(ri.0, ri.1, |k, v| {
// If the configuration change, we only want to keep the 'limits.max_records' records
dead_records.push((k, v));
})
});
}
self.record_table = Some(record_table);
self.subkey_table = Some(record_table);
self.subkey_table = Some(subkey_table);
Ok(())
}
fn add_dead_record(&mut self, key: RecordIndexKey, record: ValueRecord) {
fn add_dead_record(&mut self, key: RecordTableKey, record: ValueRecord) {
self.dead_records.push((key, record));
}
fn mark_record_changed(&mut self, key: RecordIndexKey) {
fn mark_record_changed(&mut self, key: RecordTableKey) {
let cur_ts = get_aligned_timestamp();
self.changed_records.insert((key, cur_ts));
}
async fn purge_dead_records(&mut self) {
// Delete dead keys
if self.dead_records.empty() {
if self.dead_records.is_empty() {
return;
}
let record_table = self.record_table.clone().unwrap();
let subkey_table = self.subkey_table.clone().unwrap();
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
let mut dead_records = mem::take(&mut self.dead_records);
let dead_records = mem::take(&mut self.dead_records);
for (k, v) in dead_records {
// Delete record
rt_xact.delete(0, &k.bytes());
// Delete subkeys
let subkey_count = v.subkey_count();
let subkey_count = v.subkey_count() as u32;
for sk in 0..subkey_count {
// From table
let sck = SubkeyCacheKey {
let sck = SubkeyTableKey {
key: k.key,
subkey: sk,
};
st_xact.delete(0, &sck.bytes())?;
st_xact.delete(0, &sck.bytes());
// From cache
self.subkey_cache.remove(&sck);
@@ -184,18 +126,21 @@ impl RecordStore {
return;
}
let record_table = self.record_table.clone().unwrap();
let subkey_table = self.subkey_table.clone().unwrap();
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
let mut changed_records = mem::take(&mut self.changed_records);
for (rik, ts) in changed_records {
// Flush record and changed subkeys
// Flush changed records
if let Some(r) = self.record_index.peek(&rik) {
record_table.store_rkyv(0, &rtk)?;
xxx
}
}
if let Err(e) = rt_xact.commit().await {
log_stor!(error "failed to commit record table transaction: {}", e);
}
if let Err(e) = st_xact.commit().await {
log_stor!(error "failed to commit subkey table transaction: {}", e);
}
}
pub async fn tick(&mut self, last_ts: Timestamp, cur_ts: Timestamp) {
@@ -204,7 +149,8 @@ impl RecordStore {
}
pub fn new_record(&mut self, key: TypedKey, record: ValueRecord) -> Result<(), VeilidAPIError> {
if self.with_record(key, |_| {})?.is_some() {
let rik = RecordTableKey { key };
if self.record_index.contains_key(&rik) {
apibail_generic!("record already exists");
}
@@ -215,7 +161,7 @@ impl RecordStore {
// Save to record table
record_table
.store_rkyv(0, &key, &r)
.store_rkyv(0, &rik, &r)
.await
.map_err(VeilidAPIError::internal)?;
@@ -232,7 +178,7 @@ impl RecordStore {
F: FnOnce(&ValueRecord) -> R,
{
// Get record from index
let rck = RecordIndexKey { key };
let rck = RecordTableKey { key };
if let Some(r) = self.record_index.get_mut(&rck) {
// Touch
r.touch(get_aligned_timestamp());
@@ -250,7 +196,7 @@ impl RecordStore {
subkey: ValueSubkey,
) -> Result<Option<ValueRecordData>, VeilidAPIError> {
// record from index
let rck = RecordIndexKey { key };
let rck = RecordTableKey { key };
let Some(r) = self.record_index.get_mut(&rck) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
@@ -270,7 +216,7 @@ impl RecordStore {
};
// If subkey exists in subkey cache, use that
let skck = SubkeyCacheKey { key, subkey };
let skck = SubkeyTableKey { key, subkey };
if let Some(rd) = self.subkey_cache.get_mut(&skck) {
let out = rd.clone();
@@ -299,8 +245,13 @@ impl RecordStore {
subkey: ValueSubkey,
data: ValueRecordData,
) -> Result<(), VeilidAPIError> {
// Check size limit for data
if data.data.len() > self.limits.max_subkey_size {
return Err(VeilidAPIError::generic("record subkey too large"));
}
// Get record from index
let rck = RecordIndexKey { key };
let rck = RecordTableKey { key };
let Some(r) = self.record_index.get_mut(&rck) else {
apibail_invalid_argument!("no record at this key", "key", key);
};
@@ -319,26 +270,41 @@ impl RecordStore {
apibail_internal!("record store not initialized");
};
// Write to subkey cache
let skck = SubkeyCacheKey { key, subkey };
if let Some(rd) = self.subkey_cache.insert(skck, data, |_, _| {}) {
return Ok(Some(out));
// Get the previous subkey and ensure we aren't going over the record size limit
let mut prior_subkey_size = 0usize;
// If subkey exists in subkey cache, use that
let skck = SubkeyTableKey { key, subkey };
if let Some(rd) = self.subkey_cache.peek(&skck) {
prior_subkey_size = rd.data.data().len();
} else {
// If not in cache, try to pull from table store
let k = skck.bytes();
if let Some(rd) = subkey_table
.load_rkyv::<ValueRecordData>(0, &k)
.map_err(VeilidAPIError::internal)?
{
prior_subkey_size = rd.data.data().len();
}
}
xxx do we flush this now or queue it?
// Check new data size
let new_data_size = r.data_size() + data.data().len() - priod_subkey_size;
if new_data_size > self.limits.max_record_data_size {
return Err(VeilidAPIError::generic("dht record too large"));
}
// Write subkey
// let k = skck.bytes();
// if let Some(rd) = subkey_table.load_rkyv::<ValueRecordData>(0, &k)? {
// let out = rd.data.clone();
let k = skck.bytes();
subkey_table.store_rkyv(0, &k, &data)?;
// // Add to cache, do nothing with lru out
// self.subkey_cache.insert(skck, rd, |_| {});
// Write to subkey cache
let skck = SubkeyTableKey { key, subkey };
self.subkey_cache.insert(skck, data, |_, _| {});
// return Ok(Some(out));
// };
// Update record
r.set_data_size(new_data_size);
return Ok(None);
Ok(())
}
}

View File

@@ -1,14 +1,16 @@
use super::*;
/// Configuration for the record store
#[derive(Debug, Default, Copy, Clone)]
pub struct RecordStoreLimits {
/// Number of subkeys to keep in the memory cache
pub subkey_cache_size: u32,
pub subkey_cache_size: usize,
/// Maximum size of a subkey
pub max_subkey_size: usize,
/// Maximum total record data size:
pub max_record_data_size: usize,
/// Limit on the total number of records in the table store
pub max_records: Option<u32>,
pub max_records: Option<usize>,
/// Limit on the amount of subkey cache memory to use before evicting cache items
pub max_subkey_cache_memory_mb: Option<u32>,
pub max_subkey_cache_memory_mb: Option<usize>,
/// Limit on the amount of disk space to use for subkey data
pub max_disk_space_mb: Option<u32>,
pub max_disk_space_mb: Option<usize>,
}

View File

@@ -1,4 +1,6 @@
use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
#[derive(
Clone,
@@ -66,4 +68,12 @@ impl ValueRecord {
pub fn last_touched(&self) -> Timestamp {
self.last_touched_ts
}
pub fn set_data_size(&mut self, size: usize) {
self.data_size = size;
}
pub fn data_size(&self) -> usize {
self.data_size
}
}