From fdb2f88c4ad250d7c52365c75843ba1292918dae Mon Sep 17 00:00:00 2001 From: Tamipes Date: Wed, 3 Dec 2025 18:29:00 +0100 Subject: [PATCH] chore: incremental cleanups --- src/kube_cache.rs | 36 +++++++---------- src/main.rs | 99 ++++++++++++++++------------------------------ src/mc_server.rs | 40 +++++++++++++++---- src/packets/mod.rs | 28 +++++++------ 4 files changed, 96 insertions(+), 107 deletions(-) diff --git a/src/kube_cache.rs b/src/kube_cache.rs index e26c147..e508f37 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -100,7 +100,7 @@ impl fmt::Debug for KubeServer { } } impl KubeServer { - #[tracing::instrument(name = "KubeServer::create", level = "info", skip(cache, server_addr))] + #[tracing::instrument(name = "KubeServer::create", level = "info", skip(cache))] pub async fn create( cache: Arc>, server_addr: &str, @@ -153,6 +153,7 @@ impl KubeServer { pub fn get_server_addr(&self) -> String { self.server_addr.clone() } + #[tracing::instrument(level = "info")] pub async fn get_server_status(&self) -> Result { let mut status = match self.dep.clone().status { Some(x) => x, @@ -187,9 +188,8 @@ impl KubeServer { if total_replicas > 0 { if ready_replicas > 0 { - //TODO: add Connectable check return match self.query_server_connectable().await { - Ok(()) => Ok(ServerDeploymentStatus::Connectable), + Ok(x) => Ok(ServerDeploymentStatus::Connectable(x)), Err(_) => Ok(ServerDeploymentStatus::PodOk), }; } @@ -198,51 +198,45 @@ impl KubeServer { return Ok(ServerDeploymentStatus::Offline); } } - async fn query_server_connectable(&self) -> Result<(), OpaqueError> { + async fn query_server_connectable(&self) -> Result { let port = self .get_port() .ok_or_else(|| "failed to get port from service")?; let server_stream = TcpStream::connect(format!("localhost:{port}")) .await - .map_err(|_| "Failed to connect to minecraft server")?; + .map_err(|_| "failed to connect to minecraft server")?; tracing::trace!( "successfully connected to backend server; (connectibility check) {:?}", server_stream.peer_addr() ); - Ok(()) + Ok(server_stream) } pub async fn proxy_status( &self, handshake: &Handshake, status_request: &Packet, client_stream: &mut TcpStream, + server_stream: &mut TcpStream, ) -> Result<(), OpaqueError> { - let port = self - .get_port() - .ok_or_else(|| "failed to get port from service")?; - let mut server_stream = TcpStream::connect(format!("localhost:{port}")) - .await - .map_err(|_| "Failed to connect to minecraft server")?; - handshake - .send_packet(&mut server_stream) + .send_packet(server_stream) .await - .map_err(|_| "Failed to forward handshake packet to minecraft server")?; + .map_err(|_| "failed to forward handshake packet to minecraft server")?; status_request - .send_packet(&mut server_stream) + .send_packet(server_stream) .await - .map_err(|_| "Failed to forward status request packet to minecraft server")?; + .map_err(|_| "failed to forward status request packet to minecraft server")?; - let data_amount = tokio::io::copy_bidirectional(client_stream, &mut server_stream) + let data_amount = tokio::io::copy_bidirectional(client_stream, server_stream) .await .map_err(|e| { format!( - "Error during bidirectional copy between server and client; err={:?}", + "error during bidirectional copy between server and client; err={:?}", e ) })?; - tracing::debug!("data exchanged while proxing: {:?}", data_amount); + tracing::trace!("data exchanged while proxying status: {:?}", data_amount); Ok(()) } pub async fn set_scale( @@ -273,7 +267,7 @@ where #[derive(Debug)] pub enum ServerDeploymentStatus { - Connectable, + Connectable(TcpStream), Starting, PodOk, Offline, diff --git a/src/main.rs b/src/main.rs index e863f3f..7895658 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,6 @@ mod types; #[tokio::main] async fn main() { // ---- Tracing setup ---- - // tracing_subscriber::fmt::init(); let fmt_layer = tracing_subscriber::fmt::layer() .with_target(false) .with_level(true); @@ -36,21 +35,21 @@ async fn main() { .init(); let commit_hash: &'static str = env!("COMMIT_HASH"); - tracing::info!("COMMIT_HASH: {}", commit_hash); + tracing::info!("revision: {}", commit_hash); let cache = kube_cache::Cache::create().unwrap(); let arc_cache = Arc::new(Mutex::new(cache)); - tracing::info!("kube api initialized"); + tracing::info!("initialized kube api"); let listener = TcpListener::bind("0.0.0.0:25565").await.unwrap(); - tracing::info!("tcp server started"); + tracing::info!("started tcp server"); loop { let (socket, addr) = listener.accept().await.unwrap(); let acc = arc_cache.clone(); tokio::spawn(async move { - tracing::info!( + tracing::debug!( addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), "Client connected" ); @@ -62,7 +61,7 @@ async fn main() { err = format!("{}", e.context) ); } else { - tracing::info!( + tracing::debug!( addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), "Client disconnected" ); @@ -80,7 +79,8 @@ async fn process_connection( let client_packet = match Packet::parse(&mut client_stream).await { Some(x) => x, None => { - tracing::trace!("Client HANDSHAKE -> bad packet; Disconnecting..."); + // This is debug, because Packet::parse has all error cases logged with tracing::error + tracing::debug!("Client HANDSHAKE -> malformed packet; Disconnecting..."); return Ok(()); } }; @@ -95,7 +95,7 @@ async fn process_connection( } handshake = packets::serverbound::handshake::Handshake::parse(client_packet) .await - .ok_or_else(|| "Handshake request from client failed to parse".to_string())?; + .ok_or_else(|| "handshake request from client failed to parse".to_string())?; next_server_state = handshake.get_next_state(); @@ -113,8 +113,11 @@ async fn process_connection( handle_login(&mut client_stream, &handshake, kube_server, cache.clone()).await? } packets::ProtocolState::Transfer => { - return Err(OpaqueError::create("Transfer; Not yet implemented!")) + return Err(OpaqueError::create( + "next state is transfer; Not yet implemented!", + )) } + // This is used becuase Handshake::parse returns none if is something else _ => unreachable!(), }; Ok(()) @@ -129,24 +132,23 @@ async fn handle_status( tracing::debug!(handshake = ?handshake); let client_packet = Packet::parse(client_stream) .await - .ok_or_else(|| "Could not parse client_packet".to_string())?; - match client_packet.id.get_int() { - 0 => tracing::info!("status request"), - _ => { - return Err(OpaqueError::create(&format!( - "Client STATUS: {:#x} Unknown Id -> Shutdown", - client_packet.id.get_int(), - ))) - } + .ok_or_else(|| "could not parse client_packet".to_string())?; + if client_packet.id.get_int() != 0 { + return Err(OpaqueError::create(&format!( + "Client STATUS: {:#x} Unknown Id -> Shutdown", + client_packet.id.get_int(), + ))); }; let commit_hash: &'static str = env!("COMMIT_HASH"); let mut status_struct = StatusStructNew::create(); status_struct.version.protocol = handshake.protocol_version.get_int(); - match kube_server.get_server_status().await? { - ServerDeploymentStatus::Connectable => { + let status = kube_server.get_server_status().await?; + tracing::info!(status = ?status, "status request"); + match status { + ServerDeploymentStatus::Connectable(mut server_stream) => { return kube_server - .proxy_status(handshake, &client_packet, client_stream) + .proxy_status(handshake, &client_packet, client_stream, &mut server_stream) .await } ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { @@ -178,10 +180,8 @@ async fn handle_login( kube_server: KubeServer, cache: Arc>, ) -> Result<(), OpaqueError> { - // let client_packet = Packet::parse(client_stream).await.unwrap(); - tracing::info!("login request"); match kube_server.get_server_status().await? { - ServerDeploymentStatus::Connectable => { + ServerDeploymentStatus::Connectable(mut server_stream) => { // referenced from: // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs let io_sl2sr = tokio_splice2::context::SpliceIoCtx::prepare() @@ -192,64 +192,33 @@ async fn handle_login( .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? .into_io(); - let port = kube_server - .get_port() - .ok_or_else(|| "failed to get port from service")?; - let mut server_stream = TcpStream::connect(format!("localhost:{}", port)) - .await - .map_err(|_| "Failed to connect to minecraft server")?; - handshake .send_packet(&mut server_stream) .await - .map_err(|_| "Failed to forward handshake packet to minecraft server")?; + .map_err(|_| "failed to forward handshake packet to minecraft server")?; tracing::info!("proxying with splice"); let traffic = tokio_splice2::io::SpliceBidiIo { io_sl2sr, io_sr2sl } .execute(client_stream, &mut server_stream) .await; + tracing::debug!("data exchanged: tx: {} rx: {}", traffic.tx, traffic.rx); + if let Some(e) = traffic.error { + return Err(OpaqueError::create(&format!( + "failed to splice; err = {}", + e + ))); + } } ServerDeploymentStatus::PodOk | ServerDeploymentStatus::Starting => { - let _client_packet = Packet::parse(client_stream).await; - if _client_packet.is_none() { - return Err(OpaqueError::create( - "Client LOGIN START -> bad packet; Disconnecting...", - )); - } - - let disconnect_packet = - packets::clientbound::login::Disconnect::set_reason("Starting...§d<3§r".to_owned()) - .await - .ok_or_else(|| "failed to *create* disconnect packet")?; - disconnect_packet - .send_packet(client_stream) - .await - .map_err(|_| "failed to *send* disconnect packet")?; - client_stream.flush().await.map_err(|e| e.to_string())?; + mc_server::send_disconnect(client_stream, "Starting...§d<3§r").await?; } ServerDeploymentStatus::Offline => { - let _client_packet = Packet::parse(client_stream).await; - if _client_packet.is_none() { - return Err(OpaqueError::create( - "Client LOGIN START -> bad packet; Disconnecting...", - )); - } - kube_server .set_scale(cache, 1) .map_err(|e| format!("Failed to set depoloyment scale: err = {:?}", e)) .await?; - let disconnect_packet = packets::clientbound::login::Disconnect::set_reason( - "Okayy_starting_it...§d<3§r".to_owned(), - ) - .await - .ok_or_else(|| "failed to *create* disconnect packet")?; - disconnect_packet - .send_packet(client_stream) - .await - .map_err(|_| "failed to *send* disconnect packet")?; - client_stream.flush().await.map_err(|e| e.to_string())?; + mc_server::send_disconnect(client_stream, "Okayy_starting_it...§d<3§r").await?; } } Ok(()) diff --git a/src/mc_server.rs b/src/mc_server.rs index 07c5c7d..9d32e85 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -1,4 +1,4 @@ -use tokio::net::TcpStream; +use tokio::{io::AsyncWriteExt, net::TcpStream}; use crate::{ packets::{Packet, SendPacket}, @@ -16,13 +16,37 @@ pub async fn handle_ping(client_stream: &mut TcpStream) -> Result<(), OpaqueErro .send_packet(client_stream) .await .map_err(|_| "Failed to send ping")?), - _ => { - return Err(OpaqueError::create(&format!( - "Expected ping packet, got: {}", - ping_packet.id.get_int() - ))); - } + _ => Err(OpaqueError::create(&format!( + "Expected ping packet, got: {}", + ping_packet.id.get_int() + ))), } } -// pub async fn handle_redirect(client_stream: &mut TcpStream) -> Result<(), OpaqueError> {} +/// Disconnects the client. +/// +/// It works if the client is in the login state, and it +/// has *already* and *only* sent the handshake packet. +#[tracing::instrument(skip(client_stream))] +pub async fn send_disconnect( + client_stream: &mut TcpStream, + reason: &str, +) -> Result<(), OpaqueError> { + let _client_packet = Packet::parse(client_stream).await; + if _client_packet.is_none() { + return Err(OpaqueError::create( + "Client LOGIN START -> malformed packet; Disconnecting...", + )); + } + + let disconnect_packet = + crate::packets::clientbound::login::Disconnect::set_reason(reason.to_owned()) + .await + .ok_or_else(|| "failed to *create* disconnect packet")?; + disconnect_packet + .send_packet(client_stream) + .await + .map_err(|_| "failed to *send* disconnect packet")?; + client_stream.flush().await.map_err(|e| e.to_string())?; + todo!() +} diff --git a/src/packets/mod.rs b/src/packets/mod.rs index 7e00d08..15f4a32 100644 --- a/src/packets/mod.rs +++ b/src/packets/mod.rs @@ -56,20 +56,27 @@ impl Packet { all, }) } - #[instrument(level = "trace")] + #[instrument(level = "info",skip(buf),fields(addr = buf.peer_addr().map(|x| x.to_string()).unwrap_or("unknown".to_string())))] pub async fn parse(buf: &mut TcpStream) -> Option { let length = VarInt::parse(buf).await?; + tracing::trace!(length = length.get_int()); let id = match VarInt::parse(buf).await { Some(x) => x, None => { - tracing::error!("Packet id problem(it was None)! REEEEEEEEEEEEEEEEEEEE"); - panic!(); - // return None; + tracing::error!("could not parse packet id"); + return None; } }; - // println!("---id: {id}"); if id.get_int() == 122 { + tracing::warn!("weird packet id encountered: 122"); + return None; + } + + // TODO: investigate this, becuase it is just a hunch + // but if it is too big, the vec![] macro panics + if length.get_int() > u16::MAX.into() { + tracing::error!(len = length.get_int(), "packet length is too big"); return None; } @@ -89,14 +96,9 @@ impl Packet { all, }) } - Err(x) => { - if id.get_int() == 122 { - return None; - } else { - println!("len = {}: {:?}", length.get_int(), length.get_data()); - println!("Buffer read error: {x}"); - return None; - } + Err(e) => { + tracing::error!(length = length.get_int(), data = ?length.get_data(),error = e.to_string(),"buffer read error"); + return None; } } }