From 438553a1ca015df70d6a9202daa7912155d245b8 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 22 Apr 2023 17:53:53 -0400 Subject: [PATCH] refactor --- veilid-core/proto/veilid.capnp | 15 +- veilid-core/src/rpc_processor/coders/mod.rs | 4 +- .../coders/operations/operation_app_call.rs | 73 +++--- .../operations/operation_app_message.rs | 33 +-- .../operations/operation_cancel_tunnel.rs | 31 +-- .../operations/operation_complete_tunnel.rs | 49 ++-- .../coders/operations/operation_find_block.rs | 67 ++--- .../coders/operations/operation_find_node.rs | 42 +-- .../coders/operations/operation_get_value.rs | 240 ++++++++++------- .../operations/operation_return_receipt.rs | 29 ++- .../coders/operations/operation_route.rs | 73 ++++-- .../coders/operations/operation_set_value.rs | 242 ++++++++++++++---- .../coders/operations/operation_signal.rs | 19 +- .../operations/operation_start_tunnel.rs | 55 +++- .../coders/operations/operation_status.rs | 48 +++- .../operations/operation_supply_block.rs | 111 +++++--- .../operation_validate_dial_info.rs | 56 +++- .../operations/operation_value_changed.rs | 52 +++- .../operations/operation_watch_value.rs | 157 +++++++++++- .../coders/operations/question.rs | 2 +- .../coders/operations/respond_to.rs | 6 +- .../src/rpc_processor/coders/value_data.rs | 30 +++ veilid-core/src/rpc_processor/mod.rs | 16 +- .../src/rpc_processor/rpc_get_value.rs | 2 +- veilid-core/src/rpc_processor/rpc_signal.rs | 3 +- .../rpc_processor/rpc_validate_dial_info.rs | 11 +- veilid-core/src/storage_manager/mod.rs | 29 ++- veilid-core/src/storage_manager/types/mod.rs | 2 - 28 files changed, 1061 insertions(+), 436 deletions(-) create mode 100644 veilid-core/src/rpc_processor/coders/value_data.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index f3ec1288..344a5128 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -348,14 +348,14 @@ struct OperationGetValueQ @0xf88a5b6da5eda5d0 { struct OperationGetValueA @0xd896bb46f2e0249f { value @0 :SignedValueData; # optional: the value if successful, or if unset, no value returned peers @1 :List(PeerInfo); # returned 'closer peer' information on either success or failure - descriptor @2 :SignedValueDescriptor # optional: the descriptor if requested + descriptor @2 :SignedValueDescriptor; # optional: the descriptor if requested if the value is also returned } struct OperationSetValueQ @0xbac06191ff8bdbc5 { key @0 :TypedKey; # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] subkey @1 :Subkey; # the index of the subkey value @2 :SignedValueData; # value or subvalue contents (older or equal seq number gets dropped) - descriptor @3 :SignedValueDescriptor # optional: the descriptor if needed + descriptor @3 :SignedValueDescriptor; # optional: the descriptor if needed } struct OperationSetValueA @0x9378d0732dc95be2 { @@ -366,10 +366,11 @@ struct OperationSetValueA @0x9378d0732dc95be2 { struct OperationWatchValueQ @0xf9a5a6c547b9b228 { key @0 :TypedKey; # key for value to watch - subkeys @1 :List(SubkeyRange); # subkey range to watch, if empty, watch everything + subkeys @1 :List(SubkeyRange); # subkey range to watch (up to 512 subranges), if empty, watch everything expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max) count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous) - signature @4 :Signature; # signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count + watcher @4 :PublicKey; # the watcher performing the watch, can be the owner or a schema member + signature @5 :Signature; # signature of the watcher, must be one of the schema members or the key owner. signature covers: key, subkeys, expiration, count } struct OperationWatchValueA @0xa726cab7064ba893 { @@ -389,10 +390,8 @@ struct OperationSupplyBlockQ @0xadbf4c542d749971 { } struct OperationSupplyBlockA @0xf003822e83b5c0d7 { - union { - expiration @0 :UInt64; # when the block supplier entry will need to be refreshed - peers @1 :List(PeerInfo); # returned 'closer peer' information if not successful - } + expiration @0 :UInt64; # when the block supplier entry will need to be refreshed, or 0 if not successful + peers @1 :List(PeerInfo); # returned 'closer peer' information if not successful } struct OperationFindBlockQ @0xaf4353ff004c7156 { diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 0d833dd9..6ad58903 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -25,7 +25,6 @@ mod socket_address; mod tunnel; mod typed_key; mod typed_signature; -mod value_detail; xxx eliminate value_detail pub use address::*; pub use address_type_set::*; @@ -54,7 +53,6 @@ pub use socket_address::*; pub use tunnel::*; pub use typed_key::*; pub use typed_signature::*; -pub use value_detail::*; use super::*; @@ -66,10 +64,12 @@ pub struct DecodeContext { #[derive(Debug, Clone)] pub enum QuestionContext { GetValue(ValidateGetValueContext), + SetValue(ValidateSetValueContext), } #[derive(Clone)] pub struct RPCValidateContext { crypto: Crypto, + rpc_processor: RPCProcessor, question_context: Option, } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs index 7f2d98e0..ae84a767 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_call.rs @@ -9,30 +9,15 @@ pub struct RPCOperationAppCallQ { } impl RPCOperationAppCallQ { - pub fn new(message: &[u8]) -> Result { + pub fn new(message: Vec) -> Result { if message.len() > MAX_APP_CALL_Q_MESSAGE_LEN { return Err(RPCError::protocol("AppCallQ message too long to set")); } - Ok(Self { - message: message.to_vec(), - }) + Ok(Self { message }) } pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_app_call_q::Reader, - ) -> Result { - let mr = reader.get_message().map_err(RPCError::protocol)?; - RPCOperationAppCallQ::new(mr) - } - pub fn encode( - &self, - builder: &mut veilid_capnp::operation_app_call_q::Builder, - ) -> Result<(), RPCError> { - builder.set_message(&self.message); - Ok(()) - } pub fn message(&self) -> &[u8] { &self.message @@ -41,6 +26,23 @@ impl RPCOperationAppCallQ { pub fn destructure(self) -> Vec { self.message } + + pub fn decode(reader: &veilid_capnp::operation_app_call_q::Reader) -> Result { + let mr = reader.get_message().map_err(RPCError::protocol)?; + if mr.len() > MAX_APP_CALL_Q_MESSAGE_LEN { + return Err(RPCError::protocol("AppCallQ message too long to set")); + } + Ok(Self { + message: mr.to_vec(), + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_app_call_q::Builder, + ) -> Result<(), RPCError> { + builder.set_message(&self.message); + Ok(()) + } } #[derive(Debug, Clone)] @@ -49,31 +51,16 @@ pub struct RPCOperationAppCallA { } impl RPCOperationAppCallA { - pub fn new(message: &[u8]) -> Result { + pub fn new(message: Vec) -> Result { if message.len() > MAX_APP_CALL_A_MESSAGE_LEN { return Err(RPCError::protocol("AppCallA message too long to set")); } - Ok(Self { - message: message.to_vec(), - }) + Ok(Self { message }) } pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_app_call_a::Reader, - ) -> Result { - let mr = reader.get_message().map_err(RPCError::protocol)?; - RPCOperationAppCallA::new(mr) - } - pub fn encode( - &self, - builder: &mut veilid_capnp::operation_app_call_a::Builder, - ) -> Result<(), RPCError> { - builder.set_message(&self.message); - Ok(()) - } pub fn message(&self) -> &[u8] { &self.message @@ -82,4 +69,22 @@ impl RPCOperationAppCallA { pub fn destructure(self) -> Vec { self.message } + + pub fn decode(reader: &veilid_capnp::operation_app_call_a::Reader) -> Result { + let mr = reader.get_message().map_err(RPCError::protocol)?; + if mr.len() > MAX_APP_CALL_A_MESSAGE_LEN { + return Err(RPCError::protocol("AppCallA message too long to set")); + } + Ok(Self { + message: mr.to_vec(), + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_app_call_a::Builder, + ) -> Result<(), RPCError> { + builder.set_message(&self.message); + Ok(()) + } + } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs b/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs index 17612cfc..a7456d63 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_app_message.rs @@ -8,23 +8,32 @@ pub struct RPCOperationAppMessage { } impl RPCOperationAppMessage { - pub fn new(message: &[u8]) -> Result { + pub fn new(message: Vec) -> Result { if message.len() > MAX_APP_MESSAGE_MESSAGE_LEN { return Err(RPCError::protocol("AppMessage message too long to set")); } - Ok(Self { - message: message.to_vec(), - }) + Ok(Self { message }) } pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_app_message::Reader, - ) -> Result { + + pub fn message(&self) -> &[u8] { + &self.message + } + pub fn destructure(self) -> Vec { + self.message + } + + pub fn decode(reader: &veilid_capnp::operation_app_message::Reader) -> Result { let mr = reader.get_message().map_err(RPCError::protocol)?; - RPCOperationAppMessage::new(mr) + if mr.len() > MAX_APP_MESSAGE_MESSAGE_LEN { + return Err(RPCError::protocol("AppMessage message too long to set")); + } + Ok(Self { + message: mr.to_vec(), + }) } pub fn encode( &self, @@ -33,12 +42,4 @@ impl RPCOperationAppMessage { builder.set_message(&self.message); Ok(()) } - - pub fn message(&self) -> &[u8] { - &self.message - } - - pub fn destructure(self) -> Vec { - self.message - } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs index 1a892676..4bb6ac06 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs @@ -12,12 +12,20 @@ impl RPCOperationCancelTunnelQ { pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } + + pub fn id(&self) -> TunnelId { + self.id + } + + pub fn destructure(self) -> TunnelId { + self.id + } + pub fn decode( reader: &veilid_capnp::operation_cancel_tunnel_q::Reader, - ) -> Result { + ) -> Result { let id = TunnelId::new(reader.get_id()); - - Ok(RPCOperationCancelTunnelQ { id }) + Ok(Self { id }) } pub fn encode( &self, @@ -27,13 +35,6 @@ impl RPCOperationCancelTunnelQ { Ok(()) } - pub fn id(&self) -> TunnelId { - self.id - } - - pub fn destructure(self) -> TunnelId { - self.id - } } #[derive(Debug, Clone)] @@ -54,14 +55,14 @@ impl RPCOperationCancelTunnelA { } pub fn decode( reader: &veilid_capnp::operation_cancel_tunnel_a::Reader, - ) -> Result { + ) -> Result { match reader.which().map_err(RPCError::protocol)? { veilid_capnp::operation_cancel_tunnel_a::Which::Tunnel(r) => { - Ok(RPCOperationCancelTunnelA::Tunnel(TunnelId::new(r))) + Ok(Self::Tunnel(TunnelId::new(r))) } veilid_capnp::operation_cancel_tunnel_a::Which::Error(r) => { let tunnel_error = decode_tunnel_error(r.map_err(RPCError::protocol)?); - Ok(RPCOperationCancelTunnelA::Error(tunnel_error)) + Ok(Self::Error(tunnel_error)) } } } @@ -70,10 +71,10 @@ impl RPCOperationCancelTunnelA { builder: &mut veilid_capnp::operation_cancel_tunnel_a::Builder, ) -> Result<(), RPCError> { match self { - RPCOperationCancelTunnelA::Tunnel(p) => { + Self::Tunnel(p) => { builder.set_tunnel(p.as_u64()); } - RPCOperationCancelTunnelA::Error(e) => { + Self::Error(e) => { builder.set_error(encode_tunnel_error(*e)); } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs index 8ec27b59..46b0258a 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs @@ -20,9 +20,27 @@ impl RPCOperationCompleteTunnelQ { pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } + + pub fn id(&self) -> TunnelId { + self.id + } + + pub fn local_mode(&self) -> TunnelMode { + self.local_mode + } + pub fn depth(&self) -> u8 { + self.depth + } + pub fn endpoint(&self) -> &TunnelEndpoint { + &self.endpoint + } + pub fn destructure(self) -> (TunnelId, TunnelMode, u8, TunnelEndpoint) { + (self.id, self.local_mode, self.depth, self.endpoint) + } + pub fn decode( reader: &veilid_capnp::operation_complete_tunnel_q::Reader, - ) -> Result { + ) -> Result { let id = TunnelId::new(reader.get_id()); let local_mode = match reader.get_local_mode().map_err(RPCError::protocol)? { veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, @@ -32,7 +50,7 @@ impl RPCOperationCompleteTunnelQ { let te_reader = reader.get_endpoint().map_err(RPCError::protocol)?; let endpoint = decode_tunnel_endpoint(&te_reader)?; - Ok(RPCOperationCompleteTunnelQ { + Ok(Self { id, local_mode, depth, @@ -54,23 +72,6 @@ impl RPCOperationCompleteTunnelQ { Ok(()) } - pub fn id(&self) -> TunnelId { - self.id - } - - pub fn local_mode(&self) -> TunnelMode { - self.local_mode - } - pub fn depth(&self) -> u8 { - self.depth - } - pub fn endpoint(&self) -> &TunnelEndpoint { - &self.endpoint - } - - pub fn destructure(self) -> (TunnelId, TunnelMode, u8, TunnelEndpoint) { - (self.id, self.local_mode, self.depth, self.endpoint) - } } #[derive(Debug, Clone)] @@ -92,16 +93,16 @@ impl RPCOperationCompleteTunnelA { pub fn decode( reader: &veilid_capnp::operation_complete_tunnel_a::Reader, - ) -> Result { + ) -> Result { match reader.which().map_err(RPCError::protocol)? { veilid_capnp::operation_complete_tunnel_a::Which::Tunnel(r) => { let ft_reader = r.map_err(RPCError::protocol)?; let full_tunnel = decode_full_tunnel(&ft_reader)?; - Ok(RPCOperationCompleteTunnelA::Tunnel(full_tunnel)) + Ok(Self::Tunnel(full_tunnel)) } veilid_capnp::operation_complete_tunnel_a::Which::Error(r) => { let tunnel_error = decode_tunnel_error(r.map_err(RPCError::protocol)?); - Ok(RPCOperationCompleteTunnelA::Error(tunnel_error)) + Ok(Self::Error(tunnel_error)) } } } @@ -110,10 +111,10 @@ impl RPCOperationCompleteTunnelA { builder: &mut veilid_capnp::operation_complete_tunnel_a::Builder, ) -> Result<(), RPCError> { match self { - RPCOperationCompleteTunnelA::Tunnel(p) => { + Self::Tunnel(p) => { encode_full_tunnel(p, &mut builder.reborrow().init_tunnel())?; } - RPCOperationCompleteTunnelA::Error(e) => { + Self::Error(e) => { builder.set_error(encode_tunnel_error(*e)); } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs index 80a2075e..b6897558 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_block.rs @@ -16,13 +16,22 @@ impl RPCOperationFindBlockQ { pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } + + pub fn block_id(&self) -> TypedKey { + self.block_id + } + + pub fn destructure(self) -> TypedKey { + self.block_id + } + pub fn decode( reader: &veilid_capnp::operation_find_block_q::Reader, ) -> Result { let bi_reader = reader.get_block_id().map_err(RPCError::protocol)?; let block_id = decode_typed_key(&bi_reader)?; - Ok(RPCOperationFindBlockQ::new(block_id)) + Ok(Self { block_id }) } pub fn encode( &self, @@ -33,14 +42,6 @@ impl RPCOperationFindBlockQ { Ok(()) } - - pub fn block_id(&self) -> TypedKey { - self.block_id - } - - pub fn destructure(self) -> TypedKey { - self.block_id - } } #[derive(Debug, Clone)] @@ -52,7 +53,7 @@ pub struct RPCOperationFindBlockA { impl RPCOperationFindBlockA { pub fn new( - data: &[u8], + data: Vec, suppliers: Vec, peers: Vec, ) -> Result { @@ -67,7 +68,7 @@ impl RPCOperationFindBlockA { } Ok(Self { - data: data.to_vec(), + data, suppliers, peers, }) @@ -78,19 +79,31 @@ impl RPCOperationFindBlockA { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_find_block_a::Reader, - ) -> Result { - let data = reader.get_data().map_err(RPCError::protocol)?; - let suppliers_reader = reader.get_suppliers().map_err(RPCError::protocol)?; - let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; + pub fn data(&self) -> &[u8] { + &self.data + } + pub fn suppliers(&self) -> &[PeerInfo] { + &self.suppliers + } + pub fn peers(&self) -> &[PeerInfo] { + &self.peers + } + pub fn destructure(self) -> (Vec, Vec, Vec) { + (self.data, self.suppliers, self.peers) + } + pub fn decode(reader: &veilid_capnp::operation_find_block_a::Reader) -> Result { + let data = reader.get_data().map_err(RPCError::protocol)?; if data.len() > MAX_FIND_BLOCK_A_DATA_LEN { return Err(RPCError::protocol("find block data length too long")); } + + let suppliers_reader = reader.get_suppliers().map_err(RPCError::protocol)?; if suppliers_reader.len() as usize > MAX_FIND_BLOCK_A_SUPPLIERS_LEN { return Err(RPCError::protocol("find block suppliers length too long")); } + + let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; if peers_reader.len() as usize > MAX_FIND_BLOCK_A_PEERS_LEN { return Err(RPCError::protocol("find block peers length too long")); } @@ -117,7 +130,11 @@ impl RPCOperationFindBlockA { peers.push(peer_info); } - RPCOperationFindBlockA::new(data, suppliers, peers) + Ok(Self { + data: data.to_vec(), + suppliers, + peers, + }) } pub fn encode( @@ -150,18 +167,4 @@ impl RPCOperationFindBlockA { Ok(()) } - - pub fn data(&self) -> &[u8] { - &self.data - } - pub fn suppliers(&self) -> &[PeerInfo] { - &self.suppliers - } - pub fn peers(&self) -> &[PeerInfo] { - &self.peers - } - - pub fn destructure(self) -> (Vec, Vec, Vec) { - (self.data, self.suppliers, self.peers) - } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs index 3b5352d2..046cb8dc 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs @@ -14,12 +14,19 @@ impl RPCOperationFindNodeQ { pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_find_node_q::Reader, - ) -> Result { + + pub fn node_id(&self) -> &TypedKey { + &self.node_id + } + + pub fn destructure(self) -> TypedKey { + self.node_id + } + + pub fn decode(reader: &veilid_capnp::operation_find_node_q::Reader) -> Result { let ni_reader = reader.get_node_id().map_err(RPCError::protocol)?; let node_id = decode_typed_key(&ni_reader)?; - Ok(RPCOperationFindNodeQ { node_id }) + Ok(Self { node_id }) } pub fn encode( &self, @@ -29,14 +36,6 @@ impl RPCOperationFindNodeQ { encode_typed_key(&self.node_id, &mut ni_builder); Ok(()) } - - pub fn node_id(&self) -> &TypedKey { - &self.node_id - } - - pub fn destructure(self) -> TypedKey { - self.node_id - } } #[derive(Debug, Clone)] @@ -57,6 +56,15 @@ impl RPCOperationFindNodeA { PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); Ok(()) } + + pub fn peers(&self) -> &[PeerInfo] { + &self.peers + } + + pub fn destructure(self) -> Vec { + self.peers + } + pub fn decode( reader: &veilid_capnp::operation_find_node_a::Reader, ) -> Result { @@ -77,7 +85,7 @@ impl RPCOperationFindNodeA { peers.push(peer_info); } - RPCOperationFindNodeA::new(peers) + Ok(Self { peers }) } pub fn encode( &self, @@ -95,12 +103,4 @@ impl RPCOperationFindNodeA { } Ok(()) } - - pub fn peers(&self) -> &[PeerInfo] { - &self.peers - } - - pub fn destructure(self) -> Vec { - self.peers - } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index c1c6c987..5ff3edb5 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -1,5 +1,5 @@ use super::*; -use crate::storage_manager::{SignedValueDescriptor, ValueDetail}; +use crate::storage_manager::{SignedValueData, SignedValueDescriptor}; const MAX_GET_VALUE_A_PEERS_LEN: usize = 20; @@ -39,14 +39,25 @@ impl RPCOperationGetValueQ { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_get_value_q::Reader, - ) -> Result { + pub fn key(&self) -> &TypedKey { + &self.key + } + pub fn subkey(&self) -> ValueSubkey { + self.subkey + } + pub fn want_descriptor(&self) -> bool { + self.want_descriptor + } + pub fn destructure(self) -> (TypedKey, ValueSubkey, bool) { + (self.key, self.subkey, self.want_descriptor) + } + + pub fn decode(reader: &veilid_capnp::operation_get_value_q::Reader) -> Result { let k_reader = reader.reborrow().get_key().map_err(RPCError::protocol)?; let key = decode_typed_key(&k_reader)?; let subkey = reader.reborrow().get_subkey(); let want_descriptor = reader.reborrow().get_want_descriptor(); - Ok(RPCOperationGetValueQ { + Ok(Self { key, subkey, want_descriptor, @@ -62,110 +73,167 @@ impl RPCOperationGetValueQ { builder.set_want_descriptor(self.want_descriptor); Ok(()) } - pub fn key(&self) -> &TypedKey { - &self.key - } - pub fn subkey(&self) -> ValueSubkey { - self.subkey - } - pub fn want_descriptor(&self) -> bool { - self.want_descriptor - } - pub fn destructure(self) -> (TypedKey, ValueSubkey, bool) { - (self.key, self.subkey, self.want_descriptor) - } } #[derive(Debug, Clone)] -pub enum RPCOperationGetValueA { - Value(ValueDetail), - Peers(Vec), +pub struct RPCOperationGetValueA { + value: Option, + peers: Vec, + descriptor: Option, } impl RPCOperationGetValueA { - pub fn new_value(value: ValueDetail) -> Self { - Self::Value(value) - } - pub fn new_peers(peers: Vec) -> Result { + pub fn new( + value: Option, + peers: Vec, + descriptor: Option, + ) -> Result { if peers.len() > MAX_GET_VALUE_A_PEERS_LEN { return Err(RPCError::protocol("GetValueA peers length too long")); } - Ok(Self::Peers(peers)) + if descriptor.is_some() && !value.is_some() { + return Err(RPCError::protocol( + "GetValueA should not return descriptor without value", + )); + } + Ok(Self { + value, + peers, + descriptor, + }) } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { - match self { - RPCOperationGetValueA::Value(value_detail) => { - let question_context = validate_context - .question_context - .as_ref() - .expect("GetValueA requires question context"); - let QuestionContext::GetValue(get_value_context) = question_context else { - panic!("Wrong context type for GetValueA"); + let question_context = validate_context + .question_context + .as_ref() + .expect("GetValueA requires question context"); + let QuestionContext::GetValue(get_value_context) = question_context else { + panic!("Wrong context type for GetValueA"); + }; + + if let Some(value) = &self.value { + // Get descriptor to validate with + let descriptor = if let Some(descriptor) = &self.descriptor { + if let Some(last_descriptor) = &get_value_context.last_descriptor { + if descriptor.cmp_no_sig(last_descriptor) != cmp::Ordering::Equal { + return Err(RPCError::protocol( + "getvalue descriptor does not match last descriptor", + )); + } + } + descriptor + } else { + let Some(descriptor) = &get_value_context.last_descriptor else { + return Err(RPCError::protocol( + "no last descriptor, requires a descriptor", + )); }; - value_detail - .validate( - get_value_context.last_descriptor.as_ref(), - get_value_context.subkey, - get_value_context.vcrypto.clone(), - ) - .map_err(RPCError::protocol) - } - RPCOperationGetValueA::Peers(peers) => { - PeerInfo::validate_vec(peers, validate_context.crypto.clone()); - Ok(()) + descriptor + }; + // Ensure the descriptor itself validates + descriptor + .validate(get_value_context.vcrypto.clone()) + .map_err(RPCError::protocol)?; + + // And the signed value data + value + .validate( + descriptor.owner(), + get_value_context.subkey, + get_value_context.vcrypto.clone(), + ) + .map_err(RPCError::protocol)?; + } else { + // No value, should not have descriptor + if self.descriptor.is_some() { + return Err(RPCError::protocol("descriptor returned without a value")); } } + + PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_get_value_a::Reader, - ) -> Result { - match reader.which().map_err(RPCError::protocol)? { - veilid_capnp::operation_get_value_a::Which::Value(r) => { - let value_detail = decode_value_detail(&r.map_err(RPCError::protocol)?)?; - Ok(RPCOperationGetValueA::Value(value_detail)) - } - veilid_capnp::operation_get_value_a::Which::Peers(r) => { - let peers_reader = r.map_err(RPCError::protocol)?; - if peers_reader.len() as usize > MAX_GET_VALUE_A_PEERS_LEN { - return Err(RPCError::protocol("GetValueA peers length too long")); - } - let mut peers = Vec::::with_capacity( - peers_reader - .len() - .try_into() - .map_err(RPCError::map_internal("too many peers"))?, - ); - for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p)?; - peers.push(peer_info); - } + pub fn value(&self) -> Option<&SignedValueData> { + self.value.as_ref() + } + pub fn peers(&self) -> &[PeerInfo] { + &self.peers + } + pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { + self.descriptor.as_ref() + } + pub fn destructure( + self, + ) -> ( + Option, + Vec, + Option, + ) { + (self.value, self.peers, self.descriptor) + } - Ok(RPCOperationGetValueA::Peers(peers)) - } + pub fn decode(reader: &veilid_capnp::operation_get_value_a::Reader) -> Result { + let value = if reader.has_value() { + let value_reader = reader.get_value().map_err(RPCError::protocol)?; + let value = decode_signed_value_data(&value_reader)?; + Some(value) + } else { + None + }; + + let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; + if peers_reader.len() as usize > MAX_GET_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol("GetValueA peers length too long")); } + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(RPCError::map_internal("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p)?; + peers.push(peer_info); + } + + let descriptor = if reader.has_descriptor() { + let d_reader = reader.get_descriptor().map_err(RPCError::protocol)?; + let descriptor = decode_signed_value_descriptor(&d_reader)?; + Some(descriptor) + } else { + None + }; + + Ok(Self { + value, + peers, + descriptor, + }) } pub fn encode( &self, builder: &mut veilid_capnp::operation_get_value_a::Builder, ) -> Result<(), RPCError> { - match self { - RPCOperationGetValueA::Value(value_detail) => { - let mut d_builder = builder.reborrow().init_value(); - encode_value_detail(&value_detail, &mut d_builder)?; - } - RPCOperationGetValueA::Peers(peers) => { - let mut peers_builder = builder.reborrow().init_peers( - peers - .len() - .try_into() - .map_err(RPCError::map_internal("invalid peers list length"))?, - ); - for (i, peer) in peers.iter().enumerate() { - let mut pi_builder = peers_builder.reborrow().get(i as u32); - encode_peer_info(peer, &mut pi_builder)?; - } - } + if let Some(value) = &self.value { + let mut v_builder = builder.reborrow().init_value(); + encode_signed_value_data(value, &mut v_builder)?; + } + + let mut peers_builder = builder.reborrow().init_peers( + self.peers + .len() + .try_into() + .map_err(RPCError::map_internal("invalid peers list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; + } + + if let Some(descriptor) = &self.descriptor { + let mut d_builder = builder.reborrow().init_descriptor(); + encode_signed_value_descriptor(descriptor, &mut d_builder)?; } Ok(()) diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs index 3930c33e..62e14182 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_return_receipt.rs @@ -22,11 +22,28 @@ impl RPCOperationReturnReceipt { Ok(()) } + pub fn receipt(&self) -> &[u8] { + &self.receipt + } + + pub fn destructure(self) -> Vec { + self.receipt + } + pub fn decode( reader: &veilid_capnp::operation_return_receipt::Reader, - ) -> Result { + ) -> Result { let rr = reader.get_receipt().map_err(RPCError::protocol)?; - RPCOperationReturnReceipt::new(rr) + if rr.len() < MIN_RECEIPT_SIZE { + return Err(RPCError::protocol("ReturnReceipt receipt too short to set")); + } + if rr.len() > MAX_RECEIPT_SIZE { + return Err(RPCError::protocol("ReturnReceipt receipt too long to set")); + } + + Ok(Self { + receipt: rr.to_vec(), + }) } pub fn encode( &self, @@ -35,12 +52,4 @@ impl RPCOperationReturnReceipt { builder.set_receipt(&self.receipt); Ok(()) } - - pub fn receipt(&self) -> &[u8] { - &self.receipt - } - - pub fn destructure(self) -> Vec { - self.receipt - } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs index e7d51ffe..7d72bd4c 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_route.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_route.rs @@ -2,10 +2,10 @@ use super::*; #[derive(Debug, Clone)] pub struct RoutedOperation { - pub sequencing: Sequencing, - pub signatures: Vec, - pub nonce: Nonce, - pub data: Vec, + sequencing: Sequencing, + signatures: Vec, + nonce: Nonce, + data: Vec, } impl RoutedOperation { @@ -17,10 +17,33 @@ impl RoutedOperation { data, } } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + //xxx + Ok(()) + } + pub fn sequencing(&self) -> Sequencing { + self.sequencing + } + pub fn signatures(&self) -> &[Signature] { + &self.signatures + } - pub fn decode( - reader: &veilid_capnp::routed_operation::Reader, - ) -> Result { + pub fn add_signature(&mut self, signature: Signature) { + self.signatures.push(signature); + } + + pub fn nonce(&self) -> &Nonce { + &self.nonce + } + pub fn data(&self) -> &[u8] { + &self.data + } + + pub fn destructure(self) -> (Sequencing, Vec, Nonce, Vec) { + (self.sequencing, self.signatures, self.nonce, self.data) + } + + pub fn decode(reader: &veilid_capnp::routed_operation::Reader) -> Result { let sigs_reader = reader.get_signatures().map_err(RPCError::protocol)?; let mut signatures = Vec::::with_capacity( sigs_reader @@ -36,13 +59,13 @@ impl RoutedOperation { let sequencing = decode_sequencing(reader.get_sequencing().map_err(RPCError::protocol)?); let n_reader = reader.get_nonce().map_err(RPCError::protocol)?; let nonce = decode_nonce(&n_reader); - let data = reader.get_data().map_err(RPCError::protocol)?.to_vec(); + let data = reader.get_data().map_err(RPCError::protocol)?; - Ok(RoutedOperation { + Ok(Self { sequencing, signatures, nonce, - data, + data: data.to_vec(), }) } @@ -73,21 +96,39 @@ impl RoutedOperation { #[derive(Debug, Clone)] pub struct RPCOperationRoute { - pub safety_route: SafetyRoute, - pub operation: RoutedOperation, + safety_route: SafetyRoute, + operation: RoutedOperation, } impl RPCOperationRoute { - pub fn decode( - reader: &veilid_capnp::operation_route::Reader, - ) -> Result { + pub fn new(safety_route: SafetyRoute, operation: RoutedOperation) -> Self { + Self { + safety_route, + operation, + } + } + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + self.operation.validate(validate_context) + } + + pub fn safety_route(&self) -> &SafetyRoute { + &self.safety_route + } + pub fn operation(&self) -> &RoutedOperation { + &self.operation + } + pub fn destructure(self) -> (SafetyRoute, RoutedOperation) { + (self.safety_route, self.operation) + } + + pub fn decode(reader: &veilid_capnp::operation_route::Reader) -> Result { let sr_reader = reader.get_safety_route().map_err(RPCError::protocol)?; let safety_route = decode_safety_route(&sr_reader)?; let o_reader = reader.get_operation().map_err(RPCError::protocol)?; let operation = RoutedOperation::decode(&o_reader)?; - Ok(RPCOperationRoute { + Ok(Self { safety_route, operation, }) diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs index 99eed02f..f94095df 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_set_value.rs @@ -1,22 +1,95 @@ use super::*; +use crate::storage_manager::{SignedValueData, SignedValueDescriptor}; + +const MAX_SET_VALUE_A_PEERS_LEN: usize = 20; + +#[derive(Clone)] +pub struct ValidateSetValueContext { + last_descriptor: Option, + subkey: ValueSubkey, + vcrypto: CryptoSystemVersion, +} +impl fmt::Debug for ValidateSetValueContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ValidateSetValueContext") + .field("last_descriptor", &self.last_descriptor) + .field("subkey", &self.subkey) + .field("vcrypto", &self.vcrypto.kind().to_string()) + .finish() + } +} #[derive(Debug, Clone)] pub struct RPCOperationSetValueQ { - pub key: TypedKey, - pub subkey: ValueSubkey, - pub value: ValueData, + key: TypedKey, + subkey: ValueSubkey, + value: SignedValueData, + descriptor: Option, } impl RPCOperationSetValueQ { - pub fn decode( - reader: &veilid_capnp::operation_set_value_q::Reader, - ) -> Result { + pub fn new( + key: TypedKey, + subkey: ValueSubkey, + value: SignedValueData, + descriptor: Option, + ) -> Self { + Self { + key, + subkey, + value, + descriptor, + } + } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + Ok(()) + } + + pub fn key(&self) -> &TypedKey { + &self.key + } + + pub fn subkey(&self) -> ValueSubkey { + self.subkey + } + + pub fn value(&self) -> &SignedValueData { + &self.value + } + + pub fn descriptor(&self) -> Option<&SignedValueDescriptor> { + self.descriptor.as_ref() + } + pub fn destructure( + self, + ) -> ( + TypedKey, + ValueSubkey, + SignedValueData, + Option, + ) { + (self.key, self.subkey, self.value, self.descriptor) + } + + pub fn decode(reader: &veilid_capnp::operation_set_value_q::Reader) -> Result { let k_reader = reader.get_key().map_err(RPCError::protocol)?; let key = decode_typed_key(&k_reader)?; let subkey = reader.get_subkey(); let v_reader = reader.get_value().map_err(RPCError::protocol)?; - let value = decode_value_data(&v_reader)?; - Ok(RPCOperationSetValueQ { key, subkey, value }) + let value = decode_signed_value_data(&v_reader)?; + let descriptor = if reader.has_descriptor() { + let d_reader = reader.get_descriptor().map_err(RPCError::protocol)?; + let descriptor = decode_signed_value_descriptor(&d_reader)?; + Some(descriptor) + } else { + None + }; + Ok(Self { + key, + subkey, + value, + descriptor, + }) } pub fn encode( &self, @@ -26,64 +99,129 @@ impl RPCOperationSetValueQ { encode_typed_key(&self.key, &mut k_builder); builder.set_subkey(self.subkey); let mut v_builder = builder.reborrow().init_value(); - encode_value_data(&self.value, &mut v_builder)?; + encode_signed_value_data(&self.value, &mut v_builder)?; + if let Some(descriptor) = &self.descriptor { + let mut d_builder = builder.reborrow().init_descriptor(); + encode_signed_value_descriptor(descriptor, &mut d_builder)?; + } Ok(()) } } #[derive(Debug, Clone)] -pub enum RPCOperationSetValueA { - Data(ValueData), - Peers(Vec), +pub struct RPCOperationSetValueA { + set: bool, + value: Option, + peers: Vec, } impl RPCOperationSetValueA { - pub fn decode( - reader: &veilid_capnp::operation_set_value_a::Reader, - ) -> Result { - match reader.which().map_err(RPCError::protocol)? { - veilid_capnp::operation_set_value_a::Which::Data(r) => { - let data = decode_value_data(&r.map_err(RPCError::protocol)?)?; - Ok(RPCOperationSetValueA::Data(data)) - } - veilid_capnp::operation_set_value_a::Which::Peers(r) => { - let peers_reader = r.map_err(RPCError::protocol)?; - let mut peers = Vec::::with_capacity( - peers_reader - .len() - .try_into() - .map_err(RPCError::map_internal("too many peers"))?, - ); - for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p)?; - peers.push(peer_info); - } - - Ok(RPCOperationSetValueA::Peers(peers)) - } + pub fn new( + set: bool, + value: Option, + peers: Vec, + ) -> Result { + if peers.len() as usize > MAX_SET_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol("SetValueA peers length too long")); } + Ok(Self { set, value, peers }) + } + + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + let question_context = validate_context + .question_context + .as_ref() + .expect("SetValueA requires question context"); + let QuestionContext::SetValue(set_value_context) = question_context else { + panic!("Wrong context type for SetValueA"); + }; + + if let Some(value) = &self.value { + // Get descriptor to validate with + let Some(descriptor) = &set_value_context.last_descriptor else { + return Err(RPCError::protocol( + "no last descriptor, requires a descriptor", + )); + }; + + // Ensure the descriptor itself validates + descriptor + .validate(set_value_context.vcrypto.clone()) + .map_err(RPCError::protocol)?; + + // And the signed value data + value + .validate( + descriptor.owner(), + set_value_context.subkey, + set_value_context.vcrypto.clone(), + ) + .map_err(RPCError::protocol)?; + } + + PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + Ok(()) + } + + pub fn set(&self) -> bool { + self.set + } + pub fn value(&self) -> Option<&SignedValueData> { + self.value.as_ref() + } + pub fn peers(&self) -> &[PeerInfo] { + &self.peers + } + pub fn destructure(self) -> (bool, Option, Vec) { + (self.set, self.value, self.peers) + } + + pub fn decode(reader: &veilid_capnp::operation_set_value_a::Reader) -> Result { + let set = reader.get_set(); + let value = if reader.has_value() { + let v_reader = reader.get_value().map_err(RPCError::protocol)?; + let value = decode_signed_value_data(&v_reader)?; + Some(value) + } else { + None + }; + let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; + if peers_reader.len() as usize > MAX_SET_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol("SetValueA peers length too long")); + } + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(RPCError::map_internal("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p)?; + peers.push(peer_info); + } + + Ok(Self { set, value, peers }) } pub fn encode( &self, builder: &mut veilid_capnp::operation_set_value_a::Builder, ) -> Result<(), RPCError> { - match self { - RPCOperationSetValueA::Data(data) => { - let mut d_builder = builder.reborrow().init_data(); - encode_value_data(&data, &mut d_builder)?; - } - RPCOperationSetValueA::Peers(peers) => { - let mut peers_builder = builder.reborrow().init_peers( - peers - .len() - .try_into() - .map_err(RPCError::map_internal("invalid peers list length"))?, - ); - for (i, peer) in peers.iter().enumerate() { - let mut pi_builder = peers_builder.reborrow().get(i as u32); - encode_peer_info(peer, &mut pi_builder)?; - } - } + builder.set_set(self.set); + + if let Some(value) = &self.value { + let mut v_builder = builder.reborrow().init_value(); + encode_signed_value_data(value, &mut v_builder)?; + } + + let mut peers_builder = builder.reborrow().init_peers( + self.peers + .len() + .try_into() + .map_err(RPCError::map_internal("invalid peers list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; } Ok(()) diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs index cfc9dcab..29311316 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs @@ -12,11 +12,16 @@ impl RPCOperationSignal { pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { self.signal_info.validate(validate_context.crypto.clone()) } - pub fn decode( - reader: &veilid_capnp::operation_signal::Reader, - ) -> Result { + pub fn signal_info(&self) -> &SignalInfo { + &self.signal_info + } + pub fn destructure(self) -> SignalInfo { + self.signal_info + } + + pub fn decode(reader: &veilid_capnp::operation_signal::Reader) -> Result { let signal_info = decode_signal_info(reader)?; - Ok(RPCOperationSignal { signal_info }) + Ok(Self { signal_info }) } pub fn encode( &self, @@ -25,10 +30,4 @@ impl RPCOperationSignal { encode_signal_info(&self.signal_info, builder)?; Ok(()) } - pub fn signal_info(&self) -> &SignalInfo { - &self.signal_info - } - pub fn destructure(self) -> SignalInfo { - self.signal_info - } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs index 274b0af8..b3741462 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs @@ -2,15 +2,40 @@ use super::*; #[derive(Debug, Clone)] pub struct RPCOperationStartTunnelQ { - pub id: TunnelId, - pub local_mode: TunnelMode, - pub depth: u8, + id: TunnelId, + local_mode: TunnelMode, + depth: u8, } impl RPCOperationStartTunnelQ { + pub fn new(id: TunnelId, local_mode: TunnelMode, depth: u8) -> Self { + Self { + id, + local_mode, + depth, + } + } + + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + Ok(()) + } + + pub fn id(&self) -> TunnelId { + self.id + } + pub fn local_mode(&self) -> TunnelMode { + self.local_mode + } + pub fn depth(&self) -> u8 { + self.depth + } + pub fn destructure(self) -> (TunnelId, TunnelMode, u8) { + (self.id, self.local_mode, self.depth) + } + pub fn decode( reader: &veilid_capnp::operation_start_tunnel_q::Reader, - ) -> Result { + ) -> Result { let id = TunnelId::new(reader.get_id()); let local_mode = match reader.get_local_mode().map_err(RPCError::protocol)? { veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, @@ -18,7 +43,7 @@ impl RPCOperationStartTunnelQ { }; let depth = reader.get_depth(); - Ok(RPCOperationStartTunnelQ { + Ok(Self { id, local_mode, depth, @@ -46,18 +71,28 @@ pub enum RPCOperationStartTunnelA { } impl RPCOperationStartTunnelA { + pub fn new_partial(partial_tunnel: PartialTunnel) -> Self { + Self::Partial(partial_tunnel) + } + pub fn new_error(tunnel_error: TunnelError) -> Self { + Self::Error(tunnel_error) + } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + Ok(()) + } + pub fn decode( reader: &veilid_capnp::operation_start_tunnel_a::Reader, - ) -> Result { + ) -> Result { match reader.which().map_err(RPCError::protocol)? { veilid_capnp::operation_start_tunnel_a::Which::Partial(r) => { let pt_reader = r.map_err(RPCError::protocol)?; let partial_tunnel = decode_partial_tunnel(&pt_reader)?; - Ok(RPCOperationStartTunnelA::Partial(partial_tunnel)) + Ok(Self::Partial(partial_tunnel)) } veilid_capnp::operation_start_tunnel_a::Which::Error(r) => { let tunnel_error = decode_tunnel_error(r.map_err(RPCError::protocol)?); - Ok(RPCOperationStartTunnelA::Error(tunnel_error)) + Ok(Self::Error(tunnel_error)) } } } @@ -66,10 +101,10 @@ impl RPCOperationStartTunnelA { builder: &mut veilid_capnp::operation_start_tunnel_a::Builder, ) -> Result<(), RPCError> { match self { - RPCOperationStartTunnelA::Partial(p) => { + Self::Partial(p) => { encode_partial_tunnel(p, &mut builder.reborrow().init_partial())?; } - RPCOperationStartTunnelA::Error(e) => { + Self::Error(e) => { builder.set_error(encode_tunnel_error(*e)); } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs index 96ed390c..3c9853b4 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_status.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_status.rs @@ -2,17 +2,25 @@ use super::*; #[derive(Debug, Clone)] pub struct RPCOperationStatusQ { - pub node_status: Option, + node_status: Option, } impl RPCOperationStatusQ { - pub fn validate(mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + pub fn new(node_status: Option) -> Self { + Self { node_status } + } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_status_q::Reader, - ) -> Result { + pub fn node_status(&self) -> Option<&NodeStatus> { + self.node_status.as_ref() + } + pub fn destructure(self) -> Option { + self.node_status + } + + pub fn decode(reader: &veilid_capnp::operation_status_q::Reader) -> Result { let node_status = if reader.has_node_status() { let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?; let node_status = decode_node_status(&ns_reader)?; @@ -20,7 +28,7 @@ impl RPCOperationStatusQ { } else { None }; - Ok(RPCOperationStatusQ { node_status }) + Ok(Self { node_status }) } pub fn encode( &self, @@ -36,17 +44,33 @@ impl RPCOperationStatusQ { #[derive(Debug, Clone)] pub struct RPCOperationStatusA { - pub node_status: Option, - pub sender_info: Option, + node_status: Option, + sender_info: Option, } impl RPCOperationStatusA { + pub fn new(node_status: Option, sender_info: Option) -> Self { + Self { + node_status, + sender_info, + } + } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { Ok(()) } - pub fn decode( - reader: &veilid_capnp::operation_status_a::Reader, - ) -> Result { + + pub fn node_status(&self) -> Option<&NodeStatus> { + self.node_status.as_ref() + } + pub fn sender_info(&self) -> Option<&SenderInfo> { + self.sender_info.as_ref() + } + pub fn destructure(self) -> (Option, Option) { + (self.node_status, self.sender_info) + } + + pub fn decode(reader: &veilid_capnp::operation_status_a::Reader) -> Result { let node_status = if reader.has_node_status() { let ns_reader = reader.get_node_status().map_err(RPCError::protocol)?; let node_status = decode_node_status(&ns_reader)?; @@ -63,7 +87,7 @@ impl RPCOperationStatusA { None }; - Ok(RPCOperationStatusA { + Ok(Self { node_status, sender_info, }) diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs index 56c7fb55..886b5bd4 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_supply_block.rs @@ -1,18 +1,35 @@ use super::*; +const MAX_SUPPLY_BLOCK_A_PEERS_LEN: usize = 20; + #[derive(Debug, Clone)] pub struct RPCOperationSupplyBlockQ { - pub block_id: TypedKey, + block_id: TypedKey, } impl RPCOperationSupplyBlockQ { + pub fn new(block_id: TypedKey) -> Self { + Self { block_id } + } + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + Ok(()) + } + + pub fn block_id(&self) -> &TypedKey { + &self.block_id + } + + pub fn destructure(self) -> TypedKey { + self.block_id + } + pub fn decode( reader: &veilid_capnp::operation_supply_block_q::Reader, - ) -> Result { + ) -> Result { let bi_reader = reader.get_block_id().map_err(RPCError::protocol)?; let block_id = decode_typed_key(&bi_reader)?; - Ok(RPCOperationSupplyBlockQ { block_id }) + Ok(Self { block_id }) } pub fn encode( &self, @@ -26,56 +43,68 @@ impl RPCOperationSupplyBlockQ { } #[derive(Debug, Clone)] -pub enum RPCOperationSupplyBlockA { - Expiration(u64), - Peers(Vec), +pub struct RPCOperationSupplyBlockA { + expiration: u64, + peers: Vec, } impl RPCOperationSupplyBlockA { + pub fn new(expiration: u64, peers: Vec) -> Result { + if peers.len() > MAX_SUPPLY_BLOCK_A_PEERS_LEN { + return Err(RPCError::protocol("SupplyBlockA peers length too long")); + } + Ok(Self { expiration, peers }) + } + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + Ok(()) + } + pub fn expiration(&self) -> u64 { + self.expiration + } + pub fn peers(&self) -> &[PeerInfo] { + &self.peers + } + pub fn destructure(self) -> (u64, Vec) { + (self.expiration, self.peers) + } + pub fn decode( reader: &veilid_capnp::operation_supply_block_a::Reader, - ) -> Result { - match reader.which().map_err(RPCError::protocol)? { - veilid_capnp::operation_supply_block_a::Which::Expiration(r) => { - Ok(RPCOperationSupplyBlockA::Expiration(r)) - } - veilid_capnp::operation_supply_block_a::Which::Peers(r) => { - let peers_reader = r.map_err(RPCError::protocol)?; - let mut peers = Vec::::with_capacity( - peers_reader - .len() - .try_into() - .map_err(RPCError::map_internal("too many peers"))?, - ); - for p in peers_reader.iter() { - let peer_info = decode_peer_info(&p)?; - peers.push(peer_info); - } + ) -> Result { + let expiration = reader.get_expiration(); - Ok(RPCOperationSupplyBlockA::Peers(peers)) - } + let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; + if peers_reader.len() as usize > MAX_SUPPLY_BLOCK_A_PEERS_LEN { + return Err(RPCError::protocol("SupplyBlockA peers length too long")); } + let mut peers = Vec::::with_capacity( + peers_reader + .len() + .try_into() + .map_err(RPCError::map_internal("too many peers"))?, + ); + for p in peers_reader.iter() { + let peer_info = decode_peer_info(&p)?; + peers.push(peer_info); + } + + Ok(Self { expiration, peers }) } pub fn encode( &self, builder: &mut veilid_capnp::operation_supply_block_a::Builder, ) -> Result<(), RPCError> { - match self { - RPCOperationSupplyBlockA::Expiration(e) => { - builder.set_expiration(*e); - } - RPCOperationSupplyBlockA::Peers(peers) => { - let mut peers_builder = builder.reborrow().init_peers( - peers - .len() - .try_into() - .map_err(RPCError::map_internal("invalid peers list length"))?, - ); - for (i, peer) in peers.iter().enumerate() { - let mut pi_builder = peers_builder.reborrow().get(i as u32); - encode_peer_info(peer, &mut pi_builder)?; - } - } + builder.set_expiration(self.expiration); + let mut peers_builder = builder.reborrow().init_peers( + self.peers + .len() + .try_into() + .map_err(RPCError::map_internal("invalid peers list length"))?, + ); + for (i, peer) in self.peers.iter().enumerate() { + let mut pi_builder = peers_builder.reborrow().get(i as u32); + encode_peer_info(peer, &mut pi_builder)?; } Ok(()) diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs index 63a8bd40..87fac84f 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_validate_dial_info.rs @@ -2,22 +2,68 @@ use super::*; #[derive(Debug, Clone)] pub struct RPCOperationValidateDialInfo { - pub dial_info: DialInfo, - pub receipt: Vec, - pub redirect: bool, + dial_info: DialInfo, + receipt: Vec, + redirect: bool, } impl RPCOperationValidateDialInfo { + pub fn new(dial_info: DialInfo, receipt: Vec, redirect: bool) -> Result { + if receipt.len() < MIN_RECEIPT_SIZE { + return Err(RPCError::protocol( + "ValidateDialInfo receipt too short to set", + )); + } + if receipt.len() > MAX_RECEIPT_SIZE { + return Err(RPCError::protocol( + "ValidateDialInfo receipt too long to set", + )); + } + + Ok(Self { + dial_info, + receipt, + redirect, + }) + } + + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + Ok(()) + } + pub fn dial_info(&self) -> &DialInfo { + &self.dial_info + } + pub fn receipt(&self) -> &[u8] { + &self.receipt + } + pub fn redirect(&self) -> bool { + self.redirect + } + pub fn destructure(self) -> (DialInfo, Vec, bool) { + (self.dial_info, self.receipt, self.redirect) + } + pub fn decode( reader: &veilid_capnp::operation_validate_dial_info::Reader, - ) -> Result { + ) -> Result { let di_reader = reader.get_dial_info().map_err(RPCError::protocol)?; let dial_info = decode_dial_info(&di_reader)?; let rcpt_reader = reader.get_receipt().map_err(RPCError::protocol)?; + if rcpt_reader.len() < MIN_RECEIPT_SIZE { + return Err(RPCError::protocol( + "ValidateDialInfo receipt too short to set", + )); + } + if rcpt_reader.len() > MAX_RECEIPT_SIZE { + return Err(RPCError::protocol( + "ValidateDialInfo receipt too long to set", + )); + } + let receipt = rcpt_reader.to_vec(); let redirect = reader.get_redirect(); - Ok(RPCOperationValidateDialInfo { + Ok(Self { dial_info, receipt, redirect, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs index 86bc9c69..d7a815a2 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_value_changed.rs @@ -1,17 +1,53 @@ use super::*; +use crate::storage_manager::SignedValueData; #[derive(Debug, Clone)] pub struct RPCOperationValueChanged { - pub key: TypedKey, - pub subkeys: Vec, - pub count: u32, - pub value: ValueData, + key: TypedKey, + subkeys: Vec, + count: u32, + value: SignedValueData, } impl RPCOperationValueChanged { + pub fn new( + key: TypedKey, + subkeys: Vec, + count: u32, + value: SignedValueData, + ) -> Self { + Self { + key, + subkeys, + count, + value, + } + } + + pub fn validate(&mut self, _validate_context: &RPCValidateContext) -> Result<(), RPCError> { + // validation must be done by storage manager as this is more complicated + Ok(()) + } + + pub fn key(&self) -> &TypedKey { + &self.key + } + pub fn subkeys(&self) -> &[ValueSubkeyRange] { + &self.subkeys + } + pub fn count(&self) -> u32 { + self.count + } + pub fn value(&self) -> &SignedValueData { + &self.value + } + pub fn destructure(self) -> (TypedKey, Vec, u32, SignedValueData) { + (self.key, self.subkeys, self.count, self.value) + } + pub fn decode( reader: &veilid_capnp::operation_value_changed::Reader, - ) -> Result { + ) -> Result { let k_reader = reader.get_key().map_err(RPCError::protocol)?; let key = decode_typed_key(&k_reader)?; @@ -38,8 +74,8 @@ impl RPCOperationValueChanged { } let count = reader.get_count(); let v_reader = reader.get_value().map_err(RPCError::protocol)?; - let value = decode_value_data(&v_reader)?; - Ok(RPCOperationValueChanged { + let value = decode_signed_value_data(&v_reader)?; + Ok(Self { key, subkeys, count, @@ -68,7 +104,7 @@ impl RPCOperationValueChanged { builder.set_count(self.count); let mut v_builder = builder.reborrow().init_value(); - encode_value_data(&self.value, &mut v_builder)?; + encode_signed_value_data(&self.value, &mut v_builder)?; Ok(()) } } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs index e2eaee74..9da63df7 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_watch_value.rs @@ -1,21 +1,117 @@ use super::*; +const MAX_WATCH_VALUE_Q_SUBKEYS_LEN: usize = 512; +const MAX_WATCH_VALUE_A_PEERS_LEN: usize = 20; + #[derive(Debug, Clone)] pub struct RPCOperationWatchValueQ { - pub key: TypedKey, - pub subkeys: Vec, - pub expiration: u64, - pub count: u32, + key: TypedKey, + subkeys: Vec, + expiration: u64, + count: u32, + watcher: PublicKey, + signature: Signature, } impl RPCOperationWatchValueQ { + pub fn new( + key: TypedKey, + subkeys: Vec, + expiration: u64, + count: u32, + watcher: PublicKey, + signature: Signature, + ) -> Result { + if subkeys.len() > MAX_WATCH_VALUE_Q_SUBKEYS_LEN { + return Err(RPCError::protocol("WatchValueQ subkeys length too long")); + } + Ok(Self { + key, + subkeys, + expiration, + count, + watcher, + signature, + }) + } + + // signature covers: key, subkeys, expiration, count, using watcher key + fn make_signature_data(&self) -> Vec { + let mut sig_data = + Vec::with_capacity(PUBLIC_KEY_LENGTH + 4 + (self.subkeys.len() * 8) + 8 + 4); + sig_data.extend_from_slice(&self.key.kind.0); + sig_data.extend_from_slice(&self.key.value.bytes); + for sk in &self.subkeys { + sig_data.extend_from_slice(&sk.0.to_le_bytes()); + sig_data.extend_from_slice(&sk.1.to_le_bytes()); + } + sig_data.extend_from_slice(&self.expiration.to_le_bytes()); + sig_data.extend_from_slice(&self.count.to_le_bytes()); + sig_data + } + + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + let Some(vcrypto) = validate_context.crypto.get(self.key.kind) else { + return Err(RPCError::protocol("unsupported cryptosystem")); + }; + + let sig_data = self.make_signature_data(); + vcrypto + .verify(&self.watcher, &sig_data, &self.signature) + .map_err(RPCError::protocol)?; + + Ok(()) + } + + pub fn key(&self) -> &TypedKey { + &self.key + } + pub fn subkeys(&self) -> &[ValueSubkeyRange] { + &self.subkeys + } + pub fn expiration(&self) -> u64 { + self.expiration + } + pub fn count(&self) -> u32 { + self.count + } + pub fn watcher(&self) -> &PublicKey { + &self.watcher + } + pub fn signature(&self) -> &Signature { + &self.signature + } + + pub fn destructure( + self, + ) -> ( + TypedKey, + Vec, + u64, + u32, + PublicKey, + Signature, + ) { + ( + self.key, + self.subkeys, + self.expiration, + self.count, + self.watcher, + self.signature, + ) + } + pub fn decode( reader: &veilid_capnp::operation_watch_value_q::Reader, - ) -> Result { + ) -> Result { let k_reader = reader.get_key().map_err(RPCError::protocol)?; let key = decode_typed_key(&k_reader)?; let sk_reader = reader.get_subkeys().map_err(RPCError::protocol)?; + if sk_reader.len() as usize > MAX_WATCH_VALUE_Q_SUBKEYS_LEN { + return Err(RPCError::protocol("WatchValueQ subkeys length too long")); + } let mut subkeys = Vec::::with_capacity( sk_reader .len() @@ -40,13 +136,22 @@ impl RPCOperationWatchValueQ { let expiration = reader.get_expiration(); let count = reader.get_count(); - Ok(RPCOperationWatchValueQ { + let w_reader = reader.get_watcher().map_err(RPCError::protocol)?; + let watcher = decode_key256(&w_reader); + + let s_reader = reader.get_signature().map_err(RPCError::protocol)?; + let signature = decode_signature512(&s_reader); + + Ok(Self { key, subkeys, expiration, count, + watcher, + signature, }) } + pub fn encode( &self, builder: &mut veilid_capnp::operation_watch_value_q::Builder, @@ -67,22 +172,54 @@ impl RPCOperationWatchValueQ { } builder.set_expiration(self.expiration); builder.set_count(self.count); + + let mut w_builder = builder.reborrow().init_watcher(); + encode_key256(&self.watcher, &mut w_builder); + + let mut s_builder = builder.reborrow().init_signature(); + encode_signature512(&self.signature, &mut s_builder); + Ok(()) } } #[derive(Debug, Clone)] pub struct RPCOperationWatchValueA { - pub expiration: u64, - pub peers: Vec, + expiration: u64, + peers: Vec, } impl RPCOperationWatchValueA { + pub fn new(expiration: u64, peers: Vec) -> Result { + if peers.len() > MAX_WATCH_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol("WatchValueA peers length too long")); + } + Ok(Self { expiration, peers }) + } + + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + Ok(()) + } + + pub fn expiration(&self) -> u64 { + self.expiration + } + pub fn peers(&self) -> &[PeerInfo] { + &self.peers + } + pub fn destructure(self) -> (u64, Vec) { + (self.expiration, self.peers) + } + pub fn decode( reader: &veilid_capnp::operation_watch_value_a::Reader, - ) -> Result { + ) -> Result { let expiration = reader.get_expiration(); let peers_reader = reader.get_peers().map_err(RPCError::protocol)?; + if peers_reader.len() as usize > MAX_WATCH_VALUE_A_PEERS_LEN { + return Err(RPCError::protocol("WatchValueA peers length too long")); + } let mut peers = Vec::::with_capacity( peers_reader .len() @@ -94,7 +231,7 @@ impl RPCOperationWatchValueA { peers.push(peer_info); } - Ok(RPCOperationWatchValueA { expiration, peers }) + Ok(Self { expiration, peers }) } pub fn encode( &self, diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index 8e0db706..f0f2c056 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -11,7 +11,7 @@ impl RPCQuestion { Self { respond_to, detail } } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { - self.respond_to.validate(validate_context)?; + self.respond_to.validate(validate_context.crypto.clone())?; self.detail.validate(validate_context) } pub fn respond_to(&self) -> &RespondTo { diff --git a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs index da21fefe..bff19c4f 100644 --- a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs +++ b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs @@ -7,12 +7,10 @@ pub enum RespondTo { } impl RespondTo { - pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + pub fn validate(&mut self, crypto: Crypto) -> Result<(), RPCError> { match self { RespondTo::Sender => Ok(()), - RespondTo::PrivateRoute(pr) => pr - .validate(validate_context.crypto.clone()) - .map_err(RPCError::protocol), + RespondTo::PrivateRoute(pr) => pr.validate(crypto).map_err(RPCError::protocol), } } diff --git a/veilid-core/src/rpc_processor/coders/value_data.rs b/veilid-core/src/rpc_processor/coders/value_data.rs new file mode 100644 index 00000000..c5985b6c --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/value_data.rs @@ -0,0 +1,30 @@ +use super::*; + +pub fn encode_signed_value_data( + signed_value_data: &SignedValueData, + builder: &mut veilid_capnp::signed_value_data::Builder, +) -> Result<(), RPCError> { + builder.set_seq(signed_value_data.value_data().seq()); + builder.set_data(signed_value_data.value_data().data()); + let mut wb = builder.reborrow().init_writer(); + encode_key256(signed_value_data.value_data().writer(), &mut wb); + let mut sb = builder.reborrow().init_signature(); + encode_signature512(signed_value_data.signature(), &mut sb); + Ok(()) +} + +pub fn decode_signed_value_data( + reader: &veilid_capnp::signed_value_data::Reader, +) -> Result { + let seq = reader.get_seq(); + let data = reader.get_data().map_err(RPCError::protocol)?.to_vec(); + let wr = reader.get_writer().map_err(RPCError::protocol)?; + let writer = decode_key256(&wr); + let sr = reader.get_signature().map_err(RPCError::protocol)?; + let signature = decode_signature512(&sr); + + Ok(SignedValueData { + value_data: ValueData { seq, data, writer }, + signature, + }) +} diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 024fc35f..1c9964dc 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -28,12 +28,13 @@ pub use rpc_status::*; use super::*; -use crate::crypto::*; +use crypto::*; use futures_util::StreamExt; use network_manager::*; use receipt_manager::*; use routing_table::*; use stop_token::future::FutureExt; +use storage_manager::StorageManager; ///////////////////////////////////////////////////////////////////// @@ -226,6 +227,7 @@ pub struct RPCProcessorInner { send_channel: Option, RPCMessageEncoded)>>, stop_source: Option, worker_join_handles: Vec>, + opt_storage_manager: Option, } pub struct RPCProcessorUnlockedInner { @@ -255,6 +257,7 @@ impl RPCProcessor { send_channel: None, stop_source: None, worker_join_handles: Vec::new(), + opt_storage_manager: None, } } fn new_unlocked_inner( @@ -308,6 +311,16 @@ impl RPCProcessor { self.routing_table.clone() } + pub fn set_storage_manager(&self, opt_storage_manager: Option) { + let inner = self.inner.lock(); + inner.opt_storage_manager = opt_storage_manager + } + + pub fn storage_manager(&self) -> Option { + let inner = self.inner.lock(); + inner.opt_storage_manager.clone() + } + ////////////////////////////////////////////////////////////////////// #[instrument(level = "debug", skip_all, err)] @@ -1229,6 +1242,7 @@ impl RPCProcessor { // Validate the RPC operation let validate_context = RPCValidateContext { crypto: self.crypto.clone(), + rpc_processor: self.clone(), question_context, }; operation.validate(&validate_context)?; diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 2a7689b3..5ba248f8 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -14,7 +14,7 @@ impl RPCProcessor { key: TypedKey, subkey: ValueSubkey, last_descriptor: Option, - ) -> Result>>, RPCError> { + ) -> Result>, RPCError> { let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none()); let question = RPCQuestion::new( network_result_try!(self.get_destination_respond_to(&dest)?), diff --git a/veilid-core/src/rpc_processor/rpc_signal.rs b/veilid-core/src/rpc_processor/rpc_signal.rs index 1fd6608f..f2d1d19e 100644 --- a/veilid-core/src/rpc_processor/rpc_signal.rs +++ b/veilid-core/src/rpc_processor/rpc_signal.rs @@ -55,8 +55,9 @@ impl RPCProcessor { // Handle it let network_manager = self.network_manager(); + let signal_info = signal.destructure(); network_manager - .handle_signal(signal.signal_info) + .handle_signal(signal_info) .await .map_err(RPCError::network) } diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 7535779e..d5d860b6 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -69,13 +69,10 @@ impl RPCProcessor { }; // Get the statement - let RPCOperationValidateDialInfo { - dial_info, - receipt, - redirect, - } = match msg.operation.into_kind() { - RPCOperationKind::Statement(s) => match s.into_detail() { - RPCStatementDetail::ValidateDialInfo(s) => s, + let (_, _, _, kind) = msg.operation.destructure(); + let (dial_info, receipt, redirect) = match kind { + RPCOperationKind::Statement(s) => match s.destructure() { + RPCStatementDetail::ValidateDialInfo(s) => s.destructure(), _ => panic!("not a validate dial info"), }, _ => panic!("not a statement"), diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index d5b3cfa3..8a4d17c0 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -23,6 +23,8 @@ const MAX_RECORD_DATA_SIZE: usize = 1_048_576; /// Locked structure for storage manager struct StorageManagerInner { + /// If we are started up + initialized: bool, /// Records that have been 'created' or 'opened' by this node local_record_store: Option, /// Records that have been pushed to this node for distribution by other nodes @@ -41,7 +43,7 @@ struct StorageManagerUnlockedInner { #[derive(Clone)] pub struct StorageManager { unlocked_inner: Arc, - inner: Arc>, + inner: Arc>, } impl StorageManager { @@ -64,6 +66,7 @@ impl StorageManager { } fn new_inner() -> StorageManagerInner { StorageManagerInner { + initialized: false, local_record_store: None, remote_record_store: None, } @@ -114,14 +117,14 @@ impl StorageManager { block_store, rpc_processor, )), - inner: Arc::new(Mutex::new(Self::new_inner())), + inner: Arc::new(AsyncMutex::new(Self::new_inner())), } } #[instrument(level = "debug", skip_all, err)] pub async fn init(&self) -> EyreResult<()> { debug!("startup storage manager"); - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().await; let local_limits = Self::local_limits_from_config(self.unlocked_inner.config.clone()); let remote_limits = Self::remote_limits_from_config(self.unlocked_inner.config.clone()); @@ -143,14 +146,25 @@ impl StorageManager { inner.local_record_store = Some(local_record_store); inner.remote_record_store = Some(remote_record_store); + inner.initialized = true; + + // Let rpc processor access storage manager + self.unlocked_inner + .rpc_processor + .set_storage_manager(Some(self.clone())); + Ok(()) } pub async fn terminate(&self) { debug!("starting storage manager shutdown"); + let mut inner = self.inner.lock().await; // Release the storage manager - *self.inner.lock() = Self::new_inner(); + *inner = Self::new_inner(); + + // Remove storage manager from rpc processor + self.unlocked_inner.rpc_processor.set_storage_manager(None); debug!("finished storage manager shutdown"); } @@ -172,10 +186,11 @@ impl StorageManager { record: Record, ) -> Result { // add value record to record store - let mut inner = self.inner.lock(); - let Some(local_record_store) = inner.local_record_store.as_mut() else { + let mut inner = self.inner.lock().await; + if !inner.initialized { apibail_generic!("not initialized"); - }; + } + let local_record_store = inner.local_record_store.as_mut().unwrap(); let key = self.get_key(vcrypto.clone(), &record); local_record_store.new_record(key, record).await?; Ok(key) diff --git a/veilid-core/src/storage_manager/types/mod.rs b/veilid-core/src/storage_manager/types/mod.rs index 9fde6117..9eea4ad8 100644 --- a/veilid-core/src/storage_manager/types/mod.rs +++ b/veilid-core/src/storage_manager/types/mod.rs @@ -1,9 +1,7 @@ mod signed_value_data; mod signed_value_descriptor; -mod value_detail; use super::*; pub use signed_value_data::*; pub use signed_value_descriptor::*; -pub use value_detail::*;