more app message call

This commit is contained in:
John Smith 2022-09-30 22:37:55 -04:00
parent baa1714943
commit 046b61d5d8
15 changed files with 310 additions and 8 deletions

3
Cargo.lock generated
View File

@ -5100,6 +5100,7 @@ dependencies = [
"directories",
"flexi_logger",
"futures",
"hex",
"log",
"parking_lot 0.12.1",
"serde",
@ -5218,6 +5219,7 @@ dependencies = [
"async-std",
"backtrace",
"cfg-if 1.0.0",
"data-encoding",
"ffi-support",
"futures-util",
"hostname",
@ -5292,6 +5294,7 @@ version = "0.1.0"
dependencies = [
"cfg-if 1.0.0",
"console_error_panic_hook",
"data-encoding",
"futures-util",
"js-sys",
"lazy_static",

View File

@ -42,6 +42,7 @@ bugsalot = "^0"
flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] }
thiserror = "^1"
crossbeam-channel = "^0"
hex = "^0"
veilid-core = { path = "../veilid-core", default_features = false}
[dev-dependencies]

View File

@ -76,6 +76,12 @@ impl veilid_client::Server for VeilidClientImpl {
VeilidUpdate::Log(log) => {
self.comproc.update_log(log);
}
VeilidUpdate::AppMessage(msg) => {
self.comproc.update_app_message(msg);
}
VeilidUpdate::AppCall(call) => {
self.comproc.update_app_call(call);
}
VeilidUpdate::Attachment(attachment) => {
self.comproc.update_attachment(attachment);
}
@ -365,6 +371,29 @@ impl ClientApiConnection {
res.map_err(map_to_string)
}
pub async fn server_appcall_reply(&mut self, id: u64, msg: Vec<u8>) -> Result<(), String> {
trace!("ClientApiConnection::appcall_reply");
let server = {
let inner = self.inner.borrow();
inner
.server
.as_ref()
.ok_or_else(|| "Not connected, ignoring change_log_level request".to_owned())?
.clone()
};
let mut request = server.borrow().app_call_reply_request();
request.get().set_id(id);
request.get().set_message(&msg);
let response = request.send().promise.await.map_err(map_to_string)?;
let reader = response
.get()
.map_err(map_to_string)?
.get_result()
.map_err(map_to_string)?;
let res: Result<(), VeilidAPIError> = decode_api_result(&reader);
res.map_err(map_to_string)
}
// Start Client API connection
pub async fn connect(&mut self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::connect");

View File

@ -49,6 +49,7 @@ struct CommandProcessorInner {
autoreconnect: bool,
server_addr: Option<SocketAddr>,
connection_waker: Eventual,
last_call_id: Option<u64>,
}
type Handle<T> = Rc<RefCell<T>>;
@ -70,6 +71,7 @@ impl CommandProcessor {
autoreconnect: settings.autoreconnect,
server_addr: None,
connection_waker: Eventual::new(),
last_call_id: None,
})),
}
}
@ -111,6 +113,7 @@ attach - attach the server to the Veilid network
detach - detach the server from the Veilid network
debug - send a debugging command to the Veilid server
change_log_level - change the log level for a tracing layer
reply - reply to an AppCall not handled directly by the server
"#
.to_owned(),
);
@ -225,6 +228,66 @@ change_log_level - change the log level for a tracing layer
Ok(())
}
pub fn cmd_reply(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_reply");
let mut capi = self.capi();
let ui = self.ui();
let some_last_id = self.inner_mut().last_call_id.take();
spawn_detached_local(async move {
let (first, second) = Self::word_split(&rest.clone().unwrap_or_default());
let (id, msg) = if let Some(second) = second {
let id = match u64::from_str(&first) {
Err(e) => {
ui.add_node_event(format!("invalid appcall id: {}", e));
ui.send_callback(callback);
return;
}
Ok(v) => v,
};
(id, second)
} else {
let id = match some_last_id {
None => {
ui.add_node_event("must specify last call id".to_owned());
ui.send_callback(callback);
return;
}
Some(v) => v,
};
(id, rest.unwrap_or_default())
};
let msg = if msg[0..1] == "#".to_owned() {
match hex::decode(msg[1..].as_bytes().to_vec()) {
Err(e) => {
ui.add_node_event(format!("invalid hex message: {}", e));
ui.send_callback(callback);
return;
}
Ok(v) => v,
}
} else {
msg[1..].as_bytes().to_vec()
};
let msglen = msg.len();
match capi.server_appcall_reply(id, msg).await {
Ok(()) => {
ui.add_node_event(format!("reply sent to {} : {} bytes", id, msglen));
ui.send_callback(callback);
return;
}
Err(e) => {
ui.display_string_dialog(
"Server command 'appcall_reply' failed",
e.to_string(),
callback,
);
}
}
});
Ok(())
}
pub fn run_command(&self, command_line: &str, callback: UICallback) -> Result<(), String> {
//
let (cmd, rest) = Self::word_split(command_line);
@ -238,6 +301,7 @@ change_log_level - change the log level for a tracing layer
"detach" => self.cmd_detach(callback),
"debug" => self.cmd_debug(rest, callback),
"change_log_level" => self.cmd_change_log_level(rest, callback),
"reply" => self.cmd_reply(rest, callback),
_ => {
let ui = self.ui();
ui.send_callback(callback);
@ -344,6 +408,49 @@ change_log_level - change the log level for a tracing layer
));
}
pub fn update_app_message(&mut self, msg: veilid_core::VeilidAppMessage) {
// check is message body is ascii printable
let mut printable = true;
for c in &msg.message {
if *c < 32 || *c > 126 {
printable = false;
}
}
let strmsg = if printable {
String::from_utf8_lossy(&msg.message).to_string()
} else {
hex::encode(&msg.message)
};
self.inner()
.ui
.add_node_event(format!("AppMessage ({:?}): {}", msg.sender, strmsg));
}
pub fn update_app_call(&mut self, call: veilid_core::VeilidAppCall) {
// check is message body is ascii printable
let mut printable = true;
for c in &call.message {
if *c < 32 || *c > 126 {
printable = false;
}
}
let strmsg = if printable {
String::from_utf8_lossy(&call.message).to_string()
} else {
format!("#{}", hex::encode(&call.message))
};
self.inner().ui.add_node_event(format!(
"AppCall ({:?}) id = {:016x} : {}",
call.sender, call.id, strmsg
));
self.inner_mut().last_call_id = Some(call.id);
}
pub fn update_shutdown(&mut self) {
// Do nothing with this, we'll process shutdown when rpc connection closes
}
@ -381,7 +488,6 @@ change_log_level - change the log level for a tracing layer
// calls into client_api_connection
////////////////////////////////////////////
pub fn attach(&mut self) {
trace!("CommandProcessor::attach");
let mut capi = self.capi();
spawn_detached_local(async move {
@ -392,7 +498,6 @@ change_log_level - change the log level for a tracing layer
}
pub fn detach(&mut self) {
trace!("CommandProcessor::detach");
let mut capi = self.capi();
spawn_detached_local(async move {

View File

@ -230,18 +230,23 @@ pub struct VeilidLog {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VeilidAppMessage {
/// Some(sender) if the message was sent directly, None if received via a private/safety route
#[serde(with = "opt_json_as_string")]
pub sender: Option<NodeId>,
/// The content of the message to deliver to the application
#[serde(with = "json_as_base64")]
pub message: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VeilidAppCall {
/// Some(sender) if the request was sent directly, None if received via a private/safety route
#[serde(with = "opt_json_as_string")]
pub sender: Option<NodeId>,
/// The content of the request to deliver to the application
#[serde(with = "json_as_base64")]
pub message: Vec<u8>,
/// The id to reply to
#[serde(with = "json_as_string")]
pub id: u64,
}
@ -301,6 +306,14 @@ impl fmt::Display for NodeId {
write!(f, "{}", self.key.encode())
}
}
impl FromStr for NodeId {
type Err = VeilidAPIError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self {
key: DHTKey::try_decode(s)?,
})
}
}
#[derive(Clone, Debug, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)]
pub struct ValueKey {

View File

@ -41,6 +41,23 @@ pub fn serialize_json<T: Serialize + Debug>(val: T) -> String {
}
}
pub mod json_as_base64 {
use data_encoding::BASE64URL_NOPAD;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S: Serializer>(v: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
let base64 = BASE64URL_NOPAD.encode(v);
String::serialize(&base64, s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
let base64 = String::deserialize(d)?;
BASE64URL_NOPAD
.decode(base64.as_bytes())
.map_err(|e| serde::de::Error::custom(e))
}
}
pub mod json_as_string {
use std::fmt::Display;
use std::str::FromStr;

View File

@ -1,4 +1,6 @@
import 'dart:async';
import 'dart:typed_data';
import 'dart:convert';
import 'package:change_case/change_case.dart';
@ -1239,11 +1241,13 @@ abstract class VeilidUpdate {
}
case "AppMessage":
{
return VeilidAppMessage();
return VeilidAppMessage(
sender: json["sender"], message: json["message"]);
}
case "AppCall":
{
return VeilidAppCall();
return VeilidAppCall(
sender: json["sender"], message: json["message"], id: json["id"]);
}
case "Attachment":
{
@ -1286,6 +1290,49 @@ class VeilidLog implements VeilidUpdate {
}
}
class VeilidAppMessage implements VeilidUpdate {
final String? sender;
final Uint8List message;
//
VeilidAppMessage({
required this.sender,
required this.message,
});
@override
Map<String, dynamic> get json {
return {
'kind': "AppMessage",
'sender': sender,
'message': base64UrlEncode(message)
};
}
}
class VeilidAppCall implements VeilidUpdate {
final String? sender;
final Uint8List message;
final String id;
//
VeilidAppCall({
required this.sender,
required this.message,
required this.id,
});
@override
Map<String, dynamic> get json {
return {
'kind': "AppMessage",
'sender': sender,
'message': base64UrlEncode(message),
'id': id,
};
}
}
class VeilidUpdateAttachment implements VeilidUpdate {
final VeilidStateAttachment state;
//
@ -1580,6 +1627,7 @@ abstract class Veilid {
Future<void> detach();
Future<void> shutdownVeilidCore();
Future<String> debug(String command);
Future<void> appCallReply(String id, Uint8List message);
String veilidVersionString();
VeilidVersion veilidVersion();
}

View File

@ -3,6 +3,7 @@ import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'dart:convert';
import 'dart:typed_data';
import 'package:ffi/ffi.dart';
@ -50,6 +51,10 @@ typedef _DetachDart = void Function(int);
// fn debug(port: i64, log_level: FfiStr)
typedef _DebugC = Void Function(Int64, Pointer<Utf8>);
typedef _DebugDart = void Function(int, Pointer<Utf8>);
// fn app_call_reply(port: i64, id: FfiStr, message: FfiStr)
typedef _AppCallReplyC = Void Function(Int64, Pointer<Utf8>, Pointer<Utf8>);
typedef _AppCallReplyDart = void Function(int, Pointer<Utf8>, Pointer<Utf8>);
// fn shutdown_veilid_core(port: i64)
typedef _ShutdownVeilidCoreC = Void Function(Int64);
typedef _ShutdownVeilidCoreDart = void Function(int);
@ -304,6 +309,7 @@ class VeilidFFI implements Veilid {
final _DetachDart _detach;
final _ShutdownVeilidCoreDart _shutdownVeilidCore;
final _DebugDart _debug;
final _AppCallReplyDart _appCallReply;
final _VeilidVersionStringDart _veilidVersionString;
final _VeilidVersionDart _veilidVersion;
@ -328,6 +334,8 @@ class VeilidFFI implements Veilid {
dylib.lookupFunction<_ShutdownVeilidCoreC, _ShutdownVeilidCoreDart>(
'shutdown_veilid_core'),
_debug = dylib.lookupFunction<_DebugC, _DebugDart>('debug'),
_appCallReply = dylib.lookupFunction<_AppCallReplyC, _AppCallReplyDart>(
'app_call_reply'),
_veilidVersionString = dylib.lookupFunction<_VeilidVersionStringC,
_VeilidVersionStringDart>('veilid_version_string'),
_veilidVersion =
@ -420,6 +428,16 @@ class VeilidFFI implements Veilid {
return processFuturePlain(recvPort.first);
}
@override
Future<void> appCallReply(String id, Uint8List message) async {
var nativeId = id.toNativeUtf8();
var nativeEncodedMessage = base64UrlEncode(message).toNativeUtf8();
final recvPort = ReceivePort("app_call_reply");
final sendPort = recvPort.sendPort;
_appCallReply(sendPort.nativePort, nativeId, nativeEncodedMessage);
return processFutureVoid(recvPort.first);
}
@override
String veilidVersionString() {
final versionString = _veilidVersionString();

View File

@ -5,6 +5,7 @@ import 'dart:js' as js;
import 'dart:js_util' as js_util;
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
//////////////////////////////////////////////////////////
@ -82,6 +83,13 @@ class VeilidJS implements Veilid {
return _wrapApiPromise(js_util.callMethod(wasm, "debug", [command]));
}
@override
Future<void> appCallReply(String id, Uint8List message) {
var encodedMessage = base64UrlEncode(message);
return _wrapApiPromise(
js_util.callMethod(wasm, "app_call_reply", [id, encodedMessage]));
}
@override
String veilidVersionString() {
return js_util.callMethod(wasm, "veilid_version_string", []);

View File

@ -20,6 +20,7 @@ serde_json = "^1"
serde = "^1"
futures-util = { version = "^0", default_features = false, features = ["alloc"] }
cfg-if = "^1"
data-encoding = { version = "^2" }
# Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android

View File

@ -327,6 +327,26 @@ pub extern "C" fn debug(port: i64, command: FfiStr) {
});
}
#[no_mangle]
pub extern "C" fn app_call_reply(port: i64, id: FfiStr, message: FfiStr) {
let id = id.into_opt_string().unwrap_or_default();
let message = message.into_opt_string().unwrap_or_default();
DartIsolateWrapper::new(port).spawn_result(async move {
let id = match id.parse() {
Ok(v) => v,
Err(e) => {
return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "id", id))
}
};
let message = data_encoding::BASE64URL_NOPAD
.decode(message.as_bytes())
.map_err(|e| veilid_core::VeilidAPIError::invalid_argument(e, "message", message))?;
let veilid_api = get_veilid_api().await?;
veilid_api.app_call_reply(id, message).await?;
APIRESULT_VOID
});
}
#[no_mangle]
pub extern "C" fn veilid_version_string() -> *mut c_char {
veilid_core::veilid_version_string().into_ffi_value()

View File

@ -17,6 +17,7 @@ interface VeilidServer {
shutdown @4 ();
getState @5 () -> (result :ApiResult);
changeLogLevel @6 (layer :Text, logLevel :Text) -> (result :ApiResult);
appCallReply @7 (id :UInt64, message :Data) -> (result :ApiResult);
}
interface VeilidClient {

View File

@ -236,6 +236,25 @@ impl veilid_server::Server for VeilidServerImpl {
encode_api_result(&result, &mut results.get().init_result());
Promise::ok(())
}
#[instrument(level = "trace", skip_all)]
fn app_call_reply(
&mut self,
params: veilid_server::AppCallReplyParams,
mut results: veilid_server::AppCallReplyResults,
) -> Promise<(), ::capnp::Error> {
trace!("VeilidServerImpl::app_call_reply");
let id = pry!(params.get()).get_id();
let message = pry!(pry!(params.get()).get_message()).to_owned();
let veilid_api = self.veilid_api.clone();
Promise::from_future(async move {
let result = veilid_api.app_call_reply(id, message).await;
encode_api_result(&result, &mut results.get().init_result());
Ok(())
})
}
}
// --- Client API Server-Side ---------------------------------

View File

@ -26,6 +26,7 @@ serde = "^1"
lazy_static = "^1"
send_wrapper = "^0"
futures-util = { version = "^0", default_features = false, features = ["alloc"] }
data-encoding = { version = "^2" }
[dev-dependencies]
wasm-bindgen-test = "^0"

View File

@ -268,6 +268,24 @@ pub fn debug(command: String) -> Promise {
})
}
#[wasm_bindgen()]
pub fn app_call_reply(id: String, message: String) -> Promise {
wrap_api_future(async move {
let id = match id.parse() {
Ok(v) => v,
Err(e) => {
return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "id", id))
}
};
let message = data_encoding::BASE64URL_NOPAD
.decode(message.as_bytes())
.map_err(|e| veilid_core::VeilidAPIError::invalid_argument(e, "message", message))?;
let veilid_api = get_veilid_api()?;
let out = veilid_api.app_call_reply(id, message).await?;
Ok(out)
})
}
#[wasm_bindgen()]
pub fn veilid_version_string() -> String {
veilid_core::veilid_version_string()