offline subkey writes

This commit is contained in:
Christien Rioux 2023-08-06 12:31:20 -04:00
parent 684f93e6a0
commit 435469ce94
4 changed files with 140 additions and 3 deletions

View File

@ -18,6 +18,7 @@ use storage_manager_inner::*;
pub use types::*; pub use types::*;
use super::*; use super::*;
use network_manager::*;
use routing_table::*; use routing_table::*;
use rpc_processor::*; use rpc_processor::*;
@ -27,6 +28,8 @@ const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN;
const MAX_RECORD_DATA_SIZE: usize = 1_048_576; const MAX_RECORD_DATA_SIZE: usize = 1_048_576;
/// Frequency to flush record stores to disk /// Frequency to flush record stores to disk
const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
/// Frequency to check for offline subkeys writes to send to the network
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1;
struct StorageManagerUnlockedInner { struct StorageManagerUnlockedInner {
config: VeilidConfig, config: VeilidConfig,
@ -37,6 +40,7 @@ struct StorageManagerUnlockedInner {
// Background processes // Background processes
flush_record_stores_task: TickTask<EyreReport>, flush_record_stores_task: TickTask<EyreReport>,
offline_subkey_writes_task: TickTask<EyreReport>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -59,6 +63,7 @@ impl StorageManager {
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store, block_store,
flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS),
offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS),
} }
} }
fn new_inner(unlocked_inner: Arc<StorageManagerUnlockedInner>) -> StorageManagerInner { fn new_inner(unlocked_inner: Arc<StorageManagerUnlockedInner>) -> StorageManagerInner {
@ -127,6 +132,32 @@ impl StorageManager {
Ok(inner) Ok(inner)
} }
async fn network_is_ready(&self) -> EyreResult<bool> {
if let Some(rpc_processor) = {
let inner = self.lock().await?;
inner.rpc_processor.clone()
} {
if let Some(network_class) = rpc_processor
.routing_table()
.get_network_class(RoutingDomain::PublicInternet)
{
// If our PublicInternet network class is valid we're ready to talk
Ok(network_class != NetworkClass::Invalid)
} else {
// If we haven't gotten a network class yet we shouldnt try to use the DHT
Ok(false)
}
} else {
// If we aren't attached, we won't have an rpc processor
Ok(false)
}
}
async fn has_offline_subkey_writes(&self) -> EyreResult<bool> {
let inner = self.lock().await?;
Ok(inner.offline_subkey_writes.len() != 0)
}
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
pub async fn create_record( pub async fn create_record(
&self, &self,
@ -391,7 +422,12 @@ impl StorageManager {
.await?; .await?;
// Add to offline writes to flush // Add to offline writes to flush
inner.offline_subkey_writes.entry(key).and_modify(|x| { x.insert(subkey); } ).or_insert(ValueSubkeyRangeSet::single(subkey)); inner.offline_subkey_writes.entry(key)
.and_modify(|x| { x.subkeys.insert(subkey); } )
.or_insert(OfflineSubkeyWrite{
safety_selection,
subkeys: ValueSubkeyRangeSet::single(subkey)
});
return Ok(None) return Ok(None)
}; };

View File

@ -3,6 +3,12 @@ use super::*;
const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata";
const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes"; const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct OfflineSubkeyWrite {
pub safety_selection: SafetySelection,
pub subkeys: ValueSubkeyRangeSet,
}
/// Locked structure for storage manager /// Locked structure for storage manager
pub(super) struct StorageManagerInner { pub(super) struct StorageManagerInner {
unlocked_inner: Arc<StorageManagerUnlockedInner>, unlocked_inner: Arc<StorageManagerUnlockedInner>,
@ -15,7 +21,7 @@ pub(super) struct StorageManagerInner {
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>, pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// Record subkeys that have not been pushed to the network because they were written to offline /// Record subkeys that have not been pushed to the network because they were written to offline
pub offline_subkey_writes: HashMap<TypedKey, ValueSubkeyRangeSet>, pub offline_subkey_writes: HashMap<TypedKey, OfflineSubkeyWrite>,
/// Storage manager metadata that is persistent, including copy of offline subkey writes /// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>, pub metadata_db: Option<TableDB>,
/// RPC processor if it is available /// RPC processor if it is available

View File

@ -1,10 +1,11 @@
pub mod flush_record_stores; pub mod flush_record_stores;
pub mod offline_subkey_writes;
use super::*; use super::*;
impl StorageManager { impl StorageManager {
pub(crate) fn setup_tasks(&self) { pub(crate) fn setup_tasks(&self) {
// Set rolling transfers tick task // Set flush records tick task
debug!("starting flush record stores task"); debug!("starting flush record stores task");
{ {
let this = self.clone(); let this = self.clone();
@ -25,12 +26,40 @@ impl StorageManager {
) )
}); });
} }
// Set offline subkey writes tick task
debug!("starting offline subkey writes task");
{
let this = self.clone();
self.unlocked_inner
.offline_subkey_writes_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.offline_subkey_writes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager offline subkey writes task routine"
)),
)
});
}
} }
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
// Run the rolling transfers task // Run the rolling transfers task
self.unlocked_inner.flush_record_stores_task.tick().await?; self.unlocked_inner.flush_record_stores_task.tick().await?;
// Run offline subkey writes task if there's work to be done
if self.network_is_ready().await? && self.has_offline_subkey_writes().await? {
self.unlocked_inner
.offline_subkey_writes_task
.tick()
.await?;
}
Ok(()) Ok(())
} }
@ -39,5 +68,9 @@ impl StorageManager {
if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await { if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await {
warn!("flush_record_stores_task not stopped: {}", e); warn!("flush_record_stores_task not stopped: {}", e);
} }
debug!("stopping offline subkey writes task");
if let Err(e) = self.unlocked_inner.offline_subkey_writes_task.stop().await {
warn!("offline_subkey_writes_task not stopped: {}", e);
}
} }
} }

View File

@ -0,0 +1,62 @@
use super::*;
use futures_util::*;
impl StorageManager {
// Best-effort write subkeys to the network that were written offline
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn offline_subkey_writes_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let (rpc_processor, offline_subkey_writes) = {
let inner = self.lock().await?;
let Some(rpc_processor) = inner.rpc_processor.clone() else {
return Ok(());
};
(rpc_processor, inner.offline_subkey_writes.clone())
};
// make a safety selection that is conservative
for (key, osw) in offline_subkey_writes {
if poll!(stop_token.clone()).is_ready() {
break;
}
for subkey in osw.subkeys.iter() {
let subkey_result = {
let mut inner = self.lock().await?;
inner.handle_get_local_value(key, subkey, true).await
};
let Ok(subkey_result) = subkey_result else {
continue;
};
let Some(value) = subkey_result.value else {
continue;
};
let Some(descriptor) = subkey_result.descriptor else {
continue;
};
if let Err(e) = self
.outbound_set_value(
rpc_processor.clone(),
key,
subkey,
osw.safety_selection,
value,
descriptor,
)
.await
{
log_stor!(debug "failed to write offline subkey: {}", e);
}
}
let mut inner = self.lock().await?;
inner.offline_subkey_writes.remove(&key);
}
Ok(())
}
}