veilid/veilid-core/src/attachment_manager.rs

419 lines
14 KiB
Rust
Raw Normal View History

2022-10-30 23:29:31 +00:00
use crate::crypto::Crypto;
2021-11-22 16:28:30 +00:00
use crate::network_manager::*;
2022-03-24 14:14:50 +00:00
use crate::routing_table::*;
2021-11-22 16:28:30 +00:00
use crate::*;
use core::convert::TryFrom;
2021-12-21 00:12:30 +00:00
use core::fmt;
2022-11-11 03:11:57 +00:00
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
2022-02-07 02:18:42 +00:00
use serde::*;
2021-11-22 16:28:30 +00:00
state_machine! {
2022-11-09 22:11:35 +00:00
derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,)
2021-11-22 16:28:30 +00:00
pub Attachment(Detached)
//---
Detached(AttachRequested) => Attaching [StartAttachment],
Attaching => {
AttachmentStopped => Detached,
WeakPeers => AttachedWeak,
GoodPeers => AttachedGood,
StrongPeers => AttachedStrong,
FullPeers => FullyAttached,
TooManyPeers => OverAttached,
DetachRequested => Detaching [StopAttachment]
},
AttachedWeak => {
NoPeers => Attaching,
GoodPeers => AttachedGood,
StrongPeers => AttachedStrong,
FullPeers => FullyAttached,
TooManyPeers => OverAttached,
DetachRequested => Detaching [StopAttachment]
},
AttachedGood => {
NoPeers => Attaching,
WeakPeers => AttachedWeak,
StrongPeers => AttachedStrong,
FullPeers => FullyAttached,
TooManyPeers => OverAttached,
DetachRequested => Detaching [StopAttachment]
},
AttachedStrong => {
NoPeers => Attaching,
WeakPeers => AttachedWeak,
GoodPeers => AttachedGood,
FullPeers => FullyAttached,
TooManyPeers => OverAttached,
DetachRequested => Detaching [StopAttachment]
},
FullyAttached => {
NoPeers => Attaching,
WeakPeers => AttachedWeak,
GoodPeers => AttachedGood,
StrongPeers => AttachedStrong,
TooManyPeers => OverAttached,
DetachRequested => Detaching [StopAttachment]
},
OverAttached => {
NoPeers => Attaching,
WeakPeers => AttachedWeak,
GoodPeers => AttachedGood,
StrongPeers => AttachedStrong,
FullPeers => FullyAttached,
DetachRequested => Detaching [StopAttachment]
},
Detaching => {
AttachmentStopped => Detached,
},
}
2021-12-21 00:12:30 +00:00
impl fmt::Display for AttachmentState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let out = match self {
2021-11-22 16:28:30 +00:00
AttachmentState::Attaching => "attaching".to_owned(),
AttachmentState::AttachedWeak => "attached_weak".to_owned(),
AttachmentState::AttachedGood => "attached_good".to_owned(),
AttachmentState::AttachedStrong => "attached_strong".to_owned(),
AttachmentState::FullyAttached => "fully_attached".to_owned(),
AttachmentState::OverAttached => "over_attached".to_owned(),
AttachmentState::Detaching => "detaching".to_owned(),
AttachmentState::Detached => "detached".to_owned(),
2021-12-21 00:12:30 +00:00
};
write!(f, "{}", out)
2021-11-22 16:28:30 +00:00
}
}
impl TryFrom<String> for AttachmentState {
type Error = ();
fn try_from(s: String) -> Result<Self, Self::Error> {
Ok(match s.as_str() {
"attaching" => AttachmentState::Attaching,
"attached_weak" => AttachmentState::AttachedWeak,
"attached_good" => AttachmentState::AttachedGood,
"attached_strong" => AttachmentState::AttachedStrong,
"fully_attached" => AttachmentState::FullyAttached,
"over_attached" => AttachmentState::OverAttached,
"detaching" => AttachmentState::Detaching,
"detached" => AttachmentState::Detached,
_ => return Err(()),
})
}
}
pub struct AttachmentManagerInner {
attachment_machine: CallbackStateMachine<Attachment>,
maintain_peers: bool,
attach_timestamp: Option<u64>,
2022-03-24 14:14:50 +00:00
update_callback: Option<UpdateCallback>,
2022-06-13 00:58:02 +00:00
attachment_maintainer_jh: Option<MustJoinHandle<()>>,
2021-11-22 16:28:30 +00:00
}
2022-10-10 02:07:15 +00:00
pub struct AttachmentManagerUnlockedInner {
config: VeilidConfig,
network_manager: NetworkManager,
}
2021-11-22 16:28:30 +00:00
#[derive(Clone)]
pub struct AttachmentManager {
inner: Arc<Mutex<AttachmentManagerInner>>,
2022-10-10 02:07:15 +00:00
unlocked_inner: Arc<AttachmentManagerUnlockedInner>,
2021-11-22 16:28:30 +00:00
}
impl AttachmentManager {
2022-10-10 02:07:15 +00:00
fn new_unlocked_inner(
2021-11-22 16:28:30 +00:00
config: VeilidConfig,
2022-10-10 02:07:15 +00:00
protected_store: ProtectedStore,
2021-11-22 16:28:30 +00:00
table_store: TableStore,
2022-10-10 02:07:15 +00:00
block_store: BlockStore,
2021-11-22 16:28:30 +00:00
crypto: Crypto,
2022-10-10 02:07:15 +00:00
) -> AttachmentManagerUnlockedInner {
AttachmentManagerUnlockedInner {
2021-11-22 16:28:30 +00:00
config: config.clone(),
2022-10-10 02:07:15 +00:00
network_manager: NetworkManager::new(
config,
protected_store,
table_store,
block_store,
crypto,
),
}
}
fn new_inner() -> AttachmentManagerInner {
AttachmentManagerInner {
2021-11-22 16:28:30 +00:00
attachment_machine: CallbackStateMachine::new(),
maintain_peers: false,
attach_timestamp: None,
2022-03-24 14:14:50 +00:00
update_callback: None,
2021-11-22 16:28:30 +00:00
attachment_maintainer_jh: None,
}
}
2022-10-10 02:07:15 +00:00
pub fn new(
config: VeilidConfig,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
crypto: Crypto,
) -> Self {
2021-11-22 16:28:30 +00:00
Self {
2022-10-10 02:07:15 +00:00
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(
config,
protected_store,
table_store,
block_store,
crypto,
)),
2021-11-22 16:28:30 +00:00
}
}
pub fn config(&self) -> VeilidConfig {
2022-10-10 02:07:15 +00:00
self.unlocked_inner.config.clone()
2021-11-22 16:28:30 +00:00
}
pub fn network_manager(&self) -> NetworkManager {
2022-10-10 02:07:15 +00:00
self.unlocked_inner.network_manager.clone()
2021-11-22 16:28:30 +00:00
}
pub fn is_attached(&self) -> bool {
let s = self.inner.lock().attachment_machine.state();
2021-11-27 17:44:21 +00:00
!matches!(s, AttachmentState::Detached | AttachmentState::Detaching)
2021-11-22 16:28:30 +00:00
}
pub fn is_detached(&self) -> bool {
let s = self.inner.lock().attachment_machine.state();
2021-11-27 17:44:21 +00:00
matches!(s, AttachmentState::Detached)
2021-11-22 16:28:30 +00:00
}
pub fn get_attach_timestamp(&self) -> Option<u64> {
self.inner.lock().attach_timestamp
}
2022-03-24 14:14:50 +00:00
fn translate_routing_table_health(
health: RoutingTableHealth,
config: &VeilidConfigRoutingTable,
) -> AttachmentInput {
if health.reliable_entry_count >= config.limit_over_attached.try_into().unwrap() {
2021-11-22 16:28:30 +00:00
return AttachmentInput::TooManyPeers;
}
2022-03-24 14:14:50 +00:00
if health.reliable_entry_count >= config.limit_fully_attached.try_into().unwrap() {
return AttachmentInput::FullPeers;
}
if health.reliable_entry_count >= config.limit_attached_strong.try_into().unwrap() {
return AttachmentInput::StrongPeers;
}
if health.reliable_entry_count >= config.limit_attached_good.try_into().unwrap() {
return AttachmentInput::GoodPeers;
2021-11-22 16:28:30 +00:00
}
2022-03-24 14:14:50 +00:00
if health.reliable_entry_count >= config.limit_attached_weak.try_into().unwrap()
|| health.unreliable_entry_count >= config.limit_attached_weak.try_into().unwrap()
{
return AttachmentInput::WeakPeers;
}
AttachmentInput::NoPeers
2021-11-22 16:28:30 +00:00
}
2022-03-24 14:14:50 +00:00
fn translate_attachment_state(state: &AttachmentState) -> AttachmentInput {
2021-11-22 16:28:30 +00:00
match state {
AttachmentState::OverAttached => AttachmentInput::TooManyPeers,
AttachmentState::FullyAttached => AttachmentInput::FullPeers,
AttachmentState::AttachedStrong => AttachmentInput::StrongPeers,
AttachmentState::AttachedGood => AttachmentInput::GoodPeers,
AttachmentState::AttachedWeak => AttachmentInput::WeakPeers,
AttachmentState::Attaching => AttachmentInput::NoPeers,
_ => panic!("Invalid state"),
}
}
2022-03-24 14:14:50 +00:00
async fn update_attachment(&self) {
2021-11-22 16:28:30 +00:00
let new_peer_state_input = {
let inner = self.inner.lock();
let old_peer_state_input =
2022-03-24 14:14:50 +00:00
AttachmentManager::translate_attachment_state(&inner.attachment_machine.state());
2021-11-22 16:28:30 +00:00
2022-03-24 14:14:50 +00:00
// get reliable peer count from routing table
2022-10-10 02:07:15 +00:00
let routing_table = self.network_manager().routing_table();
2022-03-24 14:14:50 +00:00
let health = routing_table.get_routing_table_health();
2022-10-10 02:07:15 +00:00
let config = self.config();
let routing_table_config = &config.get().network.routing_table;
2021-11-22 16:28:30 +00:00
let new_peer_state_input =
2022-03-24 14:14:50 +00:00
AttachmentManager::translate_routing_table_health(health, routing_table_config);
2021-11-22 16:28:30 +00:00
if old_peer_state_input == new_peer_state_input {
None
} else {
Some(new_peer_state_input)
}
};
if let Some(next_input) = new_peer_state_input {
let _ = self.process_input(&next_input).await;
}
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip(self))]
2021-11-22 16:28:30 +00:00
async fn attachment_maintainer(self) {
2022-07-07 03:15:51 +00:00
debug!("attachment starting");
2022-11-27 02:37:23 +00:00
self.inner.lock().attach_timestamp = Some(get_timestamp());
2022-10-10 02:07:15 +00:00
let netman = self.network_manager();
2021-11-22 16:28:30 +00:00
2022-07-07 03:15:51 +00:00
let mut restart;
loop {
restart = false;
if let Err(err) = netman.startup().await {
error!("network startup failed: {}", err);
netman.shutdown().await;
break;
}
2021-11-22 16:28:30 +00:00
2022-07-07 03:15:51 +00:00
debug!("started maintaining peers");
2021-11-22 16:28:30 +00:00
while self.inner.lock().maintain_peers {
// tick network manager
if let Err(err) = netman.tick().await {
error!("Error in network manager: {}", err);
self.inner.lock().maintain_peers = false;
2022-07-07 03:15:51 +00:00
restart = true;
2021-11-22 16:28:30 +00:00
break;
}
2022-07-22 17:05:28 +00:00
// see if we need to restart the network
if netman.needs_restart() {
info!("Restarting network");
restart = true;
break;
}
2022-03-24 14:14:50 +00:00
self.update_attachment().await;
2021-11-22 16:28:30 +00:00
// sleep should be at the end in case maintain_peers changes state
2022-11-27 02:37:23 +00:00
sleep(1000).await;
2021-11-22 16:28:30 +00:00
}
2022-07-07 03:15:51 +00:00
debug!("stopped maintaining peers");
2021-11-22 16:28:30 +00:00
2022-07-07 03:15:51 +00:00
debug!("stopping network");
2021-11-22 16:28:30 +00:00
netman.shutdown().await;
2022-07-07 03:15:51 +00:00
if !restart {
break;
}
debug!("completely restarting attachment");
// chill out for a second first, give network stack time to settle out
2022-11-27 02:37:23 +00:00
sleep(1000).await;
2021-11-22 16:28:30 +00:00
}
trace!("stopping attachment");
let attachment_machine = self.inner.lock().attachment_machine.clone();
let _output = attachment_machine
.consume(&AttachmentInput::AttachmentStopped)
.await;
2022-07-07 03:15:51 +00:00
debug!("attachment stopped");
2021-11-22 16:28:30 +00:00
self.inner.lock().attach_timestamp = None;
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip_all, err)]
2022-07-10 21:36:50 +00:00
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
2022-03-09 03:32:12 +00:00
trace!("init");
2022-10-10 02:07:15 +00:00
{
2022-03-24 14:14:50 +00:00
let mut inner = self.inner.lock();
inner.update_callback = Some(update_callback.clone());
2022-05-16 15:52:48 +00:00
let update_callback2 = update_callback.clone();
2022-03-24 14:14:50 +00:00
inner.attachment_machine.set_state_change_callback(Arc::new(
move |_old_state: AttachmentState, new_state: AttachmentState| {
2022-05-16 15:52:48 +00:00
update_callback2(VeilidUpdate::Attachment(VeilidStateAttachment {
state: new_state,
}))
2022-03-24 14:14:50 +00:00
},
));
2022-02-07 02:18:42 +00:00
};
2021-11-22 16:28:30 +00:00
2022-10-10 02:07:15 +00:00
self.network_manager().init(update_callback).await?;
2021-11-22 16:28:30 +00:00
Ok(())
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "debug", skip(self))]
2021-11-22 16:28:30 +00:00
pub async fn terminate(&self) {
// Ensure we detached
self.detach().await;
2022-10-10 02:07:15 +00:00
self.network_manager().terminate().await;
self.inner.lock().update_callback = None;
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2021-11-22 16:28:30 +00:00
fn attach(&self) {
// Create long-running connection maintenance routine
2022-11-02 01:05:48 +00:00
let mut inner = self.inner.lock();
2022-10-10 02:07:15 +00:00
if inner.attachment_maintainer_jh.is_some() {
return;
}
inner.maintain_peers = true;
2022-11-27 02:37:23 +00:00
inner.attachment_maintainer_jh = Some(spawn(self.clone().attachment_maintainer()));
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self))]
2021-11-22 16:28:30 +00:00
async fn detach(&self) {
2022-10-10 02:07:15 +00:00
let attachment_maintainer_jh = {
let mut inner = self.inner.lock();
let attachment_maintainer_jh = inner.attachment_maintainer_jh.take();
if attachment_maintainer_jh.is_some() {
// Terminate long-running connection maintenance routine
inner.maintain_peers = false;
}
attachment_maintainer_jh
};
2021-11-22 16:28:30 +00:00
if let Some(jh) = attachment_maintainer_jh {
jh.await;
}
}
async fn handle_output(&self, output: &AttachmentOutput) {
match output {
AttachmentOutput::StartAttachment => self.attach(),
AttachmentOutput::StopAttachment => self.detach().await,
}
}
2022-07-12 16:45:54 +00:00
async fn process_input(&self, input: &AttachmentInput) -> EyreResult<()> {
2021-11-22 16:28:30 +00:00
let attachment_machine = self.inner.lock().attachment_machine.clone();
let output = attachment_machine.consume(input).await;
match output {
2022-07-12 16:45:54 +00:00
Err(e) => Err(eyre!(
2022-03-09 03:32:12 +00:00
"invalid input '{:?}' for state machine in state '{:?}': {:?}",
input,
attachment_machine.state(),
e
)),
2021-11-22 16:28:30 +00:00
Ok(v) => {
if let Some(o) = v {
self.handle_output(&o).await;
}
2022-03-09 03:32:12 +00:00
Ok(())
2021-11-22 16:28:30 +00:00
}
}
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self), err)]
2022-07-12 16:45:54 +00:00
pub async fn request_attach(&self) -> EyreResult<()> {
2022-03-09 03:32:12 +00:00
self.process_input(&AttachmentInput::AttachRequested)
.await
2022-07-12 16:45:54 +00:00
.map_err(|e| eyre!("Attach request failed: {}", e))
2021-11-22 16:28:30 +00:00
}
2022-06-10 21:07:10 +00:00
#[instrument(level = "trace", skip(self), err)]
2022-07-12 16:45:54 +00:00
pub async fn request_detach(&self) -> EyreResult<()> {
2022-03-09 03:32:12 +00:00
self.process_input(&AttachmentInput::DetachRequested)
.await
2022-07-12 16:45:54 +00:00
.map_err(|e| eyre!("Detach request failed: {}", e))
2021-11-22 16:28:30 +00:00
}
pub fn get_state(&self) -> AttachmentState {
let attachment_machine = self.inner.lock().attachment_machine.clone();
attachment_machine.state()
}
2022-05-16 15:52:48 +00:00
pub fn get_veilid_state(&self) -> VeilidStateAttachment {
VeilidStateAttachment {
state: self.get_state(),
}
}
2021-11-22 16:28:30 +00:00
}