From b2503ae7894cae04355fbd2ae39d441f37c79270 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 6 Aug 2023 13:27:57 -0400 Subject: [PATCH] offline work --- veilid-core/src/storage_manager/mod.rs | 23 ++++++++++++++----- veilid-core/src/storage_manager/tasks/mod.rs | 2 +- .../tasks/offline_subkey_writes.rs | 20 +++++++++------- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 7d087436..c1b30978 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -132,9 +132,8 @@ impl StorageManager { Ok(inner) } - async fn network_is_ready(&self) -> EyreResult { + fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option { if let Some(rpc_processor) = { - let inner = self.lock().await?; inner.rpc_processor.clone() } { if let Some(network_class) = rpc_processor @@ -142,17 +141,26 @@ impl StorageManager { .get_network_class(RoutingDomain::PublicInternet) { // If our PublicInternet network class is valid we're ready to talk - Ok(network_class != NetworkClass::Invalid) + if network_class != NetworkClass::Invalid { + Some(rpc_processor) + } else { + None + } } else { // If we haven't gotten a network class yet we shouldnt try to use the DHT - Ok(false) + None } } else { // If we aren't attached, we won't have an rpc processor - Ok(false) + None } } + async fn online_writes_ready(&self) -> EyreResult> { + let inner = self.lock().await?; + return Ok(Self::online_writes_ready_inner(&*inner)); + } + async fn has_offline_subkey_writes(&self) -> EyreResult { let inner = self.lock().await?; Ok(inner.offline_subkey_writes.len() != 0) @@ -415,12 +423,15 @@ impl StorageManager { )?; // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = inner.rpc_processor.clone() else { + let Some(rpc_processor) = Self::online_writes_ready_inner(&inner) else { + log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); + // Offline, just write it locally and return immediately inner .handle_set_local_value(key, subkey, signed_value_data.clone()) .await?; + log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush inner.offline_subkey_writes.entry(key) .and_modify(|x| { x.subkeys.insert(subkey); } ) diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index d9f49d7b..36171b80 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -54,7 +54,7 @@ impl StorageManager { self.unlocked_inner.flush_record_stores_task.tick().await?; // Run offline subkey writes task if there's work to be done - if self.network_is_ready().await? && self.has_offline_subkey_writes().await? { + if self.online_writes_ready().await?.is_some() && self.has_offline_subkey_writes().await? { self.unlocked_inner .offline_subkey_writes_task .tick() diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 81b1613a..93a4d55e 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -10,35 +10,39 @@ impl StorageManager { _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { - let (rpc_processor, offline_subkey_writes) = { + let offline_subkey_writes = { let inner = self.lock().await?; - - let Some(rpc_processor) = inner.rpc_processor.clone() else { - return Ok(()); - }; - - (rpc_processor, inner.offline_subkey_writes.clone()) + inner.offline_subkey_writes.clone() }; // make a safety selection that is conservative for (key, osw) in offline_subkey_writes { if poll!(stop_token.clone()).is_ready() { + log_stor!(debug "Offline subkey writes cancelled."); break; } + let Some(rpc_processor) = self.online_writes_ready().await? else { + log_stor!(debug "Offline subkey writes stopped for network."); + break; + }; for subkey in osw.subkeys.iter() { let subkey_result = { let mut inner = self.lock().await?; inner.handle_get_local_value(key, subkey, true).await }; let Ok(subkey_result) = subkey_result else { - continue; + log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); + continue; }; let Some(value) = subkey_result.value else { + log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey); continue; }; let Some(descriptor) = subkey_result.descriptor else { + log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey); continue; }; + log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len()); if let Err(e) = self .outbound_set_value( rpc_processor.clone(),