more refactor
This commit is contained in:
@@ -73,7 +73,7 @@ pub(super) struct RoutingTableUnlockedInner {
|
||||
/// The current node's public DHT keys and secrets
|
||||
node_id_keypairs: BTreeMap<CryptoKind, KeyPair>,
|
||||
/// Buckets to kick on our next kick task
|
||||
kick_queue: Mutex<BTreeSet<usize>>,
|
||||
kick_queue: Mutex<BTreeSet<(CryptoKind, usize)>>,
|
||||
/// Background process for computing statistics
|
||||
rolling_transfers_task: TickTask<EyreReport>,
|
||||
/// Background process to purge dead routing table entries when necessary
|
||||
@@ -90,6 +90,56 @@ pub(super) struct RoutingTableUnlockedInner {
|
||||
private_route_management_task: TickTask<EyreReport>,
|
||||
}
|
||||
|
||||
impl RoutingTableUnlockedInner {
|
||||
pub fn network_manager(&self) -> NetworkManager {
|
||||
self.network_manager.clone()
|
||||
}
|
||||
pub fn crypto(&self) -> Crypto {
|
||||
self.network_manager().crypto()
|
||||
}
|
||||
pub fn rpc_processor(&self) -> RPCProcessor {
|
||||
self.network_manager().rpc_processor()
|
||||
}
|
||||
pub fn update_callback(&self) -> UpdateCallback {
|
||||
self.network_manager().update_callback()
|
||||
}
|
||||
pub fn with_config<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&VeilidConfigInner) -> R,
|
||||
{
|
||||
f(&*self.config.get())
|
||||
}
|
||||
|
||||
pub fn node_id(&self, kind: CryptoKind) -> PublicKey {
|
||||
self.node_id_keypairs.get(&kind).unwrap().key
|
||||
}
|
||||
|
||||
pub fn node_id_secret(&self, kind: CryptoKind) -> SecretKey {
|
||||
self.node_id_keypairs.get(&kind).unwrap().secret
|
||||
}
|
||||
|
||||
pub fn matches_own_node_id(&self, node_ids: &[TypedKey]) -> bool {
|
||||
for ni in node_ids {
|
||||
if let Some(v) = self.node_id_keypairs.get(&ni.kind) {
|
||||
if v.key == ni.key {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn find_bucket_index(&self, node_id: TypedKey) -> Option<(CryptoKind, usize)> {
|
||||
let crypto = self.crypto();
|
||||
let self_node_id = self.node_id_keypairs.get(&node_id.kind)?.key;
|
||||
let vcrypto = crypto.get(node_id.kind)?;
|
||||
vcrypto
|
||||
.distance(&node_id.key, &self_node_id)
|
||||
.first_nonzero_bit()
|
||||
.map(|x| (node_id.kind, x))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RoutingTable {
|
||||
inner: Arc<RwLock<RoutingTableInner>>,
|
||||
@@ -142,37 +192,6 @@ impl RoutingTable {
|
||||
this
|
||||
}
|
||||
|
||||
pub fn network_manager(&self) -> NetworkManager {
|
||||
self.unlocked_inner.network_manager.clone()
|
||||
}
|
||||
pub fn crypto(&self) -> Crypto {
|
||||
self.network_manager().crypto()
|
||||
}
|
||||
pub fn rpc_processor(&self) -> RPCProcessor {
|
||||
self.network_manager().rpc_processor()
|
||||
}
|
||||
pub fn update_callback(&self) -> UpdateCallback {
|
||||
self.network_manager().update_callback()
|
||||
}
|
||||
pub fn with_config<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&VeilidConfigInner) -> R,
|
||||
{
|
||||
f(&*self.unlocked_inner.config.get())
|
||||
}
|
||||
|
||||
pub fn node_id(&self, kind: CryptoKind) -> PublicKey {
|
||||
self.unlocked_inner.node_id_keypairs.get(&kind).unwrap().key
|
||||
}
|
||||
|
||||
pub fn node_id_secret(&self, kind: CryptoKind) -> SecretKey {
|
||||
self.unlocked_inner
|
||||
.node_id_keypairs
|
||||
.get(&kind)
|
||||
.unwrap()
|
||||
.secret
|
||||
}
|
||||
|
||||
/////////////////////////////////////
|
||||
/// Initialization
|
||||
|
||||
@@ -245,56 +264,93 @@ impl RoutingTable {
|
||||
debug!("finished routing table terminate");
|
||||
}
|
||||
|
||||
/// Serialize routing table to table store
|
||||
async fn save_buckets(&self) -> EyreResult<()> {
|
||||
// Serialize all entries
|
||||
let mut bucketvec: Vec<Vec<u8>> = Vec::new();
|
||||
// Since entries are shared by multiple buckets per cryptokind
|
||||
// we need to get the list of all unique entries when serializing
|
||||
let mut all_entries: Vec<Arc<BucketEntry>> = Vec::new();
|
||||
|
||||
// Serialize all buckets and get map of entries
|
||||
let mut serialized_bucket_map: BTreeMap<CryptoKind, Vec<Vec<u8>>> = BTreeMap::new();
|
||||
{
|
||||
let mut entry_map: HashMap<*const BucketEntry, u32> = HashMap::new();
|
||||
let inner = &*self.inner.read();
|
||||
for bucket in &inner.buckets {
|
||||
bucketvec.push(bucket.save_bucket()?)
|
||||
for ck in VALID_CRYPTO_KINDS {
|
||||
let buckets = inner.buckets.get(&ck).unwrap();
|
||||
let mut serialized_buckets = Vec::new();
|
||||
for bucket in buckets.iter() {
|
||||
serialized_buckets.push(bucket.save_bucket(&mut all_entries, &mut entry_map)?)
|
||||
}
|
||||
serialized_bucket_map.insert(ck, serialized_buckets);
|
||||
}
|
||||
}
|
||||
let table_store = self.network_manager().table_store();
|
||||
|
||||
// Serialize all the entries
|
||||
let mut all_entry_bytes = Vec::with_capacity(all_entries.len());
|
||||
for entry in all_entries {
|
||||
// Serialize entry
|
||||
let entry_bytes = entry.with_inner(|e| to_rkyv(e))?;
|
||||
all_entry_bytes.push(entry_bytes);
|
||||
}
|
||||
|
||||
let table_store = self.unlocked_inner.network_manager().table_store();
|
||||
let tdb = table_store.open("routing_table", 1).await?;
|
||||
let bucket_count = bucketvec.len();
|
||||
let dbx = tdb.transact();
|
||||
if let Err(e) = dbx.store_rkyv(0, b"bucket_count", &bucket_count) {
|
||||
if let Err(e) = dbx.store_rkyv(0, b"serialized_bucket_map", &serialized_bucket_map) {
|
||||
dbx.rollback();
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
for (n, b) in bucketvec.iter().enumerate() {
|
||||
dbx.store(0, format!("bucket_{}", n).as_bytes(), b)
|
||||
if let Err(e) = dbx.store_rkyv(0, b"all_entry_bytes", &all_entry_bytes) {
|
||||
dbx.rollback();
|
||||
return Err(e);
|
||||
}
|
||||
dbx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Deserialize routing table from table store
|
||||
async fn load_buckets(&self) -> EyreResult<()> {
|
||||
// Deserialize all entries
|
||||
let tstore = self.network_manager().table_store();
|
||||
// Deserialize bucket map and all entries from the table store
|
||||
let tstore = self.unlocked_inner.network_manager().table_store();
|
||||
let tdb = tstore.open("routing_table", 1).await?;
|
||||
let Some(bucket_count): Option<usize> = tdb.load_rkyv(0, b"bucket_count")? else {
|
||||
log_rtab!(debug "no bucket count in saved routing table");
|
||||
let Some(serialized_bucket_map): Option<BTreeMap<CryptoKind, Vec<Vec<u8>>>> = tdb.load_rkyv(0, b"serialized_bucket_map")? else {
|
||||
log_rtab!(debug "no bucket map in saved routing table");
|
||||
return Ok(());
|
||||
};
|
||||
let inner = &mut *self.inner.write();
|
||||
if bucket_count != inner.buckets.len() {
|
||||
// Must have the same number of buckets
|
||||
warn!("bucket count is different, not loading routing table");
|
||||
let Some(all_entry_bytes): Option<Vec<Vec<u8>>> = tdb.load_rkyv(0, b"all_entry_bytes")? else {
|
||||
log_rtab!(debug "no all_entry_bytes in saved routing table");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Reconstruct all entries
|
||||
let mut all_entries: Vec<Arc<BucketEntry>> = Vec::with_capacity(all_entry_bytes.len());
|
||||
for entry_bytes in all_entry_bytes {
|
||||
let entryinner =
|
||||
from_rkyv(entry_bytes).wrap_err("failed to deserialize bucket entry")?;
|
||||
all_entries.push(Arc::new(BucketEntry::new_with_inner(entryinner)));
|
||||
}
|
||||
let mut bucketdata_vec: Vec<Vec<u8>> = Vec::new();
|
||||
for n in 0..bucket_count {
|
||||
let Some(bucketdata): Option<Vec<u8>> =
|
||||
tdb.load(0, format!("bucket_{}", n).as_bytes())? else {
|
||||
warn!("bucket data not loading, skipping loading routing table");
|
||||
return Ok(());
|
||||
};
|
||||
bucketdata_vec.push(bucketdata);
|
||||
|
||||
// Validate serialized bucket map
|
||||
for (k, v) in &serialized_bucket_map {
|
||||
if !VALID_CRYPTO_KINDS.contains(k) {
|
||||
warn!("crypto kind is not valid, not loading routing table");
|
||||
return Ok(());
|
||||
}
|
||||
if v.len() != PUBLIC_KEY_LENGTH * 8 {
|
||||
warn!("bucket count is different, not loading routing table");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
for (n, bucketdata) in bucketdata_vec.into_iter().enumerate() {
|
||||
inner.buckets[n].load_bucket(bucketdata)?;
|
||||
|
||||
// Recreate buckets
|
||||
let inner = &mut *self.inner.write();
|
||||
|
||||
for (k, v) in serialized_bucket_map {
|
||||
let buckets = inner.buckets.get_mut(&k).unwrap();
|
||||
|
||||
for n in 0..v.len() {
|
||||
buckets[n].load_bucket(v[n].clone(), &all_entries)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -469,14 +525,6 @@ impl RoutingTable {
|
||||
self.inner.write().purge_last_connections();
|
||||
}
|
||||
|
||||
fn find_bucket_index(&self, node_id: TypedKey) -> usize {
|
||||
let crypto = self.crypto().get(node_id.kind).unwrap();
|
||||
|
||||
.distance(&node_id, &self.unlocked_inner.node_id)
|
||||
.first_nonzero_bit()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn get_entry_count(
|
||||
&self,
|
||||
routing_domain_set: RoutingDomainSet,
|
||||
@@ -502,23 +550,11 @@ impl RoutingTable {
|
||||
inner.get_all_nodes(self.clone(), cur_ts)
|
||||
}
|
||||
|
||||
fn queue_bucket_kick(&self, node_id: PublicKey) {
|
||||
let idx = self.find_bucket_index(node_id);
|
||||
fn queue_bucket_kick(&self, node_id: TypedKey) {
|
||||
let idx = self.unlocked_inner.find_bucket_index(node_id).unwrap();
|
||||
self.unlocked_inner.kick_queue.lock().insert(idx);
|
||||
}
|
||||
|
||||
/// Create a node reference, possibly creating a bucket entry
|
||||
/// the 'update_func' closure is called on the node, and, if created,
|
||||
/// in a locked fashion as to ensure the bucket entry state is always valid
|
||||
pub fn create_node_ref<F>(&self, node_id: PublicKey, update_func: F) -> Option<NodeRef>
|
||||
where
|
||||
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner),
|
||||
{
|
||||
self.inner
|
||||
.write()
|
||||
.create_node_ref(self.clone(), node_id, update_func)
|
||||
}
|
||||
|
||||
/// Resolve an existing routing table entry and return a reference to it
|
||||
pub fn lookup_node_ref(&self, node_id: PublicKey) -> Option<NodeRef> {
|
||||
self.inner.read().lookup_node_ref(self.clone(), node_id)
|
||||
@@ -542,18 +578,16 @@ impl RoutingTable {
|
||||
/// Shortcut function to add a node to our routing table if it doesn't exist
|
||||
/// and add the dial info we have for it. Returns a noderef filtered to
|
||||
/// the routing domain in which this node was registered for convenience.
|
||||
pub fn register_node_with_signed_node_info(
|
||||
pub fn register_node_with_peer_info(
|
||||
&self,
|
||||
routing_domain: RoutingDomain,
|
||||
node_id: PublicKey,
|
||||
signed_node_info: SignedNodeInfo,
|
||||
peer_info: PeerInfo,
|
||||
allow_invalid: bool,
|
||||
) -> Option<NodeRef> {
|
||||
self.inner.write().register_node_with_signed_node_info(
|
||||
self.inner.write().register_node_with_peer_info(
|
||||
self.clone(),
|
||||
routing_domain,
|
||||
node_id,
|
||||
signed_node_info,
|
||||
peer_info,
|
||||
allow_invalid,
|
||||
)
|
||||
}
|
||||
@@ -844,30 +878,13 @@ impl RoutingTable {
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
pub fn register_find_node_answer(&self, peers: Vec<PeerInfo>) -> Vec<NodeRef> {
|
||||
let node_id = self.node_id();
|
||||
|
||||
// register nodes we'd found
|
||||
let mut out = Vec::<NodeRef>::with_capacity(peers.len());
|
||||
for p in peers {
|
||||
// if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table
|
||||
if p.node_id.key == node_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
// node can not be its own relay
|
||||
if let Some(rid) = &p.signed_node_info.relay_id() {
|
||||
if rid.key == p.node_id.key {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// register the node if it's new
|
||||
if let Some(nr) = self.register_node_with_signed_node_info(
|
||||
RoutingDomain::PublicInternet,
|
||||
p.node_id.key,
|
||||
p.signed_node_info.clone(),
|
||||
false,
|
||||
) {
|
||||
if let Some(nr) =
|
||||
self.register_node_with_peer_info(RoutingDomain::PublicInternet, p, false)
|
||||
{
|
||||
out.push(nr);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user