This commit is contained in:
John Smith 2023-06-16 11:57:55 -04:00
parent d114ea3b72
commit 14ba85efda
18 changed files with 158 additions and 75 deletions

View File

@ -488,7 +488,7 @@ reply - reply to an AppCall not handled directly by the server
format!("#{}", hex::encode(&message)) format!("#{}", hex::encode(&message))
}; };
let id = json_str_u64(&call["id"]); let id = json_str_u64(&call["call_id"]);
self.inner().ui_sender.add_node_event(format!( self.inner().ui_sender.add_node_event(format!(
"AppCall ({:?}) id = {:016x} : {}", "AppCall ({:?}) id = {:016x} : {}",

View File

@ -1331,9 +1331,9 @@ impl RouteSpecStore {
} }
// ensure this isn't also an allocated route // ensure this isn't also an allocated route
if inner.content.get_id_by_key(&private_route.public_key.value).is_some() { // if inner.content.get_id_by_key(&private_route.public_key.value).is_some() {
bail!("should not import allocated route"); // bail!("should not import allocated route");
} // }
} }
inner.cache.cache_remote_private_route(cur_ts, id, private_routes); inner.cache.cache_remote_private_route(cur_ts, id, private_routes);

View File

@ -99,10 +99,14 @@ impl RPCProcessor {
} }
/// Exposed to API for apps to return app call answers /// Exposed to API for apps to return app call answers
pub async fn app_call_reply(&self, id: OperationId, message: Vec<u8>) -> Result<(), RPCError> { pub async fn app_call_reply(
&self,
call_id: OperationId,
message: Vec<u8>,
) -> Result<(), RPCError> {
self.unlocked_inner self.unlocked_inner
.waiting_app_call_table .waiting_app_call_table
.complete_op_waiter(id, message) .complete_op_waiter(call_id, message)
.await .await
} }
} }

View File

@ -258,10 +258,14 @@ impl VeilidAPI {
// App Calls // App Calls
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
pub async fn app_call_reply(&self, id: OperationId, message: Vec<u8>) -> VeilidAPIResult<()> { pub async fn app_call_reply(
&self,
call_id: OperationId,
message: Vec<u8>,
) -> VeilidAPIResult<()> {
let rpc_processor = self.rpc_processor()?; let rpc_processor = self.rpc_processor()?;
rpc_processor rpc_processor
.app_call_reply(id, message) .app_call_reply(call_id, message)
.await .await
.map_err(|e| e.into()) .map_err(|e| e.into())
} }

View File

@ -31,17 +31,23 @@ fn get_string(text: &str) -> Option<String> {
Some(text.to_owned()) Some(text.to_owned())
} }
fn get_route_id(rss: RouteSpecStore, allow_remote: bool) -> impl Fn(&str) -> Option<RouteId> { fn get_route_id(
rss: RouteSpecStore,
allow_allocated: bool,
allow_remote: bool,
) -> impl Fn(&str) -> Option<RouteId> {
return move |text: &str| { return move |text: &str| {
if text.is_empty() { if text.is_empty() {
return None; return None;
} }
match RouteId::from_str(text).ok() { match RouteId::from_str(text).ok() {
Some(key) => { Some(key) => {
if allow_allocated {
let routes = rss.list_allocated_routes(|k, _| Some(*k)); let routes = rss.list_allocated_routes(|k, _| Some(*k));
if routes.contains(&key) { if routes.contains(&key) {
return Some(key); return Some(key);
} }
}
if allow_remote { if allow_remote {
let rroutes = rss.list_remote_routes(|k, _| Some(*k)); let rroutes = rss.list_remote_routes(|k, _| Some(*k));
if rroutes.contains(&key) { if rroutes.contains(&key) {
@ -50,6 +56,7 @@ fn get_route_id(rss: RouteSpecStore, allow_remote: bool) -> impl Fn(&str) -> Opt
} }
} }
None => { None => {
if allow_allocated {
let routes = rss.list_allocated_routes(|k, _| Some(*k)); let routes = rss.list_allocated_routes(|k, _| Some(*k));
for r in routes { for r in routes {
let rkey = r.encode(); let rkey = r.encode();
@ -57,6 +64,7 @@ fn get_route_id(rss: RouteSpecStore, allow_remote: bool) -> impl Fn(&str) -> Opt
return Some(r); return Some(r);
} }
} }
}
if allow_remote { if allow_remote {
let routes = rss.list_remote_routes(|k, _| Some(*k)); let routes = rss.list_remote_routes(|k, _| Some(*k));
for r in routes { for r in routes {
@ -90,7 +98,7 @@ fn get_safety_selection(text: &str, routing_table: RoutingTable) -> Option<Safet
let mut sequencing = Sequencing::default(); let mut sequencing = Sequencing::default();
for x in text.split(",") { for x in text.split(",") {
let x = x.trim(); let x = x.trim();
if let Some(pr) = get_route_id(rss.clone(), false)(x) { if let Some(pr) = get_route_id(rss.clone(), true, false)(x) {
preferred_route = Some(pr) preferred_route = Some(pr)
} }
if let Some(n) = get_number(x) { if let Some(n) = get_number(x) {
@ -143,19 +151,29 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<D
return None; return None;
} }
if &text[0..1] == "#" { if &text[0..1] == "#" {
let rss = routing_table.route_spec_store();
// Private route // Private route
let text = &text[1..]; let text = &text[1..];
let n = get_number(text)?;
let mut dc = DEBUG_CACHE.lock();
let private_route_id = dc.imported_routes.get(n)?.clone();
let rss = routing_table.route_spec_store(); let private_route = if let Some(prid) = get_route_id(rss.clone(), false, true)(text) {
let Some(private_route) = rss.best_remote_private_route(&private_route_id) else { let Some(private_route) = rss.best_remote_private_route(&prid) else {
return None;
};
private_route
} else {
let mut dc = DEBUG_CACHE.lock();
let n = get_number(text)?;
let prid = dc.imported_routes.get(n)?.clone();
let Some(private_route) = rss.best_remote_private_route(&prid) else {
// Remove imported route // Remove imported route
dc.imported_routes.remove(n); dc.imported_routes.remove(n);
info!("removed dead imported route {}", n); info!("removed dead imported route {}", n);
return None; return None;
}; };
private_route
};
Some(Destination::private_route( Some(Destination::private_route(
private_route, private_route,
ss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())), ss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())),
@ -663,7 +681,7 @@ impl VeilidAPI {
1, 1,
"debug_route", "debug_route",
"route_id", "route_id",
get_route_id(rss.clone(), true), get_route_id(rss.clone(), true, true),
)?; )?;
// Release route // Release route
@ -695,7 +713,7 @@ impl VeilidAPI {
1, 1,
"debug_route", "debug_route",
"route_id", "route_id",
get_route_id(rss.clone(), false), get_route_id(rss.clone(), true, false),
)?; )?;
let full = { let full = {
if args.len() > 2 { if args.len() > 2 {
@ -747,7 +765,7 @@ impl VeilidAPI {
1, 1,
"debug_route", "debug_route",
"route_id", "route_id",
get_route_id(rss.clone(), false), get_route_id(rss.clone(), true, false),
)?; )?;
// Unpublish route // Unpublish route
@ -769,7 +787,7 @@ impl VeilidAPI {
1, 1,
"debug_route", "debug_route",
"route_id", "route_id",
get_route_id(rss.clone(), true), get_route_id(rss.clone(), true, true),
)?; )?;
match rss.debug_route(&route_id) { match rss.debug_route(&route_id) {
@ -831,7 +849,7 @@ impl VeilidAPI {
1, 1,
"debug_route", "debug_route",
"route_id", "route_id",
get_route_id(rss.clone(), true), get_route_id(rss.clone(), true, true),
)?; )?;
let success = rss let success = rss

View File

@ -67,15 +67,15 @@ pub struct VeilidAppCall {
/// The id to reply to /// The id to reply to
#[serde(with = "json_as_string")] #[serde(with = "json_as_string")]
#[schemars(with = "String")] #[schemars(with = "String")]
id: OperationId, call_id: OperationId,
} }
impl VeilidAppCall { impl VeilidAppCall {
pub fn new(sender: Option<TypedKey>, message: Vec<u8>, id: OperationId) -> Self { pub fn new(sender: Option<TypedKey>, message: Vec<u8>, call_id: OperationId) -> Self {
Self { Self {
sender, sender,
message, message,
id, call_id,
} }
} }
@ -86,6 +86,6 @@ impl VeilidAppCall {
&self.message &self.message
} }
pub fn id(&self) -> OperationId { pub fn id(&self) -> OperationId {
self.id self.call_id
} }
} }

View File

@ -1560,12 +1560,12 @@ class VeilidFFI implements Veilid {
} }
@override @override
Future<void> appCallReply(String id, Uint8List message) { Future<void> appCallReply(String call_id, Uint8List message) {
final nativeId = id.toNativeUtf8(); final nativeCallId = call_id.toNativeUtf8();
final nativeEncodedMessage = base64UrlNoPadEncode(message).toNativeUtf8(); final nativeEncodedMessage = base64UrlNoPadEncode(message).toNativeUtf8();
final recvPort = ReceivePort("app_call_reply"); final recvPort = ReceivePort("app_call_reply");
final sendPort = recvPort.sendPort; final sendPort = recvPort.sendPort;
_appCallReply(sendPort.nativePort, nativeId, nativeEncodedMessage); _appCallReply(sendPort.nativePort, nativeCallId, nativeEncodedMessage);
return processFutureVoid(recvPort.first); return processFutureVoid(recvPort.first);
} }

View File

@ -580,10 +580,10 @@ class VeilidJS implements Veilid {
} }
@override @override
Future<void> appCallReply(String id, Uint8List message) { Future<void> appCallReply(String callId, Uint8List message) {
var encodedMessage = base64UrlNoPadEncode(message); var encodedMessage = base64UrlNoPadEncode(message);
return _wrapApiPromise( return _wrapApiPromise(
js_util.callMethod(wasm, "app_call_reply", [id, encodedMessage])); js_util.callMethod(wasm, "app_call_reply", [callId, encodedMessage]));
} }
@override @override

View File

@ -262,7 +262,9 @@ abstract class VeilidUpdate {
case "AppCall": case "AppCall":
{ {
return VeilidAppCall( return VeilidAppCall(
sender: json["sender"], message: json["message"], id: json["id"]); sender: json["sender"],
message: json["message"],
callId: json["call_id"]);
} }
case "Attachment": case "Attachment":
{ {
@ -348,22 +350,22 @@ class VeilidAppMessage implements VeilidUpdate {
class VeilidAppCall implements VeilidUpdate { class VeilidAppCall implements VeilidUpdate {
final String? sender; final String? sender;
final Uint8List message; final Uint8List message;
final String id; final String callId;
// //
VeilidAppCall({ VeilidAppCall({
required this.sender, required this.sender,
required this.message, required this.message,
required this.id, required this.callId,
}); });
@override @override
Map<String, dynamic> toJson() { Map<String, dynamic> toJson() {
return { return {
'kind': "AppMessage", 'kind': "AppCall",
'sender': sender, 'sender': sender,
'message': base64UrlNoPadEncode(message), 'message': base64UrlNoPadEncode(message),
'id': id, 'call_id': callId,
}; };
} }
} }

View File

@ -707,21 +707,21 @@ pub extern "C" fn release_private_route(port: i64, route_id: FfiStr) {
} }
#[no_mangle] #[no_mangle]
pub extern "C" fn app_call_reply(port: i64, id: FfiStr, message: FfiStr) { pub extern "C" fn app_call_reply(port: i64, call_id: FfiStr, message: FfiStr) {
let id = id.into_opt_string().unwrap_or_default(); let call_id = call_id.into_opt_string().unwrap_or_default();
let message = message.into_opt_string().unwrap_or_default(); let message = message.into_opt_string().unwrap_or_default();
DartIsolateWrapper::new(port).spawn_result(async move { DartIsolateWrapper::new(port).spawn_result(async move {
let id = match id.parse() { let call_id = match call_id.parse() {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "id", id)) return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "call_id", call_id))
} }
}; };
let message = data_encoding::BASE64URL_NOPAD let message = data_encoding::BASE64URL_NOPAD
.decode(message.as_bytes()) .decode(message.as_bytes())
.map_err(|e| veilid_core::VeilidAPIError::invalid_argument(e, "message", message))?; .map_err(|e| veilid_core::VeilidAPIError::invalid_argument(e, "message", message))?;
let veilid_api = get_veilid_api().await?; let veilid_api = get_veilid_api().await?;
veilid_api.app_call_reply(id, message).await?; veilid_api.app_call_reply(call_id, message).await?;
APIRESULT_VOID APIRESULT_VOID
}); });
} }

View File

@ -12,6 +12,15 @@ async def test_connect():
pass pass
await simple_connect_and_run(func) await simple_connect_and_run(func)
@pytest.mark.asyncio
async def test_get_node_id():
async def func(api: veilid.VeilidAPI):
# get our own node id
state = await api.get_state()
node_id = state.config.config.network.routing_table.node_id.pop()
await simple_connect_and_run(func)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_fail_connect(): async def test_fail_connect():
with pytest.raises(Exception): with pytest.raises(Exception):
@ -27,3 +36,4 @@ async def test_version():
vstr = await api.veilid_version_string() vstr = await api.veilid_version_string()
print("veilid_version_string: {}".format(vstr)) print("veilid_version_string: {}".format(vstr))
await simple_connect_and_run(func) await simple_connect_and_run(func)

View File

@ -32,16 +32,56 @@ async def test_routing_context_app_message_loopback():
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_privacy() rc = await (await api.new_routing_context()).with_privacy()
# get our own node id # make a new local private route
state = await api.get_state() prl, blob = await api.new_private_route()
node_id = state.config.config.network.routing_table.node_id.pop()
# send an app message to our node id # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
# send an app message to our own private route
message = b"abcd1234" message = b"abcd1234"
await rc.app_message(node_id, message) await rc.app_message(prr, message)
# we should get the same message back # we should get the same message back
#update: veilid.VeilidUpdate = await asyncio.wait_for(app_message_queue.get(), timeout=10) update: veilid.VeilidUpdate = await asyncio.wait_for(app_message_queue.get(), timeout=10)
#appmsg: veilid.VeilidAppMessage = update.detail appmsg: veilid.VeilidAppMessage = update.detail
#assert appmsg.message == message assert appmsg.message == message
@pytest.mark.asyncio
async def test_routing_context_app_call_loopback():
app_call_queue = asyncio.Queue()
async def app_call_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_CALL:
await app_call_queue.put(update)
api = await veilid.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, app_call_queue_update_callback)
async with api:
# make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_privacy()
# make a new local private route
prl, blob = await api.new_private_route()
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
# send an app message to our own private route
request = b"abcd1234"
app_call_task = asyncio.create_task(rc.app_call(prr, request), name = "app call task")
# we should get the same request back
update: veilid.VeilidUpdate = await asyncio.wait_for(app_call_queue.get(), timeout=10)
appcall: veilid.VeilidAppCall = update.detail
assert appcall.message == request
# now we reply to the request
reply = b"qwer5678"
await api.app_call_reply(appcall.call_id, reply)
# now we should get the reply from the call
result = await app_call_task
assert result == reply

View File

@ -159,10 +159,10 @@ class VeilidAPI(ABC):
async def detach(self): async def detach(self):
pass pass
@abstractmethod @abstractmethod
async def new_private_route(self) -> NewPrivateRouteResult: async def new_private_route(self) -> Tuple[RouteId, bytes]:
pass pass
@abstractmethod @abstractmethod
async def new_custom_private_route(self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing) -> NewPrivateRouteResult: async def new_custom_private_route(self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing) -> Tuple[RouteId, bytes]:
pass pass
@abstractmethod @abstractmethod
async def import_remote_private_route(self, blob: bytes) -> RouteId: async def import_remote_private_route(self, blob: bytes) -> RouteId:

View File

@ -223,15 +223,15 @@ class _JsonVeilidAPI(VeilidAPI):
raise_api_result(await self.send_ndjson_request(Operation.ATTACH)) raise_api_result(await self.send_ndjson_request(Operation.ATTACH))
async def detach(self): async def detach(self):
raise_api_result(await self.send_ndjson_request(Operation.DETACH)) raise_api_result(await self.send_ndjson_request(Operation.DETACH))
async def new_private_route(self) -> NewPrivateRouteResult: async def new_private_route(self) -> Tuple[RouteId, bytes]:
return NewPrivateRouteResult.from_json(raise_api_result(await self.send_ndjson_request(Operation.NEW_PRIVATE_ROUTE))) return NewPrivateRouteResult.from_json(raise_api_result(await self.send_ndjson_request(Operation.NEW_PRIVATE_ROUTE))).to_tuple()
async def new_custom_private_route(self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing) -> NewPrivateRouteResult: async def new_custom_private_route(self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing) -> Tuple[RouteId, bytes]:
return NewPrivateRouteResult.from_json(raise_api_result( return NewPrivateRouteResult.from_json(raise_api_result(
await self.send_ndjson_request(Operation.NEW_CUSTOM_PRIVATE_ROUTE, await self.send_ndjson_request(Operation.NEW_CUSTOM_PRIVATE_ROUTE,
kinds = kinds, kinds = kinds,
stability = stability, stability = stability,
sequencing = sequencing) sequencing = sequencing)
)) )).to_tuple()
async def import_remote_private_route(self, blob: bytes) -> RouteId: async def import_remote_private_route(self, blob: bytes) -> RouteId:
return RouteId(raise_api_result( return RouteId(raise_api_result(
await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE, await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE,

View File

@ -2347,12 +2347,12 @@
"description": "Direct question blob passed to hosting application for processing to send an eventual AppReply", "description": "Direct question blob passed to hosting application for processing to send an eventual AppReply",
"type": "object", "type": "object",
"required": [ "required": [
"id", "call_id",
"kind", "kind",
"message" "message"
], ],
"properties": { "properties": {
"id": { "call_id": {
"description": "The id to reply to", "description": "The id to reply to",
"type": "string" "type": "string"
}, },

View File

@ -250,12 +250,12 @@ class VeilidAppMessage:
class VeilidAppCall: class VeilidAppCall:
sender: Optional[TypedKey] sender: Optional[TypedKey]
message: bytes message: bytes
operation_id: str call_id: str
def __init__(self, sender: Optional[TypedKey], message: bytes, operation_id: str): def __init__(self, sender: Optional[TypedKey], message: bytes, call_id: str):
self.sender = sender self.sender = sender
self.message = message self.message = message
self.operation_id = operation_id self.call_id = call_id
@staticmethod @staticmethod
def from_json(j: dict) -> Self: def from_json(j: dict) -> Self:
@ -263,7 +263,7 @@ class VeilidAppCall:
return VeilidAppCall( return VeilidAppCall(
None if j['sender'] is None else TypedKey(j['sender']), None if j['sender'] is None else TypedKey(j['sender']),
urlsafe_b64decode_no_pad(j['message']), urlsafe_b64decode_no_pad(j['message']),
j['operation_id']) j['call_id'])
class VeilidRouteChange: class VeilidRouteChange:
dead_routes: list[RouteId] dead_routes: list[RouteId]

View File

@ -204,6 +204,9 @@ class NewPrivateRouteResult:
self.route_id = route_id self.route_id = route_id
self.blob = blob self.blob = blob
def to_tuple(self) -> Tuple[RouteId, bytes]:
return (self.route_id, self.blob)
@staticmethod @staticmethod
def from_json(j: dict) -> Self: def from_json(j: dict) -> Self:
return NewPrivateRouteResult( return NewPrivateRouteResult(

View File

@ -641,19 +641,21 @@ pub fn release_private_route(route_id: String) -> Promise {
} }
#[wasm_bindgen()] #[wasm_bindgen()]
pub fn app_call_reply(id: String, message: String) -> Promise { pub fn app_call_reply(call_id: String, message: String) -> Promise {
let message: Vec<u8> = data_encoding::BASE64URL_NOPAD let message: Vec<u8> = data_encoding::BASE64URL_NOPAD
.decode(message.as_bytes()) .decode(message.as_bytes())
.unwrap(); .unwrap();
wrap_api_future_void(async move { wrap_api_future_void(async move {
let id = match id.parse() { let call_id = match call_id.parse() {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(e, "id", id)) return APIResult::Err(veilid_core::VeilidAPIError::invalid_argument(
e, "call_id", call_id,
))
} }
}; };
let veilid_api = get_veilid_api()?; let veilid_api = get_veilid_api()?;
veilid_api.app_call_reply(id, message).await?; veilid_api.app_call_reply(call_id, message).await?;
APIRESULT_UNDEFINED APIRESULT_UNDEFINED
}) })
} }