diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index c7c12da5..36fadf67 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1425,14 +1425,13 @@ impl NetworkManager { }; // Send boot magic to requested peer address let data = BOOT_MAGIC.to_vec(); - let out_data: Vec = match self + let out_data: Vec = network_result_value_or_log!(debug self .net() .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) - .await? + .await? => { - NetworkResult::Value(v) => v, - _ => return Ok(Vec::new()), - }; + return Ok(Vec::new()); + }); let bootstrap_peerinfo: Vec = deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?) diff --git a/veilid-core/src/network_manager/tasks.rs b/veilid-core/src/network_manager/tasks.rs index 26cd7ffb..ebb9dea0 100644 --- a/veilid-core/src/network_manager/tasks.rs +++ b/veilid-core/src/network_manager/tasks.rs @@ -181,8 +181,12 @@ impl NetworkManager { let routing_table = self.routing_table(); for bootstrap_di in bootstrap_dialinfos { + log_net!(debug "direct bootstrap with: {}", bootstrap_di); + let peer_info = self.boot_request(bootstrap_di).await?; + log_net!(debug " direct bootstrap peerinfo: {:?}", peer_info); + // Got peer info, let's add it to the routing table for pi in peer_info { let k = pi.node_id.key; diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index e88c5994..1db34935 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -741,6 +741,8 @@ impl RoutingTable { signed_node_info: SignedNodeInfo, allow_invalid: bool, ) -> Option { + log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid ); + // validate signed node info is not something malicious if node_id == self.node_id() { log_rtab!(debug "can't register own node id in routing table"); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 960df4c0..bd8fbfd3 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -853,9 +853,13 @@ impl fmt::Debug for DialInfoFilter { let mut out = String::new(); if self.protocol_type_set != ProtocolTypeSet::all() { out += &format!("+{:?}", self.protocol_type_set); + } else { + out += "*"; } if self.address_type_set != AddressTypeSet::all() { out += &format!("+{:?}", self.address_type_set); + } else { + out += "*"; } write!(f, "[{}]", out) } diff --git a/veilid-flutter/example/lib/main.dart b/veilid-flutter/example/lib/main.dart index df760c34..e635f0dd 100644 --- a/veilid-flutter/example/lib/main.dart +++ b/veilid-flutter/example/lib/main.dart @@ -11,7 +11,7 @@ import 'package:loggy/loggy.dart'; import 'config.dart'; // Loggy tools -const LogLevel traceLevel = LogLevel('trace', 1); +const LogLevel traceLevel = LogLevel('Trace', 1); class ConsolePrinter extends LoggyPrinter { ConsolePrinter(this.childPrinter) : super(); @@ -226,8 +226,9 @@ class _MyAppState extends State with UiLoggy { onPressed: _updateStream != null ? null : () async { - var updateStream = Veilid.instance.startupVeilidCore( - await getDefaultVeilidConfig()); + var updateStream = await Veilid.instance + .startupVeilidCore( + await getDefaultVeilidConfig()); setState(() { _updateStream = updateStream; _updateProcessor = processUpdates(); diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index b7b48c4e..b859eca5 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -1566,7 +1566,7 @@ abstract class Veilid { void initializeVeilidCore(Map platformConfigJson); void changeLogLevel(String layer, VeilidConfigLogLevel logLevel); - Stream startupVeilidCore(VeilidConfig config); + Future> startupVeilidCore(VeilidConfig config); Future getVeilidState(); Future attach(); Future detach(); diff --git a/veilid-flutter/lib/veilid_ffi.dart b/veilid-flutter/lib/veilid_ffi.dart index 55e4f5c0..4b41bd83 100644 --- a/veilid-flutter/lib/veilid_ffi.dart +++ b/veilid-flutter/lib/veilid_ffi.dart @@ -36,8 +36,8 @@ typedef _InitializeVeilidCoreDart = void Function(Pointer); typedef _ChangeLogLevelC = Void Function(Pointer, Pointer); typedef _ChangeLogLevelDart = void Function(Pointer, Pointer); // fn startup_veilid_core(port: i64, config: FfiStr) -typedef _StartupVeilidCoreC = Void Function(Int64, Pointer); -typedef _StartupVeilidCoreDart = void Function(int, Pointer); +typedef _StartupVeilidCoreC = Void Function(Int64, Int64, Pointer); +typedef _StartupVeilidCoreDart = void Function(int, int, Pointer); // fn get_veilid_state(port: i64) typedef _GetVeilidStateC = Void Function(Int64); typedef _GetVeilidStateDart = void Function(int); @@ -199,6 +199,51 @@ Future processFutureVoid(Future future) { }); } +Future> processFutureStream( + Stream returnStream, Future future) { + return future.then((value) { + final list = value as List; + switch (list[0] as int) { + case messageOk: + { + if (list[1] != null) { + throw VeilidAPIExceptionInternal( + "Unexpected MESSAGE_OK value '${list[1]}' where null expected"); + } + return returnStream; + } + case messageErr: + { + throw VeilidAPIExceptionInternal("Internal API Error: ${list[1]}"); + } + case messageOkJson: + { + var ret = jsonDecode(list[1] as String); + if (ret != null) { + throw VeilidAPIExceptionInternal( + "Unexpected MESSAGE_OK_JSON value '$ret' where null expected"); + } + return returnStream; + } + case messageErrJson: + { + throw VeilidAPIException.fromJson(jsonDecode(list[1] as String)); + } + default: + { + throw VeilidAPIExceptionInternal( + "Unexpected async return message type: ${list[0]}"); + } + } + }).catchError((e) { + // Wrap all other errors in VeilidAPIExceptionInternal + throw VeilidAPIExceptionInternal(e.toString()); + }, test: (e) { + // Pass errors that are already VeilidAPIException through without wrapping + return e is! VeilidAPIException; + }); +} + Stream processStreamJson( T Function(Map) jsonConstructor, ReceivePort port) async* { try { @@ -318,15 +363,20 @@ class VeilidFFI implements Veilid { } @override - Stream startupVeilidCore(VeilidConfig config) { + Future> startupVeilidCore(VeilidConfig config) { var nativeConfig = jsonEncode(config.json, toEncodable: veilidApiToEncodable) .toNativeUtf8(); + final recvStreamPort = ReceivePort("veilid_api_stream"); + final sendStreamPort = recvStreamPort.sendPort; final recvPort = ReceivePort("startup_veilid_core"); final sendPort = recvPort.sendPort; - _startupVeilidCore(sendPort.nativePort, nativeConfig); + _startupVeilidCore( + sendPort.nativePort, sendStreamPort.nativePort, nativeConfig); malloc.free(nativeConfig); - return processStreamJson(VeilidUpdate.fromJson, recvPort); + return processFutureStream( + processStreamJson(VeilidUpdate.fromJson, recvStreamPort), + recvPort.first); } @override diff --git a/veilid-flutter/lib/veilid_js.dart b/veilid-flutter/lib/veilid_js.dart index 7958eb33..ba9a4dc1 100644 --- a/veilid-flutter/lib/veilid_js.dart +++ b/veilid-flutter/lib/veilid_js.dart @@ -35,7 +35,7 @@ class VeilidJS implements Veilid { } @override - Stream startupVeilidCore(VeilidConfig config) async* { + Future> startupVeilidCore(VeilidConfig config) async { var streamController = StreamController(); updateCallback(String update) { var updateJson = jsonDecode(update); @@ -51,7 +51,8 @@ class VeilidJS implements Veilid { js.allowInterop(updateCallback), jsonEncode(config.json, toEncodable: veilidApiToEncodable) ])); - yield* streamController.stream; + + return streamController.stream; } @override diff --git a/veilid-flutter/rust/src/dart_ffi.rs b/veilid-flutter/rust/src/dart_ffi.rs index e0430c93..3048277f 100644 --- a/veilid-flutter/rust/src/dart_ffi.rs +++ b/veilid-flutter/rust/src/dart_ffi.rs @@ -48,19 +48,6 @@ define_string_destructor!(free_string); type APIResult = Result; const APIRESULT_VOID: APIResult<()> = APIResult::Ok(()); -// Stream abort macro for simplified error handling -macro_rules! check_err_json { - ($stream:expr, $ex:expr) => { - match $ex { - Ok(v) => v, - Err(e) => { - $stream.abort_json(e); - return; - } - } - }; -} - ///////////////////////////////////////// // FFI-specific cofnig @@ -253,25 +240,24 @@ pub extern "C" fn change_log_level(layer: FfiStr, log_level: FfiStr) { #[no_mangle] #[instrument] -pub extern "C" fn startup_veilid_core(port: i64, config: FfiStr) { +pub extern "C" fn startup_veilid_core(port: i64, stream_port: i64, config: FfiStr) { let config = config.into_opt_string(); - let stream = DartIsolateStream::new(port); - spawn(async move { + let stream = DartIsolateStream::new(stream_port); + DartIsolateWrapper::new(port).spawn_result_json(async move { let config_json = match config { Some(v) => v, None => { - stream.abort_json(veilid_core::VeilidAPIError::MissingArgument { + let err = veilid_core::VeilidAPIError::MissingArgument { context: "startup_veilid_core".to_owned(), argument: "config".to_owned(), - }); - return; + }; + return APIResult::Err(err); } }; let mut api_lock = VEILID_API.lock().await; if api_lock.is_some() { - stream.abort_json(veilid_core::VeilidAPIError::AlreadyInitialized); - return; + return APIResult::Err(veilid_core::VeilidAPIError::AlreadyInitialized); } let sink = stream.clone(); @@ -287,9 +273,10 @@ pub extern "C" fn startup_veilid_core(port: i64, config: FfiStr) { } }); - let res = veilid_core::api_startup_json(update_callback, config_json).await; - let veilid_api = check_err_json!(stream, res); + let veilid_api = veilid_core::api_startup_json(update_callback, config_json).await?; *api_lock = Some(veilid_api); + + APIRESULT_VOID }); } diff --git a/veilid-flutter/rust/src/dart_isolate_wrapper.rs b/veilid-flutter/rust/src/dart_isolate_wrapper.rs index a3a92566..2c0bb371 100644 --- a/veilid-flutter/rust/src/dart_isolate_wrapper.rs +++ b/veilid-flutter/rust/src/dart_isolate_wrapper.rs @@ -20,7 +20,7 @@ const MESSAGE_ERR_JSON: i32 = 3; //const MESSAGE_STREAM_ITEM: i32 = 4; const MESSAGE_STREAM_ITEM_JSON: i32 = 5; //const MESSAGE_STREAM_ABORT: i32 = 6; -const MESSAGE_STREAM_ABORT_JSON: i32 = 7; +//const MESSAGE_STREAM_ABORT_JSON: i32 = 7; const MESSAGE_STREAM_CLOSE: i32 = 8; impl DartIsolateWrapper { @@ -148,17 +148,17 @@ impl DartIsolateStream { // } // } - pub fn abort_json(self, error: E) -> bool { - let mut inner = self.inner.lock(); - if let Some(isolate) = inner.isolate.take() { - isolate.post(vec![ - MESSAGE_STREAM_ABORT_JSON.into_dart(), - veilid_core::serialize_json(error).into_dart(), - ]) - } else { - false - } - } + // pub fn abort_json(self, error: E) -> bool { + // let mut inner = self.inner.lock(); + // if let Some(isolate) = inner.isolate.take() { + // isolate.post(vec![ + // MESSAGE_STREAM_ABORT_JSON.into_dart(), + // veilid_core::serialize_json(error).into_dart(), + // ]) + // } else { + // false + // } + // } pub fn close(self) -> bool { let mut inner = self.inner.lock();