checkpoint

This commit is contained in:
John Smith 2023-04-26 21:07:24 -04:00
parent 36cb0687cb
commit a03c00ac76
21 changed files with 123 additions and 69 deletions

View File

@ -7,8 +7,6 @@ use core::hash::Hash;
use data_encoding::BASE64URL_NOPAD; use data_encoding::BASE64URL_NOPAD;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
/// Length of a public key in bytes /// Length of a public key in bytes

View File

@ -5,8 +5,6 @@ use core::convert::TryInto;
use core::fmt; use core::fmt;
use core::hash::Hash; use core::hash::Hash;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
/// Cryptography version fourcc code /// Cryptography version fourcc code
pub type CryptoKind = FourCC; pub type CryptoKind = FourCC;

View File

@ -1,7 +1,6 @@
use crate::*; use crate::*;
use data_encoding::BASE64URL_NOPAD; use data_encoding::BASE64URL_NOPAD;
use keyring_manager::*; use keyring_manager::*;
use rkyv::{bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use std::path::Path; use std::path::Path;
pub struct ProtectedStoreInner { pub struct ProtectedStoreInner {

View File

@ -1,8 +1,4 @@
use crate::*; use crate::*;
use rkyv::{
bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize,
Serialize as RkyvSerialize,
};
cfg_if! { cfg_if! {
if #[cfg(target_arch = "wasm32")] { if #[cfg(target_arch = "wasm32")] {

View File

@ -42,9 +42,18 @@ pub use veilid_tools as tools;
use enumset::*; use enumset::*;
use rkyv::{ use rkyv::{
bytecheck, bytecheck::CheckBytes, Archive as RkyvArchive, Deserialize as RkyvDeserialize, bytecheck, bytecheck::CheckBytes, de::deserializers::SharedDeserializeMap, with::Skip,
Serialize as RkyvSerialize, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
}; };
type RkyvSerializer = rkyv::ser::serializers::CompositeSerializer<
rkyv::ser::serializers::AlignedSerializer<rkyv::AlignedVec>,
rkyv::ser::serializers::FallbackScratch<
rkyv::ser::serializers::HeapScratch<1024>,
rkyv::ser::serializers::AllocScratch,
>,
rkyv::ser::serializers::SharedSerializeMap,
>;
type RkyvDefaultValidator<'t> = rkyv::validation::validators::DefaultValidator<'t>;
use serde::*; use serde::*;
pub mod veilid_capnp { pub mod veilid_capnp {

View File

@ -25,7 +25,3 @@ pub use peer_address::*;
pub use protocol_type::*; pub use protocol_type::*;
pub use signal_info::*; pub use signal_info::*;
pub use socket_address::*; pub use socket_address::*;
use enumset::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;

View File

@ -1,8 +1,6 @@
use super::*; use super::*;
use core::sync::atomic::{AtomicU32, Ordering}; use core::sync::atomic::{AtomicU32, Ordering};
use rkyv::{
with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
/// Reliable pings are done with increased spacing between pings /// Reliable pings are done with increased spacing between pings

View File

@ -16,9 +16,6 @@ pub use route_spec_store_content::*;
pub use route_stats::*; pub use route_stats::*;
use crate::veilid_api::*; use crate::veilid_api::*;
use rkyv::{
with::Skip, Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
};
/// The size of the remote private route cache /// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;

View File

@ -19,7 +19,3 @@ pub use routing_domain::*;
pub use signed_direct_node_info::*; pub use signed_direct_node_info::*;
pub use signed_node_info::*; pub use signed_node_info::*;
pub use signed_relayed_node_info::*; pub use signed_relayed_node_info::*;
use enumset::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;

View File

@ -425,6 +425,9 @@ impl RPCProcessor {
Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error)) Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error))
} }
get rid of multi key, finish resolve node with find_node_rpc, then do putvalue/getvalue, probably in storagemanager.
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference /// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
/// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form /// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
pub fn resolve_node( pub fn resolve_node(

View File

@ -200,7 +200,12 @@ impl StorageManager {
} }
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey { fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where
D: RkyvArchive + RkyvSerialize<RkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
{
let compiled = record.descriptor().schema_data(); let compiled = record.descriptor().schema_data();
let mut hash_data = Vec::<u8>::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len()); let mut hash_data = Vec::<u8>::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len());
hash_data.extend_from_slice(&vcrypto.kind().0); hash_data.extend_from_slice(&vcrypto.kind().0);
@ -213,7 +218,7 @@ impl StorageManager {
async fn lock(&self) -> Result<AsyncMutexGuardArc<StorageManagerInner>, VeilidAPIError> { async fn lock(&self) -> Result<AsyncMutexGuardArc<StorageManagerInner>, VeilidAPIError> {
let inner = asyncmutex_lock_arc!(&self.inner); let inner = asyncmutex_lock_arc!(&self.inner);
if !inner.initialized { if !inner.initialized {
apibail_generic!("not initialized"); apibail_not_initialized!();
} }
Ok(inner) Ok(inner)
} }
@ -260,6 +265,18 @@ impl StorageManager {
.await .await
} }
async fn do_get_value(
&self,
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
key: TypedKey,
subkey: ValueSubkey,
) -> Result<Option<GetValueAnswer>, VeilidAPIError> {
let Some(rpc_processor) = inner.rpc_processor.clone() else {
apibail_not_initialized!();
};
//
}
async fn open_record_inner( async fn open_record_inner(
&self, &self,
mut inner: AsyncMutexGuardArc<StorageManagerInner>, mut inner: AsyncMutexGuardArc<StorageManagerInner>,
@ -267,17 +284,34 @@ impl StorageManager {
writer: Option<KeyPair>, writer: Option<KeyPair>,
safety_selection: SafetySelection, safety_selection: SafetySelection,
) -> Result<DHTRecordDescriptor, VeilidAPIError> { ) -> Result<DHTRecordDescriptor, VeilidAPIError> {
// Ensure the record is closed
if inner.opened_records.contains_key(&key) {
return Err(VeilidAPIError::generic(
"record is already open and should be closed first",
));
}
// Get cryptosystem // Get cryptosystem
let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else { let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else {
apibail_generic!("unsupported cryptosystem"); apibail_generic!("unsupported cryptosystem");
}; };
// See if we have a local record already or not // See if we have a local record already or not
let cb = |r: &Record<LocalRecordDetail>| { let cb = |r: &mut Record<LocalRecordDetail>| {
// Process local record // Process local record
// Keep the safety selection we opened the record with
r.detail_mut().safety_selection = safety_selection;
// Return record details
(r.owner().clone(), r.schema()) (r.owner().clone(), r.schema())
}; };
if let Some((owner, schema)) = inner.local_record_store.unwrap().with_record(key, cb) { if let Some((owner, schema)) = inner
.local_record_store
.as_mut()
.unwrap()
.with_record_mut(key, cb)
{
// Had local record // Had local record
// If the writer we chose is also the owner, we have the owner secret // If the writer we chose is also the owner, we have the owner secret
@ -293,27 +327,23 @@ impl StorageManager {
}; };
// Write open record // Write open record
inner.opened_records.insert(key, OpenedRecord { writer }); inner.opened_records.insert(key, OpenedRecord::new(writer));
// Make DHT Record Descriptor to return // Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor { let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema);
key,
owner,
owner_secret,
schema,
};
Ok(descriptor) Ok(descriptor)
} else { } else {
// No record yet // No record yet, try to get it from the network
self.do_get_value(inner, key, 0).await
// Make DHT Record Descriptor to return // Make DHT Record Descriptor to return
let descriptor = DHTRecordDescriptor { // let descriptor = DHTRecordDescriptor {
key, // key,
owner, // owner,
owner_secret, // owner_secret,
schema, // schema,
}; // };
Ok(descriptor) // Ok(descriptor)
} }
} }
@ -352,7 +382,8 @@ impl StorageManager {
self.close_record_inner(inner, key).await?; self.close_record_inner(inner, key).await?;
} }
// Remove // Remove the record from the local store
//inner.local_record_store.unwrap().de
unimplemented!(); unimplemented!();
} }

