windows network support

This commit is contained in:
John Smith
2022-01-15 18:24:37 -05:00
parent 925c42b275
commit 60c8cd7f03
14 changed files with 920 additions and 442 deletions

View File

@@ -12,7 +12,7 @@ use std::sync::mpsc::SyncSender as StdSender;
use std::sync::mpsc::TrySendError as StdTrySendError;
//////////////////////////////////////////
#[derive(Clone)]
pub struct ClientLogChannelCloser {
sender: Arc<Mutex<Option<StdSender<String>>>>,
}
@@ -59,6 +59,7 @@ pub type ClientLogChannelWriter = std::io::LineWriter<ClientLogChannelWriterShim
//////////////////////////////////////////
#[derive(Clone)]
pub struct ClientLogChannel {
async_receiver: AsyncReceiver<String>,
}

View File

@@ -0,0 +1,173 @@
use crate::settings::*;
use std::ffi::OsStr;
use clap::{App, Arg, ArgMatches};
use std::str::FromStr;
fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = App::new("veilid-server")
.version("0.1")
.about("Veilid Server")
.color(clap::ColorChoice::Auto)
.arg(
Arg::new("daemon")
.long("daemon")
.short('d')
.help("Run in daemon mode in the background"),
)
.arg(
Arg::new("config-file")
.short('c')
.long("config-file")
.takes_value(true)
.value_name("FILE")
.default_value_os(default_config_path)
.help("Specify a configuration file to use"),
).arg(
Arg::new("attach")
.long("attach")
.takes_value(true)
.value_name("BOOL")
.possible_values(&["false", "true"])
.help("Automatically attach the server to the Veilid network"),
)
// Dev options
.arg(
Arg::new("debug")
.long("debug")
.help("Turn on debug logging on the terminal"),
)
.arg(
Arg::new("trace")
.long("trace")
.conflicts_with("debug")
.help("Turn on trace logging on the terminal"),
)
.arg(
Arg::new("subnode_index")
.long("subnode_index")
.takes_value(true)
.help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"),
)
.arg(
Arg::new("generate-dht-key")
.long("generate-dht-key")
.help("Only generate a new dht key and print it"),
)
.arg(
Arg::new("dump-config")
.long("dump-config")
.help("Instead of running the server, print the configuration it would use to the console"),
)
.arg(
Arg::new("bootstrap")
.long("bootstrap")
.takes_value(true)
.value_name("BOOTSTRAP_LIST")
.help("Specify a list of bootstrap servers to use"),
)
.arg(
Arg::new("local")
.long("local")
.help("Enable local peer scope")
);
#[cfg(debug_assertions)]
let matches = matches.arg(
Arg::new("wait-for-debug")
.long("wait-for-debug")
.help("Wait for debugger to attach"),
);
Ok(matches.get_matches())
}
pub fn process_command_line() -> Result<(Settings, ArgMatches), String> {
// Get command line options
let default_config_path = Settings::get_default_config_path();
let matches = do_clap_matches(default_config_path.as_os_str())
.map_err(|e| format!("failed to parse command line: {}", e))?;
// Check for one-off commands
#[cfg(debug_assertions)]
if matches.occurrences_of("wait-for-debug") != 0 {
use bugsalot::debugger;
debugger::wait_until_attached(None).expect("state() not implemented on this platform");
}
// Attempt to load configuration
let settings = Settings::new(
matches.occurrences_of("config-file") == 0,
matches.value_of_os("config-file").unwrap(),
)
.map_err(|e| format!("configuration is invalid: {}", e))?;
// write lock the settings
let mut settingsrw = settings.write();
// Set config from command line
if matches.occurrences_of("daemon") != 0 {
settingsrw.daemon = true;
settingsrw.logging.terminal.enabled = false;
}
if matches.occurrences_of("subnode_index") != 0 {
let subnode_index = match matches.value_of("subnode_index") {
Some(x) => x
.parse()
.map_err(|e| format!("couldn't parse subnode index: {}", e))?,
None => {
return Err("value not specified for subnode_index".to_owned());
}
};
if subnode_index == 0 {
return Err("value of subnode_index should be between 1 and 65535".to_owned());
}
settingsrw.testing.subnode_index = subnode_index;
}
if matches.occurrences_of("debug") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Debug;
}
if matches.occurrences_of("trace") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = LogLevel::Trace;
}
if matches.is_present("attach") {
settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true"));
}
if matches.is_present("local") {
settingsrw.core.network.enable_local_peer_scope = true;
}
if matches.occurrences_of("bootstrap") != 0 {
let bootstrap = match matches.value_of("bootstrap") {
Some(x) => {
println!("Overriding bootstrap with: ");
let mut out: Vec<ParsedNodeDialInfo> = Vec::new();
for x in x.split(',') {
println!(" {}", x);
out.push(ParsedNodeDialInfo::from_str(x).map_err(|e| {
format!(
"unable to parse dial info in bootstrap list: {} for {}",
e, x
)
})?);
}
out
}
None => {
return Err("value not specified for bootstrap".to_owned());
}
};
settingsrw.core.network.bootstrap = bootstrap;
}
// Apply subnode index if we're testing
drop(settingsrw);
settings
.apply_subnode_index()
.map_err(|_| "failed to apply subnode index".to_owned())?;
Ok((settings, matches))
}

