From f65400a1cee66bde16508512b0dcbe571dd62dfc Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Wed, 19 Jul 2023 10:07:51 -0400 Subject: [PATCH] network fixes --- Cargo.lock | 1 - doc/config/sample.config | 2 +- doc/config/veilid-server-config.md | 2 +- veilid-core/src/routing_table/bucket_entry.rs | 4 +- veilid-core/src/routing_table/mod.rs | 2 +- .../tasks/peer_minimum_refresh.rs | 34 +++- veilid-core/src/rpc_processor/mod.rs | 5 +- .../src/tests/common/test_veilid_config.rs | 4 +- veilid-flutter/lib/default_config.dart | 2 +- veilid-flutter/lib/routing_context.dart | 2 + veilid-flutter/lib/veilid_crypto.dart | 8 + veilid-flutter/lib/veilid_ffi.dart | 171 ++++++++++++----- veilid-flutter/lib/veilid_js.dart | 173 ++++++++++++------ veilid-flutter/lib/veilid_table_db.dart | 2 + veilid-server/src/main.rs | 22 ++- veilid-server/src/settings.rs | 4 +- 16 files changed, 309 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ad7010c..ec8c24f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5872,7 +5872,6 @@ dependencies = [ "oslog", "paranoid-android", "parking_lot 0.11.2", - "parking_lot 0.12.1", "rand 0.7.3", "range-set-blaze", "rust-fsm", diff --git a/doc/config/sample.config b/doc/config/sample.config index c2af31a9..5b752db1 100644 --- a/doc/config/sample.config +++ b/doc/config/sample.config @@ -80,7 +80,7 @@ core: set_value_count: 5 set_value_fanout: 4 min_peer_count: 20 - min_peer_refresh_time_ms: 2000 + min_peer_refresh_time_ms: 60000 validate_dial_info_receipt_time_ms: 2000 local_subkey_cache_size: 128 local_max_subkey_cache_memory_mb: 256 diff --git a/doc/config/veilid-server-config.md b/doc/config/veilid-server-config.md index 5c81b6b3..a503c7c5 100644 --- a/doc/config/veilid-server-config.md +++ b/doc/config/veilid-server-config.md @@ -247,7 +247,7 @@ dht: set_value_count: 5 set_value_fanout: 4 min_peer_count: 20 - min_peer_refresh_time_ms: 2000 + min_peer_refresh_time_ms: 60000 validate_dial_info_receipt_time_ms: 2000 local_subkey_cache_size: 128 local_max_subkey_cache_memory_mb: 256 diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index b2066539..4ad8b42d 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -261,7 +261,7 @@ impl BucketEntryInner { // See if we have an existing signed_node_info to update or not let mut node_info_changed = false; if let Some(current_sni) = opt_current_sni { - // Always allow overwriting invalid/unsigned node + // Always allow overwriting unsigned node (bootstrap) if current_sni.has_any_signature() { // If the timestamp hasn't changed or is less, ignore this update if signed_node_info.timestamp() <= current_sni.timestamp() { @@ -424,7 +424,7 @@ impl BucketEntryInner { } // Removes a connection descriptor in this entry's table of last connections - pub fn clear_last_connection(&mut self, last_connection: ConnectionDescriptor) { + pub fn clear_last_connection(&mut self, last_connection: ConnectionDescriptor) { let key = self.descriptor_to_key(last_connection); self.last_connections .remove(&key); diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index a44223bb..a1106d83 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -208,7 +208,7 @@ impl RoutingTable { rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), kick_buckets_task: TickTask::new(1), bootstrap_task: TickTask::new(1), - peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), + peer_minimum_refresh_task: TickTask::new(1), ping_validator_task: TickTask::new(1), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index 97cc9c01..f2dc92fc 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -19,12 +19,18 @@ impl RoutingTable { // Get counts by crypto kind let entry_count = self.inner.read().cached_entry_counts(); - let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); + let (min_peer_count, min_peer_refresh_time_ms) = self.with_config(|c| { + ( + c.network.dht.min_peer_count as usize, + c.network.dht.min_peer_refresh_time_ms, + ) + }); // For the PublicInternet routing domain, get list of all peers we know about // even the unreliable ones, and ask them to find nodes close to our node too let mut ord = FuturesOrdered::new(); + let cur_ts = get_timestamp(); for crypto_kind in VALID_CRYPTO_KINDS { // Do we need to peer minimum refresh this crypto kind? @@ -37,16 +43,26 @@ impl RoutingTable { } let routing_table = self.clone(); - let mut filters = VecDeque::new(); let filter = Box::new( - move |_rti: &RoutingTableInner, opt_entry: Option>| { - // Keep only the entries that contain the crypto kind we're looking for - if let Some(entry) = opt_entry { - entry.with_inner(|e| e.crypto_kinds().contains(&crypto_kind)) - } else { - VALID_CRYPTO_KINDS.contains(&crypto_kind) - } + move |rti: &RoutingTableInner, opt_entry: Option>| { + let entry = opt_entry.unwrap().clone(); + entry.with(rti, |_rti, e| { + // Keep only the entries that contain the crypto kind we're looking for + let compatible_crypto = e.crypto_kinds().contains(&crypto_kind); + if !compatible_crypto { + return false; + } + // Keep only the entries we haven't talked to in the min_peer_refresh_time + if let Some(last_q_ts) = e.peer_stats().rpc_stats.last_question_ts { + if cur_ts.saturating_sub(last_q_ts.as_u64()) + < (min_peer_refresh_time_ms as u64 * 1_000u64) + { + return false; + } + } + true + }) }, ) as RoutingTableEntryFilter; filters.push_front(filter); diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 3f73c048..bcdf9aa2 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -324,10 +324,13 @@ impl RPCProcessor { let timeout_us = TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)); let max_route_hop_count = c.network.rpc.max_route_hop_count as usize; if concurrency == 0 { - concurrency = get_concurrency() / 2; + concurrency = get_concurrency(); if concurrency == 0 { concurrency = 1; } + + // Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay + concurrency *= 16; } let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index 9ba054ac..4211cf24 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -217,7 +217,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn { "network.dht.set_value_count" => Ok(Box::new(5u32)), "network.dht.set_value_fanout" => Ok(Box::new(4u32)), "network.dht.min_peer_count" => Ok(Box::new(20u32)), - "network.dht.min_peer_refresh_time_ms" => Ok(Box::new(2_000u32)), + "network.dht.min_peer_refresh_time_ms" => Ok(Box::new(60_000u32)), "network.dht.validate_dial_info_receipt_time_ms" => Ok(Box::new(5_000u32)), "network.dht.local_subkey_cache_size" => Ok(Box::new(128u32)), "network.dht.local_max_subkey_cache_memory_mb" => Ok(Box::new(256u32)), @@ -345,7 +345,7 @@ pub async fn test_config() { assert_eq!(inner.network.dht.set_value_count, 5u32); assert_eq!(inner.network.dht.set_value_fanout, 4u32); assert_eq!(inner.network.dht.min_peer_count, 20u32); - assert_eq!(inner.network.dht.min_peer_refresh_time_ms, 2_000u32); + assert_eq!(inner.network.dht.min_peer_refresh_time_ms, 60_000u32); assert_eq!( inner.network.dht.validate_dial_info_receipt_time_ms, 5_000u32 diff --git a/veilid-flutter/lib/default_config.dart b/veilid-flutter/lib/default_config.dart index a7f15074..3917842f 100644 --- a/veilid-flutter/lib/default_config.dart +++ b/veilid-flutter/lib/default_config.dart @@ -118,7 +118,7 @@ Future getDefaultVeilidConfig(String programName) async { setValueCount: 20, setValueFanout: 5, minPeerCount: 20, - minPeerRefreshTimeMs: 2000, + minPeerRefreshTimeMs: 60000, validateDialInfoReceiptTimeMs: 2000, localSubkeyCacheSize: getLocalSubkeyCacheSize(), localMaxSubkeyCacheMemoryMb: getLocalMaxSubkeyCacheMemoryMb(), diff --git a/veilid-flutter/lib/routing_context.dart b/veilid-flutter/lib/routing_context.dart index 84dd7eaa..778c3fbe 100644 --- a/veilid-flutter/lib/routing_context.dart +++ b/veilid-flutter/lib/routing_context.dart @@ -226,6 +226,8 @@ class RouteBlob with _$RouteBlob { /// VeilidRoutingContext abstract class VeilidRoutingContext { + void close(); + // Modifiers VeilidRoutingContext withPrivacy(); VeilidRoutingContext withCustomPrivacy(SafetySelection safetySelection); diff --git a/veilid-flutter/lib/veilid_crypto.dart b/veilid-flutter/lib/veilid_crypto.dart index 6801f6e5..4a38841f 100644 --- a/veilid-flutter/lib/veilid_crypto.dart +++ b/veilid-flutter/lib/veilid_crypto.dart @@ -176,10 +176,18 @@ abstract class VeilidCryptoSystem { Future generateHash(Uint8List data); //Future generateHashReader(Stream> reader); Future validateKeyPair(PublicKey key, SecretKey secret); + Future validateKeyPairWithKeyPair(KeyPair keyPair) { + return validateKeyPair(keyPair.key, keyPair.secret); + } + Future validateHash(Uint8List data, HashDigest hash); //Future validateHashReader(Stream> reader, HashDigest hash); Future distance(CryptoKey key1, CryptoKey key2); Future sign(PublicKey key, SecretKey secret, Uint8List data); + Future signWithKeyPair(KeyPair keyPair, Uint8List data) { + return sign(keyPair.key, keyPair.secret, data); + } + Future verify(PublicKey key, Uint8List data, Signature signature); Future aeadOverhead(); Future decryptAead(Uint8List body, Nonce nonce, diff --git a/veilid-flutter/lib/veilid_ffi.dart b/veilid-flutter/lib/veilid_ffi.dart index f3fe37c7..cfadb6a4 100644 --- a/veilid-flutter/lib/veilid_ffi.dart +++ b/veilid-flutter/lib/veilid_ffi.dart @@ -536,7 +536,7 @@ Stream processStreamJson( case messageStreamItemJson: { if (list[1] == null) { - throw VeilidAPIExceptionInternal( + throw const VeilidAPIExceptionInternal( "Null MESSAGE_STREAM_ITEM_JSON value"); } var ret = jsonDecode(list[1] as String); @@ -573,49 +573,70 @@ Stream processStreamJson( } class _Ctx { - final int id; + int? id; final VeilidFFI ffi; - _Ctx(this.id, this.ffi); + _Ctx(int this.id, this.ffi); + + void ensureValid() { + if (id == null) { + throw VeilidAPIExceptionNotInitialized(); + } + } + + void close() { + if (id != null) { + ffi._releaseRoutingContext(id!); + id = null; + } + } } // FFI implementation of VeilidRoutingContext -class VeilidRoutingContextFFI implements VeilidRoutingContext { +class VeilidRoutingContextFFI extends VeilidRoutingContext { final _Ctx _ctx; - static final Finalizer<_Ctx> _finalizer = - Finalizer((ctx) => ctx.ffi._releaseRoutingContext(ctx.id)); + static final Finalizer<_Ctx> _finalizer = Finalizer((ctx) => ctx.close()); VeilidRoutingContextFFI._(this._ctx) { _finalizer.attach(this, _ctx, detach: this); } + @override + void close() { + _ctx.close(); + } + @override VeilidRoutingContextFFI withPrivacy() { - final newId = _ctx.ffi._routingContextWithPrivacy(_ctx.id); + _ctx.ensureValid(); + final newId = _ctx.ffi._routingContextWithPrivacy(_ctx.id!); return VeilidRoutingContextFFI._(_Ctx(newId, _ctx.ffi)); } @override VeilidRoutingContextFFI withCustomPrivacy(SafetySelection safetySelection) { + _ctx.ensureValid(); final newId = _ctx.ffi._routingContextWithCustomPrivacy( - _ctx.id, jsonEncode(safetySelection).toNativeUtf8()); + _ctx.id!, jsonEncode(safetySelection).toNativeUtf8()); return VeilidRoutingContextFFI._(_Ctx(newId, _ctx.ffi)); } @override VeilidRoutingContextFFI withSequencing(Sequencing sequencing) { + _ctx.ensureValid(); final newId = _ctx.ffi._routingContextWithSequencing( - _ctx.id, jsonEncode(sequencing).toNativeUtf8()); + _ctx.id!, jsonEncode(sequencing).toNativeUtf8()); return VeilidRoutingContextFFI._(_Ctx(newId, _ctx.ffi)); } @override Future appCall(String target, Uint8List request) async { + _ctx.ensureValid(); var nativeEncodedTarget = target.toNativeUtf8(); var nativeEncodedRequest = base64UrlNoPadEncode(request).toNativeUtf8(); final recvPort = ReceivePort("routing_context_app_call"); final sendPort = recvPort.sendPort; - _ctx.ffi._routingContextAppCall(sendPort.nativePort, _ctx.id, + _ctx.ffi._routingContextAppCall(sendPort.nativePort, _ctx.id!, nativeEncodedTarget, nativeEncodedRequest); final out = await processFuturePlain(recvPort.first); return base64UrlNoPadDecode(out); @@ -623,12 +644,13 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future appMessage(String target, Uint8List message) { + _ctx.ensureValid(); final nativeEncodedTarget = target.toNativeUtf8(); final nativeEncodedMessage = base64UrlNoPadEncode(message).toNativeUtf8(); final recvPort = ReceivePort("routing_context_app_message"); final sendPort = recvPort.sendPort; - _ctx.ffi._routingContextAppMessage(sendPort.nativePort, _ctx.id, + _ctx.ffi._routingContextAppMessage(sendPort.nativePort, _ctx.id!, nativeEncodedTarget, nativeEncodedMessage); return processFutureVoid(recvPort.first); } @@ -636,11 +658,12 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future createDHTRecord(DHTSchema schema, {CryptoKind kind = 0}) async { + _ctx.ensureValid(); final nativeSchema = jsonEncode(schema).toNativeUtf8(); final recvPort = ReceivePort("routing_context_create_dht_record"); final sendPort = recvPort.sendPort; _ctx.ffi._routingContextCreateDHTRecord( - sendPort.nativePort, _ctx.id, nativeSchema, kind); + sendPort.nativePort, _ctx.id!, nativeSchema, kind); final dhtRecordDescriptor = await processFutureJson(DHTRecordDescriptor.fromJson, recvPort.first); return dhtRecordDescriptor; @@ -649,13 +672,14 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future openDHTRecord( TypedKey key, KeyPair? writer) async { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final nativeWriter = writer != null ? jsonEncode(key).toNativeUtf8() : nullptr; final recvPort = ReceivePort("routing_context_open_dht_record"); final sendPort = recvPort.sendPort; _ctx.ffi._routingContextOpenDHTRecord( - sendPort.nativePort, _ctx.id, nativeKey, nativeWriter); + sendPort.nativePort, _ctx.id!, nativeKey, nativeWriter); final dhtRecordDescriptor = await processFutureJson(DHTRecordDescriptor.fromJson, recvPort.first); return dhtRecordDescriptor; @@ -663,32 +687,35 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future closeDHTRecord(TypedKey key) { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final recvPort = ReceivePort("routing_context_close_dht_record"); final sendPort = recvPort.sendPort; - _ctx.ffi - ._routingContextCloseDHTRecord(sendPort.nativePort, _ctx.id, nativeKey); + _ctx.ffi._routingContextCloseDHTRecord( + sendPort.nativePort, _ctx.id!, nativeKey); return processFutureVoid(recvPort.first); } @override Future deleteDHTRecord(TypedKey key) { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final recvPort = ReceivePort("routing_context_delete_dht_record"); final sendPort = recvPort.sendPort; _ctx.ffi._routingContextDeleteDHTRecord( - sendPort.nativePort, _ctx.id, nativeKey); + sendPort.nativePort, _ctx.id!, nativeKey); return processFutureVoid(recvPort.first); } @override Future getDHTValue( TypedKey key, int subkey, bool forceRefresh) async { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final recvPort = ReceivePort("routing_context_get_dht_value"); final sendPort = recvPort.sendPort; _ctx.ffi._routingContextGetDHTValue( - sendPort.nativePort, _ctx.id, nativeKey, subkey, forceRefresh); + sendPort.nativePort, _ctx.id!, nativeKey, subkey, forceRefresh); final valueData = await processFutureJson( optFromJson(ValueData.fromJson), recvPort.first); return valueData; @@ -697,13 +724,14 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future setDHTValue( TypedKey key, int subkey, Uint8List data) async { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final nativeData = base64UrlNoPadEncode(data).toNativeUtf8(); final recvPort = ReceivePort("routing_context_set_dht_value"); final sendPort = recvPort.sendPort; _ctx.ffi._routingContextSetDHTValue( - sendPort.nativePort, _ctx.id, nativeKey, subkey, nativeData); + sendPort.nativePort, _ctx.id!, nativeKey, subkey, nativeData); final valueData = await processFutureJson( optFromJson(ValueData.fromJson), recvPort.first); return valueData; @@ -712,13 +740,14 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future watchDHTValues(TypedKey key, List subkeys, Timestamp expiration, int count) async { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final nativeSubkeys = jsonEncode(subkeys).toNativeUtf8(); final nativeExpiration = expiration.value.toInt(); final recvPort = ReceivePort("routing_context_watch_dht_values"); final sendPort = recvPort.sendPort; - _ctx.ffi._routingContextWatchDHTValues(sendPort.nativePort, _ctx.id, + _ctx.ffi._routingContextWatchDHTValues(sendPort.nativePort, _ctx.id!, nativeKey, nativeSubkeys, nativeExpiration, count); final actualExpiration = Timestamp( value: BigInt.from(await processFuturePlain(recvPort.first))); @@ -728,60 +757,82 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext { @override Future cancelDHTWatch( TypedKey key, List subkeys) async { + _ctx.ensureValid(); final nativeKey = jsonEncode(key).toNativeUtf8(); final nativeSubkeys = jsonEncode(subkeys).toNativeUtf8(); final recvPort = ReceivePort("routing_context_cancel_dht_watch"); final sendPort = recvPort.sendPort; _ctx.ffi._routingContextCancelDHTWatch( - sendPort.nativePort, _ctx.id, nativeKey, nativeSubkeys); + sendPort.nativePort, _ctx.id!, nativeKey, nativeSubkeys); final cancelled = await processFuturePlain(recvPort.first); return cancelled; } } class _TDBT { - final int id; - VeilidTableDBFFI tdbffi; - VeilidFFI ffi; + int? id; + final VeilidTableDBFFI tdbffi; + final VeilidFFI ffi; - _TDBT(this.id, this.tdbffi, this.ffi); + _TDBT(int this.id, this.tdbffi, this.ffi); + void ensureValid() { + if (id == null) { + throw VeilidAPIExceptionNotInitialized(); + } + } + + void close() { + if (id != null) { + ffi._releaseTableDbTransaction(id!); + id = null; + } + } } // FFI implementation of VeilidTableDBTransaction class VeilidTableDBTransactionFFI extends VeilidTableDBTransaction { final _TDBT _tdbt; - static final Finalizer<_TDBT> _finalizer = - Finalizer((tdbt) => tdbt.ffi._releaseTableDbTransaction(tdbt.id)); + static final Finalizer<_TDBT> _finalizer = Finalizer((tdbt) => tdbt.close()); VeilidTableDBTransactionFFI._(this._tdbt) { _finalizer.attach(this, _tdbt, detach: this); } @override - Future commit() { + bool isDone() { + return _tdbt.id == null; + } + + @override + Future commit() async { + _tdbt.ensureValid(); final recvPort = ReceivePort("veilid_table_db_transaction_commit"); final sendPort = recvPort.sendPort; _tdbt.ffi._tableDbTransactionCommit( sendPort.nativePort, - _tdbt.id, + _tdbt.id!, ); - return processFutureVoid(recvPort.first); + await processFutureVoid(recvPort.first); + _tdbt.close(); } @override - Future rollback() { + Future rollback() async { + _tdbt.ensureValid(); final recvPort = ReceivePort("veilid_table_db_transaction_rollback"); final sendPort = recvPort.sendPort; _tdbt.ffi._tableDbTransactionRollback( sendPort.nativePort, - _tdbt.id, + _tdbt.id!, ); - return processFutureVoid(recvPort.first); + await processFutureVoid(recvPort.first); + _tdbt.close(); } @override Future store(int col, Uint8List key, Uint8List value) { + _tdbt.ensureValid(); final nativeEncodedKey = base64UrlNoPadEncode(key).toNativeUtf8(); final nativeEncodedValue = base64UrlNoPadEncode(value).toNativeUtf8(); @@ -789,7 +840,7 @@ class VeilidTableDBTransactionFFI extends VeilidTableDBTransaction { final sendPort = recvPort.sendPort; _tdbt.ffi._tableDbTransactionStore( sendPort.nativePort, - _tdbt.id, + _tdbt.id!, col, nativeEncodedKey, nativeEncodedValue, @@ -799,13 +850,14 @@ class VeilidTableDBTransactionFFI extends VeilidTableDBTransaction { @override Future delete(int col, Uint8List key) { + _tdbt.ensureValid(); final nativeEncodedKey = base64UrlNoPadEncode(key).toNativeUtf8(); final recvPort = ReceivePort("veilid_table_db_transaction_delete"); final sendPort = recvPort.sendPort; _tdbt.ffi._tableDbTransactionDelete( sendPort.nativePort, - _tdbt.id, + _tdbt.id!, col, nativeEncodedKey, ); @@ -814,32 +866,51 @@ class VeilidTableDBTransactionFFI extends VeilidTableDBTransaction { } class _TDB { - final int id; - VeilidFFI ffi; - _TDB(this.id, this.ffi); + int? id; + final VeilidFFI ffi; + _TDB(int this.id, this.ffi); + void ensureValid() { + if (id == null) { + throw VeilidAPIExceptionNotInitialized(); + } + } + + void close() { + if (id != null) { + ffi._releaseTableDb(id!); + id = null; + } + } } // FFI implementation of VeilidTableDB class VeilidTableDBFFI extends VeilidTableDB { final _TDB _tdb; - static final Finalizer<_TDB> _finalizer = - Finalizer((tdb) => tdb.ffi._releaseTableDb(tdb.id)); + static final Finalizer<_TDB> _finalizer = Finalizer((tdb) => tdb.close()); VeilidTableDBFFI._(this._tdb) { _finalizer.attach(this, _tdb, detach: this); } + @override + void close() { + _tdb.close(); + } + @override int getColumnCount() { - return _tdb.ffi._tableDbGetColumnCount(_tdb.id); + _tdb.ensureValid(); + return _tdb.ffi._tableDbGetColumnCount(_tdb.id!); } @override Future> getKeys(int col) { + _tdb.ensureValid(); + final recvPort = ReceivePort("veilid_table_db_get_keys"); final sendPort = recvPort.sendPort; - _tdb.ffi._tableDbGetKeys(sendPort.nativePort, _tdb.id, col); + _tdb.ffi._tableDbGetKeys(sendPort.nativePort, _tdb.id!, col); return processFutureJson( jsonListConstructor(base64UrlNoPadDecodeDynamic), @@ -848,12 +919,16 @@ class VeilidTableDBFFI extends VeilidTableDB { @override VeilidTableDBTransaction transact() { - final id = _tdb.ffi._tableDbTransact(_tdb.id); + _tdb.ensureValid(); + + final id = _tdb.ffi._tableDbTransact(_tdb.id!); return VeilidTableDBTransactionFFI._(_TDBT(id, this, _tdb.ffi)); } @override Future store(int col, Uint8List key, Uint8List value) { + _tdb.ensureValid(); + final nativeEncodedKey = base64UrlNoPadEncode(key).toNativeUtf8(); final nativeEncodedValue = base64UrlNoPadEncode(value).toNativeUtf8(); @@ -861,7 +936,7 @@ class VeilidTableDBFFI extends VeilidTableDB { final sendPort = recvPort.sendPort; _tdb.ffi._tableDbStore( sendPort.nativePort, - _tdb.id, + _tdb.id!, col, nativeEncodedKey, nativeEncodedValue, @@ -871,13 +946,14 @@ class VeilidTableDBFFI extends VeilidTableDB { @override Future load(int col, Uint8List key) async { + _tdb.ensureValid(); final nativeEncodedKey = base64UrlNoPadEncode(key).toNativeUtf8(); final recvPort = ReceivePort("veilid_table_db_load"); final sendPort = recvPort.sendPort; _tdb.ffi._tableDbLoad( sendPort.nativePort, - _tdb.id, + _tdb.id!, col, nativeEncodedKey, ); @@ -890,13 +966,14 @@ class VeilidTableDBFFI extends VeilidTableDB { @override Future delete(int col, Uint8List key) async { + _tdb.ensureValid(); final nativeEncodedKey = base64UrlNoPadEncode(key).toNativeUtf8(); final recvPort = ReceivePort("veilid_table_db_delete"); final sendPort = recvPort.sendPort; _tdb.ffi._tableDbDelete( sendPort.nativePort, - _tdb.id, + _tdb.id!, col, nativeEncodedKey, ); @@ -909,7 +986,7 @@ class VeilidTableDBFFI extends VeilidTableDB { } // FFI implementation of VeilidCryptoSystem -class VeilidCryptoSystemFFI implements VeilidCryptoSystem { +class VeilidCryptoSystemFFI extends VeilidCryptoSystem { final CryptoKind _kind; final VeilidFFI _ffi; @@ -1154,7 +1231,7 @@ class VeilidCryptoSystemFFI implements VeilidCryptoSystem { } // FFI implementation of high level Veilid API -class VeilidFFI implements Veilid { +class VeilidFFI extends Veilid { // veilid_core shared library final DynamicLibrary _dylib; diff --git a/veilid-flutter/lib/veilid_js.dart b/veilid-flutter/lib/veilid_js.dart index c9f1ebf3..eadd6d28 100644 --- a/veilid-flutter/lib/veilid_js.dart +++ b/veilid-flutter/lib/veilid_js.dart @@ -22,75 +22,98 @@ Future _wrapApiPromise(Object p) { } class _Ctx { - final int id; + int? id; final VeilidJS js; - _Ctx(this.id, this.js); + _Ctx(int this.id, this.js); + void ensureValid() { + if (id == null) { + throw VeilidAPIExceptionNotInitialized(); + } + } + + void close() { + if (id != null) { + js_util.callMethod(wasm, "release_routing_context", [id!]); + id = null; + } + } } // JS implementation of VeilidRoutingContext -class VeilidRoutingContextJS implements VeilidRoutingContext { +class VeilidRoutingContextJS extends VeilidRoutingContext { final _Ctx _ctx; - static final Finalizer<_Ctx> _finalizer = Finalizer( - (ctx) => js_util.callMethod(wasm, "release_routing_context", [ctx.id])); + static final Finalizer<_Ctx> _finalizer = Finalizer((ctx) => ctx.close()); VeilidRoutingContextJS._(this._ctx) { _finalizer.attach(this, _ctx, detach: this); } + @override + void close() { + _ctx.close(); + } + @override VeilidRoutingContextJS withPrivacy() { + _ctx.ensureValid(); int newId = - js_util.callMethod(wasm, "routing_context_with_privacy", [_ctx.id]); + js_util.callMethod(wasm, "routing_context_with_privacy", [_ctx.id!]); return VeilidRoutingContextJS._(_Ctx(newId, _ctx.js)); } @override VeilidRoutingContextJS withCustomPrivacy(SafetySelection safetySelection) { + _ctx.ensureValid(); final newId = js_util.callMethod( wasm, "routing_context_with_custom_privacy", - [_ctx.id, jsonEncode(safetySelection)]); + [_ctx.id!, jsonEncode(safetySelection)]); return VeilidRoutingContextJS._(_Ctx(newId, _ctx.js)); } @override VeilidRoutingContextJS withSequencing(Sequencing sequencing) { + _ctx.ensureValid(); final newId = js_util.callMethod(wasm, "routing_context_with_sequencing", - [_ctx.id, jsonEncode(sequencing)]); + [_ctx.id!, jsonEncode(sequencing)]); return VeilidRoutingContextJS._(_Ctx(newId, _ctx.js)); } @override Future appCall(String target, Uint8List request) async { + _ctx.ensureValid(); var encodedRequest = base64UrlNoPadEncode(request); return base64UrlNoPadDecode(await _wrapApiPromise(js_util.callMethod( - wasm, "routing_context_app_call", [_ctx.id, target, encodedRequest]))); + wasm, "routing_context_app_call", [_ctx.id!, target, encodedRequest]))); } @override Future appMessage(String target, Uint8List message) { + _ctx.ensureValid(); var encodedMessage = base64UrlNoPadEncode(message); return _wrapApiPromise(js_util.callMethod(wasm, - "routing_context_app_message", [_ctx.id, target, encodedMessage])); + "routing_context_app_message", [_ctx.id!, target, encodedMessage])); } @override Future createDHTRecord(DHTSchema schema, {CryptoKind kind = 0}) async { + _ctx.ensureValid(); return DHTRecordDescriptor.fromJson(jsonDecode(await _wrapApiPromise(js_util .callMethod(wasm, "routing_context_create_dht_record", - [_ctx.id, jsonEncode(schema), kind])))); + [_ctx.id!, jsonEncode(schema), kind])))); } @override Future openDHTRecord( TypedKey key, KeyPair? writer) async { + _ctx.ensureValid(); return DHTRecordDescriptor.fromJson(jsonDecode(await _wrapApiPromise(js_util .callMethod(wasm, "routing_context_open_dht_record", [ - _ctx.id, + _ctx.id!, jsonEncode(key), writer != null ? jsonEncode(writer) : null ])))); @@ -98,42 +121,47 @@ class VeilidRoutingContextJS implements VeilidRoutingContext { @override Future closeDHTRecord(TypedKey key) { + _ctx.ensureValid(); return _wrapApiPromise(js_util.callMethod( - wasm, "routing_context_close_dht_record", [_ctx.id, jsonEncode(key)])); + wasm, "routing_context_close_dht_record", [_ctx.id!, jsonEncode(key)])); } @override Future deleteDHTRecord(TypedKey key) { - return _wrapApiPromise(js_util.callMethod( - wasm, "routing_context_delete_dht_record", [_ctx.id, jsonEncode(key)])); + _ctx.ensureValid(); + return _wrapApiPromise(js_util.callMethod(wasm, + "routing_context_delete_dht_record", [_ctx.id!, jsonEncode(key)])); } @override Future getDHTValue( TypedKey key, int subkey, bool forceRefresh) async { + _ctx.ensureValid(); final opt = await _wrapApiPromise(js_util.callMethod( wasm, "routing_context_get_dht_value", - [_ctx.id, jsonEncode(key), subkey, forceRefresh])); + [_ctx.id!, jsonEncode(key), subkey, forceRefresh])); return opt == null ? null : ValueData.fromJson(jsonDecode(opt)); } @override Future setDHTValue( TypedKey key, int subkey, Uint8List data) async { + _ctx.ensureValid(); final opt = await _wrapApiPromise(js_util.callMethod( wasm, "routing_context_set_dht_value", - [_ctx.id, jsonEncode(key), subkey, base64UrlNoPadEncode(data)])); + [_ctx.id!, jsonEncode(key), subkey, base64UrlNoPadEncode(data)])); return opt == null ? null : ValueData.fromJson(jsonDecode(opt)); } @override Future watchDHTValues(TypedKey key, List subkeys, Timestamp expiration, int count) async { + _ctx.ensureValid(); final ts = await _wrapApiPromise(js_util.callMethod( wasm, "routing_context_watch_dht_values", [ - _ctx.id, + _ctx.id!, jsonEncode(key), jsonEncode(subkeys), expiration.toString(), @@ -144,15 +172,16 @@ class VeilidRoutingContextJS implements VeilidRoutingContext { @override Future cancelDHTWatch(TypedKey key, List subkeys) { + _ctx.ensureValid(); return _wrapApiPromise(js_util.callMethod( wasm, "routing_context_cancel_dht_watch", - [_ctx.id, jsonEncode(key), jsonEncode(subkeys)])); + [_ctx.id!, jsonEncode(key), jsonEncode(subkeys)])); } } // JS implementation of VeilidCryptoSystem -class VeilidCryptoSystemJS implements VeilidCryptoSystem { +class VeilidCryptoSystemJS extends VeilidCryptoSystem { final CryptoKind _kind; final VeilidJS _js; @@ -327,105 +356,146 @@ class VeilidCryptoSystemJS implements VeilidCryptoSystem { } class _TDBT { - final int id; - VeilidTableDBJS tdbjs; - VeilidJS js; + int? id; + final VeilidTableDBJS tdbjs; + final VeilidJS js; _TDBT(this.id, this.tdbjs, this.js); + void ensureValid() { + if (id == null) { + throw VeilidAPIExceptionNotInitialized(); + } + } + + void close() { + if (id != null) { + js_util.callMethod(wasm, "release_table_db_transaction", [id!]); + id = null; + } + } } // JS implementation of VeilidTableDBTransaction class VeilidTableDBTransactionJS extends VeilidTableDBTransaction { final _TDBT _tdbt; - static final Finalizer<_TDBT> _finalizer = Finalizer((tdbt) => - js_util.callMethod(wasm, "release_table_db_transaction", [tdbt.id])); + static final Finalizer<_TDBT> _finalizer = Finalizer((tdbt) => tdbt.close()); VeilidTableDBTransactionJS._(this._tdbt) { _finalizer.attach(this, _tdbt, detach: this); } @override - Future commit() { - return _wrapApiPromise( - js_util.callMethod(wasm, "table_db_transaction_commit", [_tdbt.id])); + bool isDone() { + return _tdbt.id == null; } @override - Future rollback() { - return _wrapApiPromise( - js_util.callMethod(wasm, "table_db_transaction_rollback", [_tdbt.id])); + Future commit() async { + _tdbt.ensureValid(); + await _wrapApiPromise( + js_util.callMethod(wasm, "table_db_transaction_commit", [_tdbt.id!])); + _tdbt.close(); } @override - Future store(int col, Uint8List key, Uint8List value) { + Future rollback() async { + _tdbt.ensureValid(); + await _wrapApiPromise( + js_util.callMethod(wasm, "table_db_transaction_rollback", [_tdbt.id!])); + _tdbt.close(); + } + + @override + Future store(int col, Uint8List key, Uint8List value) async { + _tdbt.ensureValid(); final encodedKey = base64UrlNoPadEncode(key); final encodedValue = base64UrlNoPadEncode(value); - return _wrapApiPromise(js_util.callMethod( - wasm, - "table_db_transaction_store", - [_tdbt.id, col, encodedKey, encodedValue])); + await _wrapApiPromise(js_util.callMethod(wasm, "table_db_transaction_store", + [_tdbt.id!, col, encodedKey, encodedValue])); } @override - Future delete(int col, Uint8List key) { + Future delete(int col, Uint8List key) async { + _tdbt.ensureValid(); final encodedKey = base64UrlNoPadEncode(key); - return _wrapApiPromise(js_util.callMethod( - wasm, "table_db_transaction_delete", [_tdbt.id, col, encodedKey])); + await _wrapApiPromise(js_util.callMethod( + wasm, "table_db_transaction_delete", [_tdbt.id!, col, encodedKey])); } } class _TDB { - final int id; - VeilidJS js; + int? id; + final VeilidJS js; - _TDB(this.id, this.js); + _TDB(int this.id, this.js); + void ensureValid() { + if (id == null) { + throw VeilidAPIExceptionNotInitialized(); + } + } + + void close() { + if (id != null) { + js_util.callMethod(wasm, "release_table_db", [id!]); + id = null; + } + } } // JS implementation of VeilidTableDB class VeilidTableDBJS extends VeilidTableDB { final _TDB _tdb; - static final Finalizer<_TDB> _finalizer = Finalizer( - (tdb) => js_util.callMethod(wasm, "release_table_db", [tdb.id])); + static final Finalizer<_TDB> _finalizer = Finalizer((tdb) => tdb.close()); VeilidTableDBJS._(this._tdb) { _finalizer.attach(this, _tdb, detach: this); } + @override + void close() { + _tdb.close(); + } + @override int getColumnCount() { - return js_util.callMethod(wasm, "table_db_get_column_count", [_tdb.id]); + _tdb.ensureValid(); + return js_util.callMethod(wasm, "table_db_get_column_count", [_tdb.id!]); } @override Future> getKeys(int col) async { + _tdb.ensureValid(); return jsonListConstructor(base64UrlNoPadDecodeDynamic)(jsonDecode( - await js_util.callMethod(wasm, "table_db_get_keys", [_tdb.id, col]))); + await js_util.callMethod(wasm, "table_db_get_keys", [_tdb.id!, col]))); } @override VeilidTableDBTransaction transact() { - final id = js_util.callMethod(wasm, "table_db_transact", [_tdb.id]); + _tdb.ensureValid(); + final id = js_util.callMethod(wasm, "table_db_transact", [_tdb.id!]); return VeilidTableDBTransactionJS._(_TDBT(id, this, _tdb.js)); } @override Future store(int col, Uint8List key, Uint8List value) { + _tdb.ensureValid(); final encodedKey = base64UrlNoPadEncode(key); final encodedValue = base64UrlNoPadEncode(value); return _wrapApiPromise(js_util.callMethod( - wasm, "table_db_store", [_tdb.id, col, encodedKey, encodedValue])); + wasm, "table_db_store", [_tdb.id!, col, encodedKey, encodedValue])); } @override Future load(int col, Uint8List key) async { + _tdb.ensureValid(); final encodedKey = base64UrlNoPadEncode(key); String? out = await _wrapApiPromise( - js_util.callMethod(wasm, "table_db_load", [_tdb.id, col, encodedKey])); + js_util.callMethod(wasm, "table_db_load", [_tdb.id!, col, encodedKey])); if (out == null) { return null; } @@ -434,16 +504,17 @@ class VeilidTableDBJS extends VeilidTableDB { @override Future delete(int col, Uint8List key) { + _tdb.ensureValid(); final encodedKey = base64UrlNoPadEncode(key); return _wrapApiPromise(js_util - .callMethod(wasm, "table_db_delete", [_tdb.id, col, encodedKey])); + .callMethod(wasm, "table_db_delete", [_tdb.id!, col, encodedKey])); } } // JS implementation of high level Veilid API -class VeilidJS implements Veilid { +class VeilidJS extends Veilid { @override void initializeVeilidCore(Map platformConfigJson) { var platformConfigJsonString = jsonEncode(platformConfigJson); diff --git a/veilid-flutter/lib/veilid_table_db.dart b/veilid-flutter/lib/veilid_table_db.dart index 19bee7f2..43868c0b 100644 --- a/veilid-flutter/lib/veilid_table_db.dart +++ b/veilid-flutter/lib/veilid_table_db.dart @@ -5,6 +5,7 @@ import 'dart:convert'; ///////////////////////////////////// /// VeilidTableDB abstract class VeilidTableDBTransaction { + bool isDone(); Future commit(); Future rollback(); Future store(int col, Uint8List key, Uint8List value); @@ -24,6 +25,7 @@ abstract class VeilidTableDBTransaction { } abstract class VeilidTableDB { + void close(); int getColumnCount(); Future> getKeys(int col); VeilidTableDBTransaction transact(); diff --git a/veilid-server/src/main.rs b/veilid-server/src/main.rs index 941711f2..b8d77e94 100644 --- a/veilid-server/src/main.rs +++ b/veilid-server/src/main.rs @@ -126,19 +126,21 @@ fn main() -> EyreResult<()> { } // --- Normal Startup --- + let orig_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic_info| { + // invoke the default handler and exit the process + orig_hook(panic_info); + + let backtrace = backtrace::Backtrace::new(); + eprintln!("Backtrace:\n{:?}", backtrace); + + eprintln!("exiting!"); + std::process::exit(1); + })); + let panic_on_shutdown = matches.occurrences_of("panic") != 0; ctrlc::set_handler(move || { if panic_on_shutdown { - let orig_hook = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |panic_info| { - // invoke the default handler and exit the process - orig_hook(panic_info); - - let backtrace = backtrace::Backtrace::new(); - eprintln!("Backtrace:\n{:?}", backtrace); - - std::process::exit(1); - })); panic!("panic requested"); } else { shutdown(); diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index 1d7c0b3d..3e41eac3 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -102,7 +102,7 @@ core: set_value_count: 5 set_value_fanout: 4 min_peer_count: 20 - min_peer_refresh_time_ms: 2000 + min_peer_refresh_time_ms: 60000 validate_dial_info_receipt_time_ms: 2000 local_subkey_cache_size: 128 local_max_subkey_cache_memory_mb: 256 @@ -1620,7 +1620,7 @@ mod tests { assert_eq!(s.core.network.dht.set_value_count, 5u32); assert_eq!(s.core.network.dht.set_value_fanout, 4u32); assert_eq!(s.core.network.dht.min_peer_count, 20u32); - assert_eq!(s.core.network.dht.min_peer_refresh_time_ms, 2_000u32); + assert_eq!(s.core.network.dht.min_peer_refresh_time_ms, 60_000u32); assert_eq!( s.core.network.dht.validate_dial_info_receipt_time_ms, 2_000u32