This commit is contained in:
John Smith 2022-09-09 16:27:13 -04:00
parent b13f8947df
commit 72b03939ef
10 changed files with 99 additions and 51 deletions

View File

@ -1425,14 +1425,13 @@ impl NetworkManager {
}; };
// Send boot magic to requested peer address // Send boot magic to requested peer address
let data = BOOT_MAGIC.to_vec(); let data = BOOT_MAGIC.to_vec();
let out_data: Vec<u8> = match self let out_data: Vec<u8> = network_result_value_or_log!(debug self
.net() .net()
.send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms) .send_recv_data_unbound_to_dial_info(dial_info, data, timeout_ms)
.await? .await? =>
{ {
NetworkResult::Value(v) => v, return Ok(Vec::new());
_ => return Ok(Vec::new()), });
};
let bootstrap_peerinfo: Vec<PeerInfo> = let bootstrap_peerinfo: Vec<PeerInfo> =
deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?) deserialize_json(std::str::from_utf8(&out_data).wrap_err("bad utf8 in boot peerinfo")?)

View File

@ -181,8 +181,12 @@ impl NetworkManager {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
for bootstrap_di in bootstrap_dialinfos { for bootstrap_di in bootstrap_dialinfos {
log_net!(debug "direct bootstrap with: {}", bootstrap_di);
let peer_info = self.boot_request(bootstrap_di).await?; let peer_info = self.boot_request(bootstrap_di).await?;
log_net!(debug " direct bootstrap peerinfo: {:?}", peer_info);
// Got peer info, let's add it to the routing table // Got peer info, let's add it to the routing table
for pi in peer_info { for pi in peer_info {
let k = pi.node_id.key; let k = pi.node_id.key;

View File

@ -741,6 +741,8 @@ impl RoutingTable {
signed_node_info: SignedNodeInfo, signed_node_info: SignedNodeInfo,
allow_invalid: bool, allow_invalid: bool,
) -> Option<NodeRef> { ) -> Option<NodeRef> {
log_rtab!("register_node_with_signed_node_info: routing_domain: {:?}, node_id: {:?}, signed_node_info: {:?}, allow_invalid: {:?}", routing_domain, node_id, signed_node_info, allow_invalid );
// validate signed node info is not something malicious // validate signed node info is not something malicious
if node_id == self.node_id() { if node_id == self.node_id() {
log_rtab!(debug "can't register own node id in routing table"); log_rtab!(debug "can't register own node id in routing table");

View File

@ -853,9 +853,13 @@ impl fmt::Debug for DialInfoFilter {
let mut out = String::new(); let mut out = String::new();
if self.protocol_type_set != ProtocolTypeSet::all() { if self.protocol_type_set != ProtocolTypeSet::all() {
out += &format!("+{:?}", self.protocol_type_set); out += &format!("+{:?}", self.protocol_type_set);
} else {
out += "*";
} }
if self.address_type_set != AddressTypeSet::all() { if self.address_type_set != AddressTypeSet::all() {
out += &format!("+{:?}", self.address_type_set); out += &format!("+{:?}", self.address_type_set);
} else {
out += "*";
} }
write!(f, "[{}]", out) write!(f, "[{}]", out)
} }

View File

@ -11,7 +11,7 @@ import 'package:loggy/loggy.dart';
import 'config.dart'; import 'config.dart';
// Loggy tools // Loggy tools
const LogLevel traceLevel = LogLevel('trace', 1); const LogLevel traceLevel = LogLevel('Trace', 1);
class ConsolePrinter extends LoggyPrinter { class ConsolePrinter extends LoggyPrinter {
ConsolePrinter(this.childPrinter) : super(); ConsolePrinter(this.childPrinter) : super();
@ -226,7 +226,8 @@ class _MyAppState extends State<MyApp> with UiLoggy {
onPressed: _updateStream != null onPressed: _updateStream != null
? null ? null
: () async { : () async {
var updateStream = Veilid.instance.startupVeilidCore( var updateStream = await Veilid.instance
.startupVeilidCore(
await getDefaultVeilidConfig()); await getDefaultVeilidConfig());
setState(() { setState(() {
_updateStream = updateStream; _updateStream = updateStream;

View File

@ -1566,7 +1566,7 @@ abstract class Veilid {
void initializeVeilidCore(Map<String, dynamic> platformConfigJson); void initializeVeilidCore(Map<String, dynamic> platformConfigJson);
void changeLogLevel(String layer, VeilidConfigLogLevel logLevel); void changeLogLevel(String layer, VeilidConfigLogLevel logLevel);
Stream<VeilidUpdate> startupVeilidCore(VeilidConfig config); Future<Stream<VeilidUpdate>> startupVeilidCore(VeilidConfig config);
Future<VeilidState> getVeilidState(); Future<VeilidState> getVeilidState();
Future<void> attach(); Future<void> attach();
Future<void> detach(); Future<void> detach();

View File

@ -36,8 +36,8 @@ typedef _InitializeVeilidCoreDart = void Function(Pointer<Utf8>);
typedef _ChangeLogLevelC = Void Function(Pointer<Utf8>, Pointer<Utf8>); typedef _ChangeLogLevelC = Void Function(Pointer<Utf8>, Pointer<Utf8>);
typedef _ChangeLogLevelDart = void Function(Pointer<Utf8>, Pointer<Utf8>); typedef _ChangeLogLevelDart = void Function(Pointer<Utf8>, Pointer<Utf8>);
// fn startup_veilid_core(port: i64, config: FfiStr) // fn startup_veilid_core(port: i64, config: FfiStr)
typedef _StartupVeilidCoreC = Void Function(Int64, Pointer<Utf8>); typedef _StartupVeilidCoreC = Void Function(Int64, Int64, Pointer<Utf8>);
typedef _StartupVeilidCoreDart = void Function(int, Pointer<Utf8>); typedef _StartupVeilidCoreDart = void Function(int, int, Pointer<Utf8>);
// fn get_veilid_state(port: i64) // fn get_veilid_state(port: i64)
typedef _GetVeilidStateC = Void Function(Int64); typedef _GetVeilidStateC = Void Function(Int64);
typedef _GetVeilidStateDart = void Function(int); typedef _GetVeilidStateDart = void Function(int);
@ -199,6 +199,51 @@ Future<void> processFutureVoid(Future<dynamic> future) {
}); });
} }
Future<Stream<T>> processFutureStream<T>(
Stream<T> returnStream, Future<dynamic> future) {
return future.then((value) {
final list = value as List<dynamic>;
switch (list[0] as int) {
case messageOk:
{
if (list[1] != null) {
throw VeilidAPIExceptionInternal(
"Unexpected MESSAGE_OK value '${list[1]}' where null expected");
}
return returnStream;
}
case messageErr:
{
throw VeilidAPIExceptionInternal("Internal API Error: ${list[1]}");
}
case messageOkJson:
{
var ret = jsonDecode(list[1] as String);
if (ret != null) {
throw VeilidAPIExceptionInternal(
"Unexpected MESSAGE_OK_JSON value '$ret' where null expected");
}
return returnStream;
}
case messageErrJson:
{
throw VeilidAPIException.fromJson(jsonDecode(list[1] as String));
}
default:
{
throw VeilidAPIExceptionInternal(
"Unexpected async return message type: ${list[0]}");
}
}
}).catchError((e) {
// Wrap all other errors in VeilidAPIExceptionInternal
throw VeilidAPIExceptionInternal(e.toString());
}, test: (e) {
// Pass errors that are already VeilidAPIException through without wrapping
return e is! VeilidAPIException;
});
}
Stream<T> processStreamJson<T>( Stream<T> processStreamJson<T>(
T Function(Map<String, dynamic>) jsonConstructor, ReceivePort port) async* { T Function(Map<String, dynamic>) jsonConstructor, ReceivePort port) async* {
try { try {
@ -318,15 +363,20 @@ class VeilidFFI implements Veilid {
} }
@override @override
Stream<VeilidUpdate> startupVeilidCore(VeilidConfig config) { Future<Stream<VeilidUpdate>> startupVeilidCore(VeilidConfig config) {
var nativeConfig = var nativeConfig =
jsonEncode(config.json, toEncodable: veilidApiToEncodable) jsonEncode(config.json, toEncodable: veilidApiToEncodable)
.toNativeUtf8(); .toNativeUtf8();
final recvStreamPort = ReceivePort("veilid_api_stream");
final sendStreamPort = recvStreamPort.sendPort;
final recvPort = ReceivePort("startup_veilid_core"); final recvPort = ReceivePort("startup_veilid_core");
final sendPort = recvPort.sendPort; final sendPort = recvPort.sendPort;
_startupVeilidCore(sendPort.nativePort, nativeConfig); _startupVeilidCore(
sendPort.nativePort, sendStreamPort.nativePort, nativeConfig);
malloc.free(nativeConfig); malloc.free(nativeConfig);
return processStreamJson(VeilidUpdate.fromJson, recvPort); return processFutureStream(
processStreamJson(VeilidUpdate.fromJson, recvStreamPort),
recvPort.first);
} }
@override @override

View File

@ -35,7 +35,7 @@ class VeilidJS implements Veilid {
} }
@override @override
Stream<VeilidUpdate> startupVeilidCore(VeilidConfig config) async* { Future<Stream<VeilidUpdate>> startupVeilidCore(VeilidConfig config) async {
var streamController = StreamController<VeilidUpdate>(); var streamController = StreamController<VeilidUpdate>();
updateCallback(String update) { updateCallback(String update) {
var updateJson = jsonDecode(update); var updateJson = jsonDecode(update);
@ -51,7 +51,8 @@ class VeilidJS implements Veilid {
js.allowInterop(updateCallback), js.allowInterop(updateCallback),
jsonEncode(config.json, toEncodable: veilidApiToEncodable) jsonEncode(config.json, toEncodable: veilidApiToEncodable)
])); ]));
yield* streamController.stream;
return streamController.stream;
} }
@override @override

View File

@ -48,19 +48,6 @@ define_string_destructor!(free_string);
type APIResult<T> = Result<T, veilid_core::VeilidAPIError>; type APIResult<T> = Result<T, veilid_core::VeilidAPIError>;
const APIRESULT_VOID: APIResult<()> = APIResult::Ok(()); const APIRESULT_VOID: APIResult<()> = APIResult::Ok(());
// Stream abort macro for simplified error handling
macro_rules! check_err_json {
($stream:expr, $ex:expr) => {
match $ex {
Ok(v) => v,
Err(e) => {
$stream.abort_json(e);
return;
}
}
};
}
///////////////////////////////////////// /////////////////////////////////////////
// FFI-specific cofnig // FFI-specific cofnig
@ -253,25 +240,24 @@ pub extern "C" fn change_log_level(layer: FfiStr, log_level: FfiStr) {
#[no_mangle] #[no_mangle]
#[instrument] #[instrument]
pub extern "C" fn startup_veilid_core(port: i64, config: FfiStr) { pub extern "C" fn startup_veilid_core(port: i64, stream_port: i64, config: FfiStr) {
let config = config.into_opt_string(); let config = config.into_opt_string();
let stream = DartIsolateStream::new(port); let stream = DartIsolateStream::new(stream_port);
spawn(async move { DartIsolateWrapper::new(port).spawn_result_json(async move {
let config_json = match config { let config_json = match config {
Some(v) => v, Some(v) => v,
None => { None => {
stream.abort_json(veilid_core::VeilidAPIError::MissingArgument { let err = veilid_core::VeilidAPIError::MissingArgument {
context: "startup_veilid_core".to_owned(), context: "startup_veilid_core".to_owned(),
argument: "config".to_owned(), argument: "config".to_owned(),
}); };
return; return APIResult::Err(err);
} }
}; };
let mut api_lock = VEILID_API.lock().await; let mut api_lock = VEILID_API.lock().await;
if api_lock.is_some() { if api_lock.is_some() {
stream.abort_json(veilid_core::VeilidAPIError::AlreadyInitialized); return APIResult::Err(veilid_core::VeilidAPIError::AlreadyInitialized);
return;
} }
let sink = stream.clone(); let sink = stream.clone();
@ -287,9 +273,10 @@ pub extern "C" fn startup_veilid_core(port: i64, config: FfiStr) {
} }
}); });
let res = veilid_core::api_startup_json(update_callback, config_json).await; let veilid_api = veilid_core::api_startup_json(update_callback, config_json).await?;
let veilid_api = check_err_json!(stream, res);
*api_lock = Some(veilid_api); *api_lock = Some(veilid_api);
APIRESULT_VOID
}); });
} }

View File

@ -20,7 +20,7 @@ const MESSAGE_ERR_JSON: i32 = 3;
//const MESSAGE_STREAM_ITEM: i32 = 4; //const MESSAGE_STREAM_ITEM: i32 = 4;
const MESSAGE_STREAM_ITEM_JSON: i32 = 5; const MESSAGE_STREAM_ITEM_JSON: i32 = 5;
//const MESSAGE_STREAM_ABORT: i32 = 6; //const MESSAGE_STREAM_ABORT: i32 = 6;
const MESSAGE_STREAM_ABORT_JSON: i32 = 7; //const MESSAGE_STREAM_ABORT_JSON: i32 = 7;
const MESSAGE_STREAM_CLOSE: i32 = 8; const MESSAGE_STREAM_CLOSE: i32 = 8;
impl DartIsolateWrapper { impl DartIsolateWrapper {
@ -148,17 +148,17 @@ impl DartIsolateStream {
// } // }
// } // }
pub fn abort_json<E: Serialize + Debug>(self, error: E) -> bool { // pub fn abort_json<E: Serialize + Debug>(self, error: E) -> bool {
let mut inner = self.inner.lock(); // let mut inner = self.inner.lock();
if let Some(isolate) = inner.isolate.take() { // if let Some(isolate) = inner.isolate.take() {
isolate.post(vec![ // isolate.post(vec![
MESSAGE_STREAM_ABORT_JSON.into_dart(), // MESSAGE_STREAM_ABORT_JSON.into_dart(),
veilid_core::serialize_json(error).into_dart(), // veilid_core::serialize_json(error).into_dart(),
]) // ])
} else { // } else {
false // false
} // }
} // }
pub fn close(self) -> bool { pub fn close(self) -> bool {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();