From b75eec70e0ecd0fbad788fcab238d43ce25e72fb Mon Sep 17 00:00:00 2001 From: Tamipes Date: Wed, 26 Nov 2025 00:37:32 +0100 Subject: [PATCH] feat: move the packets library over to use async --- Cargo.lock | 13 +++++ Cargo.toml | 2 +- src/main.rs | 48 +++++++++++++++--- src/packets/clientbound/login.rs | 17 +++---- src/packets/clientbound/status.rs | 20 ++++---- src/packets/mod.rs | 31 ++++++------ src/packets/serverbound/handshake.rs | 22 ++++----- src/packets/serverbound/status.rs | 8 +-- src/types/mod.rs | 73 ++++++++++++++++++++++------ 9 files changed, 163 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 605a7c1..7d4f571 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -915,6 +915,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.7.6" @@ -1731,10 +1740,14 @@ version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex-automata", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/Cargo.toml b/Cargo.toml index dd58ffa..5d6c72f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ tokio-stream = { version = "0.1.9", features = ["net"] } # Error tracing? tracing = "0.1.36" -tracing-subscriber = "0.3.17" +tracing-subscriber = {version = "0.3.17", features = ["env-filter"]} # Needed to parse data to yaml in kubectl serde = "1.0.130" diff --git a/src/main.rs b/src/main.rs index 92a6d33..beb8c83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,13 @@ //! This is a simple imitation of the basic functionality of kubectl: //! kubectl {get, delete, apply, watch, edit} [name] //! with labels and namespace selectors supported. -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use tokio::net::{TcpListener, TcpStream}; +use tracing::instrument; +use tracing_subscriber::{prelude::*, EnvFilter}; + +use crate::packets::Packet; mod KubeCache; mod packets; @@ -11,31 +15,59 @@ mod types; #[tokio::main] async fn main() { - tracing_subscriber::fmt::init(); + // tracing_subscriber::fmt::init(); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_target(false) + .with_level(true); + let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); // default to INFO + tracing_subscriber::registry() + .with(fmt_layer) + .with(filter_layer) + .init(); let commit_hash: &'static str = env!("COMMIT_HASH"); tracing::info!("COMMIT_HASH: {}", commit_hash); let cache = KubeCache::Cache::create().unwrap(); - let arcCache = Arc::new(cache); + let arc_cache = Arc::new(cache); tracing::info!("kube api initialized"); - let mut listener = TcpListener::bind("0.0.0.0:25565").await.unwrap(); + let listener = TcpListener::bind("0.0.0.0:25565").await.unwrap(); tracing::info!("tcp server started"); loop { - let (socket, _) = listener.accept().await.unwrap(); - let acc = arcCache.clone(); + let (socket, addr) = listener.accept().await.unwrap(); + let acc = arc_cache.clone(); tokio::spawn(async move { - if let Err(e) = process_socket(socket, acc).await { + if let Err(e) = process_socket(socket, addr, acc).await { tracing::error!("socket error: {e:?}"); } }); } } -async fn process_socket(stream: TcpStream, cache: Arc) -> Result<(), ()> { +#[instrument(level = "trace", skip(cache, stream))] +async fn process_socket( + mut stream: TcpStream, + addr: SocketAddr, + cache: Arc, +) -> Result<(), ()> { + tracing::info!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client connected" + ); + let client_packet = match Packet::parse(&mut stream).await { + Some(x) => x, + None => { + tracing::trace!("Client HANDSHAKE -> bad packet; Disconnecting..."); + return Ok(()); + } + }; + tracing::info!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client disconnected" + ); todo!() } // ----- Debug tools ----- diff --git a/src/packets/clientbound/login.rs b/src/packets/clientbound/login.rs index 9a01a97..5dc0608 100644 --- a/src/packets/clientbound/login.rs +++ b/src/packets/clientbound/login.rs @@ -1,4 +1,4 @@ -use std::io::Write; +use tokio::{io::AsyncWriteExt, net::TcpStream}; use crate::{ packets::{Packet, SendPacket}, @@ -13,19 +13,18 @@ pub struct Disconnect { } impl Disconnect { - pub fn parse(packet: Packet) -> Option { - let mut reader = packet.data.into_iter(); + pub async fn parse(packet: Packet) -> Option { Some(Disconnect { all: packet.all, - reason: VarString::parse(&mut reader)?, + reason: VarString::parse(&mut packet.data.clone()).await?, }) } pub fn get_string(&self) -> String { self.reason.get_value() } - pub fn set_reason(reason: String) -> Option { + pub async fn set_reason(reason: String) -> Option { let vec = VarString::from(reason).move_data()?; - Disconnect::parse(Packet::from_bytes(0, vec)?) + Disconnect::parse(Packet::from_bytes(0, vec)?).await } pub fn get_all(&self) -> Vec { self.all.clone() @@ -33,9 +32,9 @@ impl Disconnect { } impl SendPacket for Disconnect { - fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> { - stream.write_all(&self.all)?; - stream.flush()?; + async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> { + stream.write_all(&self.all).await?; + stream.flush().await?; Ok(()) } } diff --git a/src/packets/clientbound/status.rs b/src/packets/clientbound/status.rs index 2ac6435..2b0cfc5 100644 --- a/src/packets/clientbound/status.rs +++ b/src/packets/clientbound/status.rs @@ -1,7 +1,8 @@ -use std::{collections::HashMap, io::Write}; +use std::collections::HashMap; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; +use tokio::{io::AsyncWriteExt, net::TcpStream}; use crate::{ packets::{Packet, SendPacket}, @@ -117,11 +118,10 @@ pub struct StatusResponse { } impl StatusResponse { - pub fn parse(packet: Packet) -> Option { - let mut reader = packet.data.into_iter(); + pub async fn parse(packet: Packet) -> Option { Some(StatusResponse { all: packet.all, - json: VarString::parse(&mut reader)?, + json: VarString::parse(&mut packet.data.clone()).await?, }) } pub fn get_string(&self) -> String { @@ -137,9 +137,11 @@ impl StatusResponse { } None } - pub fn set_json(json: Box) -> StatusResponse { + pub async fn set_json(json: Box) -> StatusResponse { let vec = VarString::from(json.get_string()).move_data().unwrap(); - StatusResponse::parse(Packet::from_bytes(0, vec).unwrap()).unwrap() + StatusResponse::parse(Packet::from_bytes(0, vec).unwrap()) + .await + .unwrap() } pub fn get_all(&self) -> Vec { self.all.clone() @@ -147,9 +149,9 @@ impl StatusResponse { } impl SendPacket for StatusResponse { - fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> { - stream.write_all(&self.all)?; - stream.flush()?; + async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> { + stream.write_all(&self.all).await?; + stream.flush().await?; Ok(()) } } diff --git a/src/packets/mod.rs b/src/packets/mod.rs index eadcdef..83f2803 100644 --- a/src/packets/mod.rs +++ b/src/packets/mod.rs @@ -1,7 +1,10 @@ -use std::{ - io::{self, Read, Write}, - net::TcpStream, -}; +// use std::io::{self, Read, Write}; + +use tokio::io; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +use tracing::instrument; use crate::types::VarInt; pub mod clientbound; @@ -15,12 +18,12 @@ pub struct Packet { pub all: Vec, } pub trait SendPacket { - fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()>; + async fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()>; } impl SendPacket for Packet { - fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()> { - stream.write_all(&self.all)?; + async fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()> { + stream.write_all(&self.all).await?; Ok(()) } } @@ -53,14 +56,14 @@ impl Packet { all, }) } - pub fn parse(buf: &mut TcpStream) -> Option { - let bytes_iter = &mut buf.bytes().into_iter().map(|x| x.unwrap()); - let length = VarInt::parse(bytes_iter)?; - // println!("---length: {length}"); - let id = match VarInt::parse(bytes_iter) { + #[instrument(level = "trace")] + pub async fn parse(buf: &mut TcpStream) -> Option { + let length = VarInt::parse(buf).await?; + tracing::trace!(length = length.get_int()); + let id = match VarInt::parse(buf).await { Some(x) => x, None => { - println!("Packet id problem(it was None)! REEEEEEEEEEEEEEEEEEEE"); + tracing::error!("Packet id problem(it was None)! REEEEEEEEEEEEEEEEEEEE"); panic!(); // return None; } @@ -71,7 +74,7 @@ impl Packet { } let mut data: Vec = vec![0; length.get_int() as usize - id.get_data().len()]; - match buf.read_exact(&mut data) { + match buf.read_exact(&mut data).await { Ok(_) => { // data_id.append(&mut data.clone()); // data_length.append(&mut data_id); diff --git a/src/packets/serverbound/handshake.rs b/src/packets/serverbound/handshake.rs index ce8e4e5..659f3f2 100644 --- a/src/packets/serverbound/handshake.rs +++ b/src/packets/serverbound/handshake.rs @@ -1,6 +1,4 @@ -use std::io::Write; - -use nix::NixPath; +use tokio::{io::AsyncWriteExt, net::TcpStream}; use crate::{ packets::{Packet, SendPacket}, @@ -17,12 +15,12 @@ pub struct Handshake { } impl Handshake { - pub fn parse(packet: Packet) -> Option { - let mut reader = packet.data.clone().into_iter(); - let protocol_version = VarInt::parse(&mut reader)?; - let server_address = VarString::parse(&mut reader)?; - let server_port = UShort::parse(&mut reader)?; - let next_state = VarInt::parse(&mut reader)?; + pub async fn parse(packet: Packet) -> Option { + let mut reader = packet.data.clone(); + let protocol_version = VarInt::parse(&mut reader).await?; + let server_address = VarString::parse(&mut reader).await?; + let server_port = UShort::parse(&mut reader).await?; + let next_state = VarInt::parse(&mut reader).await?; Some(Handshake { protocol_version, server_address, @@ -62,9 +60,9 @@ impl Handshake { } impl SendPacket for Handshake { - fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> { - stream.write_all(&self.all)?; - stream.flush()?; + async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> { + stream.write_all(&self.all).await?; + stream.flush().await?; Ok(()) } } diff --git a/src/packets/serverbound/status.rs b/src/packets/serverbound/status.rs index 0edf0cf..da7eb3d 100644 --- a/src/packets/serverbound/status.rs +++ b/src/packets/serverbound/status.rs @@ -1,5 +1,7 @@ use std::io::Write; +use tokio::{io::AsyncWriteExt, net::TcpStream}; + use crate::packets::{Packet, SendPacket}; /// id: 0x00 @@ -14,9 +16,9 @@ impl StatusRequest { } impl SendPacket for StatusRequest { - fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> { - stream.write_all(&self.all)?; - stream.flush()?; + async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> { + stream.write_all(&self.all).await?; + stream.flush().await?; Ok(()) } } diff --git a/src/types/mod.rs b/src/types/mod.rs index 6cca7e1..6e8d3b4 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -1,8 +1,38 @@ use std::fmt::Display; +use tokio::{ + io::{self, AsyncRead, AsyncReadExt}, + net::TcpStream, +}; + const SEGMENT_BITS: u8 = 0x7F; const CONTINUE_BIT: u8 = 0x80; +pub trait TamiReader { + async fn next_byte(&mut self) -> io::Result>; +} + +impl TamiReader for TcpStream { + async fn next_byte(&mut self) -> io::Result> { + let mut buf: [u8; 1] = [0]; + let n = self.read(&mut buf).await?; + if n == 0 { + return Ok(None); + } + Ok(Some(buf[0])) + } +} + +impl TamiReader for Vec { + async fn next_byte(&mut self) -> io::Result> { + if self.is_empty() { + return Ok(None); + } else { + return Ok(Some(self.remove(0))); + } + } +} + #[derive(Debug)] pub struct VarInt { value: i32, @@ -28,14 +58,18 @@ impl VarInt { pub fn move_data(self) -> Vec { self.data } - pub fn read(data: &mut I) -> Option + pub async fn read(data: &mut I) -> Option where - I: Iterator, + I: TamiReader, { let mut value: i32 = 0; let mut position = 0; - for current_byte in data { + loop { + let current_byte = match data.next_byte().await { + Ok(x) => x?, + Err(e) => return None, + }; value |= ((current_byte & SEGMENT_BITS) as i32) << position; if current_byte & CONTINUE_BIT == 0 { @@ -49,16 +83,16 @@ impl VarInt { } Some(value) } - pub fn parse(reader: &mut I) -> Option + pub async fn parse(reader: &mut I) -> Option where - I: Iterator, + I: TamiReader, { let mut value: i32 = 0; let mut position = 0; let mut vec = Vec::new(); - for current_byte in reader { - let current_byte = current_byte; + loop { + let current_byte = reader.next_byte().await.ok()??; vec.push(current_byte); value |= ((current_byte & SEGMENT_BITS) as i32) << position; @@ -121,14 +155,17 @@ impl VarString { pub fn from(string: String) -> VarString { VarString { value: string } } - pub fn parse(data: &mut I) -> Option + pub async fn parse(data: &mut I) -> Option where - I: Iterator, + I: TamiReader, { - let length = VarInt::read(data)?; + let length = VarInt::read(data).await?; let mut vec = Vec::new(); for _ in 0..length { - vec.push(data.next()?); + vec.push(match data.next_byte().await { + Ok(x) => x?, + Err(e) => return None, + }); } Some(VarString { value: String::from_utf8(vec).ok()?, @@ -147,14 +184,20 @@ impl UShort { pub fn get_data(&self) -> Vec { self.data.clone() } - pub fn parse(data: &mut I) -> Option + pub async fn parse(data: &mut I) -> Option where - I: Iterator, + I: TamiReader, { - let mut vec = vec![data.next()?]; + let mut vec = vec![match data.next_byte().await { + Ok(x) => x?, + Err(e) => return None, + }]; let mut int: u16 = vec[0] as u16; int = int << 8; - vec.push(data.next()?); + vec.push(match data.next_byte().await { + Ok(x) => x?, + Err(_) => return None, + }); int |= vec[1] as u16; Some(UShort { value: int,