Merge branch veilid:main into fix-lockapi-dependency

This commit is contained in:
Pedro Nunes 2023-08-23 15:52:56 +00:00
commit aa824c176f
36 changed files with 1058 additions and 929 deletions

View File

@ -10,45 +10,16 @@ stages:
- release - release
- distribute - distribute
#before_script:
# - earthly bootstrap
create_test_machine:
stage: test
only:
- main
- merge_requests
tags:
- build-orchestration
script:
- /home/gitlab-runner/build-machine-ctl.sh create amd64-deb
when: manual
test_amd64: test_amd64:
stage: test stage: test
image: earthly/earthly:v0.6.30 image: earthly/earthly:v0.7.15
only: only:
- main - main
- merge_requests - merge_requests
needs:
- create_test_machine
tags:
- earthly-tests
script: script:
- earthly bootstrap - earthly bootstrap
- earthly --ci +unit-tests-linux-amd64 - earthly --ci +unit-tests-linux-amd64
when: manual
delete_test_machine:
stage: test
only:
- main
- merge_requests
needs:
- test_amd64
tags:
- build-orchestration
script:
- /home/gitlab-runner/build-machine-ctl.sh delete amd64-deb
release_job: release_job:
stage: release stage: release

83
Cargo.lock generated
View File

@ -854,9 +854,9 @@ dependencies = [
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.82" version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -977,26 +977,11 @@ dependencies = [
"atty", "atty",
"bitflags 1.3.2", "bitflags 1.3.2",
"strsim 0.8.0", "strsim 0.8.0",
"textwrap 0.11.0", "textwrap",
"unicode-width", "unicode-width",
"vec_map", "vec_map",
] ]
[[package]]
name = "clap"
version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"atty",
"bitflags 1.3.2",
"clap_lex 0.2.4",
"indexmap 1.9.3",
"strsim 0.10.0",
"termcolor",
"textwrap 0.16.0",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.3.23" version = "4.3.23"
@ -1016,8 +1001,9 @@ checksum = "f8ce6fffb678c9b80a70b6b6de0aad31df727623a70fd9a842c30cd573e2fa98"
dependencies = [ dependencies = [
"anstream", "anstream",
"anstyle", "anstyle",
"clap_lex 0.5.0", "clap_lex",
"strsim 0.10.0", "strsim 0.10.0",
"terminal_size",
] ]
[[package]] [[package]]
@ -1032,15 +1018,6 @@ dependencies = [
"syn 2.0.29", "syn 2.0.29",
] ]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]] [[package]]
name = "clap_lex" name = "clap_lex"
version = "0.5.0" version = "0.5.0"
@ -1533,9 +1510,9 @@ dependencies = [
[[package]] [[package]]
name = "dashmap" name = "dashmap"
version = "5.5.0" version = "5.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" checksum = "edd72493923899c6f10c641bdbdeddc7183d6396641d99c1a0d1597f37f92e28"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"hashbrown 0.14.0", "hashbrown 0.14.0",
@ -2249,9 +2226,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.20" version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049" checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833"
dependencies = [ dependencies = [
"bytes 1.4.0", "bytes 1.4.0",
"fnv", "fnv",
@ -3570,12 +3547,6 @@ dependencies = [
"hashbrown 0.12.3", "hashbrown 0.12.3",
] ]
[[package]]
name = "os_str_bytes"
version = "6.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac"
[[package]] [[package]]
name = "oslog" name = "oslog"
version = "0.2.0" version = "0.2.0"
@ -3757,12 +3728,12 @@ dependencies = [
[[package]] [[package]]
name = "petgraph" name = "petgraph"
version = "0.6.3" version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9"
dependencies = [ dependencies = [
"fixedbitset", "fixedbitset",
"indexmap 1.9.3", "indexmap 2.0.0",
] ]
[[package]] [[package]]
@ -4463,9 +4434,9 @@ dependencies = [
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.183" version = "1.0.185"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" checksum = "be9b6f69f1dfd54c3b568ffa45c310d6973a5e5148fd40cf515acaf38cf5bc31"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
@ -4491,9 +4462,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.183" version = "1.0.185"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" checksum = "dc59dfdcbad1437773485e0367fea4b090a2e0a16d9ffc46af47764536a298ec"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -4750,9 +4721,9 @@ dependencies = [
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.8" version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [ dependencies = [
"autocfg", "autocfg",
] ]
@ -4942,6 +4913,16 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "terminal_size"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6bf6f19e9f8ed8d4048dc22981458ebcf406d67e94cd422e5ecd73d63b3237"
dependencies = [
"rustix 0.37.23",
"windows-sys 0.48.0",
]
[[package]] [[package]]
name = "textwrap" name = "textwrap"
version = "0.11.0" version = "0.11.0"
@ -4951,12 +4932,6 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "textwrap"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]] [[package]]
name = "thiserror" name = "thiserror"
version = "1.0.47" version = "1.0.47"
@ -5818,7 +5793,7 @@ dependencies = [
"backtrace", "backtrace",
"bugsalot", "bugsalot",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"clap 3.2.25", "clap 4.3.23",
"color-eyre", "color-eyre",
"config", "config",
"console-subscriber", "console-subscriber",

View File

@ -125,7 +125,7 @@
</p> </p>
<p> <p>
Another example of this, but with even more tenuous connection between the block store data, is the notion of a profile picture. "Marquette's Profile Picture" is a really abstracted notion, and precisely which bits it corresponds to can vary wildly over time, not just being different versions of the picture but completely different pictures entirely. Maybe one day its a photo of Marquette and the next day it's a photo of a flower. Another example of this, but with even more tenuous connection between the block store data, is the notion of a profile picture. "Marquette's Profile Picture" is a really abstracted notion, and precisely which bits it corresponds to can vary wildly over time, not just being different versions of the picture but completely different pictures entirely. Maybe one day it's a photo of Marquette and the next day it's a photo of a flower.
</p> </p>
<p> <p>
@ -157,7 +157,7 @@
<h3 id="peer-network-revisited">Peer Network, Revisited</h3> <h3 id="peer-network-revisited">Peer Network, Revisited</h3>
<p> <p>
First, let's look at the peer network, since it's structure forms the basis for the remainder of the data storage approach. Veilid's peer network is similar to other peer-to-peer systems in that it's overlaid on top of other protocols. Veilid tries to be somewhat protocol-agnostic, however, and currently is designed to use TCP, UDP, WebSockets, and WebRTC, as well as various methods of traversing NATs so that Veilid peers can be smartphones, personal computers on hostile ISPs, etc. To facilitate this, peers are identified not by some network identity like an IP address, but instead by peer-chosen cryptographic key-pairs. Each peer also advertises a variety of options for how to communicate with it, called dial info, and when one peer wants to talk to another, it gets the dial info for that peer from the network and then uses it to communicate. First, let's look at the peer network, since its structure forms the basis for the remainder of the data storage approach. Veilid's peer network is similar to other peer-to-peer systems in that it's overlaid on top of other protocols. Veilid tries to be somewhat protocol-agnostic, however, and currently is designed to use TCP, UDP, WebSockets, and WebRTC, as well as various methods of traversing NATs so that Veilid peers can be smartphones, personal computers on hostile ISPs, etc. To facilitate this, peers are identified not by some network identity like an IP address, but instead by peer-chosen cryptographic key-pairs. Each peer also advertises a variety of options for how to communicate with it, called dial info, and when one peer wants to talk to another, it gets the dial info for that peer from the network and then uses it to communicate.
</p> </p>
<p> <p>
@ -165,7 +165,7 @@
</p> </p>
<p> <p>
To talk to a specific peer, it's dial info is looked up in the routing table. If there is dial info present, then the options are attempted in order of the priority specified in the routing table. Otherwise, the peer has to request the dial info from the network, so it looks through its routing table to find the peer who's ID is nearest the target peer according to the XOR metric, and sends it an RPC call with a procedure named <code>find_node</code>. Given any particular peer ID, the receiver of a <code>find_node</code> call returns dial info for the peers in its routing table that are nearest the given ID. This gets the peer closer to its destination, at least in the direction of the other peer it asked. If the desired peer's information was in the result of the call, then it's done, otherwise it calls <code>find_node</code> again to get closer. It iterates in this way, possibly trying alternate peers, as necessary, in a nearest-first fashion until it either finds the desire'd peer's dial info, has exhausted the entire network, or gives up. To talk to a specific peer, its dial info is looked up in the routing table. If there is dial info present, then the options are attempted in order of the priority specified in the routing table. Otherwise, the peer has to request the dial info from the network, so it looks through its routing table to find the peer who's ID is nearest the target peer according to the XOR metric, and sends it an RPC call with a procedure named <code>find_node</code>. Given any particular peer ID, the receiver of a <code>find_node</code> call returns dial info for the peers in its routing table that are nearest the given ID. This gets the peer closer to its destination, at least in the direction of the other peer it asked. If the desired peer's information was in the result of the call, then it's done, otherwise it calls <code>find_node</code> again to get closer. It iterates in this way, possibly trying alternate peers, as necessary, in a nearest-first fashion until it either finds the desire'd peer's dial info, has exhausted the entire network, or gives up.
</p> </p>
<h3 id="user-privacy">User Privacy</h3> <h3 id="user-privacy">User Privacy</h3>

View File

@ -53,7 +53,7 @@ KV store data is also stateful, so that updates to it can be made. Boone's bio,
The combination of block storage and key-value storage together makes it possible to have higher-level concepts as well. A song, for instance, might be represented in two places in Veilid: the block store would hold the raw data, while the KV store would store a representation of the idea of the song. Maybe that would consist of a JSON object with metadata about the song, like the title, composer, date, encoding information, etc. as well as the ID of the block store data. We can then also store different _versions_ of that JSON data, as the piece is updated, upsampled, remastered, or whatever, each one pointing to a different block in the block store. It's still "the same song", at a conceptual level, so it has the same identifier in the KV store, but the raw bits associated with each version differ. The combination of block storage and key-value storage together makes it possible to have higher-level concepts as well. A song, for instance, might be represented in two places in Veilid: the block store would hold the raw data, while the KV store would store a representation of the idea of the song. Maybe that would consist of a JSON object with metadata about the song, like the title, composer, date, encoding information, etc. as well as the ID of the block store data. We can then also store different _versions_ of that JSON data, as the piece is updated, upsampled, remastered, or whatever, each one pointing to a different block in the block store. It's still "the same song", at a conceptual level, so it has the same identifier in the KV store, but the raw bits associated with each version differ.
Another example of this, but with even more tenuous connection between the block store data, is the notion of a profile picture. "Marquette's Profile Picture" is a really abstracted notion, and precisely which bits it corresponds to can vary wildly over time, not just being different versions of the picture but completely different pictures entirely. Maybe one day its a photo of Marquette and the next day it's a photo of a flower. Another example of this, but with even more tenuous connection between the block store data, is the notion of a profile picture. "Marquette's Profile Picture" is a really abstracted notion, and precisely which bits it corresponds to can vary wildly over time, not just being different versions of the picture but completely different pictures entirely. Maybe one day it's a photo of Marquette and the next day it's a photo of a flower.
Social media offers many examples of these concepts. Friends lists, block lists, post indexes, favorites. These are all stateful notions, in a sense: a stable reference to a thing, but the precise content of the thing changes over time. These are exactly what we would put in the KV store, as opposed to in the block store, even if this data makes reference to content in the block store. Social media offers many examples of these concepts. Friends lists, block lists, post indexes, favorites. These are all stateful notions, in a sense: a stable reference to a thing, but the precise content of the thing changes over time. These are exactly what we would put in the KV store, as opposed to in the block store, even if this data makes reference to content in the block store.
@ -73,11 +73,11 @@ The bird's eye view of things makes it possible to hold it all in mind at once,
### Peer Network, Revisited ### Peer Network, Revisited
First, let's look at the peer network, since it's structure forms the basis for the remainder of the data storage approach. Veilid's peer network is similar to other peer-to-peer systems in that it's overlaid on top of other protocols. Veilid tries to be somewhat protocol-agnostic, however, and currently is designed to use TCP, UDP, WebSockets, and WebRTC, as well as various methods of traversing NATs so that Veilid peers can be smartphones, personal computers on hostile ISPs, etc. To facilitate this, peers are identified not by some network identity like an IP address, but instead by peer-chosen cryptographic key-pairs. Each peer also advertises a variety of options for how to communicate with it, called dial info, and when one peer wants to talk to another, it gets the dial info for that peer from the network and then uses it to communicate. First, let's look at the peer network, since its structure forms the basis for the remainder of the data storage approach. Veilid's peer network is similar to other peer-to-peer systems in that it's overlaid on top of other protocols. Veilid tries to be somewhat protocol-agnostic, however, and currently is designed to use TCP, UDP, WebSockets, and WebRTC, as well as various methods of traversing NATs so that Veilid peers can be smartphones, personal computers on hostile ISPs, etc. To facilitate this, peers are identified not by some network identity like an IP address, but instead by peer-chosen cryptographic key-pairs. Each peer also advertises a variety of options for how to communicate with it, called dial info, and when one peer wants to talk to another, it gets the dial info for that peer from the network and then uses it to communicate.
When a peer first connects to Veilid, it does so by contacting bootstrap peers, which have simple IP address dial info that is guaranteed to be stable by the maintainers of the network. These bootstrap peers are the first entries in the peer's routing table -- an address book of sorts, which it uses to figure out how to talk to a peer. The routing table consists of a mapping from peer public keys to prioritized choices for dial info. To populate the routing table, the peer asks other peers what its neighbors are in the network. The notion of neighbor here is defined by a similarity metric on peer IDs, in particular an XOR metric like many DHTs use. Over the course of interacting with the network, the peer will keep dial info up to date when it detects changes. It may also add dial info for peers it discovers along the way, depending on the peer ID. When a peer first connects to Veilid, it does so by contacting bootstrap peers, which have simple IP address dial info that is guaranteed to be stable by the maintainers of the network. These bootstrap peers are the first entries in the peer's routing table -- an address book of sorts, which it uses to figure out how to talk to a peer. The routing table consists of a mapping from peer public keys to prioritized choices for dial info. To populate the routing table, the peer asks other peers what its neighbors are in the network. The notion of neighbor here is defined by a similarity metric on peer IDs, in particular an XOR metric like many DHTs use. Over the course of interacting with the network, the peer will keep dial info up to date when it detects changes. It may also add dial info for peers it discovers along the way, depending on the peer ID.
To talk to a specific peer, it's dial info is looked up in the routing table. If there is dial info present, then the options are attempted in order of the priority specified in the routing table. Otherwise, the peer has to request the dial info from the network, so it looks through its routing table to find the peer who's ID is nearest the target peer according to the XOR metric, and sends it an RPC call with a procedure named `find_node`. Given any particular peer ID, the receiver of a `find_node` call returns dial info for the peers in its routing table that are nearest the given ID. This gets the peer closer to its destination, at least in the direction of the other peer it asked. If the desired peer's information was in the result of the call, then it's done, otherwise it calls `find_node` again to get closer. It iterates in this way, possibly trying alternate peers, as necessary, in a nearest-first fashion until it either finds the desire'd peer's dial info, has exhausted the entire network, or gives up. To talk to a specific peer, its dial info is looked up in the routing table. If there is dial info present, then the options are attempted in order of the priority specified in the routing table. Otherwise, the peer has to request the dial info from the network, so it looks through its routing table to find the peer who's ID is nearest the target peer according to the XOR metric, and sends it an RPC call with a procedure named `find_node`. Given any particular peer ID, the receiver of a `find_node` call returns dial info for the peers in its routing table that are nearest the given ID. This gets the peer closer to its destination, at least in the direction of the other peer it asked. If the desired peer's information was in the result of the call, then it's done, otherwise it calls `find_node` again to get closer. It iterates in this way, possibly trying alternate peers, as necessary, in a nearest-first fashion until it either finds the desire'd peer's dial info, has exhausted the entire network, or gives up.
### User Privacy ### User Privacy

View File

@ -36,6 +36,6 @@ For now, all secrets are encrypted using a single "database key", which is store
secretsd -k kwallet: secretsd -k kwallet:
secretsd -k exec:"pass Apps/secretsd" secretsd -k exec:"pass Apps/secretsd"
(As secretsd is supposed to be a background service, it is strongly advised to _not_ use an external program which would show interactive prompts. And in particular avoid those which use GnuPG pinentry or otherwise make use of libsecret, for hopefuly obvious reasons.) (As secretsd is supposed to be a background service, it is strongly advised to _not_ use an external program which would show interactive prompts. And in particular avoid those which use GnuPG pinentry or otherwise make use of libsecret, for hopefully obvious reasons.)
Individually encrypted collections are not yet supported, but planned in the future. (This will most likely be a fully separate layer of encryption, in addition to the database key.) Individually encrypted collections are not yet supported, but planned in the future. (This will most likely be a fully separate layer of encryption, in addition to the database key.)

View File

@ -27,7 +27,7 @@ cursive_buffered_backend = { path = "../external/cursive_buffered_backend" }
cursive_table_view = "0.14.0" cursive_table_view = "0.14.0"
arboard = "3.2.0" arboard = "3.2.0"
# cursive-tabs = "0.5.0" # cursive-tabs = "0.5.0"
clap = {version= "4", features = ["derive"]} clap = { version= "4", features = ["derive"] }
directories = "^4" directories = "^4"
log = "^0" log = "^0"
futures = "^0" futures = "^0"

View File

@ -13,8 +13,24 @@ crate-type = ["cdylib", "staticlib", "rlib"]
# Common features # Common features
default = ["enable-crypto-vld0"] default = ["enable-crypto-vld0"]
rt-async-std = ["async-std", "async-std-resolver", "async_executors/async_std", "rtnetlink/smol_socket", "veilid-tools/rt-async-std"] rt-async-std = [
rt-tokio = ["tokio", "tokio-util", "tokio-stream", "trust-dns-resolver/tokio-runtime", "async_executors/tokio_tp", "async_executors/tokio_io", "async_executors/tokio_timer", "rtnetlink/tokio_socket", "veilid-tools/rt-tokio"] "async-std",
"async-std-resolver",
"async_executors/async_std",
"rtnetlink/smol_socket",
"veilid-tools/rt-async-std",
]
rt-tokio = [
"tokio",
"tokio-util",
"tokio-stream",
"trust-dns-resolver/tokio-runtime",
"async_executors/tokio_tp",
"async_executors/tokio_io",
"async_executors/tokio_timer",
"rtnetlink/tokio_socket",
"veilid-tools/rt-tokio",
]
rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"] rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"]
# Crypto support features # Crypto support features
@ -36,7 +52,7 @@ network-result-extra = ["veilid-tools/network-result-extra"]
[dependencies] [dependencies]
# Tools # Tools
veilid-tools = { path = "../veilid-tools", features = [ "tracing" ] } veilid-tools = { path = "../veilid-tools", features = ["tracing"] }
paste = "1.0.14" paste = "1.0.14"
once_cell = "1.18.0" once_cell = "1.18.0"
owning_ref = "0.4.1" owning_ref = "0.4.1"
@ -57,7 +73,7 @@ eyre = "0.6.8"
thiserror = "1.0.47" thiserror = "1.0.47"
# Data structures # Data structures
enumset = { version= "1.1.2", features = ["serde"] } enumset = { version = "1.1.2", features = ["serde"] }
keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" } keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" }
range-set-blaze = "0.1.9" range-set-blaze = "0.1.9"
weak-table = "0.3.2" weak-table = "0.3.2"
@ -65,16 +81,32 @@ generic-array = "0.14.7"
hashlink = { path = "../external/hashlink", features = ["serde_impl"] } hashlink = { path = "../external/hashlink", features = ["serde_impl"] }
# System # System
futures-util = { version = "0.3.28", default_features = false, features = ["alloc"] } futures-util = { version = "0.3.28", default_features = false, features = [
"alloc",
] }
flume = { version = "0.11.0", features = ["async"] } flume = { version = "0.11.0", features = ["async"] }
parking_lot = "0.12.1" parking_lot = "0.12.1"
lock_api = "0.4.10" lock_api = "0.4.10"
stop-token = { version = "0.7.0", default-features = false } stop-token = { version = "0.7.0", default-features = false }
# Crypto # Crypto
ed25519-dalek = { version = "2.0.0", default_features = false, features = ["alloc", "rand_core", "digest"] } ed25519-dalek = { version = "2.0.0", default_features = false, features = [
x25519-dalek = { version = "2.0.0", default_features = false, features = ["alloc", "static_secrets"] } "alloc",
curve25519-dalek = { version = "4.0.0", default_features = false, features = ["alloc"] } "rand_core",
"digest",
"zeroize",
] }
x25519-dalek = { version = "2.0.0", default_features = false, features = [
"alloc",
"static_secrets",
"zeroize",
"precomputed-tables",
] }
curve25519-dalek = { version = "4.0.0", default_features = false, features = [
"alloc",
"zeroize",
"precomputed-tables",
] }
blake3 = { version = "1.4.1" } blake3 = { version = "1.4.1" }
chacha20poly1305 = "0.10.1" chacha20poly1305 = "0.10.1"
chacha20 = "0.9.1" chacha20 = "0.9.1"
@ -83,17 +115,20 @@ argon2 = "0.5.1"
# Network # Network
async-std-resolver = { version = "0.22.0", optional = true } async-std-resolver = { version = "0.22.0", optional = true }
trust-dns-resolver = { version = "0.22.0", optional = true } trust-dns-resolver = { version = "0.22.0", optional = true }
enum-as-inner = "=0.5.1" # temporary fix for trust-dns-resolver v0.22.0 enum-as-inner = "=0.5.1" # temporary fix for trust-dns-resolver v0.22.0
# Serialization # Serialization
capnp = { version = "0.17.2", default_features = false } capnp = { version = "0.17.2", default_features = false }
serde = { version = "1.0.183", features = ["derive" ] } serde = { version = "1.0.183", features = ["derive"] }
serde_json = { version = "1.0.105" } serde_json = { version = "1.0.105" }
serde-big-array = "0.5.1" serde-big-array = "0.5.1"
json = "0.12.4" json = "0.12.4"
data-encoding = { version = "2.4.0" } data-encoding = { version = "2.4.0" }
schemars = "0.8.12" schemars = "0.8.12"
lz4_flex = { version = "0.11.1", default-features = false, features = ["safe-encode", "safe-decode"] } lz4_flex = { version = "0.11.1", default-features = false, features = [
"safe-encode",
"safe-decode",
] }
# Dependencies for native builds only # Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android # Linux, Windows, Mac, iOS, Android
@ -107,12 +142,17 @@ libc = "0.2.147"
nix = "0.26.2" nix = "0.26.2"
# System # System
async-std = { version = "1.12.0", features = ["unstable"], optional = true} async-std = { version = "1.12.0", features = ["unstable"], optional = true }
tokio = { version = "1.32.0", features = ["full"], optional = true} tokio = { version = "1.32.0", features = ["full"], optional = true }
tokio-util = { version = "0.7.8", features = ["compat"], optional = true} tokio-util = { version = "0.7.8", features = ["compat"], optional = true }
tokio-stream = { version = "0.1.14", features = ["net"], optional = true} tokio-stream = { version = "0.1.14", features = ["net"], optional = true }
async-io = { version = "1.13.0" } async-io = { version = "1.13.0" }
futures-util = { version = "0.3.28", default-features = false, features = ["async-await", "sink", "std", "io"] } futures-util = { version = "0.3.28", default-features = false, features = [
"async-await",
"sink",
"std",
"io",
] }
# Data structures # Data structures
keyring-manager = { path = "../external/keyring-manager" } keyring-manager = { path = "../external/keyring-manager" }
@ -120,7 +160,7 @@ keyvaluedb-sqlite = { path = "../external/keyvaluedb/keyvaluedb-sqlite" }
# Network # Network
async-tungstenite = { version = "0.23.0", features = ["async-tls"] } async-tungstenite = { version = "0.23.0", features = ["async-tls"] }
igd = { path = "../external/rust-igd" } igd = { path = "../external/rust-igd" }
async-tls = "0.12.0" async-tls = "0.12.0"
webpki = "0.22.0" webpki = "0.22.0"
webpki-roots = "0.25.2" webpki-roots = "0.25.2"
@ -135,7 +175,10 @@ socket2 = { version = "0.5.3", features = ["all"] }
getrandom = { version = "0.2.4", features = ["js"] } getrandom = { version = "0.2.4", features = ["js"] }
# System # System
async_executors = { version = "0.7.0", default-features = false, features = [ "bindgen", "timer" ]} async_executors = { version = "0.7.0", default-features = false, features = [
"bindgen",
"timer",
] }
async-lock = "2.8.0" async-lock = "2.8.0"
wasm-bindgen = "0.2.87" wasm-bindgen = "0.2.87"
js-sys = "0.3.64" js-sys = "0.3.64"
@ -182,14 +225,17 @@ ifstructs = "0.1.1"
# Dependencies for Linux or Android # Dependencies for Linux or Android
[target.'cfg(any(target_os = "android", target_os = "linux"))'.dependencies] [target.'cfg(any(target_os = "android", target_os = "linux"))'.dependencies]
rtnetlink = { version = "=0.13.0", default-features = false} rtnetlink = { version = "=0.13.0", default-features = false }
netlink-sys = { version = "=0.8.5" } netlink-sys = { version = "=0.8.5" }
netlink-packet-route = { version = "=0.17.0" } netlink-packet-route = { version = "=0.17.0" }
# Dependencies for Windows # Dependencies for Windows
[target.'cfg(target_os = "windows")'.dependencies] [target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "0.3.9", features = [ "iptypes", "iphlpapi" ] } winapi = { version = "0.3.9", features = ["iptypes", "iphlpapi"] }
windows = { version = "0.51.1", features = [ "Win32_NetworkManagement_Dns", "Win32_Foundation" ]} windows = { version = "0.51.1", features = [
"Win32_NetworkManagement_Dns",
"Win32_Foundation",
] }
windows-permissions = "0.2.4" windows-permissions = "0.2.4"
# Dependencies for iOS # Dependencies for iOS
@ -208,7 +254,7 @@ features = ["bundled"]
serial_test = "2.0.0" serial_test = "2.0.0"
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
simplelog = { version = "0.12.1", features=["test"] } simplelog = { version = "0.12.1", features = ["test"] }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies] [target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3.37" wasm-bindgen-test = "0.3.37"

View File

@ -61,7 +61,7 @@ pub fn veilid_version() -> (u32, u32, u32) {
#[cfg(target_os = "android")] #[cfg(target_os = "android")]
pub use intf::android::veilid_core_setup_android; pub use intf::android::veilid_core_setup_android;
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 23] = [
"mio", "mio",
"h2", "h2",
"hyper", "hyper",
@ -83,6 +83,8 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 21] = [
"trust_dns_resolver", "trust_dns_resolver",
"trust_dns_proto", "trust_dns_proto",
"attohttpc", "attohttpc",
"ws_stream_wasm",
"keyvaluedb_web",
]; ];
use cfg_if::*; use cfg_if::*;

