fix tests

This commit is contained in:
John Smith
2023-04-23 14:10:17 -04:00
parent cb4abaefd7
commit 75c16b8c51
16 changed files with 160 additions and 115 deletions

View File

@@ -34,7 +34,7 @@ use network_manager::*;
use receipt_manager::*;
use routing_table::*;
use stop_token::future::FutureExt;
use storage_manager::StorageManager;
use storage_manager::*;
/////////////////////////////////////////////////////////////////////
@@ -227,7 +227,6 @@ pub struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
opt_storage_manager: Option<StorageManager>,
}
pub struct RPCProcessorUnlockedInner {
@@ -246,6 +245,7 @@ pub struct RPCProcessor {
crypto: Crypto,
config: VeilidConfig,
network_manager: NetworkManager,
storage_manager: StorageManager,
routing_table: RoutingTable,
inner: Arc<Mutex<RPCProcessorInner>>,
unlocked_inner: Arc<RPCProcessorUnlockedInner>,
@@ -257,7 +257,6 @@ impl RPCProcessor {
send_channel: None,
stop_source: None,
worker_join_handles: Vec::new(),
opt_storage_manager: None,
}
}
fn new_unlocked_inner(
@@ -298,6 +297,7 @@ impl RPCProcessor {
config: config.clone(),
network_manager: network_manager.clone(),
routing_table: network_manager.routing_table(),
storage_manager: network_manager.storage_manager(),
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)),
}
@@ -311,14 +311,8 @@ impl RPCProcessor {
self.routing_table.clone()
}
pub fn set_storage_manager(&self, opt_storage_manager: Option<StorageManager>) {
let mut inner = self.inner.lock();
inner.opt_storage_manager = opt_storage_manager
}
pub fn storage_manager(&self) -> Option<StorageManager> {
let inner = self.inner.lock();
inner.opt_storage_manager.clone()
pub fn storage_manager(&self) -> StorageManager {
self.storage_manager.clone()
}
//////////////////////////////////////////////////////////////////////
@@ -326,28 +320,35 @@ impl RPCProcessor {
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<()> {
debug!("startup rpc processor");
let mut inner = self.inner.lock();
{
let mut inner = self.inner.lock();
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);
inner.send_channel = Some(channel.0.clone());
inner.stop_source = Some(StopSource::new());
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);
inner.send_channel = Some(channel.0.clone());
inner.stop_source = Some(StopSource::new());
// spin up N workers
trace!(
"Spinning up {} RPC workers",
self.unlocked_inner.concurrency
);
for _ in 0..self.unlocked_inner.concurrency {
let this = self.clone();
let receiver = channel.1.clone();
let jh = spawn(Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
));
inner.worker_join_handles.push(jh);
// spin up N workers
trace!(
"Spinning up {} RPC workers",
self.unlocked_inner.concurrency
);
for _ in 0..self.unlocked_inner.concurrency {
let this = self.clone();
let receiver = channel.1.clone();
let jh = spawn(Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
));
inner.worker_join_handles.push(jh);
}
}
// Inform storage manager we are up
self.storage_manager
.set_rpc_processor(Some(self.clone()))
.await;
Ok(())
}
@@ -355,6 +356,9 @@ impl RPCProcessor {
pub async fn shutdown(&self) {
debug!("starting rpc processor shutdown");
// Stop storage manager from using us
self.storage_manager.set_rpc_processor(None).await;
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
{