From ac8812be2f53577acfd8d21f81dc5e33013fce93 Mon Sep 17 00:00:00 2001 From: Tamipes Date: Mon, 8 Jun 2026 13:47:03 +0200 Subject: [PATCH] chore: move the proxy logic out of main --- src/main.rs | 290 +-------------------------------------------------- src/proxy.rs | 286 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 291 insertions(+), 285 deletions(-) create mode 100644 src/proxy.rs diff --git a/src/main.rs b/src/main.rs index 6e7f9bc..e393480 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,15 @@ -use evalexpr::*; -use std::env; -use std::net::SocketAddr; -use std::time::Duration; - -use tokio::net::{TcpListener, TcpStream}; -use tracing::Instrument; -use tracing_subscriber::{prelude::*, EnvFilter}; - -use crate::mc_server::{MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; use crate::opaque_error::OpaqueError; -use crate::packets::clientbound::status::StatusStructNew; -use crate::packets::serverbound::handshake::Handshake; -use crate::packets::serverbound::login::LoginStart; -use crate::packets::{Packet, SendPacket}; +use std::env; +use tokio::net::TcpListener; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; mod kube_cache; mod mc_server; mod opaque_error; mod packets; +mod proxy; mod types; -static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); -static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); - #[tokio::main] async fn main() { // ---- Tracing setup ---- @@ -59,7 +46,7 @@ async fn main() { addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), "Client connected" ); - if let Err(e) = process_connection(socket, addr, api, config).await { + if let Err(e) = proxy::process_connection(socket, addr, api, config).await { match e.level { tracing::Level::ERROR => tracing::error!( // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), @@ -109,273 +96,6 @@ async fn main() { } } -async fn process_connection( - mut client_stream: TcpStream, - addr: SocketAddr, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, - config: Config, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ - // this is wrapper so that async doesnt mess up the span, and - // to make sure this doesn't propagate to later `handle_*` - #[tracing::instrument(level = "info", skip(client_stream, config))] - async fn first_packet( - client_stream: &mut TcpStream, - config: Config, - ) -> Result, OpaqueError> { - let client_packet = Packet::parse(client_stream).await?; - - // --- Handshake --- - let handshake; - let packet_id = client_packet.id.get_int(); - if packet_id != 0 { - return Err(OpaqueError::create(&format!( - "Client HANDSHAKE -> bad packet; id={packet_id} Disconnecting..." - ))); - } - handshake = packets::serverbound::handshake::Handshake::parse(client_packet) - .await - .ok_or_else(|| "Client HANDSHAKE -> malformed packet; Disconnecting...".to_string())?; - - let filter = eval_boolean(&format!( - "addr=\"{}\";{}", - handshake.get_server_address(), - config.filter_conn - )) - .map_err(|e| format!("filter error! err={:?}", e))?; - if filter { - // TODO: if the server just returns here, the client does not know it - // and sends a packet with the 122 WeirdID - tracing::trace!("filtered out the connection"); - return Ok(None); - } - - Ok(Some(handshake)) - } - let handshake = match first_packet(&mut client_stream, config).await? { - Some(x) => x, - // this is needed because of the filter - None => return Ok(()), - }; - - let next_server_state = handshake.get_next_state(); - match next_server_state { - packets::ProtocolState::Status => { - handle_status(&mut client_stream, &handshake, api).await?; - } - packets::ProtocolState::Login => { - // This block of packet parsing is needed here, so the span with the - // username is correctly propagated due to the async nature of things - let span = tracing::span!( - tracing::Level::INFO, - "login_username_extract", - join_addr = handshake.get_server_address(), - join_port = handshake.server_port.get_value() - ); - - let packet = Packet::parse(&mut client_stream) - .instrument(span.clone()) - .await?; - let login_packet = packets::serverbound::login::LoginStart::parse(packet) - .instrument(span.clone()) - .await - .ok_or("Failed to parse login start packet".to_string())?; - handle_login(&mut client_stream, &handshake, login_packet, api).await? - } - packets::ProtocolState::Transfer => { - return Err(OpaqueError::create( - "next state is transfer; Not yet implemented!", - )) - } - // This is used becuase Handshake::parse returns none if is something else - _ => unreachable!(), - }; - Ok(()) -} - -#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value()),skip(client_stream, handshake, api))] -async fn handle_status( - client_stream: &mut TcpStream, - handshake: &Handshake, - api: impl MinecraftAPI, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ - let client_packet = Packet::parse(client_stream).await?; - if client_packet.id.get_int() != 0 { - return Err(OpaqueError::create(&format!( - "Client STATUS: {:#x} Unknown Id -> Shutdown", - client_packet.id.get_int(), - ))); - }; - - let join_addr = handshake.get_server_address(); - let mut status_struct = StatusStructNew::create(); - status_struct.version.protocol = handshake.protocol_version.get_int(); - - let server = match api - .query_server( - &handshake.get_server_address(), - &handshake.server_port.get_value().to_string(), - ) - .await - { - Ok(x) => x, - Err(e) => { - let span = tracing::span!(tracing::Level::INFO, "unavailable", err = e.get_kind()); - status_struct.players.max = 0; - status_struct.players.online = 0; - status_struct.description.text = format!( - "Could not find §kserver§r: §f§o{join_addr}§r\nMinecraft Ingress - {BYE_MESSAGE}" - ); - - mc_server::complete_status_request(client_stream, status_struct) - .instrument(span.clone()) - .await?; - - // Recieve the ping packet, so the client does not send it again - let _ping = Packet::parse(client_stream) - .instrument(span.clone()) - .await - .map_err(|err| { - tracing::debug!( - err = OpaqueError::from(err).context, - "failed to parse ping packet" - ) - }); - // Send a bad ping packet back, so the client shows *searching* icon - let _pong = Packet::new(9, vec![0; 8]) - .ok_or("failed to create empty pong packet?")? - .send_packet(client_stream) - .instrument(span.clone()) - .await - .map_err(|_| tracing::debug!("failed to send pong packet")); - - let status = ServerDeploymentStatus::Unavailable(e.get_kind().to_string()); - tracing::info!(status = ?status, "status request"); - return Ok(()); - } - }; - let status = server.query_status().await?; - tracing::info!(status = ?status, "status request"); - let motd = server - .get_motd() - .unwrap_or("A minecraft server (proxy motd)".to_string()); - - match status { - ServerDeploymentStatus::Connectable(mut server_stream) => { - return server - .proxy_status(handshake, &client_packet, client_stream, &mut server_stream) - .await - } - ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { - status_struct.players.max = 1; - status_struct.players.online = 1; - status_struct.description.text = - format!("{motd}\n§2Starting!§r §b§oWait a bit§r§b ^^§r - {BYE_MESSAGE}"); - } - ServerDeploymentStatus::Offline => { - status_struct.players.max = 1; - status_struct.description.text = - format!("{motd}\n§4Offline§r §oJoin to start!§r - {BYE_MESSAGE}"); - } - ServerDeploymentStatus::Unavailable(_) => unreachable!(), - }; - - mc_server::complete_status_request(client_stream, status_struct).await?; - return mc_server::handle_ping(client_stream).await; -} - -#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value(),username = login_start.name.get_value()),skip(client_stream, handshake, api, login_start))] -async fn handle_login( - client_stream: &mut TcpStream, - handshake: &Handshake, - login_start: LoginStart, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ - let server = api - .query_server( - &handshake.get_server_address(), - &handshake.server_port.get_value().to_string(), - ) - .await; - - let status = match server.as_ref() { - Ok(x) => x.query_status().await?, - Err(e) => ServerDeploymentStatus::Unavailable(e.get_kind().to_string()), - }; - tracing::debug!(msg = "server status", status = ?status); - match status { - ServerDeploymentStatus::Connectable(mut server_stream) => { - let server = server?; - api.start_watch(server.clone(), OFFLINE_TIMER).await?; - - // referenced from: - // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs - let io_sl2sr = tokio_splice2::context::SpliceIoCtx::prepare() - .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? - .into_io(); - - let io_sr2sl = tokio_splice2::context::SpliceIoCtx::prepare() - .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? - .into_io(); - - handshake - .send_packet(&mut server_stream) - .await - .map_err(|_| "failed to forward handshake packet to minecraft server")?; - - login_start - .send_packet(&mut server_stream) - .await - .map_err(|e| { - "failed to forward login packet to server before splicing".to_string() - })?; - - tracing::info!("proxying with splice"); - let traffic = tokio_splice2::io::SpliceBidiIo { io_sl2sr, io_sr2sl } - .execute(client_stream, &mut server_stream) - .await; - - match traffic.error { - Some(e) => { - tracing::warn!( - tx = traffic.tx, - rx = traffic.rx, - err = ?e, - "connection splicing ended with error", - ) - } - None => tracing::info!( - tx = traffic.tx, - rx = traffic.rx, - "connection splicing ended", - ), - } - } - ServerDeploymentStatus::PodOk | ServerDeploymentStatus::Starting => { - tracing::info!(?status, "server is starting... disconnecting client"); - mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"The server is still starting up...\n wait a bit more please ^^\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; - } - ServerDeploymentStatus::Offline => { - let server = server?; - server.start().await?; - api.start_watch(server.clone(), OFFLINE_TIMER).await?; - mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; - } - ServerDeploymentStatus::Unavailable(_) => { - tracing::info!(?status, "tried connecting, droppping connection"); - } - } - Ok(()) -} - #[derive(Clone)] struct Config { pub filter_conn: String, diff --git a/src/proxy.rs b/src/proxy.rs new file mode 100644 index 0000000..cc343b1 --- /dev/null +++ b/src/proxy.rs @@ -0,0 +1,286 @@ +use evalexpr::*; +use std::env; +use std::net::SocketAddr; +use std::time::Duration; + +use tokio::net::{TcpListener, TcpStream}; +use tracing::Instrument; +use tracing_subscriber::{prelude::*, EnvFilter}; + +use crate::mc_server::{self, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; +use crate::opaque_error::OpaqueError; +use crate::packets::clientbound::status::StatusStructNew; +use crate::packets::serverbound::handshake::Handshake; +use crate::packets::serverbound::login::LoginStart; +use crate::packets::{self, Packet, SendPacket}; +use crate::Config; + +static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); +static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); + +pub async fn process_connection( + mut client_stream: TcpStream, + addr: SocketAddr, + api: impl MinecraftAPI + Send + Sync + 'static + Clone, + config: Config, +) -> Result<(), OpaqueError> +where + T: Send + Sync + 'static, +{ + // this is wrapper so that async doesnt mess up the span, and + // to make sure this doesn't propagate to later `handle_*` + #[tracing::instrument(level = "info", skip(client_stream, config))] + async fn first_packet( + client_stream: &mut TcpStream, + config: Config, + ) -> Result, OpaqueError> { + let client_packet = Packet::parse(client_stream).await?; + + // --- Handshake --- + let handshake; + let packet_id = client_packet.id.get_int(); + if packet_id != 0 { + return Err(OpaqueError::create(&format!( + "Client HANDSHAKE -> bad packet; id={packet_id} Disconnecting..." + ))); + } + handshake = packets::serverbound::handshake::Handshake::parse(client_packet) + .await + .ok_or_else(|| "Client HANDSHAKE -> malformed packet; Disconnecting...".to_string())?; + + let filter = eval_boolean(&format!( + "addr=\"{}\";{}", + handshake.get_server_address(), + config.filter_conn + )) + .map_err(|e| format!("filter error! err={:?}", e))?; + if filter { + // TODO: if the server just returns here, the client does not know it + // and sends a packet with the 122 WeirdID + tracing::trace!("filtered out the connection"); + return Ok(None); + } + + Ok(Some(handshake)) + } + let handshake = match first_packet(&mut client_stream, config).await? { + Some(x) => x, + // this is needed because of the filter + None => return Ok(()), + }; + + let next_server_state = handshake.get_next_state(); + match next_server_state { + packets::ProtocolState::Status => { + handle_status(&mut client_stream, &handshake, api).await?; + } + packets::ProtocolState::Login => { + // This block of packet parsing is needed here, so the span with the + // username is correctly propagated due to the async nature of things + let span = tracing::span!( + tracing::Level::INFO, + "login_username_extract", + join_addr = handshake.get_server_address(), + join_port = handshake.server_port.get_value() + ); + + let packet = Packet::parse(&mut client_stream) + .instrument(span.clone()) + .await?; + let login_packet = packets::serverbound::login::LoginStart::parse(packet) + .instrument(span.clone()) + .await + .ok_or("Failed to parse login start packet".to_string())?; + handle_login(&mut client_stream, &handshake, login_packet, api).await? + } + packets::ProtocolState::Transfer => { + return Err(OpaqueError::create( + "next state is transfer; Not yet implemented!", + )) + } + // This is used becuase Handshake::parse returns none if is something else + _ => unreachable!(), + }; + Ok(()) +} + +#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value()),skip(client_stream, handshake, api))] +async fn handle_status( + client_stream: &mut TcpStream, + handshake: &Handshake, + api: impl MinecraftAPI, +) -> Result<(), OpaqueError> +where + T: Send + Sync + 'static, +{ + let client_packet = Packet::parse(client_stream).await?; + if client_packet.id.get_int() != 0 { + return Err(OpaqueError::create(&format!( + "Client STATUS: {:#x} Unknown Id -> Shutdown", + client_packet.id.get_int(), + ))); + }; + + let join_addr = handshake.get_server_address(); + let mut status_struct = StatusStructNew::create(); + status_struct.version.protocol = handshake.protocol_version.get_int(); + + let server = match api + .query_server( + &handshake.get_server_address(), + &handshake.server_port.get_value().to_string(), + ) + .await + { + Ok(x) => x, + Err(e) => { + let span = tracing::span!(tracing::Level::INFO, "unavailable", err = e.get_kind()); + status_struct.players.max = 0; + status_struct.players.online = 0; + status_struct.description.text = format!( + "Could not find §kserver§r: §f§o{join_addr}§r\nMinecraft Ingress - {BYE_MESSAGE}" + ); + + mc_server::complete_status_request(client_stream, status_struct) + .instrument(span.clone()) + .await?; + + // Recieve the ping packet, so the client does not send it again + let _ping = Packet::parse(client_stream) + .instrument(span.clone()) + .await + .map_err(|err| { + tracing::debug!( + err = OpaqueError::from(err).context, + "failed to parse ping packet" + ) + }); + // Send a bad ping packet back, so the client shows *searching* icon + let _pong = Packet::new(9, vec![0; 8]) + .ok_or("failed to create empty pong packet?")? + .send_packet(client_stream) + .instrument(span.clone()) + .await + .map_err(|_| tracing::debug!("failed to send pong packet")); + + let status = ServerDeploymentStatus::Unavailable(e.get_kind().to_string()); + tracing::info!(status = ?status, "status request"); + return Ok(()); + } + }; + let status = server.query_status().await?; + tracing::info!(status = ?status, "status request"); + let motd = server + .get_motd() + .unwrap_or("A minecraft server (proxy motd)".to_string()); + + match status { + ServerDeploymentStatus::Connectable(mut server_stream) => { + return server + .proxy_status(handshake, &client_packet, client_stream, &mut server_stream) + .await + } + ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { + status_struct.players.max = 1; + status_struct.players.online = 1; + status_struct.description.text = + format!("{motd}\n§2Starting!§r §b§oWait a bit§r§b ^^§r - {BYE_MESSAGE}"); + } + ServerDeploymentStatus::Offline => { + status_struct.players.max = 1; + status_struct.description.text = + format!("{motd}\n§4Offline§r §oJoin to start!§r - {BYE_MESSAGE}"); + } + ServerDeploymentStatus::Unavailable(_) => unreachable!(), + }; + + mc_server::complete_status_request(client_stream, status_struct).await?; + return mc_server::handle_ping(client_stream).await; +} + +#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value(),username = login_start.name.get_value()),skip(client_stream, handshake, api, login_start))] +async fn handle_login( + client_stream: &mut TcpStream, + handshake: &Handshake, + login_start: LoginStart, + api: impl MinecraftAPI + Send + Sync + 'static + Clone, +) -> Result<(), OpaqueError> +where + T: Send + Sync + 'static, +{ + let server = api + .query_server( + &handshake.get_server_address(), + &handshake.server_port.get_value().to_string(), + ) + .await; + + let status = match server.as_ref() { + Ok(x) => x.query_status().await?, + Err(e) => ServerDeploymentStatus::Unavailable(e.get_kind().to_string()), + }; + tracing::debug!(msg = "server status", status = ?status); + match status { + ServerDeploymentStatus::Connectable(mut server_stream) => { + let server = server?; + api.start_watch(server.clone(), OFFLINE_TIMER).await?; + + // referenced from: + // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs + let io_sl2sr = tokio_splice2::context::SpliceIoCtx::prepare() + .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? + .into_io(); + + let io_sr2sl = tokio_splice2::context::SpliceIoCtx::prepare() + .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? + .into_io(); + + handshake + .send_packet(&mut server_stream) + .await + .map_err(|_| "failed to forward handshake packet to minecraft server")?; + + login_start + .send_packet(&mut server_stream) + .await + .map_err(|e| { + "failed to forward login packet to server before splicing".to_string() + })?; + + tracing::info!("proxying with splice"); + let traffic = tokio_splice2::io::SpliceBidiIo { io_sl2sr, io_sr2sl } + .execute(client_stream, &mut server_stream) + .await; + + match traffic.error { + Some(e) => { + tracing::warn!( + tx = traffic.tx, + rx = traffic.rx, + err = ?e, + "connection splicing ended with error", + ) + } + None => tracing::info!( + tx = traffic.tx, + rx = traffic.rx, + "connection splicing ended", + ), + } + } + ServerDeploymentStatus::PodOk | ServerDeploymentStatus::Starting => { + tracing::info!(?status, "server is starting... disconnecting client"); + mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"The server is still starting up...\n wait a bit more please ^^\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; + } + ServerDeploymentStatus::Offline => { + let server = server?; + server.start().await?; + api.start_watch(server.clone(), OFFLINE_TIMER).await?; + mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; + } + ServerDeploymentStatus::Unavailable(_) => { + tracing::info!(?status, "tried connecting, droppping connection"); + } + } + Ok(()) +}