View File

@@ -4,26 +4,62 @@
mod client_api;
mod client_log_channel;
mod cmdline;
mod server;
mod settings;
#[cfg(unix)]
mod unix;
mod veilid_logs;
#[cfg(windows)]
mod windows;
use async_std::task;
use cfg_if::*;
use server::*;
use veilid_logs::*;
#[allow(clippy::all)]
pub mod veilid_client_capnp {
include!(concat!(env!("OUT_DIR"), "/proto/veilid_client_capnp.rs"));
}
cfg_if::cfg_if! {
if #[cfg(windows)] {
mod windows;
fn main() -> Result<(), String> {
let (settings, matches) = cmdline::process_command_line()?;
fn main() -> windows_service::Result<(), String> {
windows::main()
// --- Dump Config ---
if matches.occurrences_of("dump-config") != 0 {
//let cfg = config::Config::try_from(&*settingsr);
return serde_yaml::to_writer(std::io::stdout(), &*settings.read())
.map_err(|e| e.to_string());
}
// --- Generate Id ---
if matches.occurrences_of("generate-id") != 0 {
let (key, secret) = veilid_core::generate_secret();
println!("Public: {}\nSecret: {}", key.encode(), secret.encode());
return Ok(());
}
// --- Daemon Mode ----
if settings.read().daemon {
cfg_if! {
if #[cfg(windows)] {
return windows::run_service(settings, matches).map_err(|e| format!("{}", e));
} else if #[cfg(unix)] {
return unix::run_daemon(settings, matches);
}
}
}
else {
mod unix;
fn main() -> Result<(), String> {
async_std::task::block_on(unix::main())
}
}
// Init combined console/file logger
let logs = VeilidLogs::setup_normal_logs(settings.clone())?;
// --- Normal Startup ---
ctrlc::set_handler(move || {
shutdown();
})
.expect("Error setting Ctrl-C handler");
// Run the server loop
task::block_on(async { run_veilid_server(settings, logs).await })
}

161
veilid-server/src/server.rs Normal file
View File

@@ -0,0 +1,161 @@
use crate::client_api;
use crate::settings::*;
use crate::veilid_logs::*;
use async_std::channel::{bounded, Receiver, Sender};
use lazy_static::*;
use log::*;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::{Duration, Instant};
use veilid_core::xx::SingleShotEventual;
lazy_static! {
static ref SHUTDOWN_SWITCH: Mutex<Option<SingleShotEventual<()>>> =
Mutex::new(Some(SingleShotEventual::new(())));
}
pub fn shutdown() {
let shutdown_switch = SHUTDOWN_SWITCH.lock().take();
if let Some(shutdown_switch) = shutdown_switch {
shutdown_switch.resolve(());
}
}
pub async fn run_veilid_server(settings: Settings, logs: VeilidLogs) -> Result<(), String> {
let settingsr = settings.read();
// Create Veilid Core
let veilid_core = veilid_core::VeilidCore::new();
// Create client api state change pipe
let (sender, receiver): (
Sender<veilid_core::VeilidStateChange>,
Receiver<veilid_core::VeilidStateChange>,
) = bounded(1);
// Create VeilidCore setup
let vcs = veilid_core::VeilidCoreSetup {
state_change_callback: Arc::new(
move |change: veilid_core::VeilidStateChange| -> veilid_core::SystemPinBoxFuture<()> {
let sender = sender.clone();
Box::pin(async move {
if sender.send(change).await.is_err() {
error!("error sending state change callback");
}
})
},
),
config_callback: settings.get_core_config_callback(),
};
// Start Veilid Core and get API
let veilid_api = veilid_core
.startup(vcs)
.await
.map_err(|e| format!("VeilidCore startup failed: {}", e))?;
// Start client api if one is requested
let mut capi = if settingsr.client_api.enabled {
let some_capi = client_api::ClientApi::new(veilid_api.clone());
some_capi
.clone()
.run(settingsr.client_api.listen_address.addrs.clone());
Some(some_capi)
} else {
None
};
// Drop rwlock on settings
let auto_attach = settingsr.auto_attach;
drop(settingsr);
// Handle state changes on main thread for capnproto rpc
let state_change_receiver_jh = capi.clone().map(|capi| {
async_std::task::spawn_local(async move {
while let Ok(change) = receiver.recv().await {
capi.clone().handle_state_change(change);
}
})
});
// Handle log messages on main thread for capnproto rpc
let client_log_receiver_jh = capi
.clone()
.map(|capi| {
logs.client_log_channel
.clone()
.map(|mut client_log_channel| {
async_std::task::spawn_local(async move {
// Batch messages to either 16384 chars at once or every second to minimize packets
let rate = Duration::from_secs(1);
let mut start = Instant::now();
let mut messages = String::new();
loop {
let timeout_dur =
rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
match async_std::future::timeout(timeout_dur, client_log_channel.recv())
.await
{
Ok(Ok(message)) => {
messages += &message;
if messages.len() > 16384 {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
Ok(Err(_)) => break,
Err(_) => {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
}
})
})
})
.flatten();
// Auto-attach if desired
if auto_attach {
info!("Auto-attach to the Veilid network");
if let Err(e) = veilid_api.attach().await {
error!("Auto-attaching to the Veilid network failed: {:?}", e);
shutdown();
}
}
// Idle while waiting to exit
let shutdown_switch = {
let shutdown_switch_locked = SHUTDOWN_SWITCH.lock();
(*shutdown_switch_locked).as_ref().map(|ss| ss.instance())
};
if let Some(shutdown_switch) = shutdown_switch {
shutdown_switch.await;
}
// Stop the client api if we have one
if let Some(c) = capi.as_mut().cloned() {
c.stop().await;
}
// Shut down Veilid API to release state change sender
veilid_api.shutdown().await;
// Close the client api log channel if it is open to release client log sender
if let Some(client_log_channel_closer) = logs.client_log_channel_closer {
client_log_channel_closer.close();
}
// Wait for state change receiver to exit
if let Some(state_change_receiver_jh) = state_change_receiver_jh {
state_change_receiver_jh.await;
}
// Wait for client api log receiver to exit
if let Some(client_log_receiver_jh) = client_log_receiver_jh {
client_log_receiver_jh.await;
}
Ok(())
}

View File

@@ -18,34 +18,34 @@ pub fn load_default_config(cfg: &mut config::Config) -> Result<(), config::Confi
daemon: false
client_api:
enabled: true
listen_address: "localhost:5959"
listen_address: 'localhost:5959'
auto_attach: true
logging:
terminal:
enabled: true
level: "info"
level: 'info'
file:
enabled: false
path: ""
path: ''
append: true
level: "info"
level: 'info'
client:
enabled: true
level: "info"
level: 'info'
testing:
subnode_index: 0
core:
protected_store:
allow_insecure_fallback: true
always_use_insecure_storage: false
insecure_fallback_directory: "%INSECURE_FALLBACK_DIRECTORY%"
insecure_fallback_directory: '%INSECURE_FALLBACK_DIRECTORY%'
table_store:
directory: "%TABLE_STORE_DIRECTORY%"
directory: '%TABLE_STORE_DIRECTORY%'
network:
max_connections: 16
connection_initial_timeout: 2000000
node_id: ""
node_id_secret: ""
node_id: ''
node_id_secret: ''
bootstrap: []
rpc:
concurrency: 0
@@ -73,46 +73,46 @@ core:
enable_local_peer_scope: false
restricted_nat_retries: 3
tls:
certificate_path: "/etc/veilid/server.crt"
private_key_path: "/etc/veilid/private/server.key"
certificate_path: '/etc/veilid/server.crt'
private_key_path: '/etc/veilid/private/server.key'
connection_initial_timeout: 2000000
application:
https:
enabled: false
listen_address: "[::]:5150"
path: "app"
# url: "https://localhost:5150"
listen_address: '[::]:5150'
path: 'app'
# url: 'https://localhost:5150'
http:
enabled: false
listen_address: "[::]:5150"
path: "app"
# url: "http://localhost:5150"
listen_address: '[::]:5150'
path: 'app'
# url: 'http://localhost:5150'
protocol:
udp:
enabled: true
socket_pool_size: 0
listen_address: "[::]:5150"
# public_address: ""
listen_address: '[::]:5150'
# public_address: ''
tcp:
connect: true
listen: true
max_connections: 32
listen_address: "[::]:5150"
# "public_address": ""
listen_address: '[::]:5150'
#'public_address: ''
ws:
connect: true
listen: true
max_connections: 16
listen_address: "[::]:5150"
path: "ws"
# url: "ws://localhost:5150/ws"
listen_address: '[::]:5150'
path: 'ws'
# url: 'ws://localhost:5150/ws'
wss:
connect: true
listen: false
max_connections: 16
listen_address: "[::]:5150"
path: "ws"
# url: ""
listen_address: '[::]:5150'
path: 'ws'
# url: ''
leases:
max_server_signal_leases: 256
max_server_relay_leases: 8
@@ -558,10 +558,9 @@ pub struct SettingsInner {
pub core: Core,
}
type Handle<T> = Arc<RwLock<T>>;
#[derive(Clone, Debug)]
pub struct Settings {
inner: Handle<SettingsInner>,
inner: Arc<RwLock<SettingsInner>>,
}
impl Settings {

View File

@@ -1,4 +1,3 @@
#[cfg(unix)]
use crate::client_api;
use crate::client_log_channel::*;
use crate::settings;
@@ -16,392 +15,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use veilid_core::xx::SingleShotEventual;
fn parse_command_line(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = App::new("veilid-server")
.version("0.1")
.about("Veilid Server")
.color(clap::ColorChoice::Auto)
.arg(
Arg::new("daemon")
.long("daemon")
.short('d')
.help("Run in daemon mode in the background"),
)
.arg(
Arg::new("config-file")
.short('c')
.long("config-file")
.takes_value(true)
.value_name("FILE")
.default_value_os(default_config_path)
.help("Specify a configuration file to use"),
).arg(
Arg::new("attach")
.long("attach")
.takes_value(true)
.value_name("BOOL")
.possible_values(&["false", "true"])
.help("Automatically attach the server to the Veilid network"),
)
// Dev options
.arg(
Arg::new("debug")
.long("debug")
.help("Turn on debug logging on the terminal"),
)
.arg(
Arg::new("trace")
.long("trace")
.conflicts_with("debug")
.help("Turn on trace logging on the terminal"),
)
.arg(
Arg::new("subnode_index")
.long("subnode_index")
.takes_value(true)
.help("Run as an extra daemon on the same machine for testing purposes, specify a number greater than zero to offset the listening ports"),
)
.arg(
Arg::new("generate-dht-key")
.long("generate-dht-key")
.help("Only generate a new dht key and print it"),
)
.arg(
Arg::new("dump-config")
.long("dump-config")
.help("Instead of running the server, print the configuration it would use to the console"),
)
.arg(
Arg::new("bootstrap")
.long("bootstrap")
.takes_value(true)
.value_name("BOOTSTRAP_LIST")
.help("Specify a list of bootstrap servers to use"),
)
.arg(
Arg::new("local")
.long("local")
.help("Enable local peer scope")
);
#[cfg(debug_assertions)]
let matches = matches.arg(
Arg::new("wait-for-debug")
.long("wait-for-debug")
.help("Wait for debugger to attach"),
);
Ok(matches.get_matches())
}
lazy_static! {
static ref SHUTDOWN_SWITCH: Mutex<Option<SingleShotEventual<()>>> =
Mutex::new(Some(SingleShotEventual::new(())));
}
pub fn shutdown() {
let shutdown_switch = SHUTDOWN_SWITCH.lock().take();
if let Some(shutdown_switch) = shutdown_switch {
shutdown_switch.resolve(());
}
}
pub async fn main() -> Result<(), String> {
// Wait until signal
ctrlc::set_handler(move || {
shutdown();
})
.expect("Error setting Ctrl-C handler");
// Get command line options
let default_config_path = settings::Settings::get_default_config_path();
let matches = parse_command_line(default_config_path.as_os_str())
.map_err(|e| format!("failed to parse command line: {}", e))?;
// Check for one-off commands
#[cfg(debug_assertions)]
if matches.occurrences_of("wait-for-debug") != 0 {
use bugsalot::debugger;
debugger::wait_until_attached(None).expect("state() not implemented on this platform");
}
if matches.occurrences_of("generate-id") != 0 {
let (key, secret) = veilid_core::generate_secret();
println!("Public: {}\nSecret: {}", key.encode(), secret.encode());
return Ok(());
}
// Attempt to load configuration
let settings = settings::Settings::new(
matches.occurrences_of("config-file") == 0,
matches.value_of_os("config-file").unwrap(),
)
.map_err(|e| format!("configuration is invalid: {}", e))?;
// write lock the settings
let mut settingsrw = settings.write();
// Set config from command line
if matches.occurrences_of("daemon") != 0 {
settingsrw.daemon = true;
settingsrw.logging.terminal.enabled = false;
}
if matches.occurrences_of("subnode_index") != 0 {
let subnode_index = match matches.value_of("subnode_index") {
Some(x) => x
.parse()
.map_err(|e| format!("couldn't parse subnode index: {}", e))?,
None => {
return Err("value not specified for subnode_index".to_owned());
}
};
if subnode_index == 0 {
return Err("value of subnode_index should be between 1 and 65535".to_owned());
}
settingsrw.testing.subnode_index = subnode_index;
}
if matches.occurrences_of("debug") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = settings::LogLevel::Debug;
}
if matches.occurrences_of("trace") != 0 {
settingsrw.logging.terminal.enabled = true;
settingsrw.logging.terminal.level = settings::LogLevel::Trace;
}
if matches.is_present("attach") {
settingsrw.auto_attach = !matches!(matches.value_of("attach"), Some("true"));
}
if matches.is_present("local") {
settingsrw.core.network.enable_local_peer_scope = true;
}
if matches.occurrences_of("bootstrap") != 0 {
let bootstrap = match matches.value_of("bootstrap") {
Some(x) => {
println!("Overriding bootstrap with: ");
let mut out: Vec<settings::ParsedNodeDialInfo> = Vec::new();
for x in x.split(',') {
println!(" {}", x);
out.push(settings::ParsedNodeDialInfo::from_str(x).map_err(|e| {
format!(
"unable to parse dial info in bootstrap list: {} for {}",
e, x
)
})?);
}
out
}
None => {
return Err("value not specified for bootstrap".to_owned());
}
};
settingsrw.core.network.bootstrap = bootstrap;
}
// Apply subnode index if we're testing
drop(settingsrw);
settings
.apply_subnode_index()
.map_err(|_| "failed to apply subnode index".to_owned())?;
let settingsr = settings.read();
// Set up loggers
let mut logs: Vec<Box<dyn SharedLogger>> = Vec::new();
let mut client_log_channel: Option<ClientLogChannel> = None;
let mut client_log_channel_closer: Option<ClientLogChannelCloser> = None;
let mut cb = ConfigBuilder::new();
cb.add_filter_ignore_str("async_std");
cb.add_filter_ignore_str("async_io");
cb.add_filter_ignore_str("polling");
cb.add_filter_ignore_str("rustls");
cb.add_filter_ignore_str("async_tungstenite");
cb.add_filter_ignore_str("tungstenite");
cb.add_filter_ignore_str("netlink_proto");
cb.add_filter_ignore_str("netlink_sys");
if settingsr.logging.terminal.enabled {
logs.push(TermLogger::new(
settings::convert_loglevel(settingsr.logging.terminal.level),
cb.build(),
TerminalMode::Mixed,
ColorChoice::Auto,
))
}
if settingsr.logging.file.enabled {
let log_path = Path::new(&settingsr.logging.file.path);
let logfile;
if settingsr.logging.file.append {
logfile = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.map_err(|e| format!("failed to open log file: {}", e))?
} else {
logfile = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(log_path)
.map_err(|e| format!("failed to open log file: {}", e))?
}
logs.push(WriteLogger::new(
settings::convert_loglevel(settingsr.logging.file.level),
cb.build(),
logfile,
))
}
if settingsr.logging.client.enabled {
let (clog, clogwriter, clogcloser) = ClientLogChannel::new();
client_log_channel = Some(clog);
client_log_channel_closer = Some(clogcloser);
logs.push(WriteLogger::new(
settings::convert_loglevel(settingsr.logging.client.level),
cb.build(),
clogwriter,
))
}
// --- Dump Config ---
if matches.occurrences_of("dump-config") != 0 {
//let cfg = config::Config::try_from(&*settingsr);
return serde_yaml::to_writer(std::io::stdout(), &*settingsr).map_err(|e| e.to_string());
}
// --- Normal Startup ---
CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?;
// Create Veilid Core
let veilid_core = veilid_core::VeilidCore::new();
// Create client api state change pipe
let (sender, receiver): (
Sender<veilid_core::VeilidStateChange>,
Receiver<veilid_core::VeilidStateChange>,
) = bounded(1);
// Create VeilidCore setup
let vcs = veilid_core::VeilidCoreSetup {
state_change_callback: Arc::new(
move |change: veilid_core::VeilidStateChange| -> veilid_core::SystemPinBoxFuture<()> {
let sender = sender.clone();
Box::pin(async move {
if sender.send(change).await.is_err() {
error!("error sending state change callback");
}
})
},
),
config_callback: settings.get_core_config_callback(),
};
// Start Veilid Core and get API
let veilid_api = veilid_core
.startup(vcs)
.await
.map_err(|e| format!("VeilidCore startup failed: {}", e))?;
// Start client api if one is requested
let mut capi = if settingsr.client_api.enabled {
let some_capi = client_api::ClientApi::new(veilid_api.clone());
some_capi
.clone()
.run(settingsr.client_api.listen_address.addrs.clone());
Some(some_capi)
} else {
None
};
// Drop rwlock on settings
let auto_attach = settingsr.auto_attach;
drop(settingsr);
// Handle state changes on main thread for capnproto rpc
let state_change_receiver_jh = capi.clone().map(|capi| {
async_std::task::spawn_local(async move {
while let Ok(change) = receiver.recv().await {
capi.clone().handle_state_change(change);
}
})
});
// Handle log messages on main thread for capnproto rpc
let client_log_receiver_jh = capi
.clone()
.map(|capi| {
client_log_channel.take().map(|mut client_log_channel| {
async_std::task::spawn_local(async move {
// Batch messages to either 16384 chars at once or every second to minimize packets
let rate = Duration::from_secs(1);
let mut start = Instant::now();
let mut messages = String::new();
loop {
let timeout_dur =
rate.checked_sub(start.elapsed()).unwrap_or(Duration::ZERO);
match async_std::future::timeout(timeout_dur, client_log_channel.recv())
.await
{
Ok(Ok(message)) => {
messages += &message;
if messages.len() > 16384 {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
Ok(Err(_)) => break,
Err(_) => {
capi.clone()
.handle_client_log(core::mem::take(&mut messages));
start = Instant::now();
}
}
}
})
})
})
.flatten();
// Auto-attach if desired
if auto_attach {
info!("Auto-attach to the Veilid network");
if let Err(e) = veilid_api.attach().await {
error!("Auto-attaching to the Veilid network failed: {:?}", e);
shutdown();
}
}
// Idle while waiting to exit
let shutdown_switch = {
let shutdown_switch_locked = SHUTDOWN_SWITCH.lock();
(*shutdown_switch_locked).as_ref().map(|ss| ss.instance())
};
if let Some(shutdown_switch) = shutdown_switch {
shutdown_switch.await;
}
// Stop the client api if we have one
if let Some(c) = capi.as_mut().cloned() {
c.stop().await;
}
// Shut down Veilid API to release state change sender
veilid_api.shutdown().await;
// Close the client api log channel if it is open to release client log sender
if let Some(client_log_channel_closer) = client_log_channel_closer {
client_log_channel_closer.close();
}
// Wait for state change receiver to exit
if let Some(state_change_receiver_jh) = state_change_receiver_jh {
state_change_receiver_jh.await;
}
// Wait for client api log receiver to exit
if let Some(client_log_receiver_jh) = client_log_receiver_jh {
client_log_receiver_jh.await;
}
pub fn run_daemon(settings: Settings, matches: ArgMatches) -> Result<(), String> {
eprintln!("Windows Service mode not implemented yet.");
Ok(())
}

View File

@@ -0,0 +1,80 @@
use crate::client_log_channel::*;
use crate::settings::*;
use simplelog::*;
use std::fs::OpenOptions;
use std::path::Path;
pub struct VeilidLogs {
pub client_log_channel: Option<ClientLogChannel>,
pub client_log_channel_closer: Option<ClientLogChannelCloser>,
}
impl VeilidLogs {
pub fn setup_normal_logs(settings: Settings) -> Result<VeilidLogs, String> {
let settingsr = settings.read();
// Set up loggers
let mut logs: Vec<Box<dyn SharedLogger>> = Vec::new();
let mut client_log_channel: Option<ClientLogChannel> = None;
let mut client_log_channel_closer: Option<ClientLogChannelCloser> = None;
let mut cb = ConfigBuilder::new();
cb.add_filter_ignore_str("async_std");
cb.add_filter_ignore_str("async_io");
cb.add_filter_ignore_str("polling");
cb.add_filter_ignore_str("rustls");
cb.add_filter_ignore_str("async_tungstenite");
cb.add_filter_ignore_str("tungstenite");
cb.add_filter_ignore_str("netlink_proto");
cb.add_filter_ignore_str("netlink_sys");
if settingsr.logging.terminal.enabled {
logs.push(TermLogger::new(
convert_loglevel(settingsr.logging.terminal.level),
cb.build(),
TerminalMode::Mixed,
ColorChoice::Auto,
))
}
if settingsr.logging.file.enabled {
let log_path = Path::new(&settingsr.logging.file.path);
let logfile;
if settingsr.logging.file.append {
logfile = OpenOptions::new()
.create(true)
.append(true)
.open(log_path)
.map_err(|e| format!("failed to open log file: {}", e))?
} else {
logfile = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(log_path)
.map_err(|e| format!("failed to open log file: {}", e))?
}
logs.push(WriteLogger::new(
convert_loglevel(settingsr.logging.file.level),
cb.build(),
logfile,
))
}
if settingsr.logging.client.enabled {
let (clog, clogwriter, clogcloser) = ClientLogChannel::new();
client_log_channel = Some(clog);
client_log_channel_closer = Some(clogcloser);
logs.push(WriteLogger::new(
convert_loglevel(settingsr.logging.client.level),
cb.build(),
clogwriter,
))
}
CombinedLogger::init(logs).map_err(|e| format!("failed to init logs: {}", e))?;
Ok(VeilidLogs {
client_log_channel,
client_log_channel_closer,
})
}
}

View File

@@ -1 +1,71 @@
use crate::settings::*;
use clap::ArgMatches;
use log::*;
use std::ffi::OsString;
use std::time::Duration;
use windows_service::service::{
ServiceControl, ServiceControlAccept, ServiceExitCode, ServiceState, ServiceStatus, ServiceType,
};
use windows_service::service_control_handler::ServiceControlHandlerResult;
use windows_service::*;
// Register generated `ffi_service_main` with the system and start the service, blocking
// this thread until the service is stopped.
pub fn run_service(settings: Settings, matches: ArgMatches) -> windows_service::Result<()> {
eprintln!("Windows Service mode not implemented yet.");
//service_dispatcher::start("veilid-server", ffi_veilid_service_main)?;
//
Ok(())
}
///////////////
define_windows_service!(ffi_veilid_service_main, veilid_service_main);
fn veilid_service_main(arguments: Vec<OsString>) {
if let Err(e) = register_service_handler(arguments) {
error!("{}", e);
}
}
///////////////
fn register_service_handler(arguments: Vec<OsString>) -> windows_service::Result<()> {
let event_handler = move |control_event| -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Stop => {
// Handle stop event and return control back to the system.
ServiceControlHandlerResult::NoError
}
// All services must accept Interrogate even if it's a no-op.
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
_ => ServiceControlHandlerResult::NotImplemented,
}
};
// Register system service event handler
let status_handle = service_control_handler::register("veilid-server", event_handler)?;
let next_status = ServiceStatus {
// Should match the one from system service registry
service_type: ServiceType::OWN_PROCESS,
// The new state
current_state: ServiceState::Running,
// Accept stop events when running
controls_accepted: ServiceControlAccept::STOP,
// Used to report an error when starting or stopping only, otherwise must be zero
exit_code: ServiceExitCode::Win32(0),
// Only used for pending states, otherwise must be zero
checkpoint: 0,
// Only used for pending states, otherwise must be zero
wait_hint: Duration::default(),
// Unused for setting status
process_id: None,
};
// Tell the system that the service is running now
status_handle.set_service_status(next_status)?;
// Do some work
Ok(())
}