View File

@ -4,6 +4,8 @@ use alloc::collections::btree_map::Entry;
// XXX: Move to config eventually? // XXX: Move to config eventually?
const PUNISHMENT_DURATION_MIN: usize = 60; const PUNISHMENT_DURATION_MIN: usize = 60;
const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536; const MAX_PUNISHMENTS_BY_NODE_ID: usize = 65536;
const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10;
const MAX_DIAL_INFO_FAILURES: usize = 65536;
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] #[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFilterError { pub enum AddressFilterError {
@ -28,6 +30,7 @@ struct AddressFilterInner {
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>, punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>, punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
punishments_by_node_id: BTreeMap<TypedKey, Timestamp>, punishments_by_node_id: BTreeMap<TypedKey, Timestamp>,
dial_info_failures: BTreeMap<DialInfo, Timestamp>,
} }
struct AddressFilterUnlockedInner { struct AddressFilterUnlockedInner {
@ -36,6 +39,7 @@ struct AddressFilterUnlockedInner {
max_connections_per_ip6_prefix_size: usize, max_connections_per_ip6_prefix_size: usize,
max_connection_frequency_per_min: usize, max_connection_frequency_per_min: usize,
punishment_duration_min: usize, punishment_duration_min: usize,
dial_info_failure_duration_min: usize,
routing_table: RoutingTable, routing_table: RoutingTable,
} }
@ -56,6 +60,10 @@ impl fmt::Debug for AddressFilterUnlockedInner {
&self.max_connection_frequency_per_min, &self.max_connection_frequency_per_min,
) )
.field("punishment_duration_min", &self.punishment_duration_min) .field("punishment_duration_min", &self.punishment_duration_min)
.field(
"dial_info_failure_duration_min",
&self.dial_info_failure_duration_min,
)
.finish() .finish()
} }
} }
@ -78,6 +86,7 @@ impl AddressFilter {
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min max_connection_frequency_per_min: c.network.max_connection_frequency_per_min
as usize, as usize,
punishment_duration_min: PUNISHMENT_DURATION_MIN, punishment_duration_min: PUNISHMENT_DURATION_MIN,
dial_info_failure_duration_min: DIAL_INFO_FAILURE_DURATION_MIN,
routing_table, routing_table,
}), }),
inner: Arc::new(Mutex::new(AddressFilterInner { inner: Arc::new(Mutex::new(AddressFilterInner {
@ -88,10 +97,17 @@ impl AddressFilter {
punishments_by_ip4: BTreeMap::new(), punishments_by_ip4: BTreeMap::new(),
punishments_by_ip6_prefix: BTreeMap::new(), punishments_by_ip6_prefix: BTreeMap::new(),
punishments_by_node_id: BTreeMap::new(), punishments_by_node_id: BTreeMap::new(),
dial_info_failures: BTreeMap::new(),
})), })),
} }
} }
// When the network restarts, some of the address filter can be cleared
pub fn restart(&self) {
let mut inner = self.inner.lock();
inner.dial_info_failures.clear();
}
fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
// v4 // v4
{ {
@ -180,6 +196,22 @@ impl AddressFilter {
} }
} }
} }
// dial info
{
let mut dead_keys = Vec::<DialInfo>::new();
for (key, value) in &mut inner.dial_info_failures {
// Drop failures older than the failure duration
if cur_ts.as_u64().saturating_sub(value.as_u64())
> self.unlocked_inner.dial_info_failure_duration_min as u64 * 60_000_000u64
{
dead_keys.push(key.clone());
}
}
for key in dead_keys {
log_net!(debug ">>> DIALINFO PERMIT: {}", key);
inner.dial_info_failures.remove(&key);
}
}
} }
fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool { fn is_ip_addr_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool {
@ -198,6 +230,14 @@ impl AddressFilter {
false false
} }
fn get_dial_info_failed_ts_inner(
&self,
inner: &AddressFilterInner,
dial_info: &DialInfo,
) -> Option<Timestamp> {
inner.dial_info_failures.get(dial_info).copied()
}
pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool { pub fn is_ip_addr_punished(&self, addr: IpAddr) -> bool {
let inner = self.inner.lock(); let inner = self.inner.lock();
let ipblock = ip_to_ipblock( let ipblock = ip_to_ipblock(
@ -207,6 +247,27 @@ impl AddressFilter {
self.is_ip_addr_punished_inner(&*inner, ipblock) self.is_ip_addr_punished_inner(&*inner, ipblock)
} }
pub fn get_dial_info_failed_ts(&self, dial_info: &DialInfo) -> Option<Timestamp> {
let inner = self.inner.lock();
self.get_dial_info_failed_ts_inner(&*inner, dial_info)
}
pub fn set_dial_info_failed(&self, dial_info: DialInfo) {
let ts = get_aligned_timestamp();
let mut inner = self.inner.lock();
if inner.dial_info_failures.len() >= MAX_DIAL_INFO_FAILURES {
log_net!(debug ">>> DIALINFO FAILURE TABLE FULL: {}", dial_info);
return;
}
log_net!(debug ">>> DIALINFO FAILURE: {:?}", dial_info);
inner
.dial_info_failures
.entry(dial_info)
.and_modify(|v| *v = ts)
.or_insert(ts);
}
pub fn punish_ip_addr(&self, addr: IpAddr) { pub fn punish_ip_addr(&self, addr: IpAddr) {
log_net!(debug ">>> PUNISHED: {}", addr); log_net!(debug ">>> PUNISHED: {}", addr);
let ts = get_aligned_timestamp(); let ts = get_aligned_timestamp();

View File

@ -211,20 +211,7 @@ impl NetworkManager {
// Make the network key // Make the network key
let network_key = { let network_key = {
let c = config.get(); let c = config.get();
let network_key_password = if let Some(nkp) = c.network.network_key_password.clone() { let network_key_password = c.network.network_key_password.clone();
Some(nkp)
} else {
if c.network
.routing_table
.bootstrap
.contains(&"bootstrap.veilid.net".to_owned())
{
None
} else {
Some(c.network.routing_table.bootstrap.join(","))
}
};
let network_key = if let Some(network_key_password) = network_key_password { let network_key = if let Some(network_key_password) = network_key_password {
if !network_key_password.is_empty() { if !network_key_password.is_empty() {
info!("Using network key"); info!("Using network key");
@ -380,6 +367,9 @@ impl NetworkManager {
return Ok(()); return Ok(());
} }
// Clean address filter for things that should not be persistent
self.address_filter().restart();
// Create network components // Create network components
let connection_manager = ConnectionManager::new(self.clone()); let connection_manager = ConnectionManager::new(self.clone());
let net = Network::new( let net = Network::new(

View File

@ -384,6 +384,21 @@ impl Network {
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
) -> EyreResult<NetworkResult<T>> {
let network_result = fut.await?;
if matches!(network_result, NetworkResult::NoConnection(_)) {
self.network_manager()
.address_filter()
.set_dial_info_failed(dial_info);
}
Ok(network_result)
}
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
// This creates a short-lived connection in the case of connection-oriented protocols // This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message. // for the purpose of sending this one message.
@ -394,59 +409,62 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connect_timeout_ms = { let data_len = data.len();
let c = self.config.get(); let connect_timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
match dial_info.protocol_type() { match dial_info.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr(); let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr) let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
let _ = network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.map(NetworkResult::Value)
.wrap_err("send message failure")?);
}
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
let pnc = network_result_try!(RawTcpProtocolHandler::connect(
None,
peer_socket_addr,
connect_timeout_ms
)
.await .await
.wrap_err("create socket failure")?; .wrap_err("connect failure")?);
let _ = network_result_try!(h network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
.send_message(data, peer_socket_addr) }
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms
)
.await .await
.map(NetworkResult::Value) .wrap_err("connect failure")?);
.wrap_err("send message failure")?); network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
} }
ProtocolType::TCP => { // Network accounting
let peer_socket_addr = dial_info.to_socket_addr(); self.network_manager()
let pnc = network_result_try!(RawTcpProtocolHandler::connect( .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
None,
peer_socket_addr,
connect_timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
None,
&dial_info,
connect_timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
}
// Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::Value(())) Ok(NetworkResult::Value(()))
})
.await
} }
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
@ -461,85 +479,95 @@ impl Network {
data: Vec<u8>, data: Vec<u8>,
timeout_ms: u32, timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> { ) -> EyreResult<NetworkResult<Vec<u8>>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connect_timeout_ms = { let data_len = data.len();
let c = self.config.get(); let connect_timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.wrap_err("send message failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
// receive single response
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
)
.await
.into_network_result())
.wrap_err("recv_message failure")?;
let recv_socket_addr = recv_addr.remote_address().to_socket_addr();
self.network_manager()
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
// if the from address is not the same as the one we sent to, then drop this
if recv_socket_addr != peer_socket_addr {
bail!("wrong address");
}
out.resize(recv_len, 0u8);
Ok(NetworkResult::Value(out))
} }
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
RawTcpProtocolHandler::connect(None, peer_socket_addr, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?); match dial_info.protocol_type() {
self.network_manager() ProtocolType::UDP => {
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); let peer_socket_addr = dial_info.to_socket_addr();
let h = RawUdpProtocolHandler::new_unspecified_bound_handler(&peer_socket_addr)
.await
.wrap_err("create socket failure")?;
network_result_try!(h
.send_message(data, peer_socket_addr)
.await
.wrap_err("send message failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) // receive single response
let mut out = vec![0u8; MAX_MESSAGE_SIZE];
let (recv_len, recv_addr) = network_result_try!(timeout(
timeout_ms,
h.recv_message(&mut out).instrument(Span::current())
)
.await .await
.into_network_result()) .into_network_result())
.wrap_err("recv failure")?); .wrap_err("recv_message failure")?;
self.network_manager() let recv_socket_addr = recv_addr.remote_address().to_socket_addr();
.stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64)); self.network_manager()
.stats_packet_rcvd(recv_socket_addr.ip(), ByteCount::new(recv_len as u64));
Ok(NetworkResult::Value(out)) // if the from address is not the same as the one we sent to, then drop this
if recv_socket_addr != peer_socket_addr {
bail!("wrong address");
}
out.resize(recv_len, 0u8);
Ok(NetworkResult::Value(out))
}
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => {
let peer_socket_addr = dial_info.to_socket_addr();
RawTcpProtocolHandler::connect(
None,
peer_socket_addr,
connect_timeout_ms,
)
.await
.wrap_err("connect failure")?
}
ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(None, &dial_info, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
let out =
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
.await
.into_network_result())
.wrap_err("recv failure")?);
self.network_manager().stats_packet_rcvd(
dial_info.to_ip_addr(),
ByteCount::new(out.len() as u64),
);
Ok(NetworkResult::Value(out))
}
} }
} })
.await
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
@ -561,7 +589,7 @@ impl Network {
network_result_value_or_log!(ph.clone() network_result_value_or_log!(ph.clone()
.send_message(data.clone(), peer_socket_addr) .send_message(data.clone(), peer_socket_addr)
.await .await
.wrap_err("sending data to existing conection")? => [ format!(": data.len={}, descriptor={:?}", data.len(), descriptor) ] .wrap_err("sending data to existing connection")? => [ format!(": data.len={}, descriptor={:?}", data.len(), descriptor) ]
{ return Ok(Some(data)); } ); { return Ok(Some(data)); } );
// Network accounting // Network accounting
@ -609,41 +637,44 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connection_descriptor; let data_len = data.len();
if dial_info.protocol_type() == ProtocolType::UDP { let connection_descriptor;
// Handle connectionless protocol if dial_info.protocol_type() == ProtocolType::UDP {
let peer_socket_addr = dial_info.to_socket_addr(); // Handle connectionless protocol
let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) { let peer_socket_addr = dial_info.to_socket_addr();
Some(ph) => ph, let ph = match self.find_best_udp_protocol_handler(&peer_socket_addr, &None) {
None => bail!("no appropriate UDP protocol handler for dial_info"), Some(ph) => ph,
}; None => bail!("no appropriate UDP protocol handler for dial_info"),
connection_descriptor = network_result_try!(ph };
.send_message(data, peer_socket_addr) connection_descriptor = network_result_try!(ph
.await .send_message(data, peer_socket_addr)
.wrap_err("failed to send data to dial info")?); .await
} else { .wrap_err("failed to send data to dial info")?);
// Handle connection-oriented protocols } else {
let conn = network_result_try!( // Handle connection-oriented protocols
self.connection_manager() let conn = network_result_try!(
.get_or_create_connection(dial_info.clone()) self.connection_manager()
.await? .get_or_create_connection(dial_info.clone())
); .await?
);
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new( return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"failed to send", "failed to send",
))); )));
}
connection_descriptor = conn.connection_descriptor();
} }
connection_descriptor = conn.connection_descriptor();
}
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(connection_descriptor)) Ok(NetworkResult::value(connection_descriptor))
})
.await
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////

