From d290a66f32288f0fab08ade4531d4da93cd59a1c Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 1 Jul 2023 10:45:31 -0400 Subject: [PATCH] dht fixes --- Cargo.lock | 7 + veilid-core/Cargo.toml | 1 + .../coders/operations/operation_get_value.rs | 10 - .../src/rpc_processor/rpc_get_value.rs | 6 +- .../src/rpc_processor/rpc_set_value.rs | 6 +- veilid-core/src/storage_manager/debug.rs | 40 +++ veilid-core/src/storage_manager/mod.rs | 6 +- .../src/storage_manager/record_store.rs | 100 +++++- veilid-core/src/storage_manager/set_value.rs | 22 +- .../storage_manager/storage_manager_inner.rs | 67 +++- .../src/storage_manager/types/record.rs | 4 +- .../types/signed_value_descriptor.rs | 11 +- veilid-core/src/veilid_api/debug.rs | 303 +++++++++++++----- .../types/dht/dht_record_descriptor.rs | 3 + .../src/veilid_api/types/dht/value_data.rs | 11 +- veilid-python/tests/test_dht.py | 16 +- 16 files changed, 491 insertions(+), 122 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bcd31bd5..97c76291 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5232,6 +5232,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shlex" version = "0.1.1" @@ -6493,6 +6499,7 @@ dependencies = [ "serde-big-array", "serde_json", "serial_test", + "shell-words", "simplelog 0.12.1", "socket2 0.5.3", "static_assertions", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 783b966d..ffa7f6b9 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -56,6 +56,7 @@ enumset = { version= "^1", features = ["serde"] } backtrace = { version = "^0" } stop-token = { version = "^0", default-features = false } num-traits = "0.2.15" +shell-words = "1.1.0" ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } x25519-dalek = { version = "^1", default_features = false, features = ["u64_backend"] } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index 4261e461..72224952 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -91,11 +91,6 @@ impl RPCOperationGetValueA { if peers.len() > MAX_GET_VALUE_A_PEERS_LEN { return Err(RPCError::protocol("GetValueA peers length too long")); } - if descriptor.is_some() && !value.is_some() { - return Err(RPCError::protocol( - "GetValueA should not return descriptor without value", - )); - } Ok(Self { value, peers, @@ -144,11 +139,6 @@ impl RPCOperationGetValueA { get_value_context.vcrypto.clone(), ) .map_err(RPCError::protocol)?; - } else { - // No value, should not have descriptor - if self.descriptor.is_some() { - return Err(RPCError::protocol("descriptor returned without a value")); - } } PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 324279f3..d484b51b 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -100,8 +100,9 @@ impl RPCProcessor { let (value, peers, descriptor) = get_value_a.destructure(); let debug_string_value = value.as_ref().map(|v| { - format!(" len={} writer={}", + format!(" len={} seq={} writer={}", v.value_data().data().len(), + v.value_data().seq(), v.value_data().writer(), ) }).unwrap_or_default(); @@ -210,8 +211,9 @@ impl RPCProcessor { .map_err(RPCError::internal)?); let debug_string_value = subkey_result.value.as_ref().map(|v| { - format!(" len={} writer={}", + format!(" len={} seq={} writer={}", v.value_data().data().len(), + v.value_data().seq(), v.value_data().writer(), ) }).unwrap_or_default(); diff --git a/veilid-core/src/rpc_processor/rpc_set_value.rs b/veilid-core/src/rpc_processor/rpc_set_value.rs index 9d29a432..e44b3569 100644 --- a/veilid-core/src/rpc_processor/rpc_set_value.rs +++ b/veilid-core/src/rpc_processor/rpc_set_value.rs @@ -202,10 +202,11 @@ impl RPCProcessor { let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key)); let debug_string = format!( - "IN <=== SetValueQ({} #{} len={} writer={}{}) <== {}", + "IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}", key, subkey, value.value_data().data().len(), + value.value_data().seq(), value.value_data().writer(), if descriptor.is_some() { " +desc" @@ -239,8 +240,9 @@ impl RPCProcessor { }; let debug_string_value = new_value.as_ref().map(|v| { - format!(" len={} writer={}", + format!(" len={} seq={} writer={}", v.value_data().data().len(), + v.value_data().seq(), v.value_data().writer(), ) }).unwrap_or_default(); diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 714061c0..ba2c43c3 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -35,4 +35,44 @@ impl StorageManager { .await; return format!("Remote records purged: reclaimed {} bytes", reclaimed); } + pub(crate) async fn debug_local_record_subkey_info( + &self, + key: TypedKey, + subkey: ValueSubkey, + ) -> String { + let inner = self.inner.lock().await; + let Some(local_record_store) = &inner.local_record_store else { + return "not initialized".to_owned(); + }; + local_record_store + .debug_record_subkey_info(key, subkey) + .await + } + pub(crate) async fn debug_remote_record_subkey_info( + &self, + key: TypedKey, + subkey: ValueSubkey, + ) -> String { + let inner = self.inner.lock().await; + let Some(remote_record_store) = &inner.remote_record_store else { + return "not initialized".to_owned(); + }; + remote_record_store + .debug_record_subkey_info(key, subkey) + .await + } + pub(crate) async fn debug_local_record_info(&self, key: TypedKey) -> String { + let inner = self.inner.lock().await; + let Some(local_record_store) = &inner.local_record_store else { + return "not initialized".to_owned(); + }; + local_record_store.debug_record_info(key) + } + pub(crate) async fn debug_remote_record_info(&self, key: TypedKey) -> String { + let inner = self.inner.lock().await; + let Some(remote_record_store) = &inner.remote_record_store else { + return "not initialized".to_owned(); + }; + remote_record_store.debug_record_info(key) + } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 4fcf20f5..165e8809 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -144,6 +144,7 @@ impl StorageManager { // The initial writer is the owner of the record inner .open_existing_record(key, Some(owner), safety_selection) + .await .map(|r| r.unwrap()) } @@ -159,7 +160,10 @@ impl StorageManager { let mut inner = self.lock().await?; // See if we have a local record already or not - if let Some(res) = inner.open_existing_record(key, writer, safety_selection)? { + if let Some(res) = inner + .open_existing_record(key, writer, safety_selection) + .await? + { return Ok(res); } diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index f65e4a38..b9208635 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -9,7 +9,7 @@ use hashlink::LruCache; pub struct RecordStore where - D: Clone + RkyvArchive + RkyvSerialize, + D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -41,7 +41,7 @@ pub struct SubkeyResult { impl RecordStore where - D: Clone + RkyvArchive + RkyvSerialize, + D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -363,6 +363,20 @@ where out } + pub(super) fn peek_record(&self, key: TypedKey, f: F) -> Option + where + F: FnOnce(&Record) -> R, + { + // Get record from index + let mut out = None; + let rtk = RecordTableKey { key }; + if let Some(record) = self.record_index.peek(&rtk) { + // Callback + out = Some(f(record)); + } + out + } + pub(super) fn with_record_mut(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&mut Record) -> R, @@ -454,6 +468,69 @@ where })); } + pub(crate) async fn peek_subkey( + &self, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult> { + // record from index + let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| { + (record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor { + Some(record.descriptor().clone()) + } else { + None + }) + }) else { + // Record not available + return Ok(None); + }; + + // Check if the subkey is in range + if subkey as usize >= subkey_count { + apibail_invalid_argument!("subkey out of range", "subkey", subkey); + } + + // See if we have this subkey stored + if !has_subkey { + // If not, return no value but maybe with descriptor + return Ok(Some(SubkeyResult { + value: None, + descriptor: opt_descriptor, + })); + } + + // Get subkey table + let Some(subkey_table) = self.subkey_table.clone() else { + apibail_internal!("record store not initialized"); + }; + + // If subkey exists in subkey cache, use that + let stk = SubkeyTableKey { key, subkey }; + if let Some(record_data) = self.subkey_cache.peek(&stk) { + let out = record_data.signed_value_data().clone(); + + return Ok(Some(SubkeyResult { + value: Some(out), + descriptor: opt_descriptor, + })); + } + // If not in cache, try to pull from table store if it is in our stored subkey set + let Some(record_data) = subkey_table + .load_rkyv::(0, &stk.bytes()) + .await + .map_err(VeilidAPIError::internal)? else { + apibail_internal!("failed to peek subkey that was stored"); + }; + + let out = record_data.signed_value_data().clone(); + + return Ok(Some(SubkeyResult { + value: Some(out), + descriptor: opt_descriptor, + })); + } + pub async fn set_subkey( &mut self, key: TypedKey, @@ -599,4 +676,23 @@ where out } + + pub(super) fn debug_record_info(&self, key: TypedKey) -> String { + self.peek_record(key, |r| format!("{:#?}", r)) + .unwrap_or("Not found".to_owned()) + } + + pub(super) async fn debug_record_subkey_info( + &self, + key: TypedKey, + subkey: ValueSubkey, + ) -> String { + match self.peek_subkey(key, subkey, true).await { + Ok(Some(v)) => { + format!("{:#?}", v) + } + Ok(None) => "Subkey not available".to_owned(), + Err(e) => format!("{}", e), + } + } } diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 74e50181..a9264ed3 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -166,8 +166,19 @@ impl StorageManager { pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option) -> VeilidAPIResult>> { let mut inner = self.lock().await?; - // See if the subkey we are modifying has a last known local value - let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + // See if this is a remote or local value + let (is_local, last_subkey_result) = { + // See if the subkey we are modifying has a last known local value + let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?; + // If this is local, it must have a descriptor already + if last_subkey_result.descriptor.is_some() { + (true, last_subkey_result) + } else { + // See if the subkey we are modifying has a last known remote value + let last_subkey_result = inner.handle_get_remote_value(key, subkey, true).await?; + (false, last_subkey_result) + } + }; // Make sure this value would actually be newer if let Some(last_value) = &last_subkey_result.value { @@ -210,7 +221,12 @@ impl StorageManager { } // Do the set and return no new value - match inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await { + let res = if is_local { + inner.handle_set_local_value(key, subkey, value).await + } else { + inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await + }; + match res { Ok(()) => {}, Err(VeilidAPIError::Internal { message }) => { apibail_internal!(message); diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index e50cbcc3..bf06161e 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -209,7 +209,57 @@ impl StorageManagerInner { Ok((dht_key, owner)) } - pub fn open_existing_record( + async fn move_remote_record_to_local(&mut self, key: TypedKey, safety_selection: SafetySelection) -> VeilidAPIResult> + { + // Get local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Get remote record store + let Some(remote_record_store) = self.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + let rcb = |r: &Record| { + // Return record details + r.clone() + }; + let Some(remote_record) = remote_record_store.with_record(key, rcb) else { + // No local or remote record found, return None + return Ok(None); + }; + + // Make local record + let cur_ts = get_aligned_timestamp(); + let local_record = Record::new(cur_ts, remote_record.descriptor().clone(), LocalRecordDetail { + safety_selection + })?; + local_record_store.new_record(key, local_record).await?; + + // Move copy subkey data from remote to local store + for subkey in remote_record.stored_subkeys().iter() { + let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await? else { + // Subkey was missing + warn!("Subkey was missing: {} #{}",key, subkey); + continue; + }; + let Some(subkey_data) = subkey_result.value else { + // Subkey was missing + warn!("Subkey data was missing: {} #{}",key, subkey); + continue; + }; + local_record_store.set_subkey(key, subkey, subkey_data).await?; + } + + // Delete remote record from store + remote_record_store.delete_record(key).await?; + + // Return record information as transferred to local record + Ok(Some((remote_record.owner().clone(), remote_record.schema()))) + } + + pub async fn open_existing_record( &mut self, key: TypedKey, writer: Option, @@ -235,8 +285,17 @@ impl StorageManagerInner { // Return record details (r.owner().clone(), r.schema()) }; - let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else { - return Ok(None); + let (owner, schema) = match local_record_store.with_record_mut(key, cb){ + Some(v) => v, + None => { + // If we don't have a local record yet, check to see if we have a remote record + // if so, migrate it to a local record + let Some(v) = self.move_remote_record_to_local(key, safety_selection).await? else { + // No remote record either + return Ok(None); + }; + v + } }; // Had local record @@ -424,7 +483,7 @@ impl StorageManagerInner { /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where - D: Clone + RkyvArchive + RkyvSerialize, + D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/types/record.rs index c146380d..86786c96 100644 --- a/veilid-core/src/storage_manager/types/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -6,7 +6,7 @@ use super::*; #[archive_attr(repr(C), derive(CheckBytes))] pub struct Record where - D: Clone + RkyvArchive + RkyvSerialize, + D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -20,7 +20,7 @@ where impl Record where - D: Clone + RkyvArchive + RkyvSerialize, + D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { diff --git a/veilid-core/src/storage_manager/types/signed_value_descriptor.rs b/veilid-core/src/storage_manager/types/signed_value_descriptor.rs index fa718dcb..ec5a3f38 100644 --- a/veilid-core/src/storage_manager/types/signed_value_descriptor.rs +++ b/veilid-core/src/storage_manager/types/signed_value_descriptor.rs @@ -5,7 +5,6 @@ use super::*; #[derive( Clone, - Debug, PartialOrd, PartialEq, Eq, @@ -79,3 +78,13 @@ impl SignedValueDescriptor { self.schema_data.cmp(&other.schema_data) } } + +impl fmt::Debug for SignedValueDescriptor { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("SignedValueDescriptor") + .field("owner", &self.owner) + .field("schema_data", &format!("{:?}", &self.schema_data)) + .field("signature", &self.signature) + .finish() + } +} diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 4e2e2224..01f02043 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -31,6 +31,19 @@ fn get_string(text: &str) -> Option { Some(text.to_owned()) } +fn get_data(text: &str) -> Option> { + if text.starts_with("#") { + hex::decode(&text[1..]).ok() + } else if text.starts_with("\"") || text.starts_with("'") { + json::parse(text) + .ok()? + .as_str() + .map(|x| x.to_owned().as_bytes().to_vec()) + } else { + Some(text.as_bytes().to_vec()) + } +} + fn get_subkeys(text: &str) -> Option { if let Some(n) = get_number(text) { Some(ValueSubkeyRangeSet::single(n.try_into().ok()?)) @@ -88,44 +101,50 @@ fn get_route_id( }; } -fn get_safety_selection(text: &str, routing_table: RoutingTable) -> Option { - let rss = routing_table.route_spec_store(); - let default_route_hop_count = - routing_table.with_config(|c| c.network.rpc.default_route_hop_count as usize); +fn get_dht_schema(text: &str) -> Option { + deserialize_json::(text).ok() +} - if text.len() != 0 && &text[0..1] == "-" { - // Unsafe - let text = &text[1..]; - let seq = get_sequencing(text).unwrap_or_default(); - Some(SafetySelection::Unsafe(seq)) - } else { - // Safe - let mut preferred_route = None; - let mut hop_count = default_route_hop_count; - let mut stability = Stability::default(); - let mut sequencing = Sequencing::default(); - for x in text.split(",") { - let x = x.trim(); - if let Some(pr) = get_route_id(rss.clone(), true, false)(x) { - preferred_route = Some(pr) - } - if let Some(n) = get_number(x) { - hop_count = n; - } - if let Some(s) = get_stability(x) { - stability = s; - } - if let Some(s) = get_sequencing(x) { - sequencing = s; +fn get_safety_selection(routing_table: RoutingTable) -> impl Fn(&str) -> Option { + move |text| { + let rss = routing_table.route_spec_store(); + let default_route_hop_count = + routing_table.with_config(|c| c.network.rpc.default_route_hop_count as usize); + + if text.len() != 0 && &text[0..1] == "-" { + // Unsafe + let text = &text[1..]; + let seq = get_sequencing(text).unwrap_or_default(); + Some(SafetySelection::Unsafe(seq)) + } else { + // Safe + let mut preferred_route = None; + let mut hop_count = default_route_hop_count; + let mut stability = Stability::default(); + let mut sequencing = Sequencing::default(); + for x in text.split(",") { + let x = x.trim(); + if let Some(pr) = get_route_id(rss.clone(), true, false)(x) { + preferred_route = Some(pr) + } + if let Some(n) = get_number(x) { + hop_count = n; + } + if let Some(s) = get_stability(x) { + stability = s; + } + if let Some(s) = get_sequencing(x) { + sequencing = s; + } } + let ss = SafetySpec { + preferred_route, + hop_count, + stability, + sequencing, + }; + Some(SafetySelection::Safe(ss)) } - let ss = SafetySpec { - preferred_route, - hop_count, - stability, - sequencing, - }; - Some(SafetySelection::Safe(ss)) } } @@ -150,7 +169,7 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option Option { fn get_public_key(text: &str) -> Option { PublicKey::from_str(text).ok() } +fn get_keypair(text: &str) -> Option { + KeyPair::from_str(text).ok() +} fn get_crypto_system_version(crypto: Crypto) -> impl FnOnce(&str) -> Option { move |text| { @@ -249,7 +271,7 @@ fn get_dht_key( move |text| { // Safety selection let (text, ss) = if let Some((first, second)) = text.split_once('+') { - let ss = get_safety_selection(second, routing_table.clone())?; + let ss = get_safety_selection(routing_table.clone())(second)?; (first, Some(ss)) } else { (text, None) @@ -389,25 +411,32 @@ fn get_debug_argument_at Option>( Ok(val) } -fn print_data_truncated(data: &[u8]) -> String { +pub fn print_data(data: &[u8], truncate_len: Option) -> String { // check is message body is ascii printable let mut printable = true; for c in data { if *c < 32 || *c > 126 { printable = false; + break; } } - let (data, truncated) = if data.len() > 64 { + let (data, truncated) = if truncate_len.is_some() && data.len() > truncate_len.unwrap() { (&data[0..64], true) } else { (&data[..], false) }; let strdata = if printable { - format!("\"{}\"", String::from_utf8_lossy(&data).to_string()) + format!("{}", String::from_utf8_lossy(&data).to_string()) } else { - hex::encode(data) + let sw = shell_words::quote(&String::from_utf8_lossy(&data).to_string()).to_string(); + let h = hex::encode(data); + if h.len() < sw.len() { + h + } else { + sw + } }; if truncated { format!("{}...", strdata) @@ -416,14 +445,6 @@ fn print_data_truncated(data: &[u8]) -> String { } } -fn print_value_data(value_data: ValueData) -> String { - format!( - "ValueData {{\n seq: {},\n writer: {},\n data: {}\n}}\n", - value_data.seq(), - value_data.writer(), - print_data_truncated(value_data.data()) - ) -} impl VeilidAPI { async fn debug_buckets(&self, args: String) -> VeilidAPIResult { let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); @@ -1010,6 +1031,62 @@ impl VeilidAPI { }; return Ok(out); } + + async fn debug_record_create(&self, args: Vec) -> VeilidAPIResult { + let netman = self.network_manager()?; + let routing_table = netman.routing_table(); + let crypto = self.crypto()?; + + let csv = get_debug_argument_at( + &args, + 1, + "debug_record_create", + "kind", + get_crypto_system_version(crypto.clone()), + ) + .unwrap_or_else(|_| crypto.best()); + let schema = get_debug_argument_at( + &args, + 2, + "debug_record_create", + "dht_schema", + get_dht_schema, + ) + .unwrap_or_else(|_| DHTSchema::dflt(1)); + let ss = get_debug_argument_at( + &args, + 3, + "debug_record_create", + "safety_selection", + get_safety_selection(routing_table), + ) + .ok(); + + // Get routing context with optional privacy + let rc = self.routing_context(); + let rc = if let Some(ss) = ss { + let rcp = match rc.with_custom_privacy(ss) { + Err(e) => return Ok(format!("Can't use safety selection: {}", e)), + Ok(v) => v, + }; + rcp + } else { + rc + }; + + // Do a record get + let record = match rc.create_dht_record(csv.kind(), schema).await { + Err(e) => return Ok(format!("Can't open DHT record: {}", e)), + Ok(v) => v, + }; + match rc.close_dht_record(*record.key()).await { + Err(e) => return Ok(format!("Can't close DHT record: {}", e)), + Ok(v) => v, + }; + debug!("DHT Record Created:\n{:#?}", record); + return Ok(format!("{:?}", record)); + } + async fn debug_record_get(&self, args: Vec) -> VeilidAPIResult { let netman = self.network_manager()?; let routing_table = netman.routing_table(); @@ -1080,7 +1157,66 @@ impl VeilidAPI { Ok(v) => v, }; let out = if let Some(value) = value { - print_value_data(value) + format!("{:?}", value) + } else { + "No value data returned".to_owned() + }; + match rc.close_dht_record(key).await { + Err(e) => return Ok(format!("Can't close DHT record: {}", e)), + Ok(v) => v, + }; + return Ok(out); + } + + async fn debug_record_set(&self, args: Vec) -> VeilidAPIResult { + let netman = self.network_manager()?; + let routing_table = netman.routing_table(); + + let (key, ss) = get_debug_argument_at( + &args, + 1, + "debug_record_set", + "key", + get_dht_key(routing_table), + )?; + let subkey = get_debug_argument_at(&args, 2, "debug_record_set", "subkey", get_number)?; + let writer = get_debug_argument_at(&args, 3, "debug_record_set", "writer", get_keypair)?; + let data = get_debug_argument_at(&args, 4, "debug_record_set", "data", get_data)?; + + // Get routing context with optional privacy + let rc = self.routing_context(); + let rc = if let Some(ss) = ss { + let rcp = match rc.with_custom_privacy(ss) { + Err(e) => return Ok(format!("Can't use safety selection: {}", e)), + Ok(v) => v, + }; + rcp + } else { + rc + }; + + // Do a record get + let _record = match rc.open_dht_record(key, Some(writer)).await { + Err(e) => return Ok(format!("Can't open DHT record: {}", e)), + Ok(v) => v, + }; + let value = match rc.set_dht_value(key, subkey as ValueSubkey, data).await { + Err(e) => { + match rc.close_dht_record(key).await { + Err(e) => { + return Ok(format!( + "Can't set DHT value and can't close DHT record: {}", + e + )) + } + Ok(v) => v, + }; + return Ok(format!("Can't set DHT value: {}", e)); + } + Ok(v) => v, + }; + let out = if let Some(value) = value { + format!("{:?}", value) } else { "No value data returned".to_owned() }; @@ -1104,46 +1240,35 @@ impl VeilidAPI { } async fn debug_record_info(&self, args: Vec) -> VeilidAPIResult { - let netman = self.network_manager()?; - let routing_table = netman.routing_table(); + let storage_manager = self.storage_manager()?; - let (key, ss) = get_debug_argument_at( - &args, - 1, - "debug_record_info", - "key", - get_dht_key(routing_table), - )?; + let key = get_debug_argument_at(&args, 1, "debug_record_info", "key", get_typed_key)?; - // Get routing context with optional privacy - let rc = self.routing_context(); - let rc = if let Some(ss) = ss { - let rcp = match rc.with_custom_privacy(ss) { - Err(e) => return Ok(format!("Can't use safety selection: {}", e)), - Ok(v) => v, - }; - rcp + let subkey = + get_debug_argument_at(&args, 2, "debug_record_info", "subkey", get_number).ok(); + + let out = if let Some(subkey) = subkey { + let li = storage_manager + .debug_local_record_subkey_info(key, subkey as ValueSubkey) + .await; + let ri = storage_manager + .debug_remote_record_subkey_info(key, subkey as ValueSubkey) + .await; + format!( + "Local Subkey Info:\n{}\n\nRemote Subkey Info:\n{}\n", + li, ri + ) } else { - rc - }; - - // Do a record get - let record = match rc.open_dht_record(key, None).await { - Err(e) => return Ok(format!("Can't open DHT record: {}", e)), - Ok(v) => v, - }; - - let out = format!("{:#?}", record); - - match rc.close_dht_record(key).await { - Err(e) => return Ok(format!("Can't close DHT record: {}", e)), - Ok(v) => v, + let li = storage_manager.debug_local_record_info(key).await; + let ri = storage_manager.debug_remote_record_info(key).await; + format!("Local Info:\n{}\n\nRemote Info:\n{}\n", li, ri) }; return Ok(out); } async fn debug_record(&self, args: String) -> VeilidAPIResult { - let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); + let args: Vec = + shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?; let command = get_debug_argument_at(&args, 0, "debug_record", "command", get_string)?; @@ -1151,8 +1276,12 @@ impl VeilidAPI { self.debug_record_list(args).await } else if command == "purge" { self.debug_record_purge(args).await + } else if command == "create" { + self.debug_record_create(args).await } else if command == "get" { self.debug_record_get(args).await + } else if command == "set" { + self.debug_record_set(args).await } else if command == "delete" { self.debug_record_delete(args).await } else if command == "info" { @@ -1187,9 +1316,11 @@ route allocate [ord|*ord] [rel] [] [in|out] test record list purge [bytes] + create + set [+] get [+] [force] delete - info + info [subkey] -------------------------------------------------------------------- is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4 * also , , , @@ -1205,10 +1336,16 @@ record list is: udp|tcp|ws|wss is: ipv4|ipv6 is: public|local + is: VLD0 + is: a json dht schema, default is '{"kind":"DFLT","o_cnt":1}' is: a number: 2 is: * a number: 2 * a comma-separated inclusive range list: 1..=3,5..=8 + is: + * a single-word string: foobar + * a shell-quoted string: "foo\nbar\n" + * a '#' followed by hex data: #12AB34CD... "# .to_owned()) } diff --git a/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs b/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs index 6162fd9f..6bbbe597 100644 --- a/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs +++ b/veilid-core/src/veilid_api/types/dht/dht_record_descriptor.rs @@ -46,6 +46,9 @@ impl DHTRecordDescriptor { } } + pub fn key(&self) -> &TypedKey { + &self.key + } pub fn owner(&self) -> &PublicKey { &self.owner } diff --git a/veilid-core/src/veilid_api/types/dht/value_data.rs b/veilid-core/src/veilid_api/types/dht/value_data.rs index c57fc2e5..b71ba9fc 100644 --- a/veilid-core/src/veilid_api/types/dht/value_data.rs +++ b/veilid-core/src/veilid_api/types/dht/value_data.rs @@ -2,7 +2,6 @@ use super::*; #[derive( Clone, - Debug, Default, PartialOrd, PartialEq, @@ -61,3 +60,13 @@ impl ValueData { mem::size_of::() + self.data.len() } } + +impl fmt::Debug for ValueData { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ValueData") + .field("seq", &self.seq) + .field("data", &print_data(&self.data, None)) + .field("writer", &self.writer) + .finish() + } +} diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 36a2f1c6..79dc9bd1 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -193,18 +193,12 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): assert rec.schema.o_cnt == 2 # Verify subkey 1 can NOT be set because we have the wrong writer - vdtemp = await rec.set_dht_value(key, 1, va) - assert vdtemp != None - assert vdtemp.data == vc - assert vdtemp.seq == 2 - assert vdtemp.writer == owner - + with pytest.raises(veilid.VeilidAPIError): + vdtemp = await rc.set_dht_value(key, 1, va) + # Verify subkey 0 can NOT be set because we have the wrong writer - vdtemp = await rec.set_dht_value(key, 0, va) - assert vdtemp != None - assert vdtemp.data == vb - assert vdtemp.seq == 1 - assert vdtemp.writer == owner + with pytest.raises(veilid.VeilidAPIError): + vdtemp = await rc.set_dht_value(key, 0, va) # Clean up await rc.close_dht_record(key)