diff --git a/Cargo.lock b/Cargo.lock index fa7ecbf..230c31a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,17 +76,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async-trait" -version = "0.1.89" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atomic-waker" version = "1.1.2" @@ -965,7 +954,6 @@ name = "mc-ingress" version = "0.1.0" dependencies = [ "anyhow", - "async-trait", "clap", "either", "evalexpr", diff --git a/Cargo.toml b/Cargo.toml index 6b03c20..31b909b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,4 +41,3 @@ nix = { version= "0.30.1", features = [ "zerocopy"] } tokio-splice2 = "0.3.2" strip-ansi-escapes = "0.2.1" evalexpr = { version = "13.1.0", features = ["regex"] } -async-trait = "0.1.89" diff --git a/src/kube_cache.rs b/src/kube_cache.rs index 27ae892..9b9e671 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -18,8 +18,6 @@ use crate::{ packets::{clientbound::status::StatusTrait, SendPacket}, OpaqueError, }; -const MAIN_LABEL: &str = "tami.moe/minecraft"; -const PORT_LABEL: &str = "tami.moe/minecraft-port"; /// This is the layer who is respinsible for caching requests. /// @@ -59,12 +57,12 @@ impl KubeCache { } async fn get_deploys(&self) -> ObjectList { // let lp: ListParams = ListParams::default(); - let lp: ListParams = ListParams::default().labels(MAIN_LABEL); + let lp: ListParams = ListParams::default().labels("tami.moe/minecraft"); self.deployments.list(&lp).await.unwrap() } async fn get_srvs(&self) -> ObjectList { // let lp: ListParams = ListParams::default(); - let lp: ListParams = ListParams::default().labels(MAIN_LABEL); + let lp: ListParams = ListParams::default().labels("tami.moe/minecraft"); self.services.list(&lp).await.unwrap() } @@ -166,6 +164,7 @@ impl MinecraftAPI for McApi { a } }; + tracing::info!(inter_addr = inter_addr); return Ok(Server { dep: deployment, srv: service, @@ -176,8 +175,77 @@ impl MinecraftAPI for McApi { }); } - fn get_map(&self) -> Arc>>> { - self.map.clone() + async fn start_watch( + self, + server: impl MinecraftServerHandle, + frequency: Duration, + ) -> Result<(), OpaqueError> { + let addr = server.get_addr().ok_or("could not get addr of server")?; + let port = server.get_port().ok_or("could not get port of server")?; + let full_addr = format!("{addr}:{port}"); + + if let Some(handle) = self.map.lock().await.get(&full_addr) { + if !handle.is_finished() { + return Ok(()); + } + } + let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", addr, port); + + let full_addr_clone = full_addr.clone(); + let api = self.clone(); + let handle = tokio::spawn( + async move { + tracing::info!("starting watcher"); + loop { + tokio::time::sleep(frequency).await; + let server = match api.query_server(&addr, &port).await { + Ok(x) => x, + Err(e) => { + tracing::error!( + err = format!("{}", e.context), + "could not query server" + ); + return; + } + }; + let status_json = match server.query_description().await { + Ok(x) => x, + Err(e) => { + tracing::error!( + err = format!("{}", e.context), + "could not query description" + ); + return; + } + }; + if status_json.get_players_online() == 0 { + // With this I don't need to specify that StatusTrait + // should be send as well. + + // Otherwise I would need to have it be defined as: + // trait StatusTrait: Send { ... } + drop(status_json); + let mut guard = api.map.lock().await; + guard.remove(&full_addr_clone); + drop(guard); + if let Err(err) = server.stop().await { + tracing::error!( + trace = %err.print_span_trace(), + err = err.context, + msg = "failed to stop server" + ); + } + return; + } + } + } + .instrument(span), + ); + let mut guard = self.map.lock().await; + guard.insert(full_addr.clone(), handle); + drop(guard); + + Ok(()) } } @@ -220,10 +288,10 @@ impl fmt::Debug for Server { .name .unwrap_or("#error#".to_string()), ) + .field("server_addr", &self.server_addr) .finish() } } -#[async_trait::async_trait] impl MinecraftServerHandle for Server { async fn start(&self) -> Result<(), OpaqueError> { self.set_scale(1).await.map_err(|e| { @@ -287,8 +355,8 @@ impl MinecraftServerHandle for Server { self.inter_addr.as_str() } - fn get_addr(&self) -> String { - self.server_addr.clone() + fn get_addr(&self) -> Option { + Some(self.server_addr.clone()) } async fn query_description(&self) -> Result, OpaqueError> { @@ -297,7 +365,9 @@ impl MinecraftServerHandle for Server { ServerDeploymentStatus::Connectable(mut tcp_stream) => { let handshake = crate::packets::serverbound::handshake::Handshake::create( crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?, - crate::types::VarString::from(self.get_addr()), + crate::types::VarString::from( + self.get_addr().ok_or("failed to get addr of server")?, + ), crate::types::UShort::from(1234), crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?, ) @@ -331,8 +401,8 @@ impl MinecraftServerHandle for Server { } } - fn get_port(&self) -> String { - self.server_port.clone() + fn get_port(&self) -> Option { + Some(self.server_port.clone()) } fn get_motd(&self) -> Option { @@ -362,13 +432,15 @@ impl MinecraftServerHandle for Server { impl Server { async fn set_scale(&self, num: i32) -> Result<(), kube::Error> { let name = self - .dep + .srv .metadata .clone() .name .unwrap_or("#error#".to_string()); - let _res = self.cache.set_dep_scale(&name, num).await?; - tracing::info!("scaled replicas of {} to {num}", self.server_addr); + let res = self.cache.set_dep_scale(&name, num).await; + if res.is_ok() { + tracing::info!("scaled replicas of {} to {num}", self.server_addr); + } Ok(()) } } @@ -381,8 +453,8 @@ where res.labels() .iter() .filter(|(key, value)| match key.as_str() { - MAIN_LABEL => value.as_str() == addr, - PORT_LABEL => { + "tami.moe/minecraft" => value.as_str() == addr, + "tami.moe/minecraft-port" => { found_port = true; value.as_str() == port } diff --git a/src/main.rs b/src/main.rs index 0c0a9e8..044cbc9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -100,12 +100,9 @@ async fn main() { async fn process_connection( mut client_stream: TcpStream, addr: SocketAddr, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, + api: impl MinecraftAPI, config: Config, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ +) -> Result<(), OpaqueError> { // this is wrapper so that async doesnt mess up the span, and // to make sure this doesn't propagate to later `handle_*` #[tracing::instrument(level = "info", skip(client_stream, config))] @@ -188,10 +185,7 @@ async fn handle_status( client_stream: &mut TcpStream, handshake: &Handshake, api: impl MinecraftAPI, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ +) -> Result<(), OpaqueError> { let client_packet = Packet::parse(client_stream).await?; if client_packet.id.get_int() != 0 { return Err(OpaqueError::create(&format!( @@ -282,11 +276,8 @@ async fn handle_login( client_stream: &mut TcpStream, handshake: &Handshake, login_start: LoginStart, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ + api: impl MinecraftAPI, +) -> Result<(), OpaqueError> { let server = api .query_server( &handshake.get_server_address(), @@ -299,7 +290,7 @@ where tracing::debug!(msg = "server status", status = ?status); match status { ServerDeploymentStatus::Connectable(mut server_stream) => { - api.start_watch(server.clone(), Duration::from_secs(60)) + api.start_watch(server.clone(), Duration::from_secs(600)) .await?; // referenced from: @@ -350,7 +341,7 @@ where } ServerDeploymentStatus::Offline => { server.start().await?; - api.start_watch(server.clone(), Duration::from_secs(60)) + api.start_watch(server.clone(), Duration::from_secs(600)) .await?; mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; } diff --git a/src/mc_server.rs b/src/mc_server.rs index a4b097d..88e6e8b 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -1,7 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; - use tokio::{io::AsyncWriteExt, net::TcpStream}; -use tracing::Instrument; use crate::{ packets::{ @@ -63,14 +60,13 @@ pub async fn send_disconnect( Ok(()) } -#[async_trait::async_trait] pub trait MinecraftServerHandle: Clone { async fn start(&self) -> Result<(), OpaqueError>; async fn stop(&self) -> Result<(), OpaqueError>; async fn query_status(&self) -> Result; fn get_internal_address(&self) -> &str; - fn get_addr(&self) -> String; - fn get_port(&self) -> String; + fn get_addr(&self) -> Option; + fn get_port(&self) -> Option; fn get_motd(&self) -> Option; async fn query_server_connectable(&self) -> Result { @@ -115,82 +111,24 @@ pub trait MinecraftServerHandle: Clone { tracing::trace!("data exchanged while proxying status: {:?}", data_amount); Ok(()) } - async fn query_description(&self) -> Result, OpaqueError>; + // TODO: move the implementation to here, but + // the async things are *strange* in rust + fn query_description( + &self, + ) -> impl std::future::Future, OpaqueError>> + Send; } pub trait MinecraftAPI { async fn query_server(&self, addr: &str, port: &str) -> Result; - fn get_map(&self) -> Arc>>>; // TODO: move the implementation to here, but /// This should be callable even if there is already a watcher, /// and it should handle the collision itself while returning OK(). async fn start_watch( self, - server: impl MinecraftServerHandle + Send + Sync + 'static, + server: impl MinecraftServerHandle, frequency: std::time::Duration, - ) -> Result<(), OpaqueError> - where - Self: Send + Sync + 'static + Clone, - { - let inter_addr = server.get_internal_address().to_string(); - - if let Some(handle) = self.get_map().lock().await.get(&inter_addr) { - if !handle.is_finished() { - return Ok(()); - } - } - let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", inter_addr, join_addr = server.get_addr(), join_port = server.get_port()); - - let full_addr_clone = inter_addr.clone(); - let api = self.clone(); - let handle = tokio::spawn( - async move { - tracing::info!("starting watcher"); - loop { - tokio::time::sleep(frequency).await; - let status_json = match server.query_description().await { - Ok(x) => x, - Err(e) => { - tracing::error!( - err = format!("{}", e.context), - "could not query description" - ); - return; - } - }; - if status_json.get_players_online() == 0 { - // With this I don't need to specify that StatusTrait - // should be send as well. - - // Otherwise I would need to have it be defined as: - // trait StatusTrait: Send { ... } - drop(status_json); - let map = api.get_map(); - let mut guard = map.lock().await; - guard.remove(&full_addr_clone); - drop(guard); - drop(map); - if let Err(err) = server.stop().await { - tracing::error!( - trace = %err.print_span_trace(), - err = err.context, - msg = "failed to stop server" - ); - } - return; - } - } - } - .instrument(span), - ); - let map = self.get_map(); - let mut guard = map.lock().await; - guard.insert(inter_addr.clone(), handle); - drop(guard); - - Ok(()) - } + ) -> Result<(), OpaqueError>; } pub enum ServerDeploymentStatus {