From 46c3a994c6b2db66406a7e433d4c77e255bfa43e Mon Sep 17 00:00:00 2001 From: Tamipes Date: Sun, 7 Jun 2026 16:03:44 +0200 Subject: [PATCH] feat: impement kube manifest caching in `kube_cache`, also fix `Server` to request data from the cache, instead of saving it --- src/kube_cache.rs | 250 +++++++++++++++++++++------------------------- src/main.rs | 102 ++++++++++--------- src/mc_server.rs | 40 +++++++- 3 files changed, 207 insertions(+), 185 deletions(-) diff --git a/src/kube_cache.rs b/src/kube_cache.rs index 27ae892..9a762ea 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -1,21 +1,17 @@ use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; -use k8s_openapi::{ - api::{apps::v1::Deployment, core::v1::Service}, - apimachinery::pkg::util::intstr::IntOrString, -}; +use futures::StreamExt; +use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; use kube::{ api::{ListParams, ObjectList, Patch, PatchParams}, - runtime::reflector::Lookup, + runtime::{reflector::Lookup, WatchStreamExt}, Api, Client, ResourceExt, }; use serde_json::json; use tokio::task::JoinHandle; -use tracing::Instrument; use crate::{ mc_server::{sanitize_addr, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}, - packets::{clientbound::status::StatusTrait, SendPacket}, OpaqueError, }; const MAIN_LABEL: &str = "tami.moe/minecraft"; @@ -28,69 +24,94 @@ const PORT_LABEL: &str = "tami.moe/minecraft-port"; /// the underlying async data access. #[derive(Debug, Clone)] pub struct KubeCache { - deployments: Api, - services: Api, + // When the store gets copied, it points to the same backing store + // In other words, this satisfies my restriction for cloning and + // there is need no to wrap it in Arc<> + dep_cache: kube::runtime::reflector::Store, + dep_api: Api, + srv_api: Api, in_cluster: bool, } impl KubeCache { /// This initializes the creation of a "kubernetes client" /// and if it is not possible returns a None. - pub async fn create() -> Option { + /// + /// It also returns a JoinHandle which is tied to the api watcher + /// and if it returns that means that the kubecache is not responsive now. + pub async fn create() -> Option<(KubeCache, JoinHandle<()>)> { let in_cluster = match std::env::var("KUBERNETES_SERVICE_HOST") { - Ok(x) => true, - Err(e) => false, + Ok(_) => true, + Err(_) => false, }; let client = Client::try_default().await.unwrap(); - let deployments: Api = Api::default_namespaced(client.clone()); - let services: Api = Api::default_namespaced(client); + let dep_api: Api = Api::default_namespaced(client.clone()); + let lp = kube::runtime::watcher::Config::default().labels(MAIN_LABEL); + let (dep_reader, dep_writer) = kube::runtime::reflector::store(); + let dep_watcher_rf = + kube::runtime::reflector(dep_writer, kube::runtime::watcher(dep_api.clone(), lp)); - return Some(KubeCache { - deployments, - services, - in_cluster, + let handle = tokio::spawn(async move { + // Source: https://docs.rs/kube/latest/kube/runtime/fn.reflector.html + let infinite_watch = dep_watcher_rf + .applied_objects() + .for_each(|_o| std::future::ready(())); + let _res = infinite_watch.await; + tracing::error!( + "Deployments watcher ended. This should not happen. (Program should exit now)" + ); }); + + let srv_api: Api = Api::default_namespaced(client); + + return Some(( + KubeCache { + dep_cache: dep_reader, + dep_api: dep_api, + srv_api, + in_cluster, + }, + handle, + )); } - async fn get_dep(&self, name: &str) -> Result { - self.deployments.get(name).await + fn get_dep(&self, name: &str) -> Option> { + self.dep_cache.find(|x| x.name_any() == name) } async fn get_srv(&self, name: &str) -> Result { - self.services.get(name).await + self.srv_api.get(name).await } - async fn get_deploys(&self) -> ObjectList { + async fn list_deploys(&self) -> Vec> { + self.dep_cache.state() + } + async fn list_srvs(&self) -> ObjectList { // let lp: ListParams = ListParams::default(); let lp: ListParams = ListParams::default().labels(MAIN_LABEL); - self.deployments.list(&lp).await.unwrap() - } - async fn get_srvs(&self) -> ObjectList { - // let lp: ListParams = ListParams::default(); - let lp: ListParams = ListParams::default().labels(MAIN_LABEL); - self.services.list(&lp).await.unwrap() + self.srv_api.list(&lp).await.unwrap() } - pub async fn query_dep_addr(&self, addr: &str, port: &str) -> Option { - let deploys = self.get_deploys().await; - let result = deploys.iter().find(|x| filter_label_value(x, addr, port))?; - Some(result.name()?.to_string()) + pub async fn query_dep(&self, addr: &str, port: &str) -> Option> { + let deploys = self.list_deploys().await; + deploys + .into_iter() + .find(|x| filter_label_value(x.as_ref(), addr, port)) } pub async fn query_srv(&self, addr: &str, port: &str) -> Option { - let deploys = self.get_srvs().await; - let result = deploys.into_iter().find(|x| { + let deploys = self.list_srvs().await; + deploys.into_iter().find(|x| { let in_cluster = match x.spec.as_ref().unwrap().type_.as_ref() { Some(t) => t == "ClusterIP", None => false, }; let incorrect_type = in_cluster ^ self.in_cluster; - !incorrect_type && filter_label_value(&x, addr, port) - })?; - Some(result) + !incorrect_type && filter_label_value(x, addr, port) + }) } async fn set_dep_scale(&self, name: &str, num: i32) -> Result { let patch = Patch::Merge(json!({"spec":{"replicas": num}})); let pp = PatchParams::default(); - self.deployments.patch(name, &pp, &patch).await + self.dep_api.patch(name, &pp, &patch).await } } @@ -109,7 +130,7 @@ impl MinecraftAPI for McApi { async fn query_server(&self, addr: &str, port: &str) -> Result { let addr = sanitize_addr(&addr); - let dep_name = match self.cache.query_dep_addr(&addr, &port).await { + let deployment = match self.cache.query_dep(&addr, &port).await { Some(x) => x, None => { return Err(OpaqueError::create_with_kind( @@ -128,51 +149,50 @@ impl MinecraftAPI for McApi { } }; - let deployment = self.cache.get_dep(&dep_name).await.map_err(|x| { - format!( - "Failed to query cache for deployment with dep_name err:{}", - x.to_string() - ) - })?; + let internal_id = match deployment.metadata.name.clone() { + Some(x) => x, + None => { + return Err(OpaqueError::create_with_kind( + "Could not query deployment metadata for name", + "InternalIdQueryFailed", + )); + } + }; + tracing::debug!("found kubernetes deployment & service"); let service_port_spec = service.clone().spec.unwrap().ports.unwrap(); - let port = service_port_spec + let port_srv = service_port_spec .iter() .find(|x| x.name.clone().unwrap() == "mc-router") .ok_or(OpaqueError::create( "Could not find \"mc-router\" nodePort for server", ))?; - let port_string; let inter_addr = match self.cache.in_cluster { false => { - let node_port = port + let node_port = port_srv .node_port .map(|x| x.to_string()) .ok_or(OpaqueError::create("Could not map nodePort to port string"))?; - port_string = node_port.clone(); format!("localhost:{}", node_port) } true => { - let target_port = port.port; - let a = format!( + let target_port = port_srv.port; + format!( "{}.default.svc.cluster.local:{}", service .name() .ok_or("Could not get name of ClusterIP service")?, target_port - ); - port_string = a.clone(); - a + ) } }; return Ok(Server { - dep: deployment, - srv: service, server_addr: addr.to_string(), - server_port: port_string, + server_port: port.to_string(), + internal_address: inter_addr, + internal_id, cache: self.cache.clone(), - inter_addr: inter_addr, }); } @@ -182,43 +202,33 @@ impl MinecraftAPI for McApi { } impl McApi { - pub async fn create() -> Option { - Some(Self { - cache: KubeCache::create().await?, - map: Arc::new(tokio::sync::Mutex::new(HashMap::new())), - }) + pub async fn create() -> Option<(Self, JoinHandle<()>)> { + let (kube_cache, kube_task) = KubeCache::create().await?; + Some(( + Self { + cache: kube_cache, + map: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + }, + kube_task, + )) } } #[derive(Clone)] pub struct Server { - dep: Deployment, - srv: Service, server_addr: String, server_port: String, + internal_address: String, + internal_id: String, cache: KubeCache, - inter_addr: String, } impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("KubeServer") + .field("internal_id", &self.internal_id) .field( - "dep", - &self - .dep - .metadata - .clone() - .name - .unwrap_or("#error#".to_string()), - ) - .field( - "srv", - &self - .srv - .metadata - .clone() - .name - .unwrap_or("#error#".to_string()), + "join_addr", + &format!("{}:{}", self.server_addr, self.server_port), ) .finish() } @@ -239,7 +249,12 @@ impl MinecraftServerHandle for Server { #[tracing::instrument(level = "info")] async fn query_status(&self) -> Result { - let mut status = match self.dep.clone().status { + let dep = self + .cache + .get_dep(&self.internal_id) + .ok_or_else(|| OpaqueError::create("Failed to get deployment from cache"))?; + + let status = match &dep.status { Some(x) => x, None => { return Err(OpaqueError::create( @@ -249,21 +264,21 @@ impl MinecraftServerHandle for Server { }; let total_replicas = status .replicas - .get_or_insert_with(|| { + .unwrap_or_else(|| { tracing::trace!("total_replicas failed to get"); -1 }) .clone(); let available_replicas = status .available_replicas - .get_or_insert_with(|| { + .unwrap_or_else(|| { tracing::trace!("available_replicas failed to get"); -1 }) .clone(); let ready_replicas = status .ready_replicas - .get_or_insert_with(|| { + .unwrap_or_else(|| { tracing::trace!("ready_replicas failed to get"); -1 }) @@ -284,60 +299,21 @@ impl MinecraftServerHandle for Server { } fn get_internal_address(&self) -> &str { - self.inter_addr.as_str() + self.internal_address.as_str() } fn get_addr(&self) -> String { self.server_addr.clone() } - async fn query_description(&self) -> Result, OpaqueError> { - let status = self.query_status().await?; - match status { - ServerDeploymentStatus::Connectable(mut tcp_stream) => { - let handshake = crate::packets::serverbound::handshake::Handshake::create( - crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?, - crate::types::VarString::from(self.get_addr()), - crate::types::UShort::from(1234), - crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?, - ) - .ok_or("failed to create handshake packet from scratch... WTF?")?; - handshake - .send_packet(&mut tcp_stream) - .await - .map_err(|_e| "failed to send handshake packet to server")?; - let status_rq = crate::packets::Packet::from_bytes(0, Vec::new()) - .ok_or("Failed to create status request packet from scratch")?; - status_rq - .send_packet(&mut tcp_stream) - .await - .map_err(|_e| "failed to send status request packet to server")?; - let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?; - let status_response = - crate::packets::clientbound::status::StatusResponse::parse(return_packet) - .await - .unwrap(); - - return status_response.get_json().ok_or(OpaqueError::create( - "failed to parse status response from server", - )); - } - _ => { - return Err(OpaqueError::create(&format!( - "server is not running; status={:?}", - status - ))) - } - } - } - fn get_port(&self) -> String { self.server_port.clone() } fn get_motd(&self) -> Option { - let all_container_motds = self - .dep + let dep = self.cache.get_dep(&self.internal_id)?; + + let all_container_motds = dep .spec .clone()? .template @@ -361,19 +337,17 @@ impl MinecraftServerHandle for Server { impl Server { async fn set_scale(&self, num: i32) -> Result<(), kube::Error> { - let name = self - .dep - .metadata - .clone() - .name - .unwrap_or("#error#".to_string()); - let _res = self.cache.set_dep_scale(&name, num).await?; - tracing::info!("scaled replicas of {} to {num}", self.server_addr); + let _res = self.cache.set_dep_scale(&self.internal_id, num).await?; + tracing::info!( + "scaled replicas of {}:{} to {num}", + self.server_addr, + self.server_port + ); Ok(()) } } -fn filter_label_value(res: &&R, addr: &str, port: &str) -> bool +fn filter_label_value(res: &R, addr: &str, port: &str) -> bool where R: ResourceExt, { diff --git a/src/main.rs b/src/main.rs index 89c421e..87bcb9e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,6 +21,7 @@ mod packets; mod types; static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); +static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); #[tokio::main] async fn main() { @@ -39,7 +40,7 @@ async fn main() { let revision: &'static str = env!("COMMIT_HASH"); tracing::info!(revision); - let api = kube_cache::McApi::create().await.unwrap(); + let (api, api_task) = kube_cache::McApi::create().await.unwrap(); tracing::info!("initialized kube api"); let config: Config = Default::default(); @@ -47,53 +48,64 @@ async fn main() { let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap(); tracing::info!(bind_addr = config.bind_addr, "started tcp server"); - loop { - let (socket, addr) = listener.accept().await.unwrap(); - let api = api.clone(); + let conn_task = tokio::spawn(async move { + loop { + let (socket, addr) = listener.accept().await.unwrap(); + let api = api.clone(); - let config = config.clone(); - tokio::spawn(async move { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client connected" - ); - if let Err(e) = process_connection(socket, addr, api, config).await { - match e.level { - tracing::Level::ERROR => tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::WARN => tracing::warn!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::INFO => tracing::info!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - _ => { - tracing::error!( + let config = config.clone(); + tokio::spawn(async move { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client connected" + ); + if let Err(e) = process_connection(socket, addr, api, config).await { + match e.level { + tracing::Level::ERROR => tracing::error!( // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), trace = %e.print_span_trace(), err = format!("{}", e.context), - actual_level = ?e.level, - "Client disconnected (bad level)" - ) + "Client disconnected" + ), + tracing::Level::WARN => tracing::warn!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + "Client disconnected" + ), + tracing::Level::INFO => tracing::info!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + "Client disconnected" + ), + _ => { + tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + actual_level = ?e.level, + "Client disconnected (bad level)" + ) + } } + } else { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client disconnected" + ); } - } else { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client disconnected" - ); - } - }); + }); + } + }); + + tokio::select! { + result = api_task => { + tracing::error!("The api tokio:spawn'ed task run to completion, which should not happen!"); + } + result = conn_task => { + tracing::error!("The connection handling tokio:spawn'ed task run to completion, which should not happen!"); + } } } @@ -299,8 +311,7 @@ where tracing::debug!(msg = "server status", status = ?status); match status { ServerDeploymentStatus::Connectable(mut server_stream) => { - api.start_watch(server.clone(), Duration::from_secs(600)) - .await?; + api.start_watch(server.clone(), OFFLINE_TIMER).await?; // referenced from: // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs @@ -350,8 +361,7 @@ where } ServerDeploymentStatus::Offline => { server.start().await?; - api.start_watch(server.clone(), Duration::from_secs(600)) - .await?; + api.start_watch(server.clone(), OFFLINE_TIMER).await?; mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; } ServerDeploymentStatus::Unavailable(_) => unreachable!(), diff --git a/src/mc_server.rs b/src/mc_server.rs index a4b097d..102523d 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -115,7 +115,45 @@ pub trait MinecraftServerHandle: Clone { tracing::trace!("data exchanged while proxying status: {:?}", data_amount); Ok(()) } - async fn query_description(&self) -> Result, OpaqueError>; + async fn query_description(&self) -> Result, OpaqueError> { + let status = self.query_status().await?; + match status { + ServerDeploymentStatus::Connectable(mut tcp_stream) => { + let handshake = crate::packets::serverbound::handshake::Handshake::create( + crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?, + crate::types::VarString::from(self.get_addr()), + crate::types::UShort::from(1234), + crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?, + ) + .ok_or("failed to create handshake packet from scratch... WTF?")?; + handshake + .send_packet(&mut tcp_stream) + .await + .map_err(|_e| "failed to send handshake packet to server")?; + let status_rq = crate::packets::Packet::from_bytes(0, Vec::new()) + .ok_or("Failed to create status request packet from scratch")?; + status_rq + .send_packet(&mut tcp_stream) + .await + .map_err(|_e| "failed to send status request packet to server")?; + let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?; + let status_response = + crate::packets::clientbound::status::StatusResponse::parse(return_packet) + .await + .unwrap(); + + return status_response.get_json().ok_or(OpaqueError::create( + "failed to parse status response from server", + )); + } + _ => { + return Err(OpaqueError::create(&format!( + "server is not running; status={:?}", + status + ))) + } + } + } } pub trait MinecraftAPI {