diff --git a/Cargo.lock b/Cargo.lock index 7d4f571..26c44ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1147,6 +1147,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "tracing-error", "tracing-subscriber", ] @@ -1723,6 +1724,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b1581020d7a273442f5b45074a6a57d5757ad0a47dac0e9f0bd57b81936f3db" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-log" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 5d6c72f..81379d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ tokio-stream = { version = "0.1.9", features = ["net"] } # Error tracing? tracing = "0.1.36" tracing-subscriber = {version = "0.3.17", features = ["env-filter"]} +tracing-error = "0.2.1" # Needed to parse data to yaml in kubectl serde = "1.0.130" diff --git a/src/KubeCache.rs b/src/KubeCache.rs deleted file mode 100644 index 27999df..0000000 --- a/src/KubeCache.rs +++ /dev/null @@ -1,41 +0,0 @@ -use k8s_openapi::api::apps::v1::Deployment; -use kube::{ - api::{ListParams, ObjectList}, - runtime::reflector::Lookup, - Api, Client, ResourceExt, -}; - -pub struct Cache { - deployments: Api, -} -impl Cache { - pub fn create() -> Option { - let kubeconfig = kube::config::Kubeconfig::read().unwrap(); - let client = Client::try_from(kubeconfig).unwrap(); - - let deployments: Api = Api::default_namespaced(client); - - return Some(Cache { deployments }); - } - pub async fn get_deploys(&self) -> ObjectList { - // let lp: ListParams = ListParams::default(); - let lp: ListParams = ListParams::default().labels("tami.moe/minecraft"); - self.deployments.list(&lp).await.unwrap() - } - - pub async fn query_addr(&self, addr: String) -> Option { - let deploys = self.get_deploys().await; - let result = deploys - .iter() - .find(|x| filter_label_value(x, addr.clone()))?; - Some(result.name()?.to_string()) - } -} - -fn filter_label_value(dep: &Deployment, str: String) -> bool { - dep.labels() - .values() - .filter(|x| x.as_str() == str.as_str()) - .count() - > 0 -} diff --git a/src/kube_cache.rs b/src/kube_cache.rs new file mode 100644 index 0000000..1654d4c --- /dev/null +++ b/src/kube_cache.rs @@ -0,0 +1,259 @@ +use std::{fmt, sync::Arc}; + +use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; +use kube::{ + api::{ListParams, ObjectList}, + runtime::reflector::Lookup, + Api, Client, ResourceExt, +}; +use tokio::{net::TcpStream, sync::Mutex}; + +use crate::{ + kube_cache, + packets::{serverbound::handshake::Handshake, Packet, SendPacket}, + OpaqueError, +}; + +#[derive(Debug)] +pub struct Cache { + deployments: Api, + services: Api, +} +impl Cache { + pub fn create() -> Option { + let kubeconfig = kube::config::Kubeconfig::read().unwrap(); + let client = Client::try_from(kubeconfig).unwrap(); + + let deployments: Api = Api::default_namespaced(client.clone()); + let services: Api = Api::default_namespaced(client); + + return Some(Cache { + deployments, + services, + }); + } + pub async fn get_dep(&self, name: &str) -> Result { + self.deployments.get(name).await + } + pub async fn get_srv(&self, name: &str) -> Result { + self.services.get(name).await + } + pub async fn get_deploys(&self) -> ObjectList { + // let lp: ListParams = ListParams::default(); + let lp: ListParams = ListParams::default().labels("tami.moe/minecraft"); + self.deployments.list(&lp).await.unwrap() + } + pub async fn get_srvs(&self) -> ObjectList { + // let lp: ListParams = ListParams::default(); + let lp: ListParams = ListParams::default().labels("tami.moe/minecraft"); + self.services.list(&lp).await.unwrap() + } + + pub async fn query_dep_addr(&self, addr: &str) -> Option { + let deploys = self.get_deploys().await; + let result = deploys.iter().find(|x| filter_label_value(x, addr))?; + Some(result.name()?.to_string()) + } + + pub async fn query_srv_addr(&self, addr: &str) -> Option { + let deploys = self.get_srvs().await; + let result = deploys.iter().find(|x| filter_label_value(x, addr))?; + Some(result.name()?.to_string()) + } + + // pub fn set_dep( + // &self, + // name: &str, + // pp: &PatchParams, + // ) -> impl std::future::Future> { + // let patch = Patch; + // self.deployments.patch(name, pp, patch) + // } +} + +pub struct KubeServer { + dep: Deployment, + srv: Service, +} +impl fmt::Debug for KubeServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("KubeServer") + .field( + "dep", + &self + .dep + .metadata + .clone() + .name + .unwrap_or("#error#".to_string()), + ) + .field( + "srv", + &self + .srv + .metadata + .clone() + .name + .unwrap_or("#error#".to_string()), + ) + .finish() + } +} +impl KubeServer { + #[tracing::instrument(name = "KubeServer::create", level = "info", skip(cache, server_addr))] + pub async fn create( + cache: Arc>, + server_addr: &str, + ) -> Result { + let cache_guard = cache.lock().await; + let dep_name = match cache_guard.query_dep_addr(server_addr).await { + Some(x) => x, + None => { + return Err(OpaqueError::create(&format!( + "Failed to find deployment name by addr" + ))) + } + }; + let srv_name = match cache_guard.query_srv_addr(server_addr).await { + Some(x) => x, + None => { + return Err(OpaqueError::create(&format!( + "Failed to find service name by addr" + ))) + } + }; + + let deployment = cache_guard.get_dep(&dep_name).await.map_err(|x| { + format!( + "Failed to query cache for deployment with dep_name err:{}", + x.to_string() + ) + })?; + let service = cache_guard.get_srv(&srv_name).await.map_err(|x| { + format!( + "Failed to query cache for service with dep_name err:{}", + x.to_string() + ) + })?; + drop(cache_guard); + tracing::info!("found kubernetes deployment & service"); + + return Ok(Self { + dep: deployment, + srv: service, + }); + } + fn get_port(&self) -> Option { + let a = self.srv.clone().spec.unwrap().ports.unwrap(); + let port = a.iter().find(|x| x.name.clone().unwrap() == "mc-router")?; + port.node_port + } + #[tracing::instrument(level = "info")] + pub async fn get_server_status(&self) -> Result { + let mut status = match self.dep.clone().status { + Some(x) => x, + None => { + return Err(OpaqueError::create( + "failed to get status of deployment for checking replicas", + )) + } + }; + let total_replicas = status + .replicas + .get_or_insert_with(|| { + tracing::trace!("total_replicas failed to get"); + -1 + }) + .clone(); + let available_replicas = status + .available_replicas + .get_or_insert_with(|| { + tracing::trace!("available_replicas failed to get"); + -1 + }) + .clone(); + let ready_replicas = status + .ready_replicas + .get_or_insert_with(|| { + tracing::trace!("ready_replicas failed to get"); + -1 + }) + .clone(); + tracing::debug!("total_replicas: {total_replicas} available_replicas: {available_replicas} ready_replicas : {ready_replicas }"); + + if total_replicas > 0 { + if ready_replicas > 0 { + //TODO: add Connectable check + return match self.query_server_connectable().await { + Ok(()) => Ok(ServerDeploymentStatus::Connectable), + Err(_) => Ok(ServerDeploymentStatus::PodOk), + }; + } + return Ok(ServerDeploymentStatus::Starting); + } else { + return Ok(ServerDeploymentStatus::Offline); + } + } + async fn query_server_connectable(&self) -> Result<(), OpaqueError> { + let port = self + .get_port() + .ok_or_else(|| "failed to get port from service")?; + let server_stream = TcpStream::connect(format!("localhost:{port}")) + .await + .map_err(|_| "Failed to connect to minecraft server")?; + + tracing::trace!( + "successfully connected to backend server; (connectibility check) {:?}", + server_stream.peer_addr() + ); + Ok(()) + } + pub async fn proxy_status( + &self, + handshake: &Handshake, + status_request: &Packet, + client_stream: &mut TcpStream, + ) -> Result<(), OpaqueError> { + let port = self + .get_port() + .ok_or_else(|| "failed to get port from service")?; + let mut server_stream = TcpStream::connect(format!("localhost:{port}")) + .await + .map_err(|_| "Failed to connect to minecraft server")?; + + handshake + .send_packet(&mut server_stream) + .await + .map_err(|_| "Failed to forward handshake packet to minecraft server")?; + status_request + .send_packet(&mut server_stream) + .await + .map_err(|_| "Failed to forward status request packet to minecraft server")?; + + let data_amount = tokio::io::copy_bidirectional(client_stream, &mut server_stream) + .await + .map_err(|e| { + format!( + "Error during bidirectional copy between server and client; err={:?}", + e + ) + })?; + tracing::info!("data exchanged while proxing: {:?}", data_amount); + Ok(()) + } +} + +fn filter_label_value(dep: &&R, str: &str) -> bool +where + R: ResourceExt, +{ + dep.labels().values().filter(|x| x.as_str() == str).count() > 0 +} + +#[derive(Debug)] +pub enum ServerDeploymentStatus { + Connectable, + Starting, + PodOk, + Offline, +} diff --git a/src/main.rs b/src/main.rs index beb8c83..d2bc56d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,17 +4,24 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::net::{TcpListener, TcpStream}; -use tracing::instrument; +use tokio::sync::Mutex; use tracing_subscriber::{prelude::*, EnvFilter}; -use crate::packets::Packet; +use crate::kube_cache::{KubeServer, ServerDeploymentStatus}; +use crate::opaque_error::OpaqueError; +use crate::packets::clientbound::status::StatusStructNew; +use crate::packets::serverbound::handshake::Handshake; +use crate::packets::{Packet, SendPacket}; -mod KubeCache; +mod kube_cache; +mod mc_server; +mod opaque_error; mod packets; mod types; #[tokio::main] async fn main() { + // ---- Tracing setup ---- // tracing_subscriber::fmt::init(); let fmt_layer = tracing_subscriber::fmt::layer() .with_target(false) @@ -23,13 +30,14 @@ async fn main() { tracing_subscriber::registry() .with(fmt_layer) .with(filter_layer) + .with(tracing_error::ErrorLayer::default()) .init(); let commit_hash: &'static str = env!("COMMIT_HASH"); tracing::info!("COMMIT_HASH: {}", commit_hash); - let cache = KubeCache::Cache::create().unwrap(); - let arc_cache = Arc::new(cache); + let cache = kube_cache::Cache::create().unwrap(); + let arc_cache = Arc::new(Mutex::new(cache)); tracing::info!("kube api initialized"); let listener = TcpListener::bind("0.0.0.0:25565").await.unwrap(); @@ -40,45 +48,125 @@ async fn main() { let acc = arc_cache.clone(); tokio::spawn(async move { - if let Err(e) = process_socket(socket, addr, acc).await { - tracing::error!("socket error: {e:?}"); + tracing::info!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client connected" + ); + if let Err(e) = process_connection(socket, addr, acc).await { + tracing::error!( + message = format!("Client disconnected"), + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = format!("{}", e.get_span_trace()), + err = format!("{}", e.context) + ); + } else { + tracing::info!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client disconnected" + ); } }); } } -#[instrument(level = "trace", skip(cache, stream))] -async fn process_socket( - mut stream: TcpStream, +#[tracing::instrument(level = "info", skip(cache, client_stream))] +async fn process_connection( + mut client_stream: TcpStream, addr: SocketAddr, - cache: Arc, -) -> Result<(), ()> { - tracing::info!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client connected" - ); - let client_packet = match Packet::parse(&mut stream).await { + cache: Arc>, +) -> Result<(), OpaqueError> { + let client_packet = match Packet::parse(&mut client_stream).await { Some(x) => x, None => { tracing::trace!("Client HANDSHAKE -> bad packet; Disconnecting..."); return Ok(()); } }; - tracing::info!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client disconnected" - ); - todo!() -} -// ----- Debug tools ----- -// let mc_deployments = cache.get_deploys().await; -// for dep in mc_deployments.iter() { -// println!("{:?}", dep.labels().values()) -// } -// // println!("{:?}", mc_deployments); -// println!("count: {}", mc_deployments.iter().count()); -// println!( -// "{:?}", -// cache.query_addr("ferret.tami.moe".to_string()).await -// ); + // --- Handshake --- + let handshake; + let server_state; + if client_packet.id.get_int() != 0 { + return Err(OpaqueError::create( + "Client HANDSHAKE -> bad packet; Disconnecting...", + )); + } + handshake = packets::serverbound::handshake::Handshake::parse(client_packet) + .await + .ok_or_else(|| "Handshake request from client failed to parse".to_string())?; + + server_state = handshake.get_next_state(); + + match server_state { + packets::ProtocolState::Status => { + handle_status( + &mut client_stream, + cache, + handshake.get_server_address(), + &handshake, + ) + .await?; + } + packets::ProtocolState::Login => todo!(), + packets::ProtocolState::Transfer => todo!(), + _ => todo!(), + }; + Ok(()) +} + +#[tracing::instrument(level = "info", skip(client_stream, cache, handshake))] +async fn handle_status( + client_stream: &mut TcpStream, + cache: Arc>, + server_addr: String, + handshake: &Handshake, +) -> Result<(), OpaqueError> { + tracing::debug!(handshake = ?handshake); + let client_packet = Packet::parse(client_stream) + .await + .ok_or_else(|| "Could not parse client_packet".to_string())?; + match client_packet.id.get_int() { + 0 => tracing::info!("Client STATUS: {:#x} Status Request", 0), + _ => { + return Err(OpaqueError::create(&format!( + "Client STATUS: {:#x} Unknown Id -> Shutdown", + client_packet.id.get_int(), + ))) + } + }; + + let kube_server = KubeServer::create(cache, &server_addr).await?; + + let status: ServerDeploymentStatus = kube_server.get_server_status().await?; + tracing::info!("kube server status: {:?}", status); + + let commit_hash: &'static str = env!("COMMIT_HASH"); + let mut status_struct = StatusStructNew::create(); + status_struct.version.protocol = handshake.protocol_version.get_int(); + match status { + ServerDeploymentStatus::Connectable => { + return kube_server + .proxy_status(handshake, &client_packet, client_stream) + .await + } + ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { + status_struct.players.max = 1; + status_struct.players.online = 1; + status_struct.description.text = format!("§aServer is starting...§r please wait\n - §dTami§r with §d<3§r §8(rev: {commit_hash})§r"); + } + ServerDeploymentStatus::Offline => { + status_struct.players.max = 1; + status_struct.description.text = format!("Server is currently §onot§r running. \n§aJoin to start it!§r - §dTami§r with §d<3§r §8(rev: {commit_hash})§r"); + } + }; + let status_res = + packets::clientbound::status::StatusResponse::set_json(Box::new(status_struct)).await; + status_res + .send_packet(client_stream) + .await + .map_err(|_| "Failed to send status packet")?; + + mc_server::handle_ping(client_stream).await?; + + Ok(()) +} diff --git a/src/mc_server.rs b/src/mc_server.rs new file mode 100644 index 0000000..07c5c7d --- /dev/null +++ b/src/mc_server.rs @@ -0,0 +1,28 @@ +use tokio::net::TcpStream; + +use crate::{ + packets::{Packet, SendPacket}, + OpaqueError, +}; + +#[tracing::instrument(skip(client_stream))] +pub async fn handle_ping(client_stream: &mut TcpStream) -> Result<(), OpaqueError> { + // --- Respond to ping packet --- + let ping_packet = Packet::parse(client_stream) + .await + .ok_or("Ping packett failed to parse")?; + match ping_packet.id.get_int() { + 1 => Ok(ping_packet + .send_packet(client_stream) + .await + .map_err(|_| "Failed to send ping")?), + _ => { + return Err(OpaqueError::create(&format!( + "Expected ping packet, got: {}", + ping_packet.id.get_int() + ))); + } + } +} + +// pub async fn handle_redirect(client_stream: &mut TcpStream) -> Result<(), OpaqueError> {} diff --git a/src/opaque_error.rs b/src/opaque_error.rs new file mode 100644 index 0000000..7d602b3 --- /dev/null +++ b/src/opaque_error.rs @@ -0,0 +1,73 @@ +use std::{error::Error, fmt}; +use tracing_error::SpanTrace; + +#[derive(Debug)] +pub struct OpaqueError { + span_trace: SpanTrace, + pub context: String, +} + +impl fmt::Display for OpaqueError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // optional: only print span trace if requested "{:#}" + if f.alternate() { + let mut vec = Vec::new(); + + self.span_trace.with_spans(|metadata, _fields| { + vec.push(metadata.name()); + true + }); + if vec.len() > 1 { + write!(f, "trace = {}", vec.pop().unwrap())?; + } + vec.reverse(); + for s in vec { + write!(f, "::{}", s)?; + } + }; + write!(f, " {}", self.context)?; + Ok(()) + } +} +impl Error for OpaqueError {} +impl OpaqueError { + pub fn create(str: &str) -> OpaqueError { + Self { + span_trace: SpanTrace::capture(), + context: str.to_string(), + } + } + pub fn get_span_trace(&self) -> String { + let mut vec = Vec::new(); + + self.span_trace.with_spans(|metadata, _fields| { + vec.push(metadata.name()); + true + }); + vec.iter().rfold(String::new(), |mut acc, x| { + if acc.len() != 0 { + acc.push_str("::"); + } + acc.push_str(x); + acc + }) + } +} + +impl From for OpaqueError { + fn from(value: String) -> Self { + Self { + span_trace: SpanTrace::capture(), + context: value, + } + } +} + +impl From<&str> for OpaqueError { + fn from(value: &str) -> Self { + Self { + span_trace: SpanTrace::capture(), + context: value.to_string(), + } + } +} diff --git a/src/packets/clientbound/status.rs b/src/packets/clientbound/status.rs index 2b0cfc5..d991ac5 100644 --- a/src/packets/clientbound/status.rs +++ b/src/packets/clientbound/status.rs @@ -137,7 +137,7 @@ impl StatusResponse { } None } - pub async fn set_json(json: Box) -> StatusResponse { + pub async fn set_json(json: Box) -> StatusResponse { let vec = VarString::from(json.get_string()).move_data().unwrap(); StatusResponse::parse(Packet::from_bytes(0, vec).unwrap()) .await diff --git a/src/packets/mod.rs b/src/packets/mod.rs index 83f2803..7e00d08 100644 --- a/src/packets/mod.rs +++ b/src/packets/mod.rs @@ -122,3 +122,29 @@ impl Packet { // } // } } + +#[derive(Copy, Clone, PartialEq)] +pub enum ProtocolState { + Handshaking, + Status, + Login, + Transfer, + Configuration, + Play, + ShutDown, +} + +impl ToString for ProtocolState { + fn to_string(&self) -> String { + match self { + ProtocolState::Handshaking => "Hanshake", + ProtocolState::Status => "Status", + ProtocolState::Login => "Login", + ProtocolState::Configuration => "Configuration ", + ProtocolState::Play => "Play", + ProtocolState::ShutDown => "Shutdown", + ProtocolState::Transfer => "Transfer", + } + .to_string() + } +} diff --git a/src/packets/serverbound/handshake.rs b/src/packets/serverbound/handshake.rs index 659f3f2..40f6a6a 100644 --- a/src/packets/serverbound/handshake.rs +++ b/src/packets/serverbound/handshake.rs @@ -1,7 +1,7 @@ use tokio::{io::AsyncWriteExt, net::TcpStream}; use crate::{ - packets::{Packet, SendPacket}, + packets::{Packet, ProtocolState, SendPacket}, types::{UShort, VarInt, VarString}, }; @@ -13,6 +13,16 @@ pub struct Handshake { pub next_state: VarInt, all: Vec, } +impl std::fmt::Debug for Handshake { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handshake") + .field("protocol_version", &self.protocol_version.get_int()) + .field("server_address", &self.server_address.get_value()) + .field("server_port", &self.server_port.get_value()) + .field("next_state", &self.next_state.get_int()) + .finish() + } +} impl Handshake { pub async fn parse(packet: Packet) -> Option { @@ -21,6 +31,11 @@ impl Handshake { let server_address = VarString::parse(&mut reader).await?; let server_port = UShort::parse(&mut reader).await?; let next_state = VarInt::parse(&mut reader).await?; + + // If you remove this, also fix get_next_state() to return an Option<> + if next_state.get_int() > 3 || next_state.get_int() < 0 { + return None; + } Some(Handshake { protocol_version, server_address, @@ -32,8 +47,13 @@ impl Handshake { pub fn get_server_address(&self) -> String { self.server_address.get_value() } - pub fn get_next_state(&self) -> i32 { - self.next_state.get_int() + pub fn get_next_state(&self) -> ProtocolState { + match self.next_state.get_int() { + 1 => ProtocolState::Status, + 2 => ProtocolState::Login, + 3 => ProtocolState::Transfer, + _ => unreachable!(), + } } pub fn create(