offline work

This commit is contained in:
Christien Rioux 2023-08-06 13:27:57 -04:00
parent 435469ce94
commit b2503ae789
3 changed files with 30 additions and 15 deletions

View File

@ -132,9 +132,8 @@ impl StorageManager {
Ok(inner) Ok(inner)
} }
async fn network_is_ready(&self) -> EyreResult<bool> { fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option<RPCProcessor> {
if let Some(rpc_processor) = { if let Some(rpc_processor) = {
let inner = self.lock().await?;
inner.rpc_processor.clone() inner.rpc_processor.clone()
} { } {
if let Some(network_class) = rpc_processor if let Some(network_class) = rpc_processor
@ -142,17 +141,26 @@ impl StorageManager {
.get_network_class(RoutingDomain::PublicInternet) .get_network_class(RoutingDomain::PublicInternet)
{ {
// If our PublicInternet network class is valid we're ready to talk // If our PublicInternet network class is valid we're ready to talk
Ok(network_class != NetworkClass::Invalid) if network_class != NetworkClass::Invalid {
Some(rpc_processor)
} else {
None
}
} else { } else {
// If we haven't gotten a network class yet we shouldnt try to use the DHT // If we haven't gotten a network class yet we shouldnt try to use the DHT
Ok(false) None
} }
} else { } else {
// If we aren't attached, we won't have an rpc processor // If we aren't attached, we won't have an rpc processor
Ok(false) None
} }
} }
async fn online_writes_ready(&self) -> EyreResult<Option<RPCProcessor>> {
let inner = self.lock().await?;
return Ok(Self::online_writes_ready_inner(&*inner));
}
async fn has_offline_subkey_writes(&self) -> EyreResult<bool> { async fn has_offline_subkey_writes(&self) -> EyreResult<bool> {
let inner = self.lock().await?; let inner = self.lock().await?;
Ok(inner.offline_subkey_writes.len() != 0) Ok(inner.offline_subkey_writes.len() != 0)
@ -415,12 +423,15 @@ impl StorageManager {
)?; )?;
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else { let Some(rpc_processor) = Self::online_writes_ready_inner(&inner) else {
log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Offline, just write it locally and return immediately // Offline, just write it locally and return immediately
inner inner
.handle_set_local_value(key, subkey, signed_value_data.clone()) .handle_set_local_value(key, subkey, signed_value_data.clone())
.await?; .await?;
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush // Add to offline writes to flush
inner.offline_subkey_writes.entry(key) inner.offline_subkey_writes.entry(key)
.and_modify(|x| { x.subkeys.insert(subkey); } ) .and_modify(|x| { x.subkeys.insert(subkey); } )

View File

@ -54,7 +54,7 @@ impl StorageManager {
self.unlocked_inner.flush_record_stores_task.tick().await?; self.unlocked_inner.flush_record_stores_task.tick().await?;
// Run offline subkey writes task if there's work to be done // Run offline subkey writes task if there's work to be done
if self.network_is_ready().await? && self.has_offline_subkey_writes().await? { if self.online_writes_ready().await?.is_some() && self.has_offline_subkey_writes().await? {
self.unlocked_inner self.unlocked_inner
.offline_subkey_writes_task .offline_subkey_writes_task
.tick() .tick()

View File

@ -10,35 +10,39 @@ impl StorageManager {
_last_ts: Timestamp, _last_ts: Timestamp,
_cur_ts: Timestamp, _cur_ts: Timestamp,
) -> EyreResult<()> { ) -> EyreResult<()> {
let (rpc_processor, offline_subkey_writes) = { let offline_subkey_writes = {
let inner = self.lock().await?; let inner = self.lock().await?;
inner.offline_subkey_writes.clone()
let Some(rpc_processor) = inner.rpc_processor.clone() else {
return Ok(());
};
(rpc_processor, inner.offline_subkey_writes.clone())
}; };
// make a safety selection that is conservative // make a safety selection that is conservative
for (key, osw) in offline_subkey_writes { for (key, osw) in offline_subkey_writes {
if poll!(stop_token.clone()).is_ready() { if poll!(stop_token.clone()).is_ready() {
log_stor!(debug "Offline subkey writes cancelled.");
break; break;
} }
let Some(rpc_processor) = self.online_writes_ready().await? else {
log_stor!(debug "Offline subkey writes stopped for network.");
break;
};
for subkey in osw.subkeys.iter() { for subkey in osw.subkeys.iter() {
let subkey_result = { let subkey_result = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
inner.handle_get_local_value(key, subkey, true).await inner.handle_get_local_value(key, subkey, true).await
}; };
let Ok(subkey_result) = subkey_result else { let Ok(subkey_result) = subkey_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
continue; continue;
}; };
let Some(value) = subkey_result.value else { let Some(value) = subkey_result.value else {
log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey);
continue; continue;
}; };
let Some(descriptor) = subkey_result.descriptor else { let Some(descriptor) = subkey_result.descriptor else {
log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey);
continue; continue;
}; };
log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len());
if let Err(e) = self if let Err(e) = self
.outbound_set_value( .outbound_set_value(
rpc_processor.clone(), rpc_processor.clone(),