checkpoint

This commit is contained in:
John Smith
2022-11-09 17:11:35 -05:00
parent a54da97393
commit e672ae0319
39 changed files with 1676 additions and 423 deletions

View File

@@ -8,7 +8,19 @@ pub struct Bucket {
}
pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, Arc<BucketEntry>>;
type BucketData = (Vec<(DHTKey, Vec<u8>)>, Option<DHTKey>);
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
struct BucketEntryData {
key: DHTKey,
value: Vec<u8>,
}
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
struct BucketData {
entries: Vec<BucketEntryData>,
newest_entry: Option<DHTKey>,
}
fn state_ordering(state: BucketEntryState) -> usize {
match state {
@@ -27,29 +39,33 @@ impl Bucket {
}
}
pub(super) fn load_bucket(&mut self, data: &[u8]) -> EyreResult<()> {
let bucket_data: BucketData =
serde_cbor::from_slice::<BucketData>(data).wrap_err("failed to deserialize bucket")?;
pub(super) fn load_bucket(&mut self, data: Vec<u8>) -> EyreResult<()> {
let bucket_data: BucketData = from_rkyv(data)?;
for (k, d) in bucket_data.0 {
let entryinner = serde_cbor::from_slice::<BucketEntryInner>(&d)
.wrap_err("failed to deserialize bucket entry")?;
for e in bucket_data.entries {
let entryinner = from_rkyv(e.value).wrap_err("failed to deserialize bucket entry")?;
self.entries
.insert(k, Arc::new(BucketEntry::new_with_inner(entryinner)));
.insert(e.key, Arc::new(BucketEntry::new_with_inner(entryinner)));
}
self.newest_entry = bucket_data.1;
self.newest_entry = bucket_data.newest_entry;
Ok(())
}
pub(super) fn save_bucket(&self) -> EyreResult<Vec<u8>> {
let mut entry_vec = Vec::new();
let mut entries = Vec::new();
for (k, v) in &self.entries {
let entry_bytes = v.with_mut_inner(|e| serde_cbor::to_vec(e))?;
entry_vec.push((*k, entry_bytes));
let entry_bytes = v.with_inner(|e| to_rkyv(e))?;
entries.push(BucketEntryData {
key: *k,
value: entry_bytes,
});
}
let bucket_data: BucketData = (entry_vec, self.newest_entry.clone());
let out = serde_cbor::to_vec(&bucket_data)?;
let bucket_data = BucketData {
entries,
newest_entry: self.newest_entry.clone(),
};
let out = to_rkyv(&bucket_data)?;
Ok(out)
}

View File

@@ -64,22 +64,44 @@ pub struct BucketEntryLocalNetwork {
node_status: Option<LocalNetworkNodeStatus>,
}
/// A range of cryptography versions supported by this entry
#[derive(Debug, Serialize, Deserialize)]
pub struct VersionRange {
/// The minimum cryptography version supported by this entry
min: u8,
/// The maximum cryptography version supported by this entry
max: u8,
}
/// The data associated with each bucket entry
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryInner {
min_max_version: Option<(u8, u8)>,
/// The minimum and maximum range of cryptography versions supported by the node,
/// inclusive of the requirements of any relay the node may be using
min_max_version: Option<VersionRange>,
/// Whether or not we have updated this peer with our node info since our network
/// and dial info has last changed, for example when our IP address changes
updated_since_last_network_change: bool,
/// The last connection descriptors used to contact this node, per protocol type
#[serde(skip)]
last_connections: BTreeMap<LastConnectionKey, (ConnectionDescriptor, u64)>,
/// The node info for this entry on the publicinternet routing domain
public_internet: BucketEntryPublicInternet,
/// The node info for this entry on the localnetwork routing domain
local_network: BucketEntryLocalNetwork,
/// Statistics gathered for the peer
peer_stats: PeerStats,
/// The accounting for the latency statistics
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// The accounting for the transfer statistics
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
/// Tracking identifier for NodeRef debugging
#[cfg(feature = "tracking")]
#[serde(skip)]
next_track_id: usize,
/// Backtraces for NodeRef debugging
#[cfg(feature = "tracking")]
#[serde(skip)]
node_ref_tracks: HashMap<usize, backtrace::Backtrace>,
@@ -190,12 +212,12 @@ impl BucketEntryInner {
// Always allow overwriting invalid/unsigned node
if current_sni.has_valid_signature() {
// If the timestamp hasn't changed or is less, ignore this update
if signed_node_info.timestamp <= current_sni.timestamp {
if signed_node_info.timestamp() <= current_sni.timestamp() {
// If we received a node update with the same timestamp
// we can make this node live again, but only if our network has recently changed
// which may make nodes that were unreachable now reachable with the same dialinfo
if !self.updated_since_last_network_change
&& signed_node_info.timestamp == current_sni.timestamp
&& signed_node_info.timestamp() == current_sni.timestamp()
{
// No need to update the signednodeinfo though since the timestamp is the same
// Touch the node and let it try to live again
@@ -207,11 +229,22 @@ impl BucketEntryInner {
}
}
// Update the protocol min/max version we have
self.min_max_version = Some((
signed_node_info.node_info.min_version,
signed_node_info.node_info.max_version,
));
// Update the protocol min/max version we have to use, to include relay requirements if needed
let mut version_range = VersionRange {
min: signed_node_info.node_info().min_version,
max: signed_node_info.node_info().max_version,
};
if let Some(relay_info) = signed_node_info.relay_info() {
version_range.min.max_assign(relay_info.min_version);
version_range.max.min_assign(relay_info.max_version);
}
if version_range.min <= version_range.max {
// Can be reached with at least one crypto version
self.min_max_version = Some(version_range);
} else {
// No valid crypto version in range
self.min_max_version = None;
}
// Update the signed node info
*opt_current_sni = Some(Box::new(signed_node_info));
@@ -238,7 +271,7 @@ impl BucketEntryInner {
RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
RoutingDomain::PublicInternet => &self.public_internet.signed_node_info,
};
opt_current_sni.as_ref().map(|s| &s.node_info)
opt_current_sni.as_ref().map(|s| s.node_info())
}
pub fn signed_node_info(&self, routing_domain: RoutingDomain) -> Option<&SignedNodeInfo> {
@@ -338,11 +371,11 @@ impl BucketEntryInner {
out
}
pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) {
pub fn set_min_max_version(&mut self, min_max_version: VersionRange) {
self.min_max_version = Some(min_max_version);
}
pub fn min_max_version(&self) -> Option<(u8, u8)> {
pub fn min_max_version(&self) -> Option<VersionRange> {
self.min_max_version
}

View File

@@ -163,7 +163,7 @@ impl RoutingTable {
// Load bucket entries from table db if possible
debug!("loading routing table entries");
if let Err(e) = self.load_buckets().await {
log_rtab!(warn "Error loading buckets from storage: {}. Resetting.", e);
log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e);
let mut inner = self.inner.write();
inner.init_buckets(self.clone());
}
@@ -173,7 +173,7 @@ impl RoutingTable {
let route_spec_store = match RouteSpecStore::load(self.clone()).await {
Ok(v) => v,
Err(e) => {
log_rtab!(warn "Error loading route spec store: {}. Resetting.", e);
log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e);
RouteSpecStore::new(self.clone())
}
};
@@ -239,7 +239,7 @@ impl RoutingTable {
let tdb = table_store.open("routing_table", 1).await?;
let bucket_count = bucketvec.len();
let mut dbx = tdb.transact();
if let Err(e) = dbx.store_cbor(0, b"bucket_count", &bucket_count) {
if let Err(e) = dbx.store_frozen(0, b"bucket_count", &bucket_count) {
dbx.rollback();
return Err(e);
}
@@ -253,14 +253,13 @@ impl RoutingTable {
async fn load_buckets(&self) -> EyreResult<()> {
// Deserialize all entries
let inner = &mut *self.inner.write();
let tstore = self.network_manager().table_store();
let tdb = tstore.open("routing_table", 1).await?;
let Some(bucket_count): Option<usize> = tdb.load_cbor(0, b"bucket_count")? else {
let Some(bucket_count): Option<usize> = tdb.load_rkyv(0, b"bucket_count")? else {
log_rtab!(debug "no bucket count in saved routing table");
return Ok(());
};
let inner = &mut *self.inner.write();
if bucket_count != inner.buckets.len() {
// Must have the same number of buckets
warn!("bucket count is different, not loading routing table");
@@ -275,8 +274,8 @@ impl RoutingTable {
};
bucketdata_vec.push(bucketdata);
}
for n in 0..bucket_count {
inner.buckets[n].load_bucket(&bucketdata_vec[n])?;
for (n, bucketdata) in bucketdata_vec.into_iter().enumerate() {
inner.buckets[n].load_bucket(bucketdata)?;
}
Ok(())
@@ -383,7 +382,7 @@ impl RoutingTable {
}
/// Return a copy of our node's signednodeinfo
pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo {
pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedDirectNodeInfo {
self.inner.read().get_own_signed_node_info(routing_domain)
}
@@ -526,7 +525,7 @@ impl RoutingTable {
&self,
routing_domain: RoutingDomain,
node_id: DHTKey,
signed_node_info: SignedNodeInfo,
signed_node_info: SignedDirectNodeInfo,
allow_invalid: bool,
) -> Option<NodeRef> {
self.inner.write().register_node_with_signed_node_info(

View File

@@ -385,9 +385,12 @@ impl NodeRef {
out
}
pub fn locked<'a>(&self, rti: &'a mut RoutingTableInner) -> NodeRefLocked<'a> {
pub fn locked<'a>(&self, rti: &'a RoutingTableInner) -> NodeRefLocked<'a> {
NodeRefLocked::new(rti, self.clone())
}
pub fn locked_mut<'a>(&self, rti: &'a mut RoutingTableInner) -> NodeRefLockedMut<'a> {
NodeRefLockedMut::new(rti, self.clone())
}
}
impl NodeRefBase for NodeRef {
@@ -480,12 +483,12 @@ impl Drop for NodeRef {
/// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone
pub struct NodeRefLocked<'a> {
inner: Mutex<&'a mut RoutingTableInner>,
inner: Mutex<&'a RoutingTableInner>,
nr: NodeRef,
}
impl<'a> NodeRefLocked<'a> {
pub fn new(inner: &'a mut RoutingTableInner, nr: NodeRef) -> Self {
pub fn new(inner: &'a RoutingTableInner, nr: NodeRef) -> Self {
Self {
inner: Mutex::new(inner),
nr,
@@ -510,6 +513,65 @@ impl<'a> NodeRefBase for NodeRefLocked<'a> {
self.nr.common.entry.with(inner, f)
}
fn operate_mut<T, F>(&self, _f: F) -> T
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
panic!("need to locked_mut() for this operation")
}
}
impl<'a> fmt::Display for NodeRefLocked<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.nr)
}
}
impl<'a> fmt::Debug for NodeRefLocked<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeRefLocked")
.field("nr", &self.nr)
.finish()
}
}
////////////////////////////////////////////////////////////////////////////////////
/// Mutable locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone
pub struct NodeRefLockedMut<'a> {
inner: Mutex<&'a mut RoutingTableInner>,
nr: NodeRef,
}
impl<'a> NodeRefLockedMut<'a> {
pub fn new(inner: &'a mut RoutingTableInner, nr: NodeRef) -> Self {
Self {
inner: Mutex::new(inner),
nr,
}
}
}
impl<'a> NodeRefBase for NodeRefLockedMut<'a> {
fn common(&self) -> &NodeRefBaseCommon {
&self.nr.common
}
fn common_mut(&mut self) -> &mut NodeRefBaseCommon {
&mut self.nr.common
}
fn operate<T, F>(&self, f: F) -> T
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.inner.lock();
self.nr.common.entry.with(inner, f)
}
fn operate_mut<T, F>(&self, f: F) -> T
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
@@ -519,15 +581,15 @@ impl<'a> NodeRefBase for NodeRefLocked<'a> {
}
}
impl<'a> fmt::Display for NodeRefLocked<'a> {
impl<'a> fmt::Display for NodeRefLockedMut<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.nr)
}
}
impl<'a> fmt::Debug for NodeRefLocked<'a> {
impl<'a> fmt::Debug for NodeRefLockedMut<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeRefLocked")
f.debug_struct("NodeRefLockedMut")
.field("nr", &self.nr)
.finish()
}

View File

@@ -221,11 +221,11 @@ impl RouteSpecStore {
)
};
// Get cbor blob from table store
// Get frozen blob from table store
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
let mut content: RouteSpecStoreContent =
rsstdb.load_cbor(0, b"content")?.unwrap_or_default();
rsstdb.load_json(0, b"content")?.unwrap_or_default();
// Look up all route hop noderefs since we can't serialize those
let mut dead_keys = Vec::new();
@@ -246,7 +246,7 @@ impl RouteSpecStore {
// Load secrets from pstore
let pstore = routing_table.network_manager().protected_store();
let out: Vec<(DHTKey, DHTKeySecret)> = pstore
.load_user_secret_cbor("RouteSpecStore")
.load_user_secret_rkyv("RouteSpecStore")
.await?
.unwrap_or_default();
@@ -289,14 +289,14 @@ impl RouteSpecStore {
inner.content.clone()
};
// Save all the fields we care about to the cbor blob in table storage
// Save all the fields we care about to the frozen blob in table storage
let table_store = self
.unlocked_inner
.routing_table
.network_manager()
.table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
rsstdb.store_cbor(0, b"content", &content)?;
rsstdb.store_json(0, b"content", &content)?;
// // Keep secrets in protected store as well
let pstore = self
@@ -310,7 +310,9 @@ impl RouteSpecStore {
out.push((*k, v.secret_key));
}
let _ = pstore.save_user_secret_cbor("RouteSpecStore", &out).await?; // ignore if this previously existed or not
let _ = pstore
.save_user_secret_frozen("RouteSpecStore", &out)
.await?; // ignore if this previously existed or not
Ok(())
}

View File

@@ -101,6 +101,45 @@ impl RoutingDomainDetailCommon {
self.network_class.unwrap_or(NetworkClass::Invalid) != NetworkClass::Invalid
}
fn make_peer_info(&self, rti: &RoutingTableInner) -> PeerInfo {
let node_info = NodeInfo {
network_class: self.network_class.unwrap_or(NetworkClass::Invalid),
outbound_protocols: self.outbound_protocols,
address_types: self.address_types,
min_version: MIN_CRYPTO_VERSION,
max_version: MAX_CRYPTO_VERSION,
dial_info_detail_list: self.dial_info_details.clone(),
};
let relay_peer_info = self
.relay_node
.as_ref()
.and_then(|rn| rn.locked(rti).make_peer_info(self.routing_domain));
let signed_node_info = match relay_peer_info {
Some(relay_pi) => SignedNodeInfo::Relayed(
SignedRelayedNodeInfo::with_secret(
NodeId::new(rti.unlocked_inner.node_id),
node_info,
relay_pi.node_id,
relay_pi.signed_node_info,
&rti.unlocked_inner.node_id_secret,
)
.unwrap(),
),
None => SignedNodeInfo::Direct(
SignedDirectNodeInfo::with_secret(
NodeId::new(rti.unlocked_inner.node_id),
node_info,
&rti.unlocked_inner.node_id_secret,
)
.unwrap(),
),
};
PeerInfo::new(NodeId::new(rti.unlocked_inner.node_id), signed_node_info)
}
pub fn with_peer_info<F, R>(&self, rti: &RoutingTableInner, f: F) -> R
where
F: FnOnce(&PeerInfo) -> R,
@@ -110,7 +149,7 @@ impl RoutingDomainDetailCommon {
// Regenerate peer info
let pi = PeerInfo::new(
NodeId::new(rti.unlocked_inner.node_id),
SignedNodeInfo::with_secret(
SignedDirectNodeInfo::with_secret(
NodeInfo {
network_class: self.network_class.unwrap_or(NetworkClass::Invalid),
outbound_protocols: self.outbound_protocols,
@@ -118,10 +157,11 @@ impl RoutingDomainDetailCommon {
min_version: MIN_CRYPTO_VERSION,
max_version: MAX_CRYPTO_VERSION,
dial_info_detail_list: self.dial_info_details.clone(),
relay_peer_info: self
.relay_node
.as_ref()
.and_then(|rn| rn.make_peer_info(self.routing_domain).map(Box::new)),
relay_peer_info: self.relay_node.as_ref().and_then(|rn| {
rn.locked(rti)
.make_peer_info(self.routing_domain)
.map(Box::new)
}),
},
NodeId::new(rti.unlocked_inner.node_id),
&rti.unlocked_inner.node_id_secret,

View File

@@ -253,7 +253,7 @@ impl RoutingTableInner {
}
/// Return a copy of our node's signednodeinfo
pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedNodeInfo {
pub fn get_own_signed_node_info(&self, routing_domain: RoutingDomain) -> SignedDirectNodeInfo {
self.with_routing_domain(routing_domain, |rdd| {
rdd.common()
.with_peer_info(self, |pi| pi.signed_node_info.clone())
@@ -662,7 +662,7 @@ impl RoutingTableInner {
outer_self: RoutingTable,
routing_domain: RoutingDomain,
node_id: DHTKey,
signed_node_info: SignedNodeInfo,
signed_node_info: SignedDirectNodeInfo,
allow_invalid: bool,
) -> Option<NodeRef> {
// validate signed node info is not something malicious
@@ -717,7 +717,8 @@ impl RoutingTableInner {
});
if let Some(nr) = &out {
// set the most recent node address for connection finding and udp replies
nr.locked(self).set_last_connection(descriptor, timestamp);
nr.locked_mut(self)
.set_last_connection(descriptor, timestamp);
}
out
}