View File

@ -7,7 +7,12 @@
use super::*; use super::*;
use hashlink::LruCache; use hashlink::LruCache;
pub struct RecordStore<D> { pub struct RecordStore<D>
where
D: RkyvArchive + RkyvSerialize<RkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
{
table_store: TableStore, table_store: TableStore,
name: String, name: String,
limits: RecordStoreLimits, limits: RecordStoreLimits,
@ -25,7 +30,12 @@ pub struct RecordStore<D> {
purge_dead_records_mutex: Arc<AsyncMutex<()>>, purge_dead_records_mutex: Arc<AsyncMutex<()>>,
} }
impl<D> RecordStore<D> { impl<D> RecordStore<D>
where
D: RkyvArchive + RkyvSerialize<RkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
{
pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self {
let subkey_cache_size = limits.subkey_cache_size as usize; let subkey_cache_size = limits.subkey_cache_size as usize;
Self { Self {
@ -56,10 +66,10 @@ impl<D> RecordStore<D> {
// Pull record index from table into a vector to ensure we sort them // Pull record index from table into a vector to ensure we sort them
let record_table_keys = record_table.get_keys(0)?; let record_table_keys = record_table.get_keys(0)?;
let mut record_index_saved: Vec<(RecordTableKey, Record)> = let mut record_index_saved: Vec<(RecordTableKey, Record<D>)> =
Vec::with_capacity(record_table_keys.len()); Vec::with_capacity(record_table_keys.len());
for rtk in record_table_keys { for rtk in record_table_keys {
if let Some(vr) = record_table.load_rkyv::<Record>(0, &rtk)? { if let Some(vr) = record_table.load_rkyv::<Record<D>>(0, &rtk)? {
let rik = RecordTableKey::try_from(rtk.as_ref())?; let rik = RecordTableKey::try_from(rtk.as_ref())?;
record_index_saved.push((rik, vr)); record_index_saved.push((rik, vr));
} }
@ -288,6 +298,27 @@ impl<D> RecordStore<D> {
out out
} }
pub fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&mut Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey { key };
if let Some(record) = self.record_index.get_mut(&rtk) {
// Callback
out = Some(f(record));
// Touch
record.touch(get_aligned_timestamp());
}
if out.is_some() {
self.mark_record_changed(rtk);
}
out
}
pub async fn get_subkey<R, F>( pub async fn get_subkey<R, F>(
&mut self, &mut self,
key: TypedKey, key: TypedKey,

View File

@ -1,8 +1,5 @@
use super::*; use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
/// Information required to handle locally opened records /// Information required to handle locally opened records
#[derive( #[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
@ -11,5 +8,5 @@ use serde::*;
pub struct LocalRecordDetail { pub struct LocalRecordDetail {
/// The last 'safety selection' used when creating/opening this record. /// The last 'safety selection' used when creating/opening this record.
/// Even when closed, this safety selection applies to republication attempts by the system. /// Even when closed, this safety selection applies to republication attempts by the system.
safety_selection: SafetySelection, pub safety_selection: SafetySelection,
} }

View File

@ -1,12 +1,15 @@
use super::*; use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
#[derive( #[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)] )]
#[archive_attr(repr(C), derive(CheckBytes))] #[archive_attr(repr(C), derive(CheckBytes))]
pub struct Record<D> { pub struct Record<D>
where
D: RkyvArchive + RkyvSerialize<RkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
{
descriptor: SignedValueDescriptor, descriptor: SignedValueDescriptor,
subkey_count: usize, subkey_count: usize,
last_touched_ts: Timestamp, last_touched_ts: Timestamp,
@ -14,7 +17,12 @@ pub struct Record<D> {
detail: D, detail: D,
} }
impl<D> Record<D> { impl<D> Record<D>
where
D: RkyvArchive + RkyvSerialize<RkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, SharedDeserializeMap>,
{
pub fn new( pub fn new(
cur_ts: Timestamp, cur_ts: Timestamp,
descriptor: SignedValueDescriptor, descriptor: SignedValueDescriptor,

View File

@ -1,6 +1,4 @@
use super::*; use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
#[derive( #[derive(
Clone, Clone,

View File

@ -1,8 +1,5 @@
use super::*; use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
#[derive( #[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)] )]

View File

@ -1,6 +1,4 @@
use super::*; use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
///////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////
/// ///

View File

@ -1,6 +1,4 @@
use super::*; use super::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
///////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////
/// ///

View File

@ -1,5 +1,13 @@
use super::*; use super::*;
#[allow(unused_macros)]
#[macro_export]
macro_rules! apibail_not_initialized {
() => {
return Err(VeilidAPIError::not_initialized())
};
}
#[allow(unused_macros)] #[allow(unused_macros)]
#[macro_export] #[macro_export]
macro_rules! apibail_timeout { macro_rules! apibail_timeout {

View File

@ -220,7 +220,7 @@ impl RoutingContext {
) -> Result<DHTRecordDescriptor, VeilidAPIError> { ) -> Result<DHTRecordDescriptor, VeilidAPIError> {
let storage_manager = self.api.storage_manager()?; let storage_manager = self.api.storage_manager()?;
storage_manager storage_manager
.open_record(key, secret, self.unlocked_inner.safety_selection) .open_record(key, writer, self.unlocked_inner.safety_selection)
.await .await
} }

View File

@ -1,6 +1,4 @@
use crate::*; use crate::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*;
//////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////
pub type ConfigCallbackReturn = Result<Box<dyn core::any::Any + Send>, VeilidAPIError>; pub type ConfigCallbackReturn = Result<Box<dyn core::any::Any + Send>, VeilidAPIError>;