more setvalue fixes and concurrency cleanup

This commit is contained in:
Christien Rioux 2023-09-09 18:35:25 -04:00
parent 853976789f
commit 07f92b6e3f
12 changed files with 168 additions and 90 deletions

View File

@ -54,6 +54,9 @@ const ROUTING_TABLE: &str = "routing_table";
const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map"; const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map";
const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key"; const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key";
// Critical sections
const LOCK_TAG_TICK: &str = "TICK";
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@ -129,9 +129,11 @@ impl RoutingDomainEditor {
} }
// Briefly pause routing table ticker while changes are made // Briefly pause routing table ticker while changes are made
if pause_tasks { let _tick_guard = if pause_tasks {
self.routing_table.pause_tasks(true).await; Some(self.routing_table.pause_tasks().await)
} } else {
None
};
// Apply changes // Apply changes
let mut changed = false; let mut changed = false;
@ -262,8 +264,5 @@ impl RoutingDomainEditor {
rss.reset(); rss.reset();
} }
} }
// Unpause routing table ticker
self.routing_table.pause_tasks(false).await;
} }
} }

View File

@ -2,6 +2,7 @@ use super::*;
use weak_table::PtrWeakHashSet; use weak_table::PtrWeakHashSet;
const RECENT_PEERS_TABLE_SIZE: usize = 64; const RECENT_PEERS_TABLE_SIZE: usize = 64;
pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -34,8 +35,9 @@ pub struct RoutingTableInner {
pub(super) recent_peers: LruCache<TypedKey, RecentPeersEntry>, pub(super) recent_peers: LruCache<TypedKey, RecentPeersEntry>,
/// Storage for private/safety RouteSpecs /// Storage for private/safety RouteSpecs
pub(super) route_spec_store: Option<RouteSpecStore>, pub(super) route_spec_store: Option<RouteSpecStore>,
/// Tick paused or not /// Async tagged critical sections table
pub(super) tick_paused: bool, /// Tag: "tick" -> in ticker
pub(super) critical_sections: AsyncTagLockTable<&'static str>,
} }
impl RoutingTableInner { impl RoutingTableInner {
@ -52,7 +54,7 @@ impl RoutingTableInner {
self_transfer_stats: TransferStatsDownUp::default(), self_transfer_stats: TransferStatsDownUp::default(),
recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE), recent_peers: LruCache::new(RECENT_PEERS_TABLE_SIZE),
route_spec_store: None, route_spec_store: None,
tick_paused: false, critical_sections: AsyncTagLockTable::new(),
} }
} }

View File

@ -126,9 +126,13 @@ impl RoutingTable {
/// to run tick tasks which may run at slower tick rates as configured /// to run tick tasks which may run at slower tick rates as configured
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
// Don't tick if paused // Don't tick if paused
if self.inner.read().tick_paused { let opt_tick_guard = {
let inner = self.inner.read();
inner.critical_sections.try_lock_tag(LOCK_TAG_TICK)
};
let Some(_tick_guard) = opt_tick_guard else {
return Ok(()); return Ok(());
} };
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self.unlocked_inner.rolling_transfers_task.tick().await?; self.unlocked_inner.rolling_transfers_task.tick().await?;
@ -183,22 +187,9 @@ impl RoutingTable {
Ok(()) Ok(())
} }
pub(crate) async fn pause_tasks(&self, paused: bool) { pub(crate) async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> {
let cancel = { let critical_sections = self.inner.read().critical_sections.clone();
let mut inner = self.inner.write(); critical_sections.lock_tag(LOCK_TAG_TICK).await
if !inner.tick_paused && paused {
inner.tick_paused = true;
true
} else if inner.tick_paused && !paused {
inner.tick_paused = false;
false
} else {
false
}
};
if cancel {
self.cancel_tasks().await;
}
} }
pub(crate) async fn cancel_tasks(&self) { pub(crate) async fn cancel_tasks(&self) {

View File

@ -78,7 +78,7 @@ impl RPCProcessor {
log_rpc!(debug "{}", debug_string); log_rpc!(debug "{}", debug_string);
let waitable_reply = network_result_try!( let waitable_reply = network_result_try!(
self.question(dest, question, Some(question_context)) self.question(dest.clone(), question, Some(question_context))
.await? .await?
); );
@ -99,7 +99,8 @@ impl RPCProcessor {
}; };
let (value, peers, descriptor) = get_value_a.destructure(); let (value, peers, descriptor) = get_value_a.destructure();
#[cfg(feature="debug-dht")]
{
let debug_string_value = value.as_ref().map(|v| { let debug_string_value = value.as_ref().map(|v| {
format!(" len={} seq={} writer={}", format!(" len={} seq={} writer={}",
v.value_data().data().len(), v.value_data().data().len(),
@ -109,7 +110,7 @@ impl RPCProcessor {
}).unwrap_or_default(); }).unwrap_or_default();
let debug_string_answer = format!( let debug_string_answer = format!(
"OUT <== GetValueA({} #{}{}{} peers={})", "OUT <== GetValueA({} #{}{}{} peers={}) <= {}",
key, key,
subkey, subkey,
debug_string_value, debug_string_value,
@ -119,10 +120,15 @@ impl RPCProcessor {
"" ""
}, },
peers.len(), peers.len(),
dest
); );
log_rpc!(debug "{}", debug_string_answer); log_rpc!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
log_rpc!(debug "Peers: {:#?}", peers);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to // Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
Ok(v) => v, Ok(v) => v,
@ -203,6 +209,8 @@ impl RPCProcessor {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]));
#[cfg(feature="debug-dht")]
{
let debug_string = format!( let debug_string = format!(
"IN <=== GetValueQ({} #{}{}) <== {}", "IN <=== GetValueQ({} #{}{}) <== {}",
key, key,
@ -216,6 +224,7 @@ impl RPCProcessor {
); );
log_rpc!(debug "{}", debug_string); log_rpc!(debug "{}", debug_string);
}
// See if we have this record ourselves // See if we have this record ourselves
let storage_manager = self.storage_manager(); let storage_manager = self.storage_manager();

View File

@ -92,7 +92,7 @@ impl RPCProcessor {
log_rpc!(debug "{}", debug_string); log_rpc!(debug "{}", debug_string);
let waitable_reply = network_result_try!( let waitable_reply = network_result_try!(
self.question(dest, question, Some(question_context)) self.question(dest.clone(), question, Some(question_context))
.await? .await?
); );
@ -115,6 +115,8 @@ impl RPCProcessor {
let (set, value, peers) = set_value_a.destructure(); let (set, value, peers) = set_value_a.destructure();
#[cfg(feature="debug-dht")]
{
let debug_string_value = value.as_ref().map(|v| { let debug_string_value = value.as_ref().map(|v| {
format!(" len={} writer={}", format!(" len={} writer={}",
v.value_data().data().len(), v.value_data().data().len(),
@ -122,8 +124,9 @@ impl RPCProcessor {
) )
}).unwrap_or_default(); }).unwrap_or_default();
let debug_string_answer = format!( let debug_string_answer = format!(
"OUT <== SetValueA({} #{}{}{} peers={})", "OUT <== SetValueA({} #{}{}{} peers={}) <= {}",
key, key,
subkey, subkey,
if set { if set {
@ -133,9 +136,13 @@ impl RPCProcessor {
}, },
debug_string_value, debug_string_value,
peers.len(), peers.len(),
dest,
); );
log_rpc!(debug "{}", debug_string_answer); log_rpc!(debug "{}", debug_string_answer);
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
log_rpc!(debug "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to // Validate peers returned are, in fact, closer to the key than the node we sent this to
let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) {
@ -235,7 +242,7 @@ impl RPCProcessor {
// If there are less than 'set_value_count' peers that are closer, then store here too // If there are less than 'set_value_count' peers that are closer, then store here too
let set_value_count = { let set_value_count = {
let c = self.config.get(); let c = self.config.get();
c.network.dht.set_value_fanout as usize c.network.dht.set_value_count as usize
}; };
let (set, new_value) = if closer_to_key_peers.len() >= set_value_count { let (set, new_value) = if closer_to_key_peers.len() >= set_value_count {
// Not close enough // Not close enough

View File

@ -120,16 +120,16 @@ Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
defaultRouteHopCount: 1, defaultRouteHopCount: 1,
), ),
dht: VeilidConfigDHT( dht: VeilidConfigDHT(
maxFindNodeCount: 20,
resolveNodeTimeoutMs: 10000, resolveNodeTimeoutMs: 10000,
resolveNodeCount: 1, resolveNodeCount: 1,
resolveNodeFanout: 4, resolveNodeFanout: 4,
maxFindNodeCount: 20,
getValueTimeoutMs: 10000, getValueTimeoutMs: 10000,
getValueCount: 3, getValueCount: 3,
getValueFanout: 4, getValueFanout: 4,
setValueTimeoutMs: 10000, setValueTimeoutMs: 10000,
setValueCount: 4, setValueCount: 5,
setValueFanout: 6, setValueFanout: 4,
minPeerCount: 20, minPeerCount: 20,
minPeerRefreshTimeMs: 60000, minPeerRefreshTimeMs: 60000,
validateDialInfoReceiptTimeMs: 2000, validateDialInfoReceiptTimeMs: 2000,

View File

@ -3,7 +3,7 @@ SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $SCRIPTDIR 2>/dev/null pushd $SCRIPTDIR 2>/dev/null
if [[ "$1" == "wasm" ]]; then if [[ "$1" == "wasm" ]]; then
WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --features=rt-wasm-bindgen WASM_BINDGEN_TEST_TIMEOUT=120 wasm-pack test --firefox --headless --no-default-features --features=rt-wasm-bindgen
elif [[ "$1" == "ios" ]]; then elif [[ "$1" == "ios" ]]; then
SYMROOT=/tmp/testout SYMROOT=/tmp/testout
APPNAME=veilidtools-tests APPNAME=veilidtools-tests

View File

@ -135,4 +135,35 @@ where
// Return the locked guard // Return the locked guard
AsyncTagLockGuard::new(self.clone(), tag, guard) AsyncTagLockGuard::new(self.clone(), tag, guard)
} }
pub fn try_lock_tag(&self, tag: T) -> Option<AsyncTagLockGuard<T>> {
// Get or create a tag lock entry
let mutex = {
let mut inner = self.inner.lock();
// See if this tag is in the table
// and if not, add a new mutex for this tag
let entry = inner
.table
.entry(tag.clone())
.or_insert_with(|| AsyncTagLockTableEntry {
mutex: Arc::new(AsyncMutex::new(())),
waiters: 0,
});
// Increment the number of waiters
entry.waiters += 1;
// Return the mutex associated with the tag
entry.mutex.clone()
// Drop the table guard
};
// Lock the tag lock
let opt_guard = asyncmutex_try_lock_arc!(mutex);
// Return the locked guard
opt_guard.map(|guard| AsyncTagLockGuard::new(self.clone(), tag, guard))
}
} }

View File

@ -55,6 +55,29 @@ pub async fn test_simple_single_contention() {
assert_eq!(table.len(), 1); assert_eq!(table.len(), 1);
} }
pub async fn test_simple_try() {
info!("test_simple_try");
let table = AsyncTagLockTable::new();
let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234);
let a2 = SocketAddr::new("1.2.3.5".parse().unwrap(), 1235);
{
let _g1 = table.lock_tag(a1).await;
let opt_g2 = table.try_lock_tag(a1);
let opt_g3 = table.try_lock_tag(a2);
assert!(opt_g2.is_none());
assert!(opt_g3.is_some());
}
let opt_g4 = table.try_lock_tag(a1);
assert!(opt_g4.is_some());
assert_eq!(table.len(), 1);
}
pub async fn test_simple_double_contention() { pub async fn test_simple_double_contention() {
info!("test_simple_double_contention"); info!("test_simple_double_contention");
@ -153,6 +176,7 @@ pub async fn test_parallel_single_contention() {
pub async fn test_all() { pub async fn test_all() {
test_simple_no_contention().await; test_simple_no_contention().await;
test_simple_try().await;
test_simple_single_contention().await; test_simple_single_contention().await;
test_parallel_single_contention().await; test_parallel_single_contention().await;
} }

View File

@ -47,6 +47,13 @@ cfg_if::cfg_if! {
$x.clone().lock_owned().await $x.clone().lock_owned().await
}; };
} }
#[macro_export]
macro_rules! asyncmutex_try_lock_arc {
($x:expr) => {
$x.try_lock_owned().ok()
};
}
} else { } else {
#[macro_export] #[macro_export]
macro_rules! asyncmutex_try_lock { macro_rules! asyncmutex_try_lock {
@ -60,6 +67,12 @@ cfg_if::cfg_if! {
$x.lock_arc().await $x.lock_arc().await
}; };
} }
#[macro_export]
macro_rules! asyncmutex_try_lock_arc {
($x:expr) => {
$x.try_lock_arc()
};
}
} }
} }

View File

@ -4,7 +4,6 @@
use cfg_if::*; use cfg_if::*;
use parking_lot::Once; use parking_lot::Once;
use veilid_tools::tests::*; use veilid_tools::tests::*;
use veilid_tools::*;
use wasm_bindgen_test::*; use wasm_bindgen_test::*;