View File

@ -357,6 +357,24 @@ impl NetworkManager {
// log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan()); // log_net!(debug "Node contact failing over to Ordered for {}", target_node_ref.to_string().cyan());
// sequencing = Sequencing::PreferOrdered; // sequencing = Sequencing::PreferOrdered;
// } // }
// Deprioritize dial info that have recently failed
let address_filter = self.address_filter();
let mut dial_info_failures_map = BTreeMap::<DialInfo, Timestamp>::new();
for did in peer_b.signed_node_info().node_info().all_filtered_dial_info_details(DialInfoDetail::NO_SORT, |_| true) {
if let Some(ts) = address_filter.get_dial_info_failed_ts(&did.dial_info) {
dial_info_failures_map.insert(did.dial_info, ts);
}
}
let dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if dial_info_failures_map.is_empty() {
None
} else {
Some(Arc::new(move |a: &DialInfoDetail, b: &DialInfoDetail| {
let ats = dial_info_failures_map.get(&a.dial_info).copied().unwrap_or_default();
let bts = dial_info_failures_map.get(&b.dial_info).copied().unwrap_or_default();
ats.cmp(&bts)
}))
};
// Get the best contact method with these parameters from the routing domain // Get the best contact method with these parameters from the routing domain
let cm = routing_table.get_contact_method( let cm = routing_table.get_contact_method(
@ -365,6 +383,7 @@ impl NetworkManager {
&peer_b, &peer_b,
dial_info_filter, dial_info_filter,
sequencing, sequencing,
dif_sort,
); );
// Translate the raw contact method to a referenced contact method // Translate the raw contact method to a referenced contact method

View File

@ -118,47 +118,66 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
) -> EyreResult<NetworkResult<T>> {
let network_result = fut.await?;
if matches!(network_result, NetworkResult::NoConnection(_)) {
self.network_manager()
.address_filter()
.set_dial_info_failed(dial_info);
}
Ok(network_result)
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_unbound_to_dial_info( pub async fn send_data_unbound_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let timeout_ms = { let data_len = data.len();
let c = self.config.get(); let timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")
} }
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc =
network_result_try!(WebsocketProtocolHandler::connect(&dial_info, timeout_ms)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
};
// Network accounting match dial_info.protocol_type() {
self.network_manager() ProtocolType::UDP => {
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); bail!("no support for UDP protocol")
}
ProtocolType::TCP => {
bail!("no support for TCP protocol")
}
ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(WebsocketProtocolHandler::connect(
&dial_info, timeout_ms
)
.await
.wrap_err("connect failure")?);
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
}
};
Ok(NetworkResult::Value(())) // Network accounting
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::Value(()))
})
.await
} }
// Send data to a dial info, unbound, using a new connection from a random port // Send data to a dial info, unbound, using a new connection from a random port
@ -173,53 +192,59 @@ impl Network {
data: Vec<u8>, data: Vec<u8>,
timeout_ms: u32, timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> { ) -> EyreResult<NetworkResult<Vec<u8>>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
let connect_timeout_ms = { let data_len = data.len();
let c = self.config.get(); let connect_timeout_ms = {
c.network.connection_initial_timeout_ms let c = self.config.get();
}; c.network.connection_initial_timeout_ms
};
if self if self
.network_manager() .network_manager()
.address_filter() .address_filter()
.is_ip_addr_punished(dial_info.address().to_ip_addr()) .is_ip_addr_punished(dial_info.address().to_ip_addr())
{ {
return Ok(NetworkResult::no_connection_other("punished")); return Ok(NetworkResult::no_connection_other("punished"));
} }
match dial_info.protocol_type() { match dial_info.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
bail!("no support for UDP protocol") bail!("no support for UDP protocol")
} }
ProtocolType::TCP => { ProtocolType::TCP => {
bail!("no support for TCP protocol") bail!("no support for TCP protocol")
} }
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
let pnc = network_result_try!(match dial_info.protocol_type() { let pnc = network_result_try!(match dial_info.protocol_type() {
ProtocolType::UDP => unreachable!(), ProtocolType::UDP => unreachable!(),
ProtocolType::TCP => unreachable!(), ProtocolType::TCP => unreachable!(),
ProtocolType::WS | ProtocolType::WSS => { ProtocolType::WS | ProtocolType::WSS => {
WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms) WebsocketProtocolHandler::connect(&dial_info, connect_timeout_ms)
.await
.wrap_err("connect failure")?
}
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
let out =
network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
.await .await
.wrap_err("connect failure")? .into_network_result())
} .wrap_err("recv failure")?);
});
network_result_try!(pnc.send(data).await.wrap_err("send failure")?); self.network_manager().stats_packet_rcvd(
self.network_manager() dial_info.to_ip_addr(),
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); ByteCount::new(out.len() as u64),
);
let out = network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv()) Ok(NetworkResult::Value(out))
.await }
.into_network_result())
.wrap_err("recv failure")?);
self.network_manager()
.stats_packet_rcvd(dial_info.to_ip_addr(), ByteCount::new(out.len() as u64));
Ok(NetworkResult::Value(out))
} }
} })
.await
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
@ -273,34 +298,37 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
let data_len = data.len(); self.record_dial_info_failure(dial_info.clone(), async move {
if dial_info.protocol_type() == ProtocolType::UDP { let data_len = data.len();
bail!("no support for UDP protocol"); if dial_info.protocol_type() == ProtocolType::UDP {
} bail!("no support for UDP protocol");
if dial_info.protocol_type() == ProtocolType::TCP { }
bail!("no support for TCP protocol"); if dial_info.protocol_type() == ProtocolType::TCP {
} bail!("no support for TCP protocol");
}
// Handle connection-oriented protocols // Handle connection-oriented protocols
let conn = network_result_try!( let conn = network_result_try!(
self.connection_manager() self.connection_manager()
.get_or_create_connection(dial_info.clone()) .get_or_create_connection(dial_info.clone())
.await? .await?
); );
if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await { if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
return Ok(NetworkResult::NoConnection(io::Error::new( return Ok(NetworkResult::NoConnection(io::Error::new(
io::ErrorKind::ConnectionReset, io::ErrorKind::ConnectionReset,
"failed to send", "failed to send",
))); )));
} }
let connection_descriptor = conn.connection_descriptor(); let connection_descriptor = conn.connection_descriptor();
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64)); .stats_packet_sent(dial_info.to_ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(connection_descriptor)) Ok(NetworkResult::value(connection_descriptor))
})
.await
} }
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
@ -320,8 +348,8 @@ impl Network {
} }
// XXX: See issue #92 // XXX: See issue #92
let family_global = AddressTypeSet::all(); let family_global = AddressTypeSet::from(AddressType::IPV4);
let family_local = AddressTypeSet::all(); let family_local = AddressTypeSet::from(AddressType::IPV4);
ProtocolConfig { ProtocolConfig {
outbound, outbound,

View File

@ -297,7 +297,7 @@ impl BucketEntryInner {
// If we're updating an entry's node info, purge all // If we're updating an entry's node info, purge all
// but the last connection in our last connections list // but the last connection in our last connections list
// because the dial info could have changed and its safer to just reconnect. // because the dial info could have changed and it's safer to just reconnect.
// The latest connection would have been the once we got the new node info // The latest connection would have been the once we got the new node info
// over so that connection is still valid. // over so that connection is still valid.
if node_info_changed { if node_info_changed {

View File

@ -538,6 +538,7 @@ impl RoutingTable {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
self.inner.read().get_contact_method( self.inner.read().get_contact_method(
routing_domain, routing_domain,
@ -545,6 +546,7 @@ impl RoutingTable {
peer_b, peer_b,
dial_info_filter, dial_info_filter,
sequencing, sequencing,
dif_sort,
) )
} }

View File

@ -401,6 +401,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
sequencing, sequencing,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
reachable = false; reachable = false;
@ -415,6 +416,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
Sequencing::EnsureOrdered, Sequencing::EnsureOrdered,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
can_do_sequenced = false; can_do_sequenced = false;
@ -438,6 +440,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
sequencing, sequencing,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
reachable = false; reachable = false;
@ -452,6 +455,7 @@ impl RouteSpecStore {
current_node, current_node,
DialInfoFilter::all(), DialInfoFilter::all(),
Sequencing::EnsureOrdered, Sequencing::EnsureOrdered,
None,
); );
if matches!(cm, ContactMethod::Unreachable) { if matches!(cm, ContactMethod::Unreachable) {
can_do_sequenced = false; can_do_sequenced = false;

View File

@ -220,6 +220,7 @@ pub trait RoutingDomainDetail {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod; ) -> ContactMethod;
} }
@ -245,6 +246,7 @@ fn first_filtered_dial_info_detail_between_nodes(
to_node: &NodeInfo, to_node: &NodeInfo,
dial_info_filter: &DialInfoFilter, dial_info_filter: &DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>
) -> Option<DialInfoDetail> { ) -> Option<DialInfoDetail> {
let dial_info_filter = dial_info_filter.clone().filtered( let dial_info_filter = dial_info_filter.clone().filtered(
&DialInfoFilter::all() &DialInfoFilter::all()
@ -253,11 +255,28 @@ fn first_filtered_dial_info_detail_between_nodes(
); );
// Apply sequencing and get sort // Apply sequencing and get sort
// Include sorting by external dial info sort for rotating through dialinfo
// based on an external preference table, for example the one kept by
// AddressFilter to deprioritize dialinfo that have recently failed to connect
let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing); let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
let sort = if ordered { let sort: Option<Box<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if ordered {
Some(DialInfoDetail::ordered_sequencing_sort) if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a, b| {
let mut ord = dif_sort(a,b);
if ord == core::cmp::Ordering::Equal {
ord = DialInfoDetail::ordered_sequencing_sort(a,b);
}
ord
}))
} else {
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
}
} else { } else {
None if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a,b| { dif_sort(a,b) }))
} else {
None
}
}; };
// If the filter is dead then we won't be able to connect // If the filter is dead then we won't be able to connect
@ -287,6 +306,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
// Get the nodeinfos for convenience // Get the nodeinfos for convenience
let node_a = peer_a.signed_node_info().node_info(); let node_a = peer_a.signed_node_info().node_info();
@ -304,7 +324,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
// Get the best match dial info for node B if we have it // Get the best match dial info for node B if we have it
if let Some(target_did) = if let Some(target_did) =
first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing) first_filtered_dial_info_detail_between_nodes(node_a, node_b, &dial_info_filter, sequencing, dif_sort.clone())
{ {
// Do we need to signal before going inbound? // Do we need to signal before going inbound?
if !target_did.class.requires_signal() { if !target_did.class.requires_signal() {
@ -334,6 +354,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_b_relay, node_b_relay,
&dial_info_filter, &dial_info_filter,
sequencing, sequencing,
dif_sort.clone(),
) )
.is_some() .is_some()
{ {
@ -347,6 +368,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_a, node_a,
&dial_info_filter, &dial_info_filter,
sequencing, sequencing,
dif_sort.clone()
) { ) {
// Ensure we aren't on the same public IP address (no hairpin nat) // Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_did.dial_info.to_ip_addr() if reverse_did.dial_info.to_ip_addr()
@ -373,6 +395,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_b, node_b,
&udp_dial_info_filter, &udp_dial_info_filter,
sequencing, sequencing,
dif_sort.clone()
) { ) {
// Does node A have a direct udp dialinfo that node B can reach? // Does node A have a direct udp dialinfo that node B can reach?
if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes( if let Some(reverse_udp_did) = first_filtered_dial_info_detail_between_nodes(
@ -380,6 +403,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
node_a, node_a,
&udp_dial_info_filter, &udp_dial_info_filter,
sequencing, sequencing,
dif_sort.clone(),
) { ) {
// Ensure we aren't on the same public IP address (no hairpin nat) // Ensure we aren't on the same public IP address (no hairpin nat)
if reverse_udp_did.dial_info.to_ip_addr() if reverse_udp_did.dial_info.to_ip_addr()
@ -422,6 +446,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
&node_b_relay, &node_b_relay,
&dial_info_filter, &dial_info_filter,
sequencing, sequencing,
dif_sort.clone()
) )
.is_some() .is_some()
{ {
@ -496,6 +521,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
// Scope the filter down to protocols node A can do outbound // Scope the filter down to protocols node A can do outbound
let dial_info_filter = dial_info_filter.filtered( let dial_info_filter = dial_info_filter.filtered(
@ -504,20 +530,31 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
.with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()), .with_protocol_type_set(peer_a.signed_node_info().node_info().outbound_protocols()),
); );
// Get first filtered dialinfo // Apply sequencing and get sort
let (sort, dial_info_filter) = match sequencing { // Include sorting by external dial info sort for rotating through dialinfo
Sequencing::NoPreference => (None, dial_info_filter), // based on an external preference table, for example the one kept by
Sequencing::PreferOrdered => ( // AddressFilter to deprioritize dialinfo that have recently failed to connect
Some(DialInfoDetail::ordered_sequencing_sort), let (ordered, dial_info_filter) = dial_info_filter.with_sequencing(sequencing);
dial_info_filter, let sort: Option<Box<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>> = if ordered {
), if let Some(dif_sort) = dif_sort {
Sequencing::EnsureOrdered => ( Some(Box::new(move |a, b| {
Some(DialInfoDetail::ordered_sequencing_sort), let mut ord = dif_sort(a,b);
dial_info_filter.filtered( if ord == core::cmp::Ordering::Equal {
&DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), ord = DialInfoDetail::ordered_sequencing_sort(a,b);
), }
), ord
}))
} else {
Some(Box::new(move |a,b| { DialInfoDetail::ordered_sequencing_sort(a,b) }))
}
} else {
if let Some(dif_sort) = dif_sort {
Some(Box::new(move |a,b| { dif_sort(a,b) }))
} else {
None
}
}; };
// If the filter is dead then we won't be able to connect // If the filter is dead then we won't be able to connect
if dial_info_filter.is_dead() { if dial_info_filter.is_dead() {
return ContactMethod::Unreachable; return ContactMethod::Unreachable;

View File

@ -226,9 +226,10 @@ impl RoutingTableInner {
peer_b: &PeerInfo, peer_b: &PeerInfo,
dial_info_filter: DialInfoFilter, dial_info_filter: DialInfoFilter,
sequencing: Sequencing, sequencing: Sequencing,
dif_sort: Option<Arc<dyn Fn(&DialInfoDetail, &DialInfoDetail) -> core::cmp::Ordering>>,
) -> ContactMethod { ) -> ContactMethod {
self.with_routing_domain(routing_domain, |rdd| { self.with_routing_domain(routing_domain, |rdd| {
rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing) rdd.get_contact_method(self, peer_a, peer_b, dial_info_filter, sequencing, dif_sort)
}) })
} }

View File

@ -25,7 +25,7 @@ pub fn decode_signed_value_data(
let signature = decode_signature512(&sr); let signature = decode_signature512(&sr);
Ok(SignedValueData::new( Ok(SignedValueData::new(
ValueData::new_with_seq(seq, data, writer), ValueData::new_with_seq(seq, data, writer).map_err(RPCError::protocol)?,
signature, signature,
)) ))
} }

View File

@ -203,7 +203,7 @@ impl StorageManager {
} }
} }
/// Handle a recieved 'Get Value' query /// Handle a received 'Get Value' query
pub async fn inbound_get_value( pub async fn inbound_get_value(
&self, &self,
key: TypedKey, key: TypedKey,

View File

@ -402,9 +402,9 @@ impl StorageManager {
return Ok(None); return Ok(None);
} }
let seq = last_signed_value_data.value_data().seq(); let seq = last_signed_value_data.value_data().seq();
ValueData::new_with_seq(seq + 1, data, writer.key) ValueData::new_with_seq(seq + 1, data, writer.key)?
} else { } else {
ValueData::new(data, writer.key) ValueData::new(data, writer.key)?
}; };
// Validate with schema // Validate with schema

View File

@ -173,7 +173,7 @@ impl StorageManager {
} }
} }
/// Handle a recieved 'Set Value' query /// Handle a received 'Set Value' query
/// Returns a None if the value passed in was set /// Returns a None if the value passed in was set
/// Returns a Some(current value) if the value was older and the current value was kept /// Returns a Some(current value) if the value was older and the current value was kept
pub async fn inbound_set_value( pub async fn inbound_set_value(

View File

@ -167,8 +167,8 @@ pub fn setup_veilid_core() -> (UpdateCallback, ConfigCallback) {
fn config_callback(key: String) -> ConfigCallbackReturn { fn config_callback(key: String) -> ConfigCallbackReturn {
match key.as_str() { match key.as_str() {
"program_name" => Ok(Box::new(String::from("VeilidCoreTests"))), "program_name" => Ok(Box::new(String::from("VeilidCoreTests"))),
"namespace" => Ok(Box::new(String::from(""))), "namespace" => Ok(Box::<String>::default()),
"capabilities.disable" => Ok(Box::new(Vec::<FourCC>::new())), "capabilities.disable" => Ok(Box::<Vec::<FourCC>>::default()),
"table_store.directory" => Ok(Box::new(get_table_store_path())), "table_store.directory" => Ok(Box::new(get_table_store_path())),
"table_store.delete" => Ok(Box::new(true)), "table_store.delete" => Ok(Box::new(true)),
"block_store.directory" => Ok(Box::new(get_block_store_path())), "block_store.directory" => Ok(Box::new(get_block_store_path())),
@ -193,7 +193,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.network_key_password" => Ok(Box::new(Option::<String>::None)), "network.network_key_password" => Ok(Box::new(Option::<String>::None)),
"network.routing_table.node_id" => Ok(Box::new(TypedKeyGroup::new())), "network.routing_table.node_id" => Ok(Box::new(TypedKeyGroup::new())),
"network.routing_table.node_id_secret" => Ok(Box::new(TypedSecretGroup::new())), "network.routing_table.node_id_secret" => Ok(Box::new(TypedSecretGroup::new())),
"network.routing_table.bootstrap" => Ok(Box::new(Vec::<String>::new())), "network.routing_table.bootstrap" => Ok(Box::<Vec::<String>>::default()),
"network.routing_table.limit_over_attached" => Ok(Box::new(64u32)), "network.routing_table.limit_over_attached" => Ok(Box::new(64u32)),
"network.routing_table.limit_fully_attached" => Ok(Box::new(32u32)), "network.routing_table.limit_fully_attached" => Ok(Box::new(32u32)),
"network.routing_table.limit_attached_strong" => Ok(Box::new(16u32)), "network.routing_table.limit_attached_strong" => Ok(Box::new(16u32)),

View File

@ -207,6 +207,6 @@ pub fn fix_veilidvaluechange() -> VeilidValueChange {
key: fix_typedkey(), key: fix_typedkey(),
subkeys: vec![1, 2, 3, 4], subkeys: vec![1, 2, 3, 4],
count: 5, count: 5,
value: ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()), value: ValueData::new_with_seq(23, b"ValueData".to_vec(), fix_cryptokey()).unwrap(),
} }
} }

View File

@ -1,4 +1,5 @@
use super::*; use super::*;
use veilid_api::VeilidAPIResult;
#[derive(Clone, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema)] #[derive(Clone, Default, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize, JsonSchema)]
pub struct ValueData { pub struct ValueData {
@ -17,17 +18,25 @@ pub struct ValueData {
impl ValueData { impl ValueData {
pub const MAX_LEN: usize = 32768; pub const MAX_LEN: usize = 32768;
pub fn new(data: Vec<u8>, writer: PublicKey) -> Self { pub fn new(data: Vec<u8>, writer: PublicKey) -> VeilidAPIResult<Self> {
assert!(data.len() <= Self::MAX_LEN); if data.len() > Self::MAX_LEN {
Self { apibail_generic!("invalid size");
}
Ok(Self {
seq: 0, seq: 0,
data, data,
writer, writer,
} })
} }
pub fn new_with_seq(seq: ValueSeqNum, data: Vec<u8>, writer: PublicKey) -> Self { pub fn new_with_seq(
assert!(data.len() <= Self::MAX_LEN); seq: ValueSeqNum,
Self { seq, data, writer } data: Vec<u8>,
writer: PublicKey,
) -> VeilidAPIResult<Self> {
if data.len() > Self::MAX_LEN {
apibail_generic!("invalid size");
}
Ok(Self { seq, data, writer })
} }
pub fn seq(&self) -> ValueSeqNum { pub fn seq(&self) -> ValueSeqNum {
@ -56,3 +65,34 @@ impl fmt::Debug for ValueData {
.finish() .finish()
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn value_data_ok() {
assert!(ValueData::new(vec![0; ValueData::MAX_LEN], CryptoKey { bytes: [0; 32] }).is_ok());
assert!(ValueData::new_with_seq(
0,
vec![0; ValueData::MAX_LEN],
CryptoKey { bytes: [0; 32] }
)
.is_ok());
}
#[test]
fn value_data_too_long() {
assert!(ValueData::new(
vec![0; ValueData::MAX_LEN + 1],
CryptoKey { bytes: [0; 32] }
)
.is_err());
assert!(ValueData::new_with_seq(
0,
vec![0; ValueData::MAX_LEN + 1],
CryptoKey { bytes: [0; 32] }
)
.is_err());
}
}

View File

@ -403,7 +403,7 @@ packages:
path: ".." path: ".."
relative: true relative: true
source: path source: path
version: "0.1.9" version: "0.1.10"
web: web:
dependency: transitive dependency: transitive
description: description:

View File

@ -1,2 +1,2 @@
@echo off @echo off
flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 %*

View File

@ -1,2 +1,2 @@
#!/bin/bash #!/bin/bash
flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 flutter run --dart-define=DELETE_TABLE_STORE=1 --dart-define=DELETE_PROTECTED_STORE=1 --dart-define=DELETE_BLOCK_STORE=1 $@

View File

@ -58,129 +58,133 @@ int getRemoteMaxStorageSpaceMb() {
return 256; return 256;
} }
Future<VeilidConfig> getDefaultVeilidConfig(String programName) async => Future<VeilidConfig> getDefaultVeilidConfig(String programName) async {
VeilidConfig( // ignore: do_not_use_environment
programName: programName, const bootstrap = String.fromEnvironment('BOOTSTRAP');
namespace: '', return VeilidConfig(
capabilities: const VeilidConfigCapabilities(disable: []), programName: programName,
protectedStore: const VeilidConfigProtectedStore( namespace: '',
allowInsecureFallback: false, capabilities: const VeilidConfigCapabilities(disable: []),
alwaysUseInsecureStorage: false, protectedStore: const VeilidConfigProtectedStore(
directory: '', allowInsecureFallback: false,
delete: false, alwaysUseInsecureStorage: false,
deviceEncryptionKeyPassword: '', directory: '',
delete: false,
deviceEncryptionKeyPassword: '',
),
tableStore: VeilidConfigTableStore(
directory: kIsWeb
? ''
: p.join((await getApplicationSupportDirectory()).absolute.path,
'table_store'),
delete: false,
),
blockStore: VeilidConfigBlockStore(
directory: kIsWeb
? ''
: p.join((await getApplicationSupportDirectory()).absolute.path,
'block_store'),
delete: false,
),
network: VeilidConfigNetwork(
connectionInitialTimeoutMs: 2000,
connectionInactivityTimeoutMs: 60000,
maxConnectionsPerIp4: 32,
maxConnectionsPerIp6Prefix: 32,
maxConnectionsPerIp6PrefixSize: 56,
maxConnectionFrequencyPerMin: 128,
clientWhitelistTimeoutMs: 300000,
reverseConnectionReceiptTimeMs: 5000,
holePunchReceiptTimeMs: 5000,
routingTable: VeilidConfigRoutingTable(
nodeId: [],
nodeIdSecret: [],
bootstrap: bootstrap.isNotEmpty
? bootstrap.split(',')
: (kIsWeb
? ['ws://bootstrap.veilid.net:5150/ws']
: ['bootstrap.veilid.net']),
limitOverAttached: 64,
limitFullyAttached: 32,
limitAttachedStrong: 16,
limitAttachedGood: 8,
limitAttachedWeak: 4,
), ),
tableStore: VeilidConfigTableStore( rpc: const VeilidConfigRPC(
directory: kIsWeb concurrency: 0,
? '' queueSize: 1024,
: p.join((await getApplicationSupportDirectory()).absolute.path, maxTimestampBehindMs: 10000,
'table_store'), maxTimestampAheadMs: 10000,
delete: false, timeoutMs: 5000,
maxRouteHopCount: 4,
defaultRouteHopCount: 1,
), ),
blockStore: VeilidConfigBlockStore( dht: VeilidConfigDHT(
directory: kIsWeb resolveNodeTimeoutMs: 10000,
? '' resolveNodeCount: 1,
: p.join((await getApplicationSupportDirectory()).absolute.path, resolveNodeFanout: 4,
'block_store'), maxFindNodeCount: 20,
delete: false, getValueTimeoutMs: 10000,
), getValueCount: 3,
network: VeilidConfigNetwork( getValueFanout: 4,
setValueTimeoutMs: 10000,
setValueCount: 4,
setValueFanout: 6,
minPeerCount: 20,
minPeerRefreshTimeMs: 60000,
validateDialInfoReceiptTimeMs: 2000,
localSubkeyCacheSize: getLocalSubkeyCacheSize(),
localMaxSubkeyCacheMemoryMb: await getLocalMaxSubkeyCacheMemoryMb(),
remoteSubkeyCacheSize: getRemoteSubkeyCacheSize(),
remoteMaxRecords: getRemoteMaxRecords(),
remoteMaxSubkeyCacheMemoryMb: await getRemoteMaxSubkeyCacheMemoryMb(),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb()),
upnp: true,
detectAddressChanges: true,
restrictedNatRetries: 0,
tls: const VeilidConfigTLS(
certificatePath: '',
privateKeyPath: '',
connectionInitialTimeoutMs: 2000, connectionInitialTimeoutMs: 2000,
connectionInactivityTimeoutMs: 60000, ),
maxConnectionsPerIp4: 32, application: const VeilidConfigApplication(
maxConnectionsPerIp6Prefix: 32, https: VeilidConfigHTTPS(
maxConnectionsPerIp6PrefixSize: 56, enabled: false,
maxConnectionFrequencyPerMin: 128, listenAddress: '',
clientWhitelistTimeoutMs: 300000, path: '',
reverseConnectionReceiptTimeMs: 5000, ),
holePunchReceiptTimeMs: 5000, http: VeilidConfigHTTP(
routingTable: const VeilidConfigRoutingTable( enabled: false,
nodeId: [], listenAddress: '',
nodeIdSecret: [], path: '',
bootstrap: kIsWeb )),
? ['ws://bootstrap.veilid.net:5150/ws'] protocol: const VeilidConfigProtocol(
: ['bootstrap.veilid.net'], udp: VeilidConfigUDP(
limitOverAttached: 64, enabled: !kIsWeb,
limitFullyAttached: 32, socketPoolSize: 0,
limitAttachedStrong: 16, listenAddress: '',
limitAttachedGood: 8,
limitAttachedWeak: 4,
), ),
rpc: const VeilidConfigRPC( tcp: VeilidConfigTCP(
concurrency: 0, connect: !kIsWeb,
queueSize: 1024, listen: !kIsWeb,
maxTimestampBehindMs: 10000, maxConnections: 32,
maxTimestampAheadMs: 10000, listenAddress: '',
timeoutMs: 5000,
maxRouteHopCount: 4,
defaultRouteHopCount: 1,
), ),
dht: VeilidConfigDHT( ws: VeilidConfigWS(
resolveNodeTimeoutMs: 10000, connect: true,
resolveNodeCount: 1, listen: !kIsWeb,
resolveNodeFanout: 4, maxConnections: 32,
maxFindNodeCount: 20, listenAddress: '',
getValueTimeoutMs: 10000, path: 'ws',
getValueCount: 3,
getValueFanout: 4,
setValueTimeoutMs: 10000,
setValueCount: 4,
setValueFanout: 6,
minPeerCount: 20,
minPeerRefreshTimeMs: 60000,
validateDialInfoReceiptTimeMs: 2000,
localSubkeyCacheSize: getLocalSubkeyCacheSize(),
localMaxSubkeyCacheMemoryMb: await getLocalMaxSubkeyCacheMemoryMb(),
remoteSubkeyCacheSize: getRemoteSubkeyCacheSize(),
remoteMaxRecords: getRemoteMaxRecords(),
remoteMaxSubkeyCacheMemoryMb:
await getRemoteMaxSubkeyCacheMemoryMb(),
remoteMaxStorageSpaceMb: getRemoteMaxStorageSpaceMb()),
upnp: true,
detectAddressChanges: true,
restrictedNatRetries: 0,
tls: const VeilidConfigTLS(
certificatePath: '',
privateKeyPath: '',
connectionInitialTimeoutMs: 2000,
), ),
application: const VeilidConfigApplication( wss: VeilidConfigWSS(
https: VeilidConfigHTTPS( connect: true,
enabled: false, listen: false,
listenAddress: '', maxConnections: 32,
path: '', listenAddress: '',
), path: 'ws',
http: VeilidConfigHTTP(
enabled: false,
listenAddress: '',
path: '',
)),
protocol: const VeilidConfigProtocol(
udp: VeilidConfigUDP(
enabled: !kIsWeb,
socketPoolSize: 0,
listenAddress: '',
),
tcp: VeilidConfigTCP(
connect: !kIsWeb,
listen: !kIsWeb,
maxConnections: 32,
listenAddress: '',
),
ws: VeilidConfigWS(
connect: true,
listen: !kIsWeb,
maxConnections: 16,
listenAddress: '',
path: 'ws',
),
wss: VeilidConfigWSS(
connect: true,
listen: false,
maxConnections: 16,
listenAddress: '',
path: 'ws',
),
), ),
), ),
); ),
);
}

View File

@ -1,5 +1,6 @@
[package] [package]
name = "veilid-server" name = "veilid-server"
description = "Veilid Server"
version = "0.1.10" version = "0.1.10"
authors = ["Veilid Team <contact@veilid.com>"] authors = ["Veilid Team <contact@veilid.com>"]
license = "MPL-2.0" license = "MPL-2.0"
@ -37,7 +38,7 @@ tokio-util = { version = "^0", features = ["compat"], optional = true}
async-tungstenite = { version = "^0", features = ["async-tls"] } async-tungstenite = { version = "^0", features = ["async-tls"] }
color-eyre = { version = "^0", default-features = false } color-eyre = { version = "^0", default-features = false }
backtrace = "^0" backtrace = "^0"
clap = "^3" clap = { version= "4", features = ["derive", "string", "wrap_help"] }
directories = "^4" directories = "^4"
parking_lot = "^0" parking_lot = "^0"
config = { version = "^0", features = ["yaml"] } config = { version = "^0", features = ["yaml"] }

View File

@ -1,337 +0,0 @@
use crate::settings::*;
use crate::*;
use clap::{Arg, ArgMatches, Command};
use std::ffi::OsStr;
use std::path::Path;
use std::str::FromStr;
use veilid_core::{TypedKeyGroup, TypedSecretGroup};
fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = Command::new("veilid-server")
.version(env!("CARGO_PKG_VERSION"))
.about("Veilid Server")
.color(clap::ColorChoice::Auto)
.arg(
Arg::new("daemon")
.long("daemon")
.short('d')
.help("Run in daemon mode in the background"),
)
.arg(
Arg::new("foreground")
.long("foreground")
.short('f')
.conflicts_with("daemon")
.help("Run in the foreground"),
)
.arg(
Arg::new("config-file")
.short('c')
.long("config-file")
.takes_value(true)
.value_name("FILE")
.default_value_os(default_config_path)
.allow_invalid_utf8(true)
.help("Specify a configuration file to use"),
)
.arg(
Arg::new("set-config")
.short('s')
.long("set-config")
.takes_value(true)
.multiple_occurrences(true)
.help("Specify configuration value to set (key in dot format, value in json format), eg: logging.api.enabled=true")
)
.arg(
Arg::new("password")
.short('p')
.long("password")
.takes_value(true)
.help("Specify password to use to protect the device encryption key")
)
.arg(
Arg::new("new-password")
.long("new-password")
.takes_value(true)
.help("Change password used to protect the device encryption key. Device storage will be migrated.")
)
.arg(
Arg::new("attach")
.long("attach")
.takes_value(true)
.value_name("BOOL")
.possible_values(&["false", "true"])
.help("Automatically attach the server to the Veilid network"),
)
// Dev options
.arg(
Arg::new("debug")
.long("debug")
.help("Turn on debug logging on the terminal"),
)
.arg(
Arg::new("trace")
.long("trace")
.conflicts_with("debug")
.help("Turn on trace logging on the terminal"),
)
.arg(
Arg::new("otlp")
.long("otlp")
.takes_value(true)
.value_name("endpoint")
.default_missing_value("localhost:4317")
.help("Turn on OpenTelemetry tracing")
.long_help("This option uses the GRPC OpenTelemetry protocol, not HTTP. The format for the endpoint is host:port, like 'localhost:4317'"),
)
.arg(
Arg::new("subnode-index")
.long("subnode-index")
.takes_value(true)
.help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"),
)
.arg(
Arg::new("generate-key-pair")
.long("generate-key-pair")
.takes_value(true)
.value_name("crypto_kind")
.default_missing_value("")
.help("Only generate a new keypair and print it")
.long_help("Generate a new keypair for a specific crypto kind and print both the key and its secret to the terminal, then exit immediately."),
)
.arg(
Arg::new("set-node-id")
.long("set-node-id")
.takes_value(true)
.value_name("key_set")
.help("Set the node ids and secret keys")
.long_help("Specify node ids in typed key set format ('[VLD0:xxxx,VLD1:xxxx]') on the command line, a prompt appears to enter the secret key set interactively.")
)
.arg(
Arg::new("delete-protected-store")
.long("delete-protected-store")
.help("Delete the entire contents of the protected store (DANGER, NO UNDO!)"),
)
.arg(
Arg::new("delete-table-store")
.long("delete-table-store")
.help("Delete the entire contents of the table store (DANGER, NO UNDO!)"),
)
.arg(
Arg::new("delete-block-store")
.long("delete-block-store")
.help("Delete the entire contents of the block store (DANGER, NO UNDO!)"),
)
.arg(
Arg::new("dump-config")
.long("dump-config")
.help("Instead of running the server, print the configuration it would use to the console"),
)
.arg(
Arg::new("dump-txt-record")
.long("dump-txt-record")
.help("Prints the bootstrap TXT record for this node and then quits")
)
.arg(
Arg::new("emit-schema")
.long("emit-schema")
.takes_value(true)
.value_name("schema_name")
.default_missing_value("")
.help("Emits a JSON-Schema for a named type")
)
.arg(
Arg::new("bootstrap")
.long("bootstrap")
.takes_value(true)
.value_name("BOOTSTRAP_LIST")
.help("Specify a list of bootstrap hostnames to use")
)
.arg(
Arg::new("panic")
.long("panic")
.help("panic on ctrl-c instead of graceful shutdown"),
)
.arg(
Arg::new("network-key")
.long("network-key")
.takes_value(true)
.help("password override to use for network isolation"),
)
;
#[cfg(feature = "rt-tokio")]
let matches = matches.arg(
Arg::new("console")
.long("console")
.help("enable tokio console"),
);
#[cfg(debug_assertions)]
let matches = matches.arg(
Arg::new("wait-for-debug")
.long("wait-for-debug")
.help("Wait for debugger to attach"),
);
Ok(matches.get_matches())
}
pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> {
// Get command line options
let default_config_path = Settings::get_default_config_path();
let matches = do_clap_matches(default_config_path.as_os_str())
.wrap_err("failed to parse command line: {}")?;
// Check for one-off commands
#[cfg(debug_assertions)]
if matches.occurrences_of("wait-for-debug") != 0 {
use bugsalot::debugger;
debugger::wait_until_attached(None).expect("state() not implemented on this platform");
}
// Attempt to load configuration
let settings_path = if let Some(config_file) = matches.value_of_os("config-file") {
if Path::new(config_file).exists() {
Some(config_file)
} else {
None
}
} else {
None
};
let settings = Settings::new(settings_path).wrap_err("configuration is invalid")?;
// write lock the settings
let mut settingsrw = settings.write();
// Set config from command line
if matches.occurrences_of("daemon") != 0 {
settingsrw.daemon.enabled = true;
settingsrw.logging.terminal.enabled = false;
}
if matches.occurrences_of("foreground") != 0 {
settingsrw.daemon.enabled = false;
}
if matches.occurrences_of("subnode-index") != 0 {
let subnode_index = match matches.value_of("subnode-index") {
Some(x) => x.parse().wrap_err("couldn't parse subnode index")?,
None => {
bail!("value not specified for subnode-index");
}
};
if subnode_index == 0 {
bail!("value of subnode_index should be between 1 and 65535");
}
settingsrw.testing.subnode_index = subnode_index;
}
if matches.occurrences_of("debug") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Debug;
}
if matches.occurrences_of("trace") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Trace;
}
if matches.occurrences_of("otlp") != 0 {
settingsrw.logging.otlp.enabled = true;
settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str(
&matches
.value_of("otlp")
.expect("should not be null because of default missing value")
.to_string(),
)
.wrap_err("failed to parse OTLP address")?;
settingsrw.logging.otlp.level = LogLevel::Trace;
}
if matches.is_present("attach") {
settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true"));
}
if matches.occurrences_of("delete-protected-store") != 0 {
settingsrw.core.protected_store.delete = true;
}
if matches.occurrences_of("delete-block-store") != 0 {
settingsrw.core.block_store.delete = true;
}
if matches.occurrences_of("delete-table-store") != 0 {
settingsrw.core.table_store.delete = true;
}
if matches.occurrences_of("password") != 0 {
settingsrw.core.protected_store.device_encryption_key_password = matches.value_of("password").unwrap().to_owned();
}
if matches.occurrences_of("new-password") != 0 {
settingsrw.core.protected_store.new_device_encryption_key_password = Some(matches.value_of("new-password").unwrap().to_owned());
}
if matches.occurrences_of("network-key") != 0 {
settingsrw.core.network.network_key_password = Some(matches.value_of("network-key").unwrap().to_owned());
}
if matches.occurrences_of("dump-txt-record") != 0 {
// Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false;
}
if let Some(v) = matches.value_of("set-node-id") {
// Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false;
// Split or get secret
let tks =
TypedKeyGroup::from_str(v).wrap_err("failed to decode node id set from command line")?;
let buffer = rpassword::prompt_password("Enter secret key set (will not echo): ")
.wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string();
let tss = TypedSecretGroup::from_str(&buffer).wrap_err("failed to decode secret set")?;
settingsrw.core.network.routing_table.node_id = Some(tks);
settingsrw.core.network.routing_table.node_id_secret = Some(tss);
}
if matches.occurrences_of("bootstrap") != 0 {
let bootstrap_list = match matches.value_of("bootstrap") {
Some(x) => {
println!("Overriding bootstrap list with: ");
let mut out: Vec<String> = Vec::new();
for x in x.split(',') {
let x = x.trim().to_string();
if !x.is_empty() {
println!(" {}", x);
out.push(x);
}
}
out
}
None => {
bail!("value not specified for bootstrap");
}
};
settingsrw.core.network.routing_table.bootstrap = bootstrap_list;
}
#[cfg(feature = "rt-tokio")]
if matches.occurrences_of("console") != 0 {
settingsrw.logging.console.enabled = true;
}
drop(settingsrw);
// Set specific config settings
if let Some(set_configs) = matches.values_of("set-config") {
for set_config in set_configs {
if let Some((k, v)) = set_config.split_once('=') {
let k = k.trim();
let v = v.trim();
settings.set(k, v)?;
}
}
}
// Apply subnode index if we're testing
settings
.apply_subnode_index()
.wrap_err("failed to apply subnode index")?;
Ok((settings, matches))
}

View File

@ -4,7 +4,6 @@
#![recursion_limit = "256"] #![recursion_limit = "256"]
mod client_api; mod client_api;
mod cmdline;
mod server; mod server;
mod settings; mod settings;
mod tools; mod tools;
@ -14,82 +13,337 @@ mod veilid_logs;
#[cfg(windows)] #[cfg(windows)]
mod windows; mod windows;
use crate::settings::*;
use clap::{Args, Parser};
use server::*; use server::*;
use settings::LogLevel;
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::{OsString, OsStr};
use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use tools::*; use tools::*;
use veilid_core::{TypedKeyGroup, TypedSecretGroup};
use veilid_logs::*; use veilid_logs::*;
#[derive(Args, Debug, Clone)]
#[group(multiple = false)]
pub struct Logging {
/// Turn on debug logging on the terminal
#[arg(long)]
debug: bool,
/// Turn on trace logging on the terminal
#[arg(long)]
trace: bool,
}
#[derive(Parser, Debug, Clone)]
#[command(author, version, about)]
pub struct CmdlineArgs {
/// Run in daemon mode in the background
#[arg(short, long)]
daemon: bool,
/// Run in the foreground
#[arg(short, long)]
foreground: bool,
/// Specify a configuration file to use
#[arg(short, long, value_name = "FILE", default_value = OsString::from(Settings::get_default_config_path()))]
config_file: Option<OsString>,
/// Specify configuration value to set (key in dot format, value in json format), eg: logging.api.enabled=true
#[arg(short, long, value_name = "CONFIG")]
set_config: Vec<String>,
/// Specify password to use to protect the device encryption key
#[arg(short, long, value_name = "PASSWORD")]
password: Option<String>,
/// Change password used to protect the device encryption key. Device storage will be migrated.
#[arg(long, value_name = "PASSWORD")]
new_password: Option<String>,
/// Do not automatically attach the server to the Veilid network
///
/// Default behaviour is to automatically attach the server to the Veilid network, this option disables this behaviour.
#[arg(long, value_name = "BOOL")]
no_attach: bool,
#[command(flatten)]
logging: Logging,
/// Turn on OpenTelemetry tracing
///
/// This option uses the GRPC OpenTelemetry protocol, not HTTP. The format for the endpoint is host:port, like 'localhost:4317'
#[arg(long, value_name = "endpoint")]
otlp: Option<String>,
/// Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports
#[arg(long)]
subnode_index: Option<u16>,
/// Only generate a new keypair and print it
///
/// Generate a new keypair for a specific crypto kind and print both the key and its secret to the terminal, then exit immediately.
#[arg(long, value_name = "crypto_kind")]
generate_key_pair: Option<String>,
/// Set the node ids and secret keys
///
/// Specify node ids in typed key set format ('[VLD0:xxxx,VLD1:xxxx]') on the command line, a prompt appears to enter the secret key set interactively.
#[arg(long, value_name = "key_set")]
set_node_id: Option<String>,
/// Delete the entire contents of the protected store (DANGER, NO UNDO!)
#[arg(long)]
delete_protected_store: bool,
/// Delete the entire contents of the table store (DANGER, NO UNDO!)
#[arg(long)]
delete_table_store: bool,
/// Delete the entire contents of the block store (DANGER, NO UNDO!)
#[arg(long)]
delete_block_store: bool,
/// Instead of running the server, print the configuration it would use to the console
#[arg(long)]
dump_config: bool,
/// Prints the bootstrap TXT record for this node and then quits
#[arg(long)]
dump_txt_record: bool,
/// Emits a JSON-Schema for a named type
#[arg(long, value_name = "schema_name")]
emit_schema: Option<String>,
/// Specify a list of bootstrap hostnames to use
#[arg(long, value_name = "BOOTSTRAP_LIST")]
bootstrap: Option<String>,
/// panic on ctrl-c instead of graceful shutdown
#[arg(long)]
panic: bool,
/// password override to use for network isolation
#[arg(long, value_name = "KEY")]
network_key: Option<String>,
/// Wait for debugger to attach
#[cfg(debug_assertions)]
#[arg(long)]
wait_for_debug: bool,
/// enable tokio console
#[cfg(feature = "rt-tokio")]
#[arg(long)]
console: bool,
}
#[instrument(err)] #[instrument(err)]
fn main() -> EyreResult<()> { fn main() -> EyreResult<()> {
#[cfg(windows)] #[cfg(windows)]
let _ = ansi_term::enable_ansi_support(); let _ = ansi_term::enable_ansi_support();
color_eyre::install()?; color_eyre::install()?;
let (settings, matches) = cmdline::process_command_line()?; // Get command line options
let args = CmdlineArgs::parse();
let svc_args = args.clone();
// Check for one-off commands
#[cfg(debug_assertions)]
if args.wait_for_debug{
use bugsalot::debugger;
debugger::wait_until_attached(None).expect("state() not implemented on this platform");
}
// Attempt to load configuration
let settings_path: Option<&OsStr> = if let Some(config_file) = &args.config_file {
if Path::new(&config_file).exists() {
Some(config_file)
} else {
None
}
} else {
None
};
let settings = Settings::new(settings_path).wrap_err("configuration is invalid")?;
// write lock the settings
let mut settingsrw = settings.write();
// Set config from command line
if args.daemon {
settingsrw.daemon.enabled = true;
settingsrw.logging.terminal.enabled = false;
}
if args.foreground {
settingsrw.daemon.enabled = false;
}
if let Some(subnode_index) = args.subnode_index {
if subnode_index == 0 {
bail!("value of subnode_index should be between 1 and 65535");
}
settingsrw.testing.subnode_index = subnode_index;
};
if args.logging.debug {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Debug;
}
if args.logging.trace {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Trace;
}
if args.otlp.is_some() {
println!("Enabling OTLP tracing");
settingsrw.logging.otlp.enabled = true;
settingsrw.logging.otlp.grpc_endpoint = NamedSocketAddrs::from_str(
args.otlp.expect("should not be null because of default missing value").as_str(),
)
.wrap_err("failed to parse OTLP address")?;
settingsrw.logging.otlp.level = LogLevel::Trace;
}
if args.no_attach {
settingsrw.auto_attach = false;
}
if args.delete_protected_store {
settingsrw.core.protected_store.delete = true;
}
if args.delete_block_store {
settingsrw.core.block_store.delete = true;
}
if args.delete_table_store {
settingsrw.core.table_store.delete = true;
}
if let Some(password) = args.password {
settingsrw.core.protected_store.device_encryption_key_password = password;
}
if let Some(new_password) = args.new_password {
settingsrw.core.protected_store.new_device_encryption_key_password = Some(new_password);
}
if let Some(network_key) = args.network_key {
settingsrw.core.network.network_key_password = Some(network_key);
}
if args.dump_txt_record {
// Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false;
}
let mut node_id_set = false;
if let Some(key_set) = args.set_node_id {
node_id_set = true;
// Turn off terminal logging so we can be interactive
settingsrw.logging.terminal.enabled = false;
// Split or get secret
let tks =
TypedKeyGroup::from_str(&key_set).wrap_err("failed to decode node id set from command line")?;
let buffer = rpassword::prompt_password("Enter secret key set (will not echo): ")
.wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string();
let tss = TypedSecretGroup::from_str(&buffer).wrap_err("failed to decode secret set")?;
settingsrw.core.network.routing_table.node_id = Some(tks);
settingsrw.core.network.routing_table.node_id_secret = Some(tss);
}
if let Some(bootstrap) = args.bootstrap {
println!("Overriding bootstrap list with: ");
let mut bootstrap_list: Vec<String> = Vec::new();
for x in bootstrap.split(',') {
let x = x.trim().to_string();
if !x.is_empty() {
println!(" {}", x);
bootstrap_list.push(x);
}
}
settingsrw.core.network.routing_table.bootstrap = bootstrap_list;
};
#[cfg(feature = "rt-tokio")]
if args.console {
settingsrw.logging.console.enabled = true;
}
drop(settingsrw);
// Set specific config settings
for set_config in args.set_config {
if let Some((k, v)) = set_config.split_once('=') {
let k = k.trim();
let v = v.trim();
settings.set(k, v)?;
}
}
// Apply subnode index if we're testing
settings
.apply_subnode_index()
.wrap_err("failed to apply subnode index")?;
// --- Dump Config --- // --- Dump Config ---
if matches.occurrences_of("dump-config") != 0 { if args.dump_config {
return serde_yaml::to_writer(std::io::stdout(), &*settings.read()) return serde_yaml::to_writer(std::io::stdout(), &*settings.read())
.wrap_err("failed to write yaml"); .wrap_err("failed to write yaml");
} }
// --- Generate DHT Key --- // --- Generate DHT Key ---
if matches.occurrences_of("generate-key-pair") != 0 { if let Some(ckstr) = args.generate_key_pair {
if let Some(ckstr) = matches.get_one::<String>("generate-key-pair") { if ckstr == "" {
if ckstr == "" { let mut tks = veilid_core::TypedKeyGroup::new();
let mut tks = veilid_core::TypedKeyGroup::new(); let mut tss = veilid_core::TypedSecretGroup::new();
let mut tss = veilid_core::TypedSecretGroup::new(); for ck in veilid_core::VALID_CRYPTO_KINDS {
for ck in veilid_core::VALID_CRYPTO_KINDS { let tkp = veilid_core::Crypto::generate_keypair(ck)
let tkp = veilid_core::Crypto::generate_keypair(ck) .wrap_err("invalid crypto kind")?;
.wrap_err("invalid crypto kind")?; tks.add(veilid_core::TypedKey::new(tkp.kind, tkp.value.key));
tks.add(veilid_core::TypedKey::new(tkp.kind, tkp.value.key)); tss.add(veilid_core::TypedSecret::new(tkp.kind, tkp.value.secret));
tss.add(veilid_core::TypedSecret::new(tkp.kind, tkp.value.secret));
}
println!(
"Public Keys:\n{}\nSecret Keys:\n{}\n",
tks.to_string(),
tss.to_string()
);
} else {
let ck: veilid_core::CryptoKind =
veilid_core::FourCC::from_str(ckstr).wrap_err("couldn't parse crypto kind")?;
let tkp =
veilid_core::Crypto::generate_keypair(ck).wrap_err("invalid crypto kind")?;
println!("{}", tkp.to_string());
} }
return Ok(()); println!(
"Public Keys:\n{}\nSecret Keys:\n{}\n",
tks.to_string(),
tss.to_string()
);
} else { } else {
bail!("missing crypto kind"); let ck: veilid_core::CryptoKind =
veilid_core::FourCC::from_str(&ckstr).wrap_err("couldn't parse crypto kind")?;
let tkp =
veilid_core::Crypto::generate_keypair(ck).wrap_err("invalid crypto kind")?;
println!("{}", tkp.to_string());
} }
return Ok(());
} }
// -- Emit JSON-Schema -- // -- Emit JSON-Schema --
if matches.occurrences_of("emit-schema") != 0 { if let Some(esstr) = args.emit_schema {
if let Some(esstr) = matches.value_of("emit-schema") { let mut schemas = HashMap::<String, String>::new();
let mut schemas = HashMap::<String, String>::new(); veilid_core::json_api::emit_schemas(&mut schemas);
veilid_core::json_api::emit_schemas(&mut schemas);
if let Some(schema) = schemas.get(esstr) { if let Some(schema) = schemas.get(&esstr) {
println!("{}", schema); println!("{}", schema);
} else { } else {
println!("Valid schemas:"); println!("Valid schemas:");
for s in schemas.keys() { for s in schemas.keys() {
println!(" {}", s); println!(" {}", s);
}
} }
return Ok(());
} }
return Ok(());
} }
// See if we're just running a quick command // See if we're just running a quick command
let (server_mode, success, failure) = if matches.occurrences_of("set-node-id") != 0 { let (server_mode, success, failure) = if node_id_set {
( (
ServerMode::ShutdownImmediate, ServerMode::ShutdownImmediate,
"Node Id and Secret set successfully", "Node Id and Secret set successfully",
"Failed to set Node Id and Secret", "Failed to set Node Id and Secret",
) )
} else if matches.occurrences_of("dump-txt-record") != 0 { } else if args.dump_txt_record {
(ServerMode::DumpTXTRecord, "", "Failed to dump txt record") (ServerMode::DumpTXTRecord, "", "Failed to dump txt record")
} else { } else {
(ServerMode::Normal, "", "") (ServerMode::Normal, "", "")
@ -118,9 +372,9 @@ fn main() -> EyreResult<()> {
if settings.read().daemon.enabled { if settings.read().daemon.enabled {
cfg_if! { cfg_if! {
if #[cfg(windows)] { if #[cfg(windows)] {
return windows::run_service(settings, matches); return windows::run_service(settings, svc_args);
} else if #[cfg(unix)] { } else if #[cfg(unix)] {
return unix::run_daemon(settings, matches); return unix::run_daemon(settings, svc_args);
} }
} }
} }
@ -138,7 +392,7 @@ fn main() -> EyreResult<()> {
std::process::exit(1); std::process::exit(1);
})); }));
let panic_on_shutdown = matches.occurrences_of("panic") != 0; let panic_on_shutdown = args.panic;
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
if panic_on_shutdown { if panic_on_shutdown {
panic!("panic requested"); panic!("panic requested");

View File

@ -1,5 +1,6 @@
#![allow(clippy::bool_assert_comparison)] #![allow(clippy::bool_assert_comparison)]
use clap::ValueEnum;
use directories::*; use directories::*;
use crate::tools::*; use crate::tools::*;
@ -143,14 +144,14 @@ core:
ws: ws:
connect: true connect: true
listen: true listen: true
max_connections: 16 max_connections: 32
listen_address: '' listen_address: ''
path: 'ws' path: 'ws'
# url: 'ws://localhost:5150/ws' # url: 'ws://localhost:5150/ws'
wss: wss:
connect: true connect: true
listen: false listen: false
max_connections: 16 max_connections: 32
listen_address: '' listen_address: ''
path: 'ws' path: 'ws'
# url: '' # url: ''
@ -229,7 +230,7 @@ pub fn load_config(cfg: config::Config, config_file: &Path) -> EyreResult<config
} }
} }
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Copy, Clone, Debug, PartialEq, ValueEnum)]
pub enum LogLevel { pub enum LogLevel {
Off, Off,
Error, Error,
@ -1686,7 +1687,7 @@ mod tests {
// //
assert_eq!(s.core.network.protocol.ws.connect, true); assert_eq!(s.core.network.protocol.ws.connect, true);
assert_eq!(s.core.network.protocol.ws.listen, true); assert_eq!(s.core.network.protocol.ws.listen, true);
assert_eq!(s.core.network.protocol.ws.max_connections, 16); assert_eq!(s.core.network.protocol.ws.max_connections, 32);
assert_eq!(s.core.network.protocol.ws.listen_address.name, ""); assert_eq!(s.core.network.protocol.ws.listen_address.name, "");
assert_eq!(s.core.network.protocol.ws.listen_address.addrs, vec![]); assert_eq!(s.core.network.protocol.ws.listen_address.addrs, vec![]);
assert_eq!( assert_eq!(
@ -1697,7 +1698,7 @@ mod tests {
// //
assert_eq!(s.core.network.protocol.wss.connect, true); assert_eq!(s.core.network.protocol.wss.connect, true);
assert_eq!(s.core.network.protocol.wss.listen, false); assert_eq!(s.core.network.protocol.wss.listen, false);
assert_eq!(s.core.network.protocol.wss.max_connections, 16); assert_eq!(s.core.network.protocol.wss.max_connections, 32);
assert_eq!(s.core.network.protocol.wss.listen_address.name, ""); assert_eq!(s.core.network.protocol.wss.listen_address.name, "");
assert_eq!(s.core.network.protocol.wss.listen_address.addrs, vec![]); assert_eq!(s.core.network.protocol.wss.listen_address.addrs, vec![]);
assert_eq!( assert_eq!(

View File

@ -1,8 +1,8 @@
use crate::*;
use crate::server::*; use crate::server::*;
use crate::settings::Settings; use crate::settings::Settings;
use crate::tools::*; use crate::tools::*;
use crate::veilid_logs::*; use crate::veilid_logs::*;
use clap::ArgMatches;
use futures_util::StreamExt; use futures_util::StreamExt;
use signal_hook::consts::signal::*; use signal_hook::consts::signal::*;
use signal_hook_async_std::Signals; use signal_hook_async_std::Signals;
@ -26,7 +26,7 @@ async fn handle_signals(mut signals: Signals) {
#[warn(missing_docs)] #[warn(missing_docs)]
#[instrument(err)] #[instrument(err)]
pub fn run_daemon(settings: Settings, _matches: ArgMatches) -> EyreResult<()> { pub fn run_daemon(settings: Settings, _args: CmdlineArgs) -> EyreResult<()> {
let daemon = { let daemon = {
let mut daemon = daemonize::Daemonize::new(); let mut daemon = daemonize::Daemonize::new();
let s = settings.read(); let s = settings.read();

View File

@ -1,6 +1,5 @@
use crate::settings::*; use crate::settings::*;
use crate::*; use crate::*;
use clap::ArgMatches;
use std::ffi::OsString; use std::ffi::OsString;
use std::time::Duration; use std::time::Duration;
use tracing::*; use tracing::*;
@ -12,7 +11,7 @@ use windows_service::*;
// Register generated `ffi_service_main` with the system and start the service, blocking // Register generated `ffi_service_main` with the system and start the service, blocking
// this thread until the service is stopped. // this thread until the service is stopped.
pub fn run_service(settings: Settings, matches: ArgMatches) -> EyreResult<()> { pub fn run_service(settings: Settings, _args: CmdlineArgs) -> EyreResult<()> {
eprintln!("Windows Service mode not implemented yet."); eprintln!("Windows Service mode not implemented yet.");
//service_dispatcher::start("veilid-server", ffi_veilid_service_main)?; //service_dispatcher::start("veilid-server", ffi_veilid_service_main)?;

View File

@ -34,7 +34,7 @@ else
OUTPUTDIR=../target/wasm32-unknown-unknown/debug/pkg OUTPUTDIR=../target/wasm32-unknown-unknown/debug/pkg
INPUTDIR=../target/wasm32-unknown-unknown/debug INPUTDIR=../target/wasm32-unknown-unknown/debug
RUSTFLAGS="-O -g" cargo build --target wasm32-unknown-unknown RUSTFLAGS="-O -g $RUSTFLAGS" cargo build --target wasm32-unknown-unknown
mkdir -p $OUTPUTDIR mkdir -p $OUTPUTDIR
wasm-bindgen --out-dir $OUTPUTDIR --target web --keep-debug --debug $INPUTDIR/veilid_wasm.wasm wasm-bindgen --out-dir $OUTPUTDIR --target web --keep-debug --debug $INPUTDIR/veilid_wasm.wasm
./wasm-sourcemap.py $OUTPUTDIR/veilid_wasm_bg.wasm -o $OUTPUTDIR/veilid_wasm_bg.wasm.map --dwarfdump $DWARFDUMP ./wasm-sourcemap.py $OUTPUTDIR/veilid_wasm_bg.wasm -o $OUTPUTDIR/veilid_wasm_bg.wasm.map --dwarfdump $DWARFDUMP
@ -44,4 +44,4 @@ fi
popd &> /dev/null popd &> /dev/null
# Print for use with scripts # Print for use with scripts
echo SUCCESS:OUTPUTDIR=$(get_abs_filename $OUTPUTDIR) echo SUCCESS:OUTPUTDIR=$(get_abs_filename $OUTPUTDIR)