diff --git a/Cargo.lock b/Cargo.lock index fa7ecbf..599e7b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,6 +983,7 @@ dependencies = [ "tokio", "tokio-splice2", "tokio-stream", + "tokio-util", "tracing", "tracing-error", "tracing-subscriber", @@ -1706,13 +1707,14 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.17" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "slab", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6b03c20..85040c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ tokio-splice2 = "0.3.2" strip-ansi-escapes = "0.2.1" evalexpr = { version = "13.1.0", features = ["regex"] } async-trait = "0.1.89" +tokio-util = { version = "0.7.18", features = ["rt"] } diff --git a/kube/deployment.yaml b/kube/deployment.yaml index 5736290..1d5d7c8 100644 --- a/kube/deployment.yaml +++ b/kube/deployment.yaml @@ -15,7 +15,7 @@ spec: app: minecraft-ingress spec: serviceAccountName: minecraft-ingress - terminationGracePeriodSeconds: 5 + terminationGracePeriodSeconds: 28800 # This is 8 hours containers: - name: minecraft-ingress image: git.tami.moe/tamipes/minecraft-ingress:latest diff --git a/src/kube_cache.rs b/src/kube_cache.rs index b48ef26..199109d 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; +use async_trait::async_trait; use futures::StreamExt; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; use kube::{ @@ -115,6 +116,7 @@ pub struct McApi { map: Arc>>>, } +#[async_trait] impl MinecraftAPI for McApi { #[tracing::instrument( name = "MinecraftAPI::query_server", diff --git a/src/main.rs b/src/main.rs index 6e7f9bc..0a5bf94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,15 @@ -use evalexpr::*; -use std::env; -use std::net::SocketAddr; -use std::time::Duration; - -use tokio::net::{TcpListener, TcpStream}; -use tracing::Instrument; -use tracing_subscriber::{prelude::*, EnvFilter}; - -use crate::mc_server::{MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; use crate::opaque_error::OpaqueError; -use crate::packets::clientbound::status::StatusStructNew; -use crate::packets::serverbound::handshake::Handshake; -use crate::packets::serverbound::login::LoginStart; -use crate::packets::{Packet, SendPacket}; +use std::env; +use tokio::net::TcpListener; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; mod kube_cache; mod mc_server; mod opaque_error; mod packets; +mod proxy; mod types; -static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); -static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); - #[tokio::main] async fn main() { // ---- Tracing setup ---- @@ -48,332 +35,32 @@ async fn main() { let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap(); tracing::info!(bind_addr = config.bind_addr, "started tcp server"); - let conn_task = tokio::spawn(async move { - loop { - let (socket, addr) = listener.accept().await.unwrap(); - let api = api.clone(); - - let config = config.clone(); - tokio::spawn(async move { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client connected" - ); - if let Err(e) = process_connection(socket, addr, api, config).await { - match e.level { - tracing::Level::ERROR => tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::WARN => tracing::warn!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::INFO => tracing::info!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - _ => { - tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - actual_level = ?e.level, - "Client disconnected (bad level)" - ) - } - } - } else { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client disconnected" - ); - } - }); - } - }); - - tokio::select! { - result = api_task => { - tracing::error!("The api tokio:spawn'ed task ran to completion, which should not happen!"); - } - result = conn_task => { - tracing::error!("The connection handling tokio:spawn'ed task ran to completion, which should not happen!"); - } - } -} - -async fn process_connection( - mut client_stream: TcpStream, - addr: SocketAddr, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, - config: Config, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ - // this is wrapper so that async doesnt mess up the span, and - // to make sure this doesn't propagate to later `handle_*` - #[tracing::instrument(level = "info", skip(client_stream, config))] - async fn first_packet( - client_stream: &mut TcpStream, - config: Config, - ) -> Result, OpaqueError> { - let client_packet = Packet::parse(client_stream).await?; - - // --- Handshake --- - let handshake; - let packet_id = client_packet.id.get_int(); - if packet_id != 0 { - return Err(OpaqueError::create(&format!( - "Client HANDSHAKE -> bad packet; id={packet_id} Disconnecting..." - ))); - } - handshake = packets::serverbound::handshake::Handshake::parse(client_packet) - .await - .ok_or_else(|| "Client HANDSHAKE -> malformed packet; Disconnecting...".to_string())?; - - let filter = eval_boolean(&format!( - "addr=\"{}\";{}", - handshake.get_server_address(), - config.filter_conn - )) - .map_err(|e| format!("filter error! err={:?}", e))?; - if filter { - // TODO: if the server just returns here, the client does not know it - // and sends a packet with the 122 WeirdID - tracing::trace!("filtered out the connection"); - return Ok(None); - } - - Ok(Some(handshake)) - } - let handshake = match first_packet(&mut client_stream, config).await? { - Some(x) => x, - // this is needed because of the filter - None => return Ok(()), - }; - - let next_server_state = handshake.get_next_state(); - match next_server_state { - packets::ProtocolState::Status => { - handle_status(&mut client_stream, &handshake, api).await?; - } - packets::ProtocolState::Login => { - // This block of packet parsing is needed here, so the span with the - // username is correctly propagated due to the async nature of things - let span = tracing::span!( - tracing::Level::INFO, - "login_username_extract", - join_addr = handshake.get_server_address(), - join_port = handshake.server_port.get_value() - ); - - let packet = Packet::parse(&mut client_stream) - .instrument(span.clone()) - .await?; - let login_packet = packets::serverbound::login::LoginStart::parse(packet) - .instrument(span.clone()) - .await - .ok_or("Failed to parse login start packet".to_string())?; - handle_login(&mut client_stream, &handshake, login_packet, api).await? - } - packets::ProtocolState::Transfer => { - return Err(OpaqueError::create( - "next state is transfer; Not yet implemented!", - )) - } - // This is used becuase Handshake::parse returns none if is something else - _ => unreachable!(), - }; - Ok(()) -} - -#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value()),skip(client_stream, handshake, api))] -async fn handle_status( - client_stream: &mut TcpStream, - handshake: &Handshake, - api: impl MinecraftAPI, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ - let client_packet = Packet::parse(client_stream).await?; - if client_packet.id.get_int() != 0 { - return Err(OpaqueError::create(&format!( - "Client STATUS: {:#x} Unknown Id -> Shutdown", - client_packet.id.get_int(), - ))); - }; - - let join_addr = handshake.get_server_address(); - let mut status_struct = StatusStructNew::create(); - status_struct.version.protocol = handshake.protocol_version.get_int(); - - let server = match api - .query_server( - &handshake.get_server_address(), - &handshake.server_port.get_value().to_string(), - ) - .await + let mut sigterm = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { Ok(x) => x, Err(e) => { - let span = tracing::span!(tracing::Level::INFO, "unavailable", err = e.get_kind()); - status_struct.players.max = 0; - status_struct.players.online = 0; - status_struct.description.text = format!( - "Could not find §kserver§r: §f§o{join_addr}§r\nMinecraft Ingress - {BYE_MESSAGE}" - ); - - mc_server::complete_status_request(client_stream, status_struct) - .instrument(span.clone()) - .await?; - - // Recieve the ping packet, so the client does not send it again - let _ping = Packet::parse(client_stream) - .instrument(span.clone()) - .await - .map_err(|err| { - tracing::debug!( - err = OpaqueError::from(err).context, - "failed to parse ping packet" - ) - }); - // Send a bad ping packet back, so the client shows *searching* icon - let _pong = Packet::new(9, vec![0; 8]) - .ok_or("failed to create empty pong packet?")? - .send_packet(client_stream) - .instrument(span.clone()) - .await - .map_err(|_| tracing::debug!("failed to send pong packet")); - - let status = ServerDeploymentStatus::Unavailable(e.get_kind().to_string()); - tracing::info!(status = ?status, "status request"); - return Ok(()); + tracing::error!(error = ?e, "could not initilaize SIGTERM handling channel!"); + return; } }; - let status = server.query_status().await?; - tracing::info!(status = ?status, "status request"); - let motd = server - .get_motd() - .unwrap_or("A minecraft server (proxy motd)".to_string()); + let cancel_token = tokio_util::sync::CancellationToken::new(); - match status { - ServerDeploymentStatus::Connectable(mut server_stream) => { - return server - .proxy_status(handshake, &client_packet, client_stream, &mut server_stream) - .await + let mut conn_task = proxy::start_proxy(listener, api, config, cancel_token.clone()); + + tokio::select! { + _ = api_task => { + tracing::error!("the MinecraftApi tokio:spawn'ed task ran to completion, which should not happen!"); } - ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { - status_struct.players.max = 1; - status_struct.players.online = 1; - status_struct.description.text = - format!("{motd}\n§2Starting!§r §b§oWait a bit§r§b ^^§r - {BYE_MESSAGE}"); + _ = &mut conn_task => { + tracing::error!("the connection handling tokio:spawn'ed task ran to completion, which should not happen!"); } - ServerDeploymentStatus::Offline => { - status_struct.players.max = 1; - status_struct.description.text = - format!("{motd}\n§4Offline§r §oJoin to start!§r - {BYE_MESSAGE}"); - } - ServerDeploymentStatus::Unavailable(_) => unreachable!(), - }; - - mc_server::complete_status_request(client_stream, status_struct).await?; - return mc_server::handle_ping(client_stream).await; -} - -#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value(),username = login_start.name.get_value()),skip(client_stream, handshake, api, login_start))] -async fn handle_login( - client_stream: &mut TcpStream, - handshake: &Handshake, - login_start: LoginStart, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ - let server = api - .query_server( - &handshake.get_server_address(), - &handshake.server_port.get_value().to_string(), - ) - .await; - - let status = match server.as_ref() { - Ok(x) => x.query_status().await?, - Err(e) => ServerDeploymentStatus::Unavailable(e.get_kind().to_string()), - }; - tracing::debug!(msg = "server status", status = ?status); - match status { - ServerDeploymentStatus::Connectable(mut server_stream) => { - let server = server?; - api.start_watch(server.clone(), OFFLINE_TIMER).await?; - - // referenced from: - // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs - let io_sl2sr = tokio_splice2::context::SpliceIoCtx::prepare() - .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? - .into_io(); - - let io_sr2sl = tokio_splice2::context::SpliceIoCtx::prepare() - .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? - .into_io(); - - handshake - .send_packet(&mut server_stream) - .await - .map_err(|_| "failed to forward handshake packet to minecraft server")?; - - login_start - .send_packet(&mut server_stream) - .await - .map_err(|e| { - "failed to forward login packet to server before splicing".to_string() - })?; - - tracing::info!("proxying with splice"); - let traffic = tokio_splice2::io::SpliceBidiIo { io_sl2sr, io_sr2sl } - .execute(client_stream, &mut server_stream) - .await; - - match traffic.error { - Some(e) => { - tracing::warn!( - tx = traffic.tx, - rx = traffic.rx, - err = ?e, - "connection splicing ended with error", - ) - } - None => tracing::info!( - tx = traffic.tx, - rx = traffic.rx, - "connection splicing ended", - ), - } - } - ServerDeploymentStatus::PodOk | ServerDeploymentStatus::Starting => { - tracing::info!(?status, "server is starting... disconnecting client"); - mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"The server is still starting up...\n wait a bit more please ^^\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; - } - ServerDeploymentStatus::Offline => { - let server = server?; - server.start().await?; - api.start_watch(server.clone(), OFFLINE_TIMER).await?; - mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; - } - ServerDeploymentStatus::Unavailable(_) => { - tracing::info!(?status, "tried connecting, droppping connection"); + result = sigterm.recv() => { + tracing::info!(sigterm_signal = ?result,"SIGTERM received"); + cancel_token.cancel(); + let res = conn_task.await; + tracing::info!(api_task_result = ?res, "shutdown complete"); } } - Ok(()) } #[derive(Clone)] diff --git a/src/mc_server.rs b/src/mc_server.rs index 102523d..3794acf 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use async_trait::async_trait; use tokio::{io::AsyncWriteExt, net::TcpStream}; use tracing::Instrument; @@ -64,7 +65,7 @@ pub async fn send_disconnect( } #[async_trait::async_trait] -pub trait MinecraftServerHandle: Clone { +pub trait MinecraftServerHandle: Send + Sync + 'static + Clone { async fn start(&self) -> Result<(), OpaqueError>; async fn stop(&self) -> Result<(), OpaqueError>; async fn query_status(&self) -> Result; @@ -156,7 +157,8 @@ pub trait MinecraftServerHandle: Clone { } } -pub trait MinecraftAPI { +#[async_trait] +pub trait MinecraftAPI: Send + Sync + 'static { async fn query_server(&self, addr: &str, port: &str) -> Result; fn get_map(&self) -> Arc>>>; diff --git a/src/proxy.rs b/src/proxy.rs new file mode 100644 index 0000000..8a23bc2 --- /dev/null +++ b/src/proxy.rs @@ -0,0 +1,354 @@ +use evalexpr::*; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::task::JoinHandle; + +use tokio::net::{TcpListener, TcpStream}; +use tracing::Instrument; + +use crate::mc_server::{self, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; +use crate::opaque_error::OpaqueError; +use crate::packets::clientbound::status::StatusStructNew; +use crate::packets::serverbound::handshake::Handshake; +use crate::packets::serverbound::login::LoginStart; +use crate::packets::{self, Packet, SendPacket}; +use crate::Config; + +static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); +static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); + +pub fn start_proxy( + listener: TcpListener, + api: impl MinecraftAPI + Clone, + config: Config, + token: tokio_util::sync::CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + let tracker = tokio_util::task::TaskTracker::new(); + loop { + tokio::select! { + Ok((socket, addr)) = listener.accept() => { + let api = api.clone(); + + let config = config.clone(); + tracker.spawn(async move { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client connected" + ); + if let Err(e) = process_connection(socket, addr, api, config).await { + trace_opaque(&e, "Client disconnected"); + } else { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client disconnected" + ); + } + }); + } + _ = token.cancelled() => { + tracker.close(); + let open_connections = tracker.len(); + tracing::info!(open_connections,"stopped handling new connections"); + tracker.wait().await; + break; + } + } + } + }) +} + +fn trace_opaque(e: &OpaqueError, str: &str) { + match e.level { + tracing::Level::ERROR => tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + message = str + ), + tracing::Level::WARN => tracing::warn!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + message = str + ), + tracing::Level::INFO => tracing::info!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + message = str + ), + _ => { + tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + actual_level = ?e.level, + "Client disconnected (bad level)" + ) + } + } +} + +async fn process_connection( + mut client_stream: TcpStream, + addr: SocketAddr, + api: impl MinecraftAPI + Clone, + config: Config, +) -> Result<(), OpaqueError> { + // this is wrapper so that async doesnt mess up the span, and + // to make sure this doesn't propagate to later `handle_*` + #[tracing::instrument(level = "info", skip(client_stream, config))] + async fn first_packet( + client_stream: &mut TcpStream, + config: Config, + ) -> Result, OpaqueError> { + let client_packet = Packet::parse(client_stream).await?; + + // --- Handshake --- + let handshake; + let packet_id = client_packet.id.get_int(); + if packet_id != 0 { + return Err(OpaqueError::create(&format!( + "Client HANDSHAKE -> bad packet; id={packet_id} Disconnecting..." + ))); + } + handshake = packets::serverbound::handshake::Handshake::parse(client_packet) + .await + .ok_or_else(|| "Client HANDSHAKE -> malformed packet; Disconnecting...".to_string())?; + + let filter = eval_boolean(&format!( + "addr=\"{}\";{}", + handshake.get_server_address(), + config.filter_conn + )) + .map_err(|e| format!("filter error! err={:?}", e))?; + if filter { + // TODO: if the server just returns here, the client does not know it + // and sends a packet with the 122 WeirdID + tracing::trace!("filtered out the connection"); + return Ok(None); + } + + Ok(Some(handshake)) + } + let handshake = match first_packet(&mut client_stream, config).await? { + Some(x) => x, + // this is needed because of the filter + None => return Ok(()), + }; + + let next_server_state = handshake.get_next_state(); + match next_server_state { + packets::ProtocolState::Status => { + handle_status(&mut client_stream, &handshake, api).await?; + } + packets::ProtocolState::Login => { + // This block of packet parsing is needed here, so the span with the + // username is correctly propagated due to the async nature of things + let span = tracing::span!( + tracing::Level::INFO, + "login_username_extract", + join_addr = handshake.get_server_address(), + join_port = handshake.server_port.get_value() + ); + + let packet = Packet::parse(&mut client_stream) + .instrument(span.clone()) + .await?; + let login_packet = packets::serverbound::login::LoginStart::parse(packet) + .instrument(span.clone()) + .await + .ok_or("Failed to parse login start packet".to_string())?; + handle_login(&mut client_stream, &handshake, login_packet, api).await? + } + packets::ProtocolState::Transfer => { + return Err(OpaqueError::create( + "next state is transfer; Not yet implemented!", + )) + } + // This is used becuase Handshake::parse returns none if is something else + _ => unreachable!(), + }; + Ok(()) +} + +#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value()),skip(client_stream, handshake, api))] +async fn handle_status( + client_stream: &mut TcpStream, + handshake: &Handshake, + api: impl MinecraftAPI, +) -> Result<(), OpaqueError> { + let client_packet = Packet::parse(client_stream).await?; + if client_packet.id.get_int() != 0 { + return Err(OpaqueError::create(&format!( + "Client STATUS: {:#x} Unknown Id -> Shutdown", + client_packet.id.get_int(), + ))); + }; + + let join_addr = handshake.get_server_address(); + let mut status_struct = StatusStructNew::create(); + status_struct.version.protocol = handshake.protocol_version.get_int(); + + let server = match api + .query_server( + &handshake.get_server_address(), + &handshake.server_port.get_value().to_string(), + ) + .await + { + Ok(x) => x, + Err(e) => { + let span = tracing::span!(tracing::Level::INFO, "unavailable", err = e.get_kind()); + status_struct.players.max = 0; + status_struct.players.online = 0; + status_struct.description.text = format!( + "Could not find §kserver§r: §f§o{join_addr}§r\nMinecraft Ingress - {BYE_MESSAGE}" + ); + + mc_server::complete_status_request(client_stream, status_struct) + .instrument(span.clone()) + .await?; + + // Recieve the ping packet, so the client does not send it again + let _ping = Packet::parse(client_stream) + .instrument(span.clone()) + .await + .map_err(|err| { + tracing::debug!( + err = OpaqueError::from(err).context, + "failed to parse ping packet" + ) + }); + // Send a bad ping packet back, so the client shows *searching* icon + let _pong = Packet::new(9, vec![0; 8]) + .ok_or("failed to create empty pong packet?")? + .send_packet(client_stream) + .instrument(span.clone()) + .await + .map_err(|_| tracing::debug!("failed to send pong packet")); + + let status = ServerDeploymentStatus::Unavailable(e.get_kind().to_string()); + tracing::info!(status = ?status, "status request"); + return Ok(()); + } + }; + let status = server.query_status().await?; + tracing::info!(status = ?status, "status request"); + let motd = server + .get_motd() + .unwrap_or("A minecraft server (proxy motd)".to_string()); + + match status { + ServerDeploymentStatus::Connectable(mut server_stream) => { + return server + .proxy_status(handshake, &client_packet, client_stream, &mut server_stream) + .await + } + ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { + status_struct.players.max = 1; + status_struct.players.online = 1; + status_struct.description.text = + format!("{motd}\n§2Starting!§r §b§oWait a bit§r§b ^^§r - {BYE_MESSAGE}"); + } + ServerDeploymentStatus::Offline => { + status_struct.players.max = 1; + status_struct.description.text = + format!("{motd}\n§4Offline§r §oJoin to start!§r - {BYE_MESSAGE}"); + } + ServerDeploymentStatus::Unavailable(_) => unreachable!(), + }; + + mc_server::complete_status_request(client_stream, status_struct).await?; + return mc_server::handle_ping(client_stream).await; +} + +#[tracing::instrument(level = "info", fields(join_addr = handshake.get_server_address(),join_port = handshake.server_port.get_value(),username = login_start.name.get_value()),skip(client_stream, handshake, api, login_start))] +async fn handle_login( + client_stream: &mut TcpStream, + handshake: &Handshake, + login_start: LoginStart, + api: impl MinecraftAPI + Send + Sync + 'static + Clone, +) -> Result<(), OpaqueError> +where + T: Send + Sync + 'static, +{ + let server = api + .query_server( + &handshake.get_server_address(), + &handshake.server_port.get_value().to_string(), + ) + .await; + + let status = match server.as_ref() { + Ok(x) => x.query_status().await?, + Err(e) => ServerDeploymentStatus::Unavailable(e.get_kind().to_string()), + }; + tracing::debug!(msg = "server status", status = ?status); + match status { + ServerDeploymentStatus::Connectable(mut server_stream) => { + let server = server?; + api.start_watch(server.clone(), OFFLINE_TIMER).await?; + + // referenced from: + // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs + let io_sl2sr = tokio_splice2::context::SpliceIoCtx::prepare() + .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? + .into_io(); + + let io_sr2sl = tokio_splice2::context::SpliceIoCtx::prepare() + .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? + .into_io(); + + handshake + .send_packet(&mut server_stream) + .await + .map_err(|_| "failed to forward handshake packet to minecraft server")?; + + login_start + .send_packet(&mut server_stream) + .await + .map_err(|e| { + "failed to forward login packet to server before splicing".to_string() + })?; + + tracing::info!("proxying with splice"); + let traffic = tokio_splice2::io::SpliceBidiIo { io_sl2sr, io_sr2sl } + .execute(client_stream, &mut server_stream) + .await; + + match traffic.error { + Some(e) => { + tracing::warn!( + tx = traffic.tx, + rx = traffic.rx, + err = ?e, + "connection splicing ended with error", + ) + } + None => tracing::info!( + tx = traffic.tx, + rx = traffic.rx, + "connection splicing ended", + ), + } + } + ServerDeploymentStatus::PodOk | ServerDeploymentStatus::Starting => { + tracing::info!(?status, "server is starting... disconnecting client"); + mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"The server is still starting up...\n wait a bit more please ^^\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; + } + ServerDeploymentStatus::Offline => { + let server = server?; + server.start().await?; + api.start_watch(server.clone(), OFFLINE_TIMER).await?; + mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; + } + ServerDeploymentStatus::Unavailable(_) => { + tracing::info!(?status, "tried connecting, droppping connection"); + } + } + Ok(()) +}