diff --git a/src/kube_cache.rs b/src/kube_cache.rs index d427e95..ac3aba7 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; use kube::{ @@ -7,9 +7,11 @@ use kube::{ Api, Client, ResourceExt, }; use serde_json::json; +use tracing::Instrument; use crate::{ mc_server::{MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}, + packets::{clientbound::status::StatusTrait, SendPacket}, OpaqueError, }; @@ -77,6 +79,7 @@ impl KubeCache { #[derive(Clone)] pub struct McApi { cache: KubeCache, + map: Arc>>, } impl MinecraftAPI for McApi { @@ -120,16 +123,64 @@ impl MinecraftAPI for McApi { cache: self.cache.clone(), }); } + + async fn start_watch( + self, + server: impl MinecraftServerHandle, + frequency: Duration, + ) -> Result<(), OpaqueError> { + let addr = server.get_addr().ok_or("could not get addr of server")?; + if self.map.lock().await.get(&addr).is_none() { + let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", addr); + + tokio::spawn( + async move { + tracing::info!("starting watch"); + tokio::time::sleep(frequency).await; + let server = self.query_server(addr.clone()).await.unwrap(); + let status_json = match server.query_description().await { + Ok(x) => x, + Err(e) => { + tracing::error!( + err = format!("{}", e.context), + "could not query description" + ); + return; + } + }; + if status_json.get_players_online() == 0 { + // With this I don't need to specify that StatusTrait + // should be send as well. + // Otherwise I would need to have it be defined as: + // trait StatusTrait: Send { ... } + drop(status_json); + if let Err(err) = server.stop().await { + tracing::error!( + trace = err.get_span_trace(), + err = err.context, + msg = "failed to stop server" + ); + } + return; + } + } + .instrument(span), + ); + } + Ok(()) + } } impl McApi { pub fn create() -> Option { Some(Self { cache: KubeCache::create()?, + map: Arc::new(tokio::sync::Mutex::new(HashMap::new())), }) } } +#[derive(Clone)] pub struct Server { dep: Deployment, srv: Service, @@ -169,7 +220,7 @@ impl MinecraftServerHandle for Server { } async fn stop(&self) -> Result<(), OpaqueError> { - self.set_scale(1).await.map_err(|e| { + self.set_scale(0).await.map_err(|e| { OpaqueError::create(&format!("failed to set deployment scale: err = {:?}", e)) }) } @@ -229,6 +280,48 @@ impl MinecraftServerHandle for Server { fn get_addr(&self) -> Option { Some(self.server_addr.clone()) } + + async fn query_description(&self) -> Result, OpaqueError> { + let status = self.query_status().await?; + match status { + ServerDeploymentStatus::Connectable(mut tcp_stream) => { + let handshake = crate::packets::serverbound::handshake::Handshake::create( + crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?, + crate::types::VarString::from( + self.get_addr().ok_or("failed to get addr of server")?, + ), + crate::types::UShort::from(1234), + crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?, + ) + .ok_or("failed to create handshake packet from scratch... WTF?")?; + handshake + .send_packet(&mut tcp_stream) + .await + .map_err(|_e| "failed to send handshake packet to server")?; + let status_rq = crate::packets::Packet::from_bytes(0, Vec::new()) + .ok_or("Failed to create status request packet from scratch")?; + status_rq + .send_packet(&mut tcp_stream) + .await + .map_err(|_e| "failed to send status request packet to server")?; + let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?; + let status_response = + crate::packets::clientbound::status::StatusResponse::parse(return_packet) + .await + .unwrap(); + + return status_response.get_json().ok_or(OpaqueError::create( + "failed to parse status response from server", + )); + } + _ => { + return Err(OpaqueError::create(&format!( + "server is not running; status={:?}", + status + ))) + } + } + } } impl Server { diff --git a/src/main.rs b/src/main.rs index 2131103..72c5529 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::env; use std::net::SocketAddr; +use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -100,7 +101,7 @@ async fn process_connection( handle_status(&mut client_stream, &handshake, server).await?; } packets::ProtocolState::Login => { - handle_login(&mut client_stream, &handshake, server).await? + handle_login(&mut client_stream, &handshake, server, api).await? } packets::ProtocolState::Transfer => { return Err(OpaqueError::create( @@ -161,11 +162,12 @@ async fn handle_status( Ok(()) } -#[tracing::instrument(level = "info", fields(server_addr = server.get_addr()),skip(client_stream, handshake, server))] -async fn handle_login( +#[tracing::instrument(level = "info", fields(server_addr = server.get_addr()),skip(client_stream, handshake, server, api))] +async fn handle_login( client_stream: &mut TcpStream, handshake: &Handshake, server: impl MinecraftServerHandle, + api: impl MinecraftAPI, ) -> Result<(), OpaqueError> { match server.query_status().await? { ServerDeploymentStatus::Connectable(mut server_stream) => { @@ -202,6 +204,8 @@ async fn handle_login( } ServerDeploymentStatus::Offline => { server.start().await?; + api.start_watch(server.clone(), Duration::from_secs(600)) + .await?; mc_server::send_disconnect(client_stream, "Okayy_starting_it...§d<3§r").await?; } } diff --git a/src/mc_server.rs b/src/mc_server.rs index 1a9efac..b864efe 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -1,7 +1,13 @@ +use std::fmt; + use tokio::{io::AsyncWriteExt, net::TcpStream}; use crate::{ - packets::{serverbound::handshake::Handshake, Packet, SendPacket}, + packets::{ + clientbound::status::{StatusStructNew, StatusTrait}, + serverbound::handshake::Handshake, + Packet, SendPacket, + }, OpaqueError, }; @@ -44,7 +50,7 @@ pub async fn send_disconnect( Ok(()) } -pub trait MinecraftServerHandle { +pub trait MinecraftServerHandle: Clone { async fn start(&self) -> Result<(), OpaqueError>; async fn stop(&self) -> Result<(), OpaqueError>; async fn query_status(&self) -> Result; @@ -95,10 +101,20 @@ pub trait MinecraftServerHandle { tracing::trace!("data exchanged while proxying status: {:?}", data_amount); Ok(()) } + // TODO: move the implementation to here, but + // the async things are *strange* in rust + fn query_description( + &self, + ) -> impl std::future::Future, OpaqueError>> + Send; } pub trait MinecraftAPI { async fn query_server(&self, addr: String) -> Result; + async fn start_watch( + self, + server: impl MinecraftServerHandle, + frequency: std::time::Duration, + ) -> Result<(), OpaqueError>; } pub enum ServerDeploymentStatus {