From 4cf3d5aea0aa1922f6c8adbc4af4d41c3631d760 Mon Sep 17 00:00:00 2001 From: Tamipes Date: Wed, 3 Dec 2025 22:58:21 +0100 Subject: [PATCH] feat: add an Error type to `Packet::parse` instead of error logging manually inside `parse`, return a Result with a custom enum --- src/kube_cache.rs | 11 +++++- src/main.rs | 14 ++------ src/mc_server.rs | 11 ++---- src/opaque_error.rs | 9 +++++ src/packets/mod.rs | 84 +++++++++++++++++++++++++-------------------- 5 files changed, 69 insertions(+), 60 deletions(-) diff --git a/src/kube_cache.rs b/src/kube_cache.rs index e508f37..8b7f4a2 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -265,10 +265,19 @@ where dep.labels().values().filter(|x| x.as_str() == str).count() > 0 } -#[derive(Debug)] pub enum ServerDeploymentStatus { Connectable(TcpStream), Starting, PodOk, Offline, } +impl fmt::Debug for ServerDeploymentStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Connectable(_) => write!(f, "Connectable"), + Self::Starting => write!(f, "Starting"), + Self::PodOk => write!(f, "PodOk"), + Self::Offline => write!(f, "Offline"), + } + } +} diff --git a/src/main.rs b/src/main.rs index 3e101d8..24023f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use std::{net::SocketAddr, sync::Arc}; use futures::TryFutureExt; -use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Mutex; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -73,14 +72,7 @@ async fn process_connection( addr: SocketAddr, cache: Arc>, ) -> Result<(), OpaqueError> { - let client_packet = match Packet::parse(&mut client_stream).await { - Some(x) => x, - None => { - // This is debug, because Packet::parse has all error cases logged with tracing::error - tracing::debug!("Client HANDSHAKE -> malformed packet; Disconnecting..."); - return Ok(()); - } - }; + let client_packet = Packet::parse(&mut client_stream).await?; // --- Handshake --- let handshake; @@ -127,9 +119,7 @@ async fn handle_status( kube_server: KubeServer, ) -> Result<(), OpaqueError> { tracing::debug!(handshake = ?handshake); - let client_packet = Packet::parse(client_stream) - .await - .ok_or_else(|| "could not parse client_packet".to_string())?; + let client_packet = Packet::parse(client_stream).await?; if client_packet.id.get_int() != 0 { return Err(OpaqueError::create(&format!( "Client STATUS: {:#x} Unknown Id -> Shutdown", diff --git a/src/mc_server.rs b/src/mc_server.rs index 9d32e85..f4c5a36 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -8,9 +8,7 @@ use crate::{ #[tracing::instrument(skip(client_stream))] pub async fn handle_ping(client_stream: &mut TcpStream) -> Result<(), OpaqueError> { // --- Respond to ping packet --- - let ping_packet = Packet::parse(client_stream) - .await - .ok_or("Ping packett failed to parse")?; + let ping_packet = Packet::parse(client_stream).await?; match ping_packet.id.get_int() { 1 => Ok(ping_packet .send_packet(client_stream) @@ -32,12 +30,7 @@ pub async fn send_disconnect( client_stream: &mut TcpStream, reason: &str, ) -> Result<(), OpaqueError> { - let _client_packet = Packet::parse(client_stream).await; - if _client_packet.is_none() { - return Err(OpaqueError::create( - "Client LOGIN START -> malformed packet; Disconnecting...", - )); - } + let _client_packet = Packet::parse(client_stream).await?; let disconnect_packet = crate::packets::clientbound::login::Disconnect::set_reason(reason.to_owned()) diff --git a/src/opaque_error.rs b/src/opaque_error.rs index 1876bdd..88a5b49 100644 --- a/src/opaque_error.rs +++ b/src/opaque_error.rs @@ -81,3 +81,12 @@ impl From<&str> for OpaqueError { } } } + +impl From for OpaqueError { + fn from(value: crate::packets::ParseError) -> Self { + Self { + span_trace: SpanTrace::capture(), + context: format!("{:?}", value), + } + } +} diff --git a/src/packets/mod.rs b/src/packets/mod.rs index e255ec5..7e9638f 100644 --- a/src/packets/mod.rs +++ b/src/packets/mod.rs @@ -58,28 +58,27 @@ impl Packet { all, }) } - #[instrument(level = "info",skip(buf),fields(addr = buf.peer_addr().map(|x| x.to_string()).unwrap_or("unknown".to_string())))] - pub async fn parse(buf: &mut TcpStream) -> Option { - let length = VarInt::parse(buf).await?; + #[tracing::instrument(level = "info",skip(stream),fields(addr = stream.peer_addr().map(|x| x.to_string()).unwrap_or("unknown".to_string())))] + pub async fn parse(stream: &mut TcpStream) -> Result { + let length = VarInt::parse(stream) + .await + .ok_or(ParseError::IDParseError)?; tracing::trace!(length = length.get_int()); - let id = match VarInt::parse(buf).await { + let id = match VarInt::parse(stream).await { Some(x) => x, None => { - tracing::error!("could not parse packet id"); - return None; + return Err(ParseError::IDParseError); } }; if id.get_int() == 122 { - tracing::warn!("weird packet id encountered: 122"); - return None; + return Err(ParseError::WeirdID); } // TODO: investigate this, becuase it is just a hunch // but if it is too big, the vec![] macro panics if length.get_int() > u16::MAX.into() { - tracing::error!(len = length.get_int(), "packet length is too big"); - return None; + return Err(ParseError::LengthIsTooBig(length.get_int())); } // TODO: this is a bandaid fix; the above check *should* make sure the // next line does not run into "capacity overflow", but it doesn't work @@ -88,26 +87,23 @@ impl Packet { }) { Ok(x) => x, Err(e) => { - tracing::error!( - len_int = length.get_int(), - usize = length.get_int() as usize - id.get_data().len(), - len_data = id.get_data().len(), - error = ?e, - "panic while allocating with vec![] macro" - ); - return None; + return Err(ParseError::BufferAllocationPanic(format!( + "panic while allocating with vec![] macro len_int={} usize={} len_data={} error={:?}", + id.get_data().len(), + length.get_int(), + length.get_int() as usize - id.get_data().len(), + e + ))); } }; - match buf.read_exact(&mut data).await { + match stream.read_exact(&mut data).await { Ok(_) => { - // data_id.append(&mut data.clone()); - // data_length.append(&mut data_id); let mut vec = id.get_data(); vec.append(&mut data.clone()); let mut all = length.get_data(); all.append(&mut vec); - Some(Packet { + Ok(Packet { id, length, data, @@ -115,8 +111,12 @@ impl Packet { }) } Err(e) => { - tracing::error!(length = length.get_int(), data = ?length.get_data(),error = e.to_string(),"buffer read error"); - return None; + return Err(ParseError::StreamReadError(format!( + "length={}; data={:?}; error={:?}; ", + length.get_int(), + length.get_data(), + e + ))); } } } @@ -127,20 +127,28 @@ impl Packet { all.append(&mut vec); return Some(all); } - // pub fn proto_name(&self, state: &Type) -> String { - // match state { - // ProtocolState::Handshaking => match self.id.get_int() { - // 0 => "Handshake".to_owned(), - // _ => "error".to_owned(), - // }, - // ProtocolState::Status => match self.id.get_int() { - // 0 => "StatusRequest".to_owned(), - // 1 => "PingRequest".to_owned(), - // _ => "error".to_owned(), - // }, - // _ => "Dont care state".to_owned(), - // } - // } +} + +pub enum ParseError { + IDParseError, + WeirdID, + LengthIsTooBig(i32), + StreamReadError(String), + BufferAllocationPanic(String), +} + +impl fmt::Debug for ParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::IDParseError => write!(f, "IDParseError: could not parse packet id"), + Self::WeirdID => write!(f, "WeirdID: weird packet id encountered: 122"), + Self::LengthIsTooBig(x) => { + write!(f, "LengthIsTooBig: packet length is too big; len={x}") + } + Self::StreamReadError(str) => write!(f, "StreamReadError: {str}"), + Self::BufferAllocationPanic(str) => write!(f, "BufferAllocationPanic: {str}"), + } + } } #[derive(Copy, Clone, PartialEq)]