feat: add watcher which checks if the joined servers became empty and stops them

This commit is contained in:
Tamipes 2025-12-10 23:10:37 +01:00
parent 138429948a
commit 5bfff0a081
3 changed files with 120 additions and 7 deletions

View file

@ -1,4 +1,4 @@
use std::fmt; use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service};
use kube::{ use kube::{
@ -7,9 +7,11 @@ use kube::{
Api, Client, ResourceExt, Api, Client, ResourceExt,
}; };
use serde_json::json; use serde_json::json;
use tracing::Instrument;
use crate::{ use crate::{
mc_server::{MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}, mc_server::{MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus},
packets::{clientbound::status::StatusTrait, SendPacket},
OpaqueError, OpaqueError,
}; };
@ -77,6 +79,7 @@ impl KubeCache {
#[derive(Clone)] #[derive(Clone)]
pub struct McApi { pub struct McApi {
cache: KubeCache, cache: KubeCache,
map: Arc<tokio::sync::Mutex<HashMap<String, Server>>>,
} }
impl MinecraftAPI<Server> for McApi { impl MinecraftAPI<Server> for McApi {
@ -120,16 +123,64 @@ impl MinecraftAPI<Server> for McApi {
cache: self.cache.clone(), cache: self.cache.clone(),
}); });
} }
async fn start_watch(
self,
server: impl MinecraftServerHandle,
frequency: Duration,
) -> Result<(), OpaqueError> {
let addr = server.get_addr().ok_or("could not get addr of server")?;
if self.map.lock().await.get(&addr).is_none() {
let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", addr);
tokio::spawn(
async move {
tracing::info!("starting watch");
tokio::time::sleep(frequency).await;
let server = self.query_server(addr.clone()).await.unwrap();
let status_json = match server.query_description().await {
Ok(x) => x,
Err(e) => {
tracing::error!(
err = format!("{}", e.context),
"could not query description"
);
return;
}
};
if status_json.get_players_online() == 0 {
// With this I don't need to specify that StatusTrait
// should be send as well.
// Otherwise I would need to have it be defined as:
// trait StatusTrait: Send { ... }
drop(status_json);
if let Err(err) = server.stop().await {
tracing::error!(
trace = err.get_span_trace(),
err = err.context,
msg = "failed to stop server"
);
}
return;
}
}
.instrument(span),
);
}
Ok(())
}
} }
impl McApi { impl McApi {
pub fn create() -> Option<Self> { pub fn create() -> Option<Self> {
Some(Self { Some(Self {
cache: KubeCache::create()?, cache: KubeCache::create()?,
map: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
}) })
} }
} }
#[derive(Clone)]
pub struct Server { pub struct Server {
dep: Deployment, dep: Deployment,
srv: Service, srv: Service,
@ -169,7 +220,7 @@ impl MinecraftServerHandle for Server {
} }
async fn stop(&self) -> Result<(), OpaqueError> { async fn stop(&self) -> Result<(), OpaqueError> {
self.set_scale(1).await.map_err(|e| { self.set_scale(0).await.map_err(|e| {
OpaqueError::create(&format!("failed to set deployment scale: err = {:?}", e)) OpaqueError::create(&format!("failed to set deployment scale: err = {:?}", e))
}) })
} }
@ -229,6 +280,48 @@ impl MinecraftServerHandle for Server {
fn get_addr(&self) -> Option<String> { fn get_addr(&self) -> Option<String> {
Some(self.server_addr.clone()) Some(self.server_addr.clone())
} }
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError> {
let status = self.query_status().await?;
match status {
ServerDeploymentStatus::Connectable(mut tcp_stream) => {
let handshake = crate::packets::serverbound::handshake::Handshake::create(
crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?,
crate::types::VarString::from(
self.get_addr().ok_or("failed to get addr of server")?,
),
crate::types::UShort::from(1234),
crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?,
)
.ok_or("failed to create handshake packet from scratch... WTF?")?;
handshake
.send_packet(&mut tcp_stream)
.await
.map_err(|_e| "failed to send handshake packet to server")?;
let status_rq = crate::packets::Packet::from_bytes(0, Vec::new())
.ok_or("Failed to create status request packet from scratch")?;
status_rq
.send_packet(&mut tcp_stream)
.await
.map_err(|_e| "failed to send status request packet to server")?;
let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?;
let status_response =
crate::packets::clientbound::status::StatusResponse::parse(return_packet)
.await
.unwrap();
return status_response.get_json().ok_or(OpaqueError::create(
"failed to parse status response from server",
));
}
_ => {
return Err(OpaqueError::create(&format!(
"server is not running; status={:?}",
status
)))
}
}
}
} }
impl Server { impl Server {

View file

@ -1,5 +1,6 @@
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tracing_subscriber::{prelude::*, EnvFilter}; use tracing_subscriber::{prelude::*, EnvFilter};
@ -100,7 +101,7 @@ async fn process_connection<T: MinecraftServerHandle>(
handle_status(&mut client_stream, &handshake, server).await?; handle_status(&mut client_stream, &handshake, server).await?;
} }
packets::ProtocolState::Login => { packets::ProtocolState::Login => {
handle_login(&mut client_stream, &handshake, server).await? handle_login(&mut client_stream, &handshake, server, api).await?
} }
packets::ProtocolState::Transfer => { packets::ProtocolState::Transfer => {
return Err(OpaqueError::create( return Err(OpaqueError::create(
@ -161,11 +162,12 @@ async fn handle_status(
Ok(()) Ok(())
} }
#[tracing::instrument(level = "info", fields(server_addr = server.get_addr()),skip(client_stream, handshake, server))] #[tracing::instrument(level = "info", fields(server_addr = server.get_addr()),skip(client_stream, handshake, server, api))]
async fn handle_login( async fn handle_login<T>(
client_stream: &mut TcpStream, client_stream: &mut TcpStream,
handshake: &Handshake, handshake: &Handshake,
server: impl MinecraftServerHandle, server: impl MinecraftServerHandle,
api: impl MinecraftAPI<T>,
) -> Result<(), OpaqueError> { ) -> Result<(), OpaqueError> {
match server.query_status().await? { match server.query_status().await? {
ServerDeploymentStatus::Connectable(mut server_stream) => { ServerDeploymentStatus::Connectable(mut server_stream) => {
@ -202,6 +204,8 @@ async fn handle_login(
} }
ServerDeploymentStatus::Offline => { ServerDeploymentStatus::Offline => {
server.start().await?; server.start().await?;
api.start_watch(server.clone(), Duration::from_secs(600))
.await?;
mc_server::send_disconnect(client_stream, "Okayy_starting_it...§d<3§r").await?; mc_server::send_disconnect(client_stream, "Okayy_starting_it...§d<3§r").await?;
} }
} }

View file

@ -1,7 +1,13 @@
use std::fmt;
use tokio::{io::AsyncWriteExt, net::TcpStream}; use tokio::{io::AsyncWriteExt, net::TcpStream};
use crate::{ use crate::{
packets::{serverbound::handshake::Handshake, Packet, SendPacket}, packets::{
clientbound::status::{StatusStructNew, StatusTrait},
serverbound::handshake::Handshake,
Packet, SendPacket,
},
OpaqueError, OpaqueError,
}; };
@ -44,7 +50,7 @@ pub async fn send_disconnect(
Ok(()) Ok(())
} }
pub trait MinecraftServerHandle { pub trait MinecraftServerHandle: Clone {
async fn start(&self) -> Result<(), OpaqueError>; async fn start(&self) -> Result<(), OpaqueError>;
async fn stop(&self) -> Result<(), OpaqueError>; async fn stop(&self) -> Result<(), OpaqueError>;
async fn query_status(&self) -> Result<ServerDeploymentStatus, OpaqueError>; async fn query_status(&self) -> Result<ServerDeploymentStatus, OpaqueError>;
@ -95,10 +101,20 @@ pub trait MinecraftServerHandle {
tracing::trace!("data exchanged while proxying status: {:?}", data_amount); tracing::trace!("data exchanged while proxying status: {:?}", data_amount);
Ok(()) Ok(())
} }
// TODO: move the implementation to here, but
// the async things are *strange* in rust
fn query_description(
&self,
) -> impl std::future::Future<Output = Result<Box<dyn StatusTrait>, OpaqueError>> + Send;
} }
pub trait MinecraftAPI<T> { pub trait MinecraftAPI<T> {
async fn query_server(&self, addr: String) -> Result<T, OpaqueError>; async fn query_server(&self, addr: String) -> Result<T, OpaqueError>;
async fn start_watch(
self,
server: impl MinecraftServerHandle,
frequency: std::time::Duration,
) -> Result<(), OpaqueError>;
} }
pub enum ServerDeploymentStatus { pub enum ServerDeploymentStatus {