From 046b61d5d8a684c133fc4f6df2bba7dd546a69ea Mon Sep 17 00:00:00 2001 From: John Smith Date: Fri, 30 Sep 2022 22:37:55 -0400 Subject: [PATCH] more app message call --- Cargo.lock | 3 + veilid-cli/Cargo.toml | 1 + veilid-cli/src/client_api_connection.rs | 29 +++++ veilid-cli/src/command_processor.rs | 109 +++++++++++++++++- veilid-core/src/veilid_api/mod.rs | 13 +++ .../src/veilid_api/serialize_helpers.rs | 17 +++ veilid-flutter/lib/veilid.dart | 52 ++++++++- veilid-flutter/lib/veilid_ffi.dart | 18 +++ veilid-flutter/lib/veilid_js.dart | 8 ++ veilid-flutter/rust/Cargo.toml | 1 + veilid-flutter/rust/src/dart_ffi.rs | 20 ++++ veilid-server/proto/veilid-client.capnp | 9 +- veilid-server/src/client_api.rs | 19 +++ veilid-wasm/Cargo.toml | 1 + veilid-wasm/src/lib.rs | 18 +++ 15 files changed, 310 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c1f5cbc..66988967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5100,6 +5100,7 @@ dependencies = [ "directories", "flexi_logger", "futures", + "hex", "log", "parking_lot 0.12.1", "serde", @@ -5218,6 +5219,7 @@ dependencies = [ "async-std", "backtrace", "cfg-if 1.0.0", + "data-encoding", "ffi-support", "futures-util", "hostname", @@ -5292,6 +5294,7 @@ version = "0.1.0" dependencies = [ "cfg-if 1.0.0", "console_error_panic_hook", + "data-encoding", "futures-util", "js-sys", "lazy_static", diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index bc2e5e7a..eb4cc69e 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -42,6 +42,7 @@ bugsalot = "^0" flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] } thiserror = "^1" crossbeam-channel = "^0" +hex = "^0" veilid-core = { path = "../veilid-core", default_features = false} [dev-dependencies] diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index 22828531..682269c3 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -76,6 +76,12 @@ impl veilid_client::Server for VeilidClientImpl { VeilidUpdate::Log(log) => { self.comproc.update_log(log); } + VeilidUpdate::AppMessage(msg) => { + self.comproc.update_app_message(msg); + } + VeilidUpdate::AppCall(call) => { + self.comproc.update_app_call(call); + } VeilidUpdate::Attachment(attachment) => { self.comproc.update_attachment(attachment); } @@ -365,6 +371,29 @@ impl ClientApiConnection { res.map_err(map_to_string) } + pub async fn server_appcall_reply(&mut self, id: u64, msg: Vec) -> Result<(), String> { + trace!("ClientApiConnection::appcall_reply"); + let server = { + let inner = self.inner.borrow(); + inner + .server + .as_ref() + .ok_or_else(|| "Not connected, ignoring change_log_level request".to_owned())? + .clone() + }; + let mut request = server.borrow().app_call_reply_request(); + request.get().set_id(id); + request.get().set_message(&msg); + let response = request.send().promise.await.map_err(map_to_string)?; + let reader = response + .get() + .map_err(map_to_string)? + .get_result() + .map_err(map_to_string)?; + let res: Result<(), VeilidAPIError> = decode_api_result(&reader); + res.map_err(map_to_string) + } + // Start Client API connection pub async fn connect(&mut self, connect_addr: SocketAddr) -> Result<(), String> { trace!("ClientApiConnection::connect"); diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 1e6d4ad6..a9c3877e 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -49,6 +49,7 @@ struct CommandProcessorInner { autoreconnect: bool, server_addr: Option, connection_waker: Eventual, + last_call_id: Option, } type Handle = Rc>; @@ -70,6 +71,7 @@ impl CommandProcessor { autoreconnect: settings.autoreconnect, server_addr: None, connection_waker: Eventual::new(), + last_call_id: None, })), } } @@ -111,6 +113,7 @@ attach - attach the server to the Veilid network detach - detach the server from the Veilid network debug - send a debugging command to the Veilid server change_log_level - change the log level for a tracing layer +reply - reply to an AppCall not handled directly by the server "# .to_owned(), ); @@ -225,6 +228,66 @@ change_log_level - change the log level for a tracing layer Ok(()) } + pub fn cmd_reply(&self, rest: Option, callback: UICallback) -> Result<(), String> { + trace!("CommandProcessor::cmd_reply"); + + let mut capi = self.capi(); + let ui = self.ui(); + let some_last_id = self.inner_mut().last_call_id.take(); + spawn_detached_local(async move { + let (first, second) = Self::word_split(&rest.clone().unwrap_or_default()); + let (id, msg) = if let Some(second) = second { + let id = match u64::from_str(&first) { + Err(e) => { + ui.add_node_event(format!("invalid appcall id: {}", e)); + ui.send_callback(callback); + return; + } + Ok(v) => v, + }; + (id, second) + } else { + let id = match some_last_id { + None => { + ui.add_node_event("must specify last call id".to_owned()); + ui.send_callback(callback); + return; + } + Some(v) => v, + }; + (id, rest.unwrap_or_default()) + }; + let msg = if msg[0..1] == "#".to_owned() { + match hex::decode(msg[1..].as_bytes().to_vec()) { + Err(e) => { + ui.add_node_event(format!("invalid hex message: {}", e)); + ui.send_callback(callback); + return; + } + Ok(v) => v, + } + } else { + msg[1..].as_bytes().to_vec() + }; + let msglen = msg.len(); + match capi.server_appcall_reply(id, msg).await { + Ok(()) => { + ui.add_node_event(format!("reply sent to {} : {} bytes", id, msglen)); + ui.send_callback(callback); + return; + } + Err(e) => { + ui.display_string_dialog( + "Server command 'appcall_reply' failed", + e.to_string(), + callback, + ); + } + } + }); + Ok(()) + } + pub fn run_command(&self, command_line: &str, callback: UICallback) -> Result<(), String> { // let (cmd, rest) = Self::word_split(command_line); @@ -238,6 +301,7 @@ change_log_level - change the log level for a tracing layer "detach" => self.cmd_detach(callback), "debug" => self.cmd_debug(rest, callback), "change_log_level" => self.cmd_change_log_level(rest, callback), + "reply" => self.cmd_reply(rest, callback), _ => { let ui = self.ui(); ui.send_callback(callback); @@ -344,6 +408,49 @@ change_log_level - change the log level for a tracing layer )); } + pub fn update_app_message(&mut self, msg: veilid_core::VeilidAppMessage) { + // check is message body is ascii printable + let mut printable = true; + for c in &msg.message { + if *c < 32 || *c > 126 { + printable = false; + } + } + + let strmsg = if printable { + String::from_utf8_lossy(&msg.message).to_string() + } else { + hex::encode(&msg.message) + }; + + self.inner() + .ui + .add_node_event(format!("AppMessage ({:?}): {}", msg.sender, strmsg)); + } + + pub fn update_app_call(&mut self, call: veilid_core::VeilidAppCall) { + // check is message body is ascii printable + let mut printable = true; + for c in &call.message { + if *c < 32 || *c > 126 { + printable = false; + } + } + + let strmsg = if printable { + String::from_utf8_lossy(&call.message).to_string() + } else { + format!("#{}", hex::encode(&call.message)) + }; + + self.inner().ui.add_node_event(format!( + "AppCall ({:?}) id = {:016x} : {}", + call.sender, call.id, strmsg + )); + + self.inner_mut().last_call_id = Some(call.id); + } + pub fn update_shutdown(&mut self) { // Do nothing with this, we'll process shutdown when rpc connection closes } @@ -381,7 +488,6 @@ change_log_level - change the log level for a tracing layer // calls into client_api_connection //////////////////////////////////////////// pub fn attach(&mut self) { - trace!("CommandProcessor::attach"); let mut capi = self.capi(); spawn_detached_local(async move { @@ -392,7 +498,6 @@ change_log_level - change the log level for a tracing layer } pub fn detach(&mut self) { - trace!("CommandProcessor::detach"); let mut capi = self.capi(); spawn_detached_local(async move { diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 50835a13..0934ff5c 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -230,18 +230,23 @@ pub struct VeilidLog { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct VeilidAppMessage { /// Some(sender) if the message was sent directly, None if received via a private/safety route + #[serde(with = "opt_json_as_string")] pub sender: Option, /// The content of the message to deliver to the application + #[serde(with = "json_as_base64")] pub message: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct VeilidAppCall { /// Some(sender) if the request was sent directly, None if received via a private/safety route + #[serde(with = "opt_json_as_string")] pub sender: Option, /// The content of the request to deliver to the application + #[serde(with = "json_as_base64")] pub message: Vec, /// The id to reply to + #[serde(with = "json_as_string")] pub id: u64, } @@ -301,6 +306,14 @@ impl fmt::Display for NodeId { write!(f, "{}", self.key.encode()) } } +impl FromStr for NodeId { + type Err = VeilidAPIError; + fn from_str(s: &str) -> Result { + Ok(Self { + key: DHTKey::try_decode(s)?, + }) + } +} #[derive(Clone, Debug, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] pub struct ValueKey { diff --git a/veilid-core/src/veilid_api/serialize_helpers.rs b/veilid-core/src/veilid_api/serialize_helpers.rs index f2d6d28d..58b51909 100644 --- a/veilid-core/src/veilid_api/serialize_helpers.rs +++ b/veilid-core/src/veilid_api/serialize_helpers.rs @@ -41,6 +41,23 @@ pub fn serialize_json(val: T) -> String { } } +pub mod json_as_base64 { + use data_encoding::BASE64URL_NOPAD; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(v: &Vec, s: S) -> Result { + let base64 = BASE64URL_NOPAD.encode(v); + String::serialize(&base64, s) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result, D::Error> { + let base64 = String::deserialize(d)?; + BASE64URL_NOPAD + .decode(base64.as_bytes()) + .map_err(|e| serde::de::Error::custom(e)) + } +} + pub mod json_as_string { use std::fmt::Display; use std::str::FromStr; diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index 85eb8f8f..843e2938 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -1,4 +1,6 @@ import 'dart:async'; +import 'dart:typed_data'; +import 'dart:convert'; import 'package:change_case/change_case.dart'; @@ -1239,11 +1241,13 @@ abstract class VeilidUpdate { } case "AppMessage": { - return VeilidAppMessage(); + return VeilidAppMessage( + sender: json["sender"], message: json["message"]); } case "AppCall": { - return VeilidAppCall(); + return VeilidAppCall( + sender: json["sender"], message: json["message"], id: json["id"]); } case "Attachment": { @@ -1286,6 +1290,49 @@ class VeilidLog implements VeilidUpdate { } } +class VeilidAppMessage implements VeilidUpdate { + final String? sender; + final Uint8List message; + + // + VeilidAppMessage({ + required this.sender, + required this.message, + }); + + @override + Map get json { + return { + 'kind': "AppMessage", + 'sender': sender, + 'message': base64UrlEncode(message) + }; + } +} + +class VeilidAppCall implements VeilidUpdate { + final String? sender; + final Uint8List message; + final String id; + + // + VeilidAppCall({ + required this.sender, + required this.message, + required this.id, + }); + + @override + Map get json { + return { + 'kind': "AppMessage", + 'sender': sender, + 'message': base64UrlEncode(message), + 'id': id, + }; + } +} + class VeilidUpdateAttachment implements VeilidUpdate { final VeilidStateAttachment state; // @@ -1580,6 +1627,7 @@ abstract class Veilid { Future detach(); Future shutdownVeilidCore(); Future debug(String command); + Future appCallReply(String id, Uint8List message); String veilidVersionString(); VeilidVersion veilidVersion(); } diff --git a/veilid-flutter/lib/veilid_ffi.dart b/veilid-flutter/lib/veilid_ffi.dart index 4b41bd83..23ce9ef6 100644 --- a/veilid-flutter/lib/veilid_ffi.dart +++ b/veilid-flutter/lib/veilid_ffi.dart @@ -3,6 +3,7 @@ import 'dart:ffi'; import 'dart:io'; import 'dart:isolate'; import 'dart:convert'; +import 'dart:typed_data'; import 'package:ffi/ffi.dart'; @@ -50,6 +51,10 @@ typedef _DetachDart = void Function(int); // fn debug(port: i64, log_level: FfiStr) typedef _DebugC = Void Function(Int64, Pointer); typedef _DebugDart = void Function(int, Pointer); +// fn app_call_reply(port: i64, id: FfiStr, message: FfiStr) +typedef _AppCallReplyC = Void Function(Int64, Pointer, Pointer); +typedef _AppCallReplyDart = void Function(int, Pointer, Pointer); + // fn shutdown_veilid_core(port: i64) typedef _ShutdownVeilidCoreC = Void Function(Int64); typedef _ShutdownVeilidCoreDart = void Function(int); @@ -304,6 +309,7 @@ class VeilidFFI implements Veilid { final _DetachDart _detach; final _ShutdownVeilidCoreDart _shutdownVeilidCore; final _DebugDart _debug; + final _AppCallReplyDart _appCallReply; final _VeilidVersionStringDart _veilidVersionString; final _VeilidVersionDart _veilidVersion; @@ -328,6 +334,8 @@ class VeilidFFI implements Veilid { dylib.lookupFunction<_ShutdownVeilidCoreC, _ShutdownVeilidCoreDart>( 'shutdown_veilid_core'), _debug = dylib.lookupFunction<_DebugC, _DebugDart>('debug'), + _appCallReply = dylib.lookupFunction<_AppCallReplyC, _AppCallReplyDart>( + 'app_call_reply'), _veilidVersionString = dylib.lookupFunction<_VeilidVersionStringC, _VeilidVersionStringDart>('veilid_version_string'), _veilidVersion = @@ -420,6 +428,16 @@ class VeilidFFI implements Veilid { return processFuturePlain(recvPort.first); } + @override + Future appCallReply(String id, Uint8List message) async { + var nativeId = id.toNativeUtf8(); + var nativeEncodedMessage = base64UrlEncode(message).toNativeUtf8(); + final recvPort = ReceivePort("app_call_reply"); + final sendPort = recvPort.sendPort; + _appCallReply(sendPort.nativePort, nativeId, nativeEncodedMessage); + return processFutureVoid(recvPort.first); + } + @override String veilidVersionString() { final versionString = _veilidVersionString(); diff --git a/veilid-flutter/lib/veilid_js.dart b/veilid-flutter/lib/veilid_js.dart index ba9a4dc1..1d5e907c 100644 --- a/veilid-flutter/lib/veilid_js.dart +++ b/veilid-flutter/lib/veilid_js.dart @@ -5,6 +5,7 @@ import 'dart:js' as js; import 'dart:js_util' as js_util; import 'dart:async'; import 'dart:convert'; +import 'dart:typed_data'; ////////////////////////////////////////////////////////// @@ -82,6 +83,13 @@ class VeilidJS implements Veilid { return _wrapApiPromise(js_util.callMethod(wasm, "debug", [command])); } + @override + Future appCallReply(String id, Uint8List message) { + var encodedMessage = base64UrlEncode(message); + return _wrapApiPromise( + js_util.callMethod(wasm, "app_call_reply", [id, encodedMessage])); + } + @override String veilidVersionString() { return js_util.callMethod(wasm, "veilid_version_string", []); diff --git a/veilid-flutter/rust/Cargo.toml b/veilid-flutter/rust/Cargo.toml index deeea4d6..1967f5e1 100644 --- a/veilid-flutter/rust/Cargo.toml +++ b/veilid-flutter/rust/Cargo.toml @@ -20,6 +20,7 @@ serde_json = "^1" serde = "^1" futures-util = { version = "^0", default_features = false, features = ["alloc"] } cfg-if = "^1" +data-encoding = { version = "^2" } # Dependencies for native builds only # Linux, Windows, Mac, iOS, Android diff --git a/veilid-flutter/rust/src/dart_ffi.rs b/veilid-flutter/rust/src/dart_ffi.rs index 3048277f..00080298 100644 --- a/veilid-flutter/rust/src/dart_ffi.rs +++ b/veilid-flutter/rust/src/dart_ffi.rs @@ -327,6 +327,26 @@ pub extern "C" fn debug(port: i64, command: FfiStr) { }); } +#[no_mangle] +pub extern "C" fn app_call_reply(port: i64, id: FfiStr, message: FfiStr) { + let id = id.into_opt_string().unwrap_or_default(); + let message = message.into_opt_string().unwrap_or_default(); + DartIsolateWrapper::new(port).spawn_result(async move { + let id = match id.parse() { + Ok(v) => v, + Err(e) => { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "id", id)) + } + }; + let message = data_encoding::BASE64URL_NOPAD + .decode(message.as_bytes()) + .map_err(|e| veilid_core::VeilidAPIError::invalid_argument(e, "message", message))?; + let veilid_api = get_veilid_api().await?; + veilid_api.app_call_reply(id, message).await?; + APIRESULT_VOID + }); +} + #[no_mangle] pub extern "C" fn veilid_version_string() -> *mut c_char { veilid_core::veilid_version_string().into_ffi_value() diff --git a/veilid-server/proto/veilid-client.capnp b/veilid-server/proto/veilid-client.capnp index 1d4066b1..320ffea4 100644 --- a/veilid-server/proto/veilid-client.capnp +++ b/veilid-server/proto/veilid-client.capnp @@ -10,15 +10,16 @@ struct ApiResult { interface Registration {} interface VeilidServer { - register @0 (veilidClient: VeilidClient) -> (registration: Registration, state: Text); - debug @1 (command: Text) -> (result :ApiResult); + register @0 (veilidClient :VeilidClient) -> (registration :Registration, state :Text); + debug @1 (command :Text) -> (result :ApiResult); attach @2 () -> (result :ApiResult); detach @3 () -> (result :ApiResult); shutdown @4 (); getState @5 () -> (result :ApiResult); - changeLogLevel @6 (layer: Text, logLevel: Text) -> (result :ApiResult); + changeLogLevel @6 (layer :Text, logLevel :Text) -> (result :ApiResult); + appCallReply @7 (id :UInt64, message :Data) -> (result :ApiResult); } interface VeilidClient { - update @0 (veilidUpdate: Text); + update @0 (veilidUpdate :Text); } \ No newline at end of file diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index cafe89cb..6207cb5f 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -236,6 +236,25 @@ impl veilid_server::Server for VeilidServerImpl { encode_api_result(&result, &mut results.get().init_result()); Promise::ok(()) } + + #[instrument(level = "trace", skip_all)] + fn app_call_reply( + &mut self, + params: veilid_server::AppCallReplyParams, + mut results: veilid_server::AppCallReplyResults, + ) -> Promise<(), ::capnp::Error> { + trace!("VeilidServerImpl::app_call_reply"); + + let id = pry!(params.get()).get_id(); + let message = pry!(pry!(params.get()).get_message()).to_owned(); + + let veilid_api = self.veilid_api.clone(); + Promise::from_future(async move { + let result = veilid_api.app_call_reply(id, message).await; + encode_api_result(&result, &mut results.get().init_result()); + Ok(()) + }) + } } // --- Client API Server-Side --------------------------------- diff --git a/veilid-wasm/Cargo.toml b/veilid-wasm/Cargo.toml index c5cd2426..7ca4e42f 100644 --- a/veilid-wasm/Cargo.toml +++ b/veilid-wasm/Cargo.toml @@ -26,6 +26,7 @@ serde = "^1" lazy_static = "^1" send_wrapper = "^0" futures-util = { version = "^0", default_features = false, features = ["alloc"] } +data-encoding = { version = "^2" } [dev-dependencies] wasm-bindgen-test = "^0" diff --git a/veilid-wasm/src/lib.rs b/veilid-wasm/src/lib.rs index 9f0e1b4b..6baf1e80 100644 --- a/veilid-wasm/src/lib.rs +++ b/veilid-wasm/src/lib.rs @@ -268,6 +268,24 @@ pub fn debug(command: String) -> Promise { }) } +#[wasm_bindgen()] +pub fn app_call_reply(id: String, message: String) -> Promise { + wrap_api_future(async move { + let id = match id.parse() { + Ok(v) => v, + Err(e) => { + return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "id", id)) + } + }; + let message = data_encoding::BASE64URL_NOPAD + .decode(message.as_bytes()) + .map_err(|e| veilid_core::VeilidAPIError::invalid_argument(e, "message", message))?; + let veilid_api = get_veilid_api()?; + let out = veilid_api.app_call_reply(id, message).await?; + Ok(out) + }) +} + #[wasm_bindgen()] pub fn veilid_version_string() -> String { veilid_core::veilid_version_string()