From 291e3ef2fee55b14379fc302784fe2bfb70afcdc Mon Sep 17 00:00:00 2001 From: John Smith Date: Mon, 26 Jun 2023 21:29:02 -0400 Subject: [PATCH] add better dht debugging --- Cargo.lock | 2 + scripts/tools/keytool.py | 99 +++++++++++++++ veilid-cli/src/command_processor.rs | 62 ++++++++++ veilid-core/Cargo.toml | 1 + veilid-core/src/crypto/byte_array_types.rs | 39 ++++-- veilid-core/src/crypto/vld0/mod.rs | 6 +- veilid-core/src/routing_table/find_peers.rs | 27 +++- .../tasks/private_route_management.rs | 4 +- veilid-core/src/rpc_processor/mod.rs | 22 ++++ .../src/rpc_processor/rpc_get_value.rs | 67 +++++++++- .../src/rpc_processor/rpc_set_value.rs | 74 ++++++++++- veilid-core/src/storage_manager/debug.rs | 20 +++ .../src/storage_manager/limited_size.rs | 111 +++++++++++++++++ veilid-core/src/storage_manager/mod.rs | 2 + .../src/storage_manager/record_store.rs | 115 ++++++++++++------ veilid-core/src/veilid_api/debug.rs | 19 ++- .../src/veilid_api/json_api/process.rs | 4 +- .../veilid_api/json_api/routing_context.rs | 2 +- veilid-core/src/veilid_api/routing_context.rs | 4 +- veilid-python/tests/conftest.py | 2 +- veilid-python/tests/test_dht.py | 5 + veilid-python/tests/test_routing_context.py | 85 +++++++++++-- veilid-python/veilid/api.py | 2 +- veilid-python/veilid/json_api.py | 16 +-- veilid-python/veilid/schema/Request.json | 8 +- veilid-tools/Cargo.toml | 1 + veilid-tools/src/timestamp.rs | 113 +++++++++++++++++ 27 files changed, 817 insertions(+), 95 deletions(-) create mode 100644 scripts/tools/keytool.py create mode 100644 veilid-core/src/storage_manager/limited_size.rs diff --git a/Cargo.lock b/Cargo.lock index 5aeab7b8..cb542298 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6467,6 +6467,7 @@ dependencies = [ "netlink-packet-route", "netlink-sys", "nix 0.26.2", + "num-traits", "once_cell", "owning_ref", "paranoid-android", @@ -6605,6 +6606,7 @@ dependencies = [ "async_executors", "backtrace", "cfg-if 1.0.0", + "chrono", "console_error_panic_hook", "eyre", "flume", diff --git a/scripts/tools/keytool.py b/scripts/tools/keytool.py new file mode 100644 index 00000000..90323314 --- /dev/null +++ b/scripts/tools/keytool.py @@ -0,0 +1,99 @@ +import base64 +import sys +import argparse + +def urlsafe_b64encode_no_pad(b: bytes) -> str: + """ + Removes any `=` used as padding from the encoded string. + """ + return base64.urlsafe_b64encode(b).decode().rstrip("=") + + +def urlsafe_b64decode_no_pad(s: str) -> bytes: + """ + Adds back in the required padding before decoding. + """ + padding = 4 - (len(s) % 4) + s = s + ("=" * padding) + return base64.urlsafe_b64decode(s) + + +def do_value(args): + + key = urlsafe_b64decode_no_pad(args.key) + + print("key:", key.hex()) + + +def dist(key1: bytes, key2: bytes) -> bytes: + distance = bytearray(len(key1)) + for n in range(len(key1)): + distance[n] = key1[n] ^ key2[n] + + return bytes(distance) + + +def do_distance(args): + + key1 = urlsafe_b64decode_no_pad(args.key1) + key2 = urlsafe_b64decode_no_pad(args.key2) + + print("key1:", key1.hex()) + print("key2:", key2.hex()) + + distance = dist(key1, key2) + print("dist:", distance.hex()) + +def keycmp(key1: bytes, key2: bytes) -> int: + for n in range(len(key1)): + if key1[n] < key2[n]: + return -1 + if key1[n] > key2[n]: + return 1 + return 0 + +def do_closer(args): + + key = urlsafe_b64decode_no_pad(args.key) + near = urlsafe_b64decode_no_pad(args.near) + far = urlsafe_b64decode_no_pad(args.far) + + print(" key:", key.hex()) + print("near:", near.hex()) + print(" far:", far.hex()) + + distance_near = dist(key, near) + distance_far = dist(key, far) + + print(" dn:", distance_near.hex()) + print(" df:", distance_far.hex()) + + c = keycmp(distance_near, distance_far) + print(" cmp:", c) + +def main(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(required=True) + + parser_value = subparsers.add_parser('value') + parser_value.add_argument('key', type=str) + parser_value.set_defaults(func=do_value) + + parser_value = subparsers.add_parser('distance') + parser_value.add_argument('key1', type=str) + parser_value.add_argument('key2', type=str) + parser_value.set_defaults(func=do_distance) + + parser_value = subparsers.add_parser('closer') + parser_value.add_argument('key', type=str) + parser_value.add_argument('near', type=str) + parser_value.add_argument('far', type=str) + parser_value.set_defaults(func=do_closer) + + args = parser.parse_args() + args.func(args) + + sys.exit(0) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 6225d5d2..72311f8c 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -46,6 +46,7 @@ struct CommandProcessorInner { server_addr: Option, connection_waker: Eventual, last_call_id: Option, + enable_app_messages: bool, } #[derive(Clone)] @@ -66,6 +67,7 @@ impl CommandProcessor { server_addr: None, connection_waker: Eventual::new(), last_call_id: None, + enable_app_messages: false, })), } } @@ -122,6 +124,10 @@ reply reply to an AppCall not handled directly by must be exact call id reported in VeilidUpdate can be a string (left trimmed) or it can start with a '#' followed by a string of undelimited hex bytes +enable [flag] set a flag +disable [flag] unset a flag + valid flags in include: + app_messages "# .to_owned(), ); @@ -305,6 +311,52 @@ reply reply to an AppCall not handled directly by Ok(()) } + pub fn cmd_enable(&self, rest: Option, callback: UICallback) -> Result<(), String> { + trace!("CommandProcessor::cmd_enable"); + + let ui = self.ui_sender(); + let this = self.clone(); + spawn_detached_local(async move { + let flag = rest.clone().unwrap_or_default(); + match flag.as_str() { + "app_messages" => { + this.inner.lock().enable_app_messages = true; + ui.add_node_event(Level::Info, format!("flag enabled: {}", flag)); + ui.send_callback(callback); + } + _ => { + ui.add_node_event(Level::Error, format!("unknown flag: {}", flag)); + ui.send_callback(callback); + return; + } + } + }); + Ok(()) + } + + pub fn cmd_disable(&self, rest: Option, callback: UICallback) -> Result<(), String> { + trace!("CommandProcessor::cmd_disable"); + + let ui = self.ui_sender(); + let this = self.clone(); + spawn_detached_local(async move { + let flag = rest.clone().unwrap_or_default(); + match flag.as_str() { + "app_messages" => { + this.inner.lock().enable_app_messages = false; + ui.add_node_event(Level::Info, format!("flag disabled: {}", flag)); + ui.send_callback(callback); + } + _ => { + ui.add_node_event(Level::Error, format!("unknown flag: {}", flag)); + ui.send_callback(callback); + return; + } + } + }); + Ok(()) + } + pub fn run_command(&self, command_line: &str, callback: UICallback) -> Result<(), String> { // let (cmd, rest) = Self::word_split(command_line); @@ -319,6 +371,8 @@ reply reply to an AppCall not handled directly by "debug" => self.cmd_debug(rest, callback), "change_log_level" => self.cmd_change_log_level(rest, callback), "reply" => self.cmd_reply(rest, callback), + "enable" => self.cmd_enable(rest, callback), + "disable" => self.cmd_disable(rest, callback), _ => { let ui = self.ui_sender(); ui.send_callback(callback); @@ -472,6 +526,10 @@ reply reply to an AppCall not handled directly by } pub fn update_app_message(&self, msg: &json::JsonValue) { + if !self.inner.lock().enable_app_messages { + return; + } + let message = json_str_vec_u8(&msg["message"]); // check is message body is ascii printable @@ -506,6 +564,10 @@ reply reply to an AppCall not handled directly by } pub fn update_app_call(&self, call: &json::JsonValue) { + if !self.inner.lock().enable_app_messages { + return; + } + let message = json_str_vec_u8(&call["message"]); // check is message body is ascii printable diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 338c8555..783b966d 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -55,6 +55,7 @@ flume = { version = "^0", features = ["async"] } enumset = { version= "^1", features = ["serde"] } backtrace = { version = "^0" } stop-token = { version = "^0", default-features = false } +num-traits = "0.2.15" ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } x25519-dalek = { version = "^1", default_features = false, features = ["u64_backend"] } diff --git a/veilid-core/src/crypto/byte_array_types.rs b/veilid-core/src/crypto/byte_array_types.rs index 5590e467..9d60d4f4 100644 --- a/veilid-core/src/crypto/byte_array_types.rs +++ b/veilid-core/src/crypto/byte_array_types.rs @@ -77,18 +77,7 @@ where macro_rules! byte_array_type { ($name:ident, $size:expr, $encoded_size:expr) => { - #[derive( - Clone, - Copy, - Hash, - Eq, - PartialEq, - PartialOrd, - Ord, - RkyvArchive, - RkyvSerialize, - RkyvDeserialize, - )] + #[derive(Clone, Copy, Hash, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[archive_attr(repr(C), derive(CheckBytes, Hash, Eq, PartialEq, PartialOrd, Ord))] pub struct $name { pub bytes: [u8; $size], @@ -125,6 +114,32 @@ macro_rules! byte_array_type { } } + impl PartialOrd for $name { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for $name { + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + for n in 0..$size { + let c = self.bytes[n].cmp(&other.bytes[n]); + if c != core::cmp::Ordering::Equal { + return c; + } + } + core::cmp::Ordering::Equal + } + } + + impl PartialEq for $name { + fn eq(&self, other: &Self) -> bool { + self.bytes == other.bytes + } + } + + impl Eq for $name {} + impl $name { pub fn new(bytes: [u8; $size]) -> Self { Self { bytes } diff --git a/veilid-core/src/crypto/vld0/mod.rs b/veilid-core/src/crypto/vld0/mod.rs index 3d77c9c2..680e7468 100644 --- a/veilid-core/src/crypto/vld0/mod.rs +++ b/veilid-core/src/crypto/vld0/mod.rs @@ -176,10 +176,10 @@ impl CryptoSystem for CryptoSystemVLD0 { } // Distance Metric fn distance(&self, key1: &PublicKey, key2: &PublicKey) -> CryptoKeyDistance { - let mut bytes = [0u8; PUBLIC_KEY_LENGTH]; + let mut bytes = [0u8; CRYPTO_KEY_LENGTH]; - for (n, byte) in bytes.iter_mut().enumerate() { - *byte = key1.bytes[n] ^ key2.bytes[n]; + for n in 0..CRYPTO_KEY_LENGTH { + bytes[n] = key1.bytes[n] ^ key2.bytes[n]; } CryptoKeyDistance::new(bytes) diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs index 2a240e26..e7f289ea 100644 --- a/veilid-core/src/routing_table/find_peers.rs +++ b/veilid-core/src/routing_table/find_peers.rs @@ -99,8 +99,8 @@ impl RoutingTable { }, ); - // xxx test // Validate peers returned are, in fact, closer to the key than the node we sent this to + // This same test is used on the other side so we vet things here let valid = match Self::verify_peers_closer(vcrypto2, own_node_id, key, &closest_nodes) { Ok(v) => v, Err(e) => { @@ -108,13 +108,16 @@ impl RoutingTable { } }; if !valid { - panic!("non-closer peers returned"); + error!( + "non-closer peers returned: own_node_id={:#?} key={:#?} closest_nodes={:#?}", + own_node_id, key, closest_nodes + ); } NetworkResult::value(closest_nodes) } - /// Determine if set of peers is closer to key_near than key_far + /// Determine if set of peers is closer to key_near than key_far is to key_near pub(crate) fn verify_peers_closer( vcrypto: CryptoSystemVersion, key_far: TypedKey, @@ -128,14 +131,30 @@ impl RoutingTable { } let mut closer = true; + let d_far = vcrypto.distance(&key_far.value, &key_near.value); for peer in peers { let Some(key_peer) = peer.node_ids().get(kind) else { bail!("peers need to have a key with the same cryptosystem"); }; let d_near = vcrypto.distance(&key_near.value, &key_peer.value); - let d_far = vcrypto.distance(&key_far.value, &key_peer.value); if d_far < d_near { + let warning = format!( + r#"peer: {} +near (key): {} +far (self): {} + d_near: {} + d_far: {} + cmp: {:?}"#, + key_peer.value, + key_near.value, + key_far.value, + d_near, + d_far, + d_near.cmp(&d_far) + ); + warn!("{}", warning); closer = false; + break; } } diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index 3f44a529..a5fe37d8 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -198,12 +198,12 @@ impl RoutingTable { // Newly allocated routes let mut newly_allocated_routes = Vec::new(); for _n in 0..routes_to_allocate { - // Parameters here must be the default safety route spec + // Parameters here must be the most inclusive safety route spec // These will be used by test_remote_route as well if let Some(k) = rss.allocate_route( &VALID_CRYPTO_KINDS, Stability::default(), - Sequencing::default(), + Sequencing::EnsureOrdered, default_route_hop_count, DirectionSet::all(), &[], diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index eea3f982..1e41497c 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -110,6 +110,28 @@ impl RPCMessageHeader { RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_crypto_kind(), } } + // pub fn direct_peer_noderef(&self) -> NodeRef { + // match &self.detail { + // RPCMessageHeaderDetail::Direct(d) => d.peer_noderef.clone(), + // RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.peer_noderef.clone(), + // RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.peer_noderef.clone(), + // } + // } + pub fn direct_sender_node_id(&self) -> TypedKey { + match &self.detail { + RPCMessageHeaderDetail::Direct(d) => { + TypedKey::new(d.envelope.get_crypto_kind(), d.envelope.get_sender_id()) + } + RPCMessageHeaderDetail::SafetyRouted(s) => TypedKey::new( + s.direct.envelope.get_crypto_kind(), + s.direct.envelope.get_sender_id(), + ), + RPCMessageHeaderDetail::PrivateRouted(p) => TypedKey::new( + p.direct.envelope.get_crypto_kind(), + p.direct.envelope.get_sender_id(), + ), + } + } } #[derive(Debug)] diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 6151ee01..324279f3 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -50,13 +50,13 @@ impl RPCProcessor { }; let debug_string = format!( - "GetValue(key={} subkey={} last_descriptor={}) => {}", + "OUT ==> GetValueQ({} #{}{}) => {}", key, subkey, if last_descriptor.is_some() { - "Some" + " +lastdesc" } else { - "None" + "" }, dest ); @@ -74,6 +74,8 @@ impl RPCProcessor { vcrypto: vcrypto.clone(), }); + log_rpc!(debug "{}", debug_string); + let waitable_reply = network_result_try!( self.question(dest, question, Some(question_context)) .await? @@ -97,6 +99,28 @@ impl RPCProcessor { let (value, peers, descriptor) = get_value_a.destructure(); + let debug_string_value = value.as_ref().map(|v| { + format!(" len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + + let debug_string_answer = format!( + "OUT <== GetValueA({} #{}{}{} peers={})", + key, + subkey, + debug_string_value, + if descriptor.is_some() { + " +desc" + } else { + "" + }, + peers.len(), + ); + + log_rpc!(debug "{}", debug_string_answer); + // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { Ok(v) => v, @@ -164,13 +188,50 @@ impl RPCProcessor { let routing_table = self.routing_table(); let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key)); + let debug_string = format!( + "IN <=== GetValueQ({} #{}{}) <== {}", + key, + subkey, + if want_descriptor { + " +wantdesc" + } else { + "" + }, + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string); + // See if we have this record ourselves let storage_manager = self.storage_manager(); let subkey_result = network_result_try!(storage_manager .inbound_get_value(key, subkey, want_descriptor) .await .map_err(RPCError::internal)?); + + let debug_string_value = subkey_result.value.as_ref().map(|v| { + format!(" len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + let debug_string_answer = format!( + "IN ===> GetValueA({} #{}{}{} peers={}) ==> {}", + key, + subkey, + debug_string_value, + if subkey_result.descriptor.is_some() { + " +desc" + } else { + "" + }, + closer_to_key_peers.len(), + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string_answer); + // Make GetValue answer let get_value_a = RPCOperationGetValueA::new( subkey_result.value, diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index aeb60bb5..9d29a432 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -54,12 +54,16 @@ impl RPCProcessor { }; let debug_string = format!( - "SetValue(key={} subkey={} value_data(writer)={} value_data(len)={} send_descriptor={}) => {}", + "OUT ==> SetValueQ({} #{} len={} writer={}{}) => {}", key, subkey, - value.value_data().writer(), value.value_data().data().len(), - send_descriptor, + value.value_data().writer(), + if send_descriptor { + " +senddesc" + } else { + "" + }, dest ); @@ -84,11 +88,14 @@ impl RPCProcessor { vcrypto: vcrypto.clone(), }); + log_rpc!(debug "{}", debug_string); + let waitable_reply = network_result_try!( self.question(dest, question, Some(question_context)) .await? ); + // Wait for reply let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { TimeoutOr::Timeout => return Ok(NetworkResult::Timeout), @@ -106,6 +113,28 @@ impl RPCProcessor { }; let (set, value, peers) = set_value_a.destructure(); + + let debug_string_value = value.as_ref().map(|v| { + format!(" len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + + let debug_string_answer = format!( + "OUT <== SetValueA({} #{}{}{} peers={})", + key, + subkey, + if set { + " +set" + } else { + "" + }, + debug_string_value, + peers.len(), + ); + + log_rpc!(debug "{}", debug_string_answer); // Validate peers returned are, in fact, closer to the key than the node we sent this to let valid = match RoutingTable::verify_peers_closer(vcrypto, target_node_id, key, &peers) { @@ -172,6 +201,22 @@ impl RPCProcessor { let routing_table = self.routing_table(); let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key)); + let debug_string = format!( + "IN <=== SetValueQ({} #{} len={} writer={}{}) <== {}", + key, + subkey, + value.value_data().data().len(), + value.value_data().writer(), + if descriptor.is_some() { + " +desc" + } else { + "" + }, + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string); + // If there are less than 'set_value_count' peers that are closer, then store here too let set_value_count = { let c = self.config.get(); @@ -193,6 +238,29 @@ impl RPCProcessor { (true, new_value) }; + let debug_string_value = new_value.as_ref().map(|v| { + format!(" len={} writer={}", + v.value_data().data().len(), + v.value_data().writer(), + ) + }).unwrap_or_default(); + + let debug_string_answer = format!( + "IN ===> SetValueA({} #{}{}{} peers={}) ==> {}", + key, + subkey, + if set { + " +set" + } else { + "" + }, + debug_string_value, + closer_to_key_peers.len(), + msg.header.direct_sender_node_id() + ); + + log_rpc!(debug "{}", debug_string_answer); + // Make SetValue answer let set_value_a = RPCOperationSetValueA::new(set, new_value, closer_to_key_peers)?; diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 0c47b56a..714061c0 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -15,4 +15,24 @@ impl StorageManager { }; remote_record_store.debug_records() } + pub(crate) async fn purge_local_records(&self, reclaim: Option) -> String { + let mut inner = self.inner.lock().await; + let Some(local_record_store) = &mut inner.local_record_store else { + return "not initialized".to_owned(); + }; + let reclaimed = local_record_store + .reclaim_space(reclaim.unwrap_or(usize::MAX)) + .await; + return format!("Local records purged: reclaimed {} bytes", reclaimed); + } + pub(crate) async fn purge_remote_records(&self, reclaim: Option) -> String { + let mut inner = self.inner.lock().await; + let Some(remote_record_store) = &mut inner.remote_record_store else { + return "not initialized".to_owned(); + }; + let reclaimed = remote_record_store + .reclaim_space(reclaim.unwrap_or(usize::MAX)) + .await; + return format!("Remote records purged: reclaimed {} bytes", reclaimed); + } } diff --git a/veilid-core/src/storage_manager/limited_size.rs b/veilid-core/src/storage_manager/limited_size.rs new file mode 100644 index 00000000..dbb82bec --- /dev/null +++ b/veilid-core/src/storage_manager/limited_size.rs @@ -0,0 +1,111 @@ +use super::*; +use num_traits::{PrimInt, Unsigned}; + +#[derive(ThisError, Debug, Clone, Copy, Eq, PartialEq)] +pub enum LimitError { + #[error("limit overflow")] + OverLimit, +} + +#[derive(ThisError, Debug, Clone, Copy, Eq, PartialEq)] +pub enum NumericError { + #[error("numeric overflow")] + Overflow, + #[error("numeric underflow")] + Underflow, +} + +#[derive(Debug, Clone)] +pub struct LimitedSize { + description: String, + value: T, + limit: Option, + uncommitted_value: Option, +} + +impl LimitedSize { + pub fn new(description: &str, value: T, limit: Option) -> Self { + Self { + description: description.to_owned(), + value, + limit, + uncommitted_value: None, + } + } + + fn current_value(&self) -> T { + self.uncommitted_value.unwrap_or(self.value) + } + + pub fn set(&mut self, new_value: T) { + self.uncommitted_value = Some(new_value); + } + + pub fn add(&mut self, v: T) -> Result { + let current_value = self.current_value(); + let max_v = T::max_value() - current_value; + if v > max_v { + return Err(NumericError::Overflow); + } + let new_value = current_value + v; + self.uncommitted_value = Some(new_value); + Ok(new_value) + } + pub fn sub(&mut self, v: T) -> Result { + let current_value = self.current_value(); + let max_v = current_value - T::min_value(); + if v > max_v { + return Err(NumericError::Underflow); + } + let new_value = current_value - v; + self.uncommitted_value = Some(new_value); + Ok(new_value) + } + pub fn saturating_sub(&mut self, mut v: T) -> T { + let current_value = self.current_value(); + let max_v = current_value - T::min_value(); + if v > max_v { + log_stor!(debug "Numeric underflow ({})", self.description); + v = max_v; + } + let new_value = current_value - v; + self.uncommitted_value = Some(new_value); + new_value + } + + pub fn check_limit(&self) -> bool { + if let Some(uncommitted_value) = self.uncommitted_value { + if let Some(limit) = self.limit { + if uncommitted_value > limit { + return false; + } + } + } + true + } + + pub fn commit(&mut self) -> Result { + if let Some(uncommitted_value) = self.uncommitted_value { + if let Some(limit) = self.limit { + if uncommitted_value > limit { + log_stor!(debug "Commit over limit failed ({}): {} > {}", self.description, uncommitted_value, limit); + return Err(LimitError::OverLimit); + } + } + log_stor!(debug "Commit ({}): {} => {}", self.description, self.value, uncommitted_value); + self.value = uncommitted_value; + } + Ok(self.value) + } + + pub fn rollback(&mut self) -> T { + if let Some(uv) = self.uncommitted_value { + log_stor!(debug "Rollback ({}): {} (drop {})", self.description, self.value, uv); + } + return self.value; + } + + pub fn get(&self) -> T { + return self.value; + } +} diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 722242ee..2fb7ab80 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -1,6 +1,7 @@ mod debug; mod get_value; mod keys; +mod limited_size; mod record_store; mod record_store_limits; mod set_value; @@ -9,6 +10,7 @@ mod tasks; mod types; use keys::*; +use limited_size::*; use record_store::*; use record_store_limits::*; use storage_manager_inner::*; diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 671835ce..f65e4a38 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -21,8 +21,8 @@ where subkey_table: Option, record_index: LruCache>, subkey_cache: LruCache, - subkey_cache_total_size: usize, - total_storage_space: usize, + subkey_cache_total_size: LimitedSize, + total_storage_space: LimitedSize, dead_records: Vec<(RecordTableKey, Record)>, changed_records: HashSet, @@ -47,6 +47,13 @@ where { pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { let subkey_cache_size = limits.subkey_cache_size as usize; + let limit_subkey_cache_total_size = limits + .max_subkey_cache_memory_mb + .map(|mb| mb * 1_048_576usize); + let limit_max_storage_space = limits + .max_storage_space_mb + .map(|mb| mb as u64 * 1_048_576u64); + Self { table_store, name: name.to_owned(), @@ -55,8 +62,16 @@ where subkey_table: None, record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), - subkey_cache_total_size: 0, - total_storage_space: 0, + subkey_cache_total_size: LimitedSize::new( + "subkey_cache_total_size", + 0, + limit_subkey_cache_total_size, + ), + total_storage_space: LimitedSize::new( + "total_storage_space", + 0, + limit_max_storage_space, + ), dead_records: Vec::new(), changed_records: HashSet::new(), purge_dead_records_mutex: Arc::new(AsyncMutex::new(())), @@ -89,8 +104,17 @@ where let mut dead_records = Vec::new(); for ri in record_index_saved { // total the storage space - self.total_storage_space += mem::size_of::(); - self.total_storage_space += ri.1.total_size(); + self.total_storage_space + .add(mem::size_of::() as u64) + .unwrap(); + self.total_storage_space + .add(ri.1.total_size() as u64) + .unwrap(); + if let Err(_) = self.total_storage_space.commit() { + // If we overflow the limit, kill off the record + dead_records.push((ri.0, ri.1)); + continue; + } // add to index and ensure we deduplicate in the case of an error if let Some(v) = self.record_index.insert(ri.0, ri.1, |k, v| { @@ -130,24 +154,32 @@ where // Old data dead_size += old_record_data.total_size(); } - self.subkey_cache_total_size -= dead_size; - self.subkey_cache_total_size += record_data_total_size; + self.subkey_cache_total_size.sub(dead_size).unwrap(); + self.subkey_cache_total_size + .add(record_data_total_size) + .unwrap(); // Purge over size limit - if let Some(max_subkey_cache_memory_mb) = self.limits.max_subkey_cache_memory_mb { - while self.subkey_cache_total_size > (max_subkey_cache_memory_mb * 1_048_576usize) { - if let Some((_, v)) = self.subkey_cache.remove_lru() { - self.subkey_cache_total_size -= v.total_size(); - } else { - break; - } + while self.subkey_cache_total_size.commit().is_err() { + if let Some((_, v)) = self.subkey_cache.remove_lru() { + self.subkey_cache_total_size.saturating_sub(v.total_size()); + } else { + self.subkey_cache_total_size.rollback(); + + log_stor!(error "subkey cache should not be empty, has {} bytes unaccounted for",self.subkey_cache_total_size.get()); + + self.subkey_cache_total_size.set(0); + self.subkey_cache_total_size.commit().unwrap(); + break; } } } fn remove_from_subkey_cache(&mut self, key: SubkeyTableKey) { if let Some(dead_record_data) = self.subkey_cache.remove(&key) { - self.subkey_cache_total_size -= dead_record_data.total_size(); + self.subkey_cache_total_size + .saturating_sub(dead_record_data.total_size()); + self.subkey_cache_total_size.commit().unwrap(); } } @@ -206,8 +238,11 @@ where } // Remove from total size - self.total_storage_space -= mem::size_of::(); - self.total_storage_space -= v.total_size(); + self.total_storage_space + .saturating_sub(mem::size_of::() as u64); + self.total_storage_space + .saturating_sub(v.total_size() as u64); + self.total_storage_space.commit().unwrap(); } if let Err(e) = rt_xact.commit().await { log_stor!(error "failed to commit record table transaction: {}", e); @@ -258,12 +293,12 @@ where }; // If over size limit, dont create record - let new_total_storage_space = - self.total_storage_space + mem::size_of::() + record.total_size(); - if let Some(max_storage_space_mb) = &self.limits.max_storage_space_mb { - if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) { - apibail_try_again!(); - } + self.total_storage_space + .add((mem::size_of::() + record.total_size()) as u64) + .unwrap(); + if !self.total_storage_space.check_limit() { + self.total_storage_space.rollback(); + apibail_try_again!(); } // Save to record table @@ -286,7 +321,7 @@ where } // Update storage space - self.total_storage_space = new_total_storage_space; + self.total_storage_space.commit().unwrap(); Ok(()) } @@ -482,12 +517,14 @@ where } // Check new total storage space - let new_total_storage_space = - self.total_storage_space + new_record_data_size - prior_record_data_size; - if let Some(max_storage_space_mb) = self.limits.max_storage_space_mb { - if new_total_storage_space > (max_storage_space_mb * 1_048_576usize) { - apibail_try_again!(); - } + self.total_storage_space + .sub(prior_record_data_size as u64) + .unwrap(); + self.total_storage_space + .add(new_record_data_size as u64) + .unwrap(); + if !self.total_storage_space.check_limit() { + apibail_try_again!(); } // Write subkey @@ -506,6 +543,9 @@ where }) .expect("record should still be here"); + // Update storage space + self.total_storage_space.commit().unwrap(); + Ok(()) } @@ -513,16 +553,19 @@ where /// This will force a garbage collection of the space immediately /// If zero is passed in here, a garbage collection will be performed of dead records /// without removing any live records - pub async fn reclaim_space(&mut self, space: usize) { + pub async fn reclaim_space(&mut self, space: usize) -> usize { let mut reclaimed = 0usize; while reclaimed < space { if let Some((k, v)) = self.record_index.remove_lru() { reclaimed += mem::size_of::(); reclaimed += v.total_size(); self.add_dead_record(k, v); + } else { + break; } } self.purge_dead_records(false).await; + reclaimed } pub(super) fn debug_records(&self) -> String { @@ -532,9 +575,9 @@ where out += "Record Index:\n"; for (rik, rec) in &self.record_index { out += &format!( - " {} @ {} len={} subkeys={}\n", + " {} age={} len={} subkeys={}\n", rik.key.to_string(), - rec.last_touched().as_u64(), + debug_duration(get_timestamp() - rec.last_touched().as_u64()), rec.record_data_size(), rec.stored_subkeys(), ); @@ -542,9 +585,9 @@ where out += &format!("Subkey Cache Count: {}\n", self.subkey_cache.len()); out += &format!( "Subkey Cache Total Size: {}\n", - self.subkey_cache_total_size + self.subkey_cache_total_size.get() ); - out += &format!("Total Storage Space: {}\n", self.total_storage_space); + out += &format!("Total Storage Space: {}\n", self.total_storage_space.get()); out += &format!("Dead Records: {}\n", self.dead_records.len()); for dr in &self.dead_records { out += &format!(" {}\n", dr.0.key.to_string()); diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 36ae4d4a..5b86a98c 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -901,6 +901,20 @@ impl VeilidAPI { return Ok(out); } + async fn debug_record_purge(&self, args: Vec) -> VeilidAPIResult { + // + let storage_manager = self.storage_manager()?; + + let scope = get_debug_argument_at(&args, 1, "debug_record_purge", "scope", get_string)?; + let bytes = get_debug_argument_at(&args, 2, "debug_record_purge", "bytes", get_number).ok(); + let out = match scope.as_str() { + "local" => storage_manager.purge_local_records(bytes).await, + "remote" => storage_manager.purge_remote_records(bytes).await, + _ => "Invalid scope\n".to_owned(), + }; + return Ok(out); + } + async fn debug_record(&self, args: String) -> VeilidAPIResult { let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); @@ -908,6 +922,8 @@ impl VeilidAPI { if command == "list" { self.debug_record_list(args).await + } else if command == "purge" { + self.debug_record_purge(args).await } else { Ok(">>> Unknown command\n".to_owned()) } @@ -936,7 +952,8 @@ impl VeilidAPI { list import test - record list + record list + purge [bytes] is: * direct: [+][] diff --git a/veilid-core/src/veilid_api/json_api/process.rs b/veilid-core/src/veilid_api/json_api/process.rs index 023820d0..f27a7c9a 100644 --- a/veilid-core/src/veilid_api/json_api/process.rs +++ b/veilid-core/src/veilid_api/json_api/process.rs @@ -259,11 +259,11 @@ impl JsonRequestProcessor { .add_routing_context(routing_context.clone().with_sequencing(sequencing)), } } - RoutingContextRequestOp::AppCall { target, request } => { + RoutingContextRequestOp::AppCall { target, message } => { RoutingContextResponseOp::AppCall { result: to_json_api_result_with_vec_u8( self.parse_target(target) - .then(|tr| async { routing_context.app_call(tr?, request).await }) + .then(|tr| async { routing_context.app_call(tr?, message).await }) .await, ), } diff --git a/veilid-core/src/veilid_api/json_api/routing_context.rs b/veilid-core/src/veilid_api/json_api/routing_context.rs index 94c55a9e..4ec8a881 100644 --- a/veilid-core/src/veilid_api/json_api/routing_context.rs +++ b/veilid-core/src/veilid_api/json_api/routing_context.rs @@ -29,7 +29,7 @@ pub enum RoutingContextRequestOp { target: String, #[serde(with = "json_as_base64")] #[schemars(with = "String")] - request: Vec, + message: Vec, }, AppMessage { target: String, diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 6a928fdc..37989be1 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -137,14 +137,14 @@ impl RoutingContext { //////////////////////////////////////////////////////////////// // App-level Messaging - pub async fn app_call(&self, target: Target, request: Vec) -> VeilidAPIResult> { + pub async fn app_call(&self, target: Target, message: Vec) -> VeilidAPIResult> { let rpc_processor = self.api.rpc_processor()?; // Get destination let dest = self.get_destination(target).await?; // Send app message - let answer = match rpc_processor.rpc_call_app_call(dest, request).await { + let answer = match rpc_processor.rpc_call_app_call(dest, message).await { Ok(NetworkResult::Value(v)) => v, Ok(NetworkResult::Timeout) => apibail_timeout!(), Ok(NetworkResult::ServiceUnavailable(e)) => { diff --git a/veilid-python/tests/conftest.py b/veilid-python/tests/conftest.py index 4306ed8e..6cabbd26 100644 --- a/veilid-python/tests/conftest.py +++ b/veilid-python/tests/conftest.py @@ -22,7 +22,7 @@ def server_info() -> tuple[str, int]: return hostname, 5959 -async def simple_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): +async def simple_update_callback(update: veilid.VeilidUpdate): print(f"VeilidUpdate: {update}") diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index af17bc64..8bce06e8 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -67,10 +67,15 @@ async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): vd2 = await rc.get_dht_value(rec.key, 0, False) assert vd2 != None + vd3 = await rc.get_dht_value(rec.key, 0, True) + assert vd3 != None + print("vd: {}", vd.__dict__) print("vd2: {}", vd2.__dict__) + print("vd3: {}", vd3.__dict__) assert vd == vd2 + assert vd2 == vd3 await rc.close_dht_record(rec.key) await rc.delete_dht_record(rec.key) diff --git a/veilid-python/tests/test_routing_context.py b/veilid-python/tests/test_routing_context.py index 3dd868aa..d92b133b 100644 --- a/veilid-python/tests/test_routing_context.py +++ b/veilid-python/tests/test_routing_context.py @@ -3,6 +3,7 @@ import asyncio import random import sys +import os import pytest import veilid @@ -30,7 +31,7 @@ async def test_routing_context_app_message_loopback(): # Seriously, mypy? app_message_queue: asyncio.Queue = asyncio.Queue() - async def app_message_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): + async def app_message_queue_update_callback(update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: await app_message_queue.put(update) @@ -69,7 +70,7 @@ async def test_routing_context_app_message_loopback(): async def test_routing_context_app_call_loopback(): app_call_queue: asyncio.Queue = asyncio.Queue() - async def app_call_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): + async def app_call_queue_update_callback(update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_CALL: await app_call_queue.put(update) @@ -120,7 +121,7 @@ async def test_routing_context_app_message_loopback_big_packets(): global got_message got_message = 0 - async def app_message_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): + async def app_message_queue_update_callback(update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: global got_message got_message += 1 @@ -147,8 +148,8 @@ async def test_routing_context_app_message_loopback_big_packets(): # import it as a remote route as well so we can send to it prr = await api.import_remote_private_route(blob) - # do this test 100 times - for _ in range(100): + # do this test 1000 times + for _ in range(1000): # send a random sized random app message to our own private route message = random.randbytes(random.randint(0, 32768)) @@ -169,27 +170,39 @@ async def test_routing_context_app_message_loopback_big_packets(): @pytest.mark.asyncio async def test_routing_context_app_call_loopback_big_packets(): - - print("") - global got_message got_message = 0 - async def app_message_queue_update_callback(api: veilid.VeilidAPI, update: veilid.VeilidUpdate): + + app_call_queue: asyncio.Queue = asyncio.Queue() + + async def app_call_queue_update_callback(update: veilid.VeilidUpdate): if update.kind == veilid.VeilidUpdateKind.APP_CALL: + await app_call_queue.put(update) + + async def app_call_queue_task_handler(api: veilid.VeilidAPI): + while True: + update = await app_call_queue.get() + global got_message got_message += 1 + sys.stdout.write("{} ".format(got_message)) sys.stdout.flush() - await api.app_call_reply(update.detail.call_id, update.detail.message) + await api.app_call_reply(update.detail.call_id, update.detail.message) + hostname, port = server_info() api = await veilid.json_api_connect( - hostname, port, app_message_queue_update_callback + hostname, port, app_call_queue_update_callback ) async with api: # purge routes to ensure we start fresh await api.debug("purge routes") + app_call_task = asyncio.create_task( + app_call_queue_task_handler(api), name="app call task" + ) + # make a routing context that uses a safety route rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) async with rc: @@ -200,11 +213,57 @@ async def test_routing_context_app_call_loopback_big_packets(): # import it as a remote route as well so we can send to it prr = await api.import_remote_private_route(blob) - # do this test 100 times - for _ in range(100): + # do this test 10 times + for _ in range(10): # send a random sized random app message to our own private route message = random.randbytes(random.randint(0, 32768)) out_message = await rc.app_call(prr, message) assert message == out_message + + app_call_task.cancel() + + +@pytest.mark.skipif(os.getenv("NOSKIP")!="1", reason="unneeded test, only for performance check") +@pytest.mark.asyncio +async def test_routing_context_app_message_loopback_bandwidth(): + + app_message_queue: asyncio.Queue = asyncio.Queue() + + async def app_message_queue_update_callback(update: veilid.VeilidUpdate): + if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: + await app_message_queue.put(True) + + hostname, port = server_info() + api = await veilid.json_api_connect( + hostname, port, app_message_queue_update_callback + ) + async with api: + # purge routes to ensure we start fresh + await api.debug("purge routes") + + # make a routing context that uses a safety route + #rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) + #rc = await (await api.new_routing_context()).with_privacy() + rc = await api.new_routing_context() + async with rc: + + # make a new local private route + prl, blob = await api.new_private_route() + + # import it as a remote route as well so we can send to it + prr = await api.import_remote_private_route(blob) + + # do this test 1000 times + message = random.randbytes(16384) + for _ in range(10000): + + # send a random sized random app message to our own private route + await rc.app_message(prr, message) + + # we should get the same number of messages back (not storing all that data) + for _ in range(10000): + await asyncio.wait_for( + app_message_queue.get(), timeout=10 + ) diff --git a/veilid-python/veilid/api.py b/veilid-python/veilid/api.py index e468eb6d..9008bbf4 100644 --- a/veilid-python/veilid/api.py +++ b/veilid-python/veilid/api.py @@ -166,7 +166,7 @@ class TableDb(ABC): class CryptoSystem(ABC): - + async def __aenter__(self) -> Self: return self diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 710ff125..4725246e 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -51,7 +51,7 @@ _VALIDATOR_RECV_MESSAGE = _get_schema_validator( class _JsonVeilidAPI(VeilidAPI): reader: Optional[asyncio.StreamReader] writer: Optional[asyncio.StreamWriter] - update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable] + update_callback: Callable[[VeilidUpdate], Awaitable] handle_recv_messages_task: Optional[asyncio.Task] validate_schema: bool done: bool @@ -64,7 +64,7 @@ class _JsonVeilidAPI(VeilidAPI): self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, - update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable], + update_callback: Callable[[VeilidUpdate], Awaitable], validate_schema: bool = True, ): self.reader = reader @@ -115,7 +115,7 @@ class _JsonVeilidAPI(VeilidAPI): @classmethod async def connect( - cls, host: str, port: int, update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable] + cls, host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable] ) -> Self: reader, writer = await asyncio.open_connection(host, port) veilid_api = cls(reader, writer, update_callback) @@ -135,6 +135,8 @@ class _JsonVeilidAPI(VeilidAPI): # Resolve the request's future to the response json if reqfuture is not None: reqfuture.set_result(j) + else: + print("Missing id: {}", id) async def handle_recv_messages(self): # Read lines until we're done @@ -155,7 +157,7 @@ class _JsonVeilidAPI(VeilidAPI): if j["type"] == "Response": await self.handle_recv_message_response(j) elif j["type"] == "Update": - await self.update_callback(self, VeilidUpdate.from_json(j)) + await self.update_callback(VeilidUpdate.from_json(j)) finally: await self._cleanup_close() @@ -485,7 +487,7 @@ class _JsonRoutingContext(RoutingContext): await self.release() return self.__class__(self.api, new_rc_id) - async def app_call(self, target: TypedKey | RouteId, request: bytes) -> bytes: + async def app_call(self, target: TypedKey | RouteId, message: bytes) -> bytes: return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -494,7 +496,7 @@ class _JsonRoutingContext(RoutingContext): rc_id=self.rc_id, rc_op=RoutingContextOperation.APP_CALL, target=target, - request=request, + message=message, ) ) ) @@ -1162,6 +1164,6 @@ class _JsonCryptoSystem(CryptoSystem): async def json_api_connect( - host: str, port: int, update_callback: Callable[[VeilidAPI, VeilidUpdate], Awaitable] + host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable] ) -> _JsonVeilidAPI: return await _JsonVeilidAPI.connect(host, port, update_callback) diff --git a/veilid-python/veilid/schema/Request.json b/veilid-python/veilid/schema/Request.json index 82119da0..83ae07e8 100644 --- a/veilid-python/veilid/schema/Request.json +++ b/veilid-python/veilid/schema/Request.json @@ -259,20 +259,20 @@ { "type": "object", "required": [ + "message", "rc_op", - "request", "target" ], "properties": { + "message": { + "type": "string" + }, "rc_op": { "type": "string", "enum": [ "AppCall" ] }, - "request": { - "type": "string" - }, "target": { "type": "string" } diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 51717ee2..4b3875da 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -48,6 +48,7 @@ tokio = { version = "^1", features = ["full"], optional = true} tokio-util = { version = "^0", features = ["compat"], optional = true} maplit = "^1" futures-util = { version = "^0", default-features = false, features = ["async-await", "sink", "std", "io"] } +chrono = "^0" libc = "^0" nix = "^0" diff --git a/veilid-tools/src/timestamp.rs b/veilid-tools/src/timestamp.rs index 4b5c187e..48fb4b89 100644 --- a/veilid-tools/src/timestamp.rs +++ b/veilid-tools/src/timestamp.rs @@ -11,8 +11,46 @@ cfg_if! { panic!("WASM requires browser environment"); } } + + pub fn debug_ts(ts: u64) -> String { + if is_browser() { + let mut now = Date::now(); + let mut date = Date::new_0(); + date.set_time((ts / 1000u64) as f64); + + let show_year = now.get_utc_full_year() != date.get_utc_full_year(); + let show_month = show_year || now.get_utc_month() != date.get_utc_month(); + let show_date = show_month || now.get_utc_date() != date.get_utc_date(); + + format!("{}{}{}{}", + if show_year { + format!("{:04}/",date.get_utc_full_year()) + } else { + "".to_owned() + }, + if show_month { + format!("{:02}/",date.get_utc_month()) + } else { + "".to_owned() + }, + if show_date { + format!("{:02}-",date.get_utc_date()) + } else { + "".to_owned() + }, + format!("{:02}:{:02}:{:02}.{:04}", + date.get_utc_hours(), + date.get_utc_minutes(), + date.get_utc_seconds(), + date.get_utc_milliseconds() + )) + } else { + panic!("WASM requires browser environment"); + } + } } else { use std::time::{SystemTime, UNIX_EPOCH}; + use chrono::{Datelike, Timelike}; pub fn get_timestamp() -> u64 { match SystemTime::now().duration_since(UNIX_EPOCH) { @@ -21,5 +59,80 @@ cfg_if! { } } + pub fn debug_ts(ts: u64) -> String { + let now = chrono::DateTime::::from(SystemTime::now()); + let date = chrono::DateTime::::from(UNIX_EPOCH + Duration::from_micros(ts)); + + let show_year = now.year() != date.year(); + let show_month = show_year || now.month() != date.month(); + let show_date = show_month || now.day() != date.day(); + + format!("{}{}{}{}", + if show_year { + format!("{:04}/",date.year()) + } else { + "".to_owned() + }, + if show_month { + format!("{:02}/",date.month()) + } else { + "".to_owned() + }, + if show_date { + format!("{:02}-",date.day()) + } else { + "".to_owned() + }, + format!("{:02}:{:02}:{:02}.{:04}", + date.hour(), + date.minute(), + date.second(), + date.nanosecond()/1_000_000 + )) + + } } } + +const DAY: u64 = 1_000_000u64 * 60 * 60 * 24; +const HOUR: u64 = 1_000_000u64 * 60 * 60; +const MIN: u64 = 1_000_000u64 * 60; +const SEC: u64 = 1_000_000u64; +const MSEC: u64 = 1_000u64; + +pub fn debug_duration(dur: u64) -> String { + let days = dur / DAY; + let dur = dur % DAY; + let hours = dur / HOUR; + let dur = dur % HOUR; + let mins = dur / MIN; + let dur = dur % MIN; + let secs = dur / SEC; + let dur = dur % SEC; + let msecs = dur / MSEC; + + format!( + "{}{}{}{}.{:03}", + if days != 0 { + format!("{}d", days) + } else { + "".to_owned() + }, + if hours != 0 { + format!("{}h", hours) + } else { + "".to_owned() + }, + if mins != 0 { + format!("{}m", mins) + } else { + "".to_owned() + }, + if secs != 0 { + format!("{}s", secs) + } else { + "".to_owned() + }, + msecs + ) +}