use evalexpr::*; use std::env; use std::net::SocketAddr; use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; use tracing::Instrument; use tracing_subscriber::{prelude::*, EnvFilter}; use crate::mc_server::{MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; use crate::opaque_error::OpaqueError; use crate::packets::clientbound::status::StatusStructNew; use crate::packets::serverbound::handshake::Handshake; use crate::packets::serverbound::login::LoginStart; use crate::packets::{Packet, SendPacket}; mod kube_cache; mod mc_server; mod opaque_error; mod packets; mod types; static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); #[tokio::main] async fn main() { // ---- Tracing setup ---- let fmt_layer = tracing_subscriber::fmt::layer() .with_target(false) .with_level(true); let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); tracing_subscriber::registry() .with(fmt_layer) .with(filter_layer) .with(tracing_error::ErrorLayer::default()) .init(); tracing::info!("mc-ingress"); let revision: &'static str = env!("COMMIT_HASH"); tracing::info!(revision); let api = kube_cache::McApi::create().unwrap(); tracing::info!("initialized kube api"); let config: Config = Default::default(); let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap(); tracing::info!(bind_addr = config.bind_addr, "started tcp server"); loop { let (socket, addr) = listener.accept().await.unwrap(); let api = api.clone(); let config = config.clone(); tokio::spawn(async move { tracing::debug!( addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), "Client connected" ); if let Err(e) = process_connection(socket, addr, api, config).await { match e.level { tracing::Level::ERROR => tracing::error!( // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), trace = %e.print_span_trace(), err = format!("{}", e.context), "Client disconnected" ), tracing::Level::WARN => tracing::warn!( // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), trace = %e.print_span_trace(), err = format!("{}", e.context), "Client disconnected" ), tracing::Level::INFO => tracing::info!( // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), trace = %e.print_span_trace(), err = format!("{}", e.context), "Client disconnected" ), _ => { tracing::error!( // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), trace = %e.print_span_trace(), err = format!("{}", e.context), actual_level = ?e.level, "Client disconnected (bad level)" ) } } } else { tracing::debug!( addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), "Client disconnected" ); } }); } } async fn process_connection( mut client_stream: TcpStream, addr: SocketAddr, api: impl MinecraftAPI, config: Config, ) -> Result<(), OpaqueError> { // this is wrapper so that async doesnt mess up the span, and // to make sure this doesn't propagate to later `handle_*` #[tracing::instrument(level = "info", skip(client_stream, config))] async fn first_packet( client_stream: &mut TcpStream, config: Config, ) -> Result, OpaqueError> { let client_packet = Packet::parse(client_stream).await?; // --- Handshake --- let handshake; let packet_id = client_packet.id.get_int(); if packet_id != 0 { return Err(OpaqueError::create(&format!( "Client HANDSHAKE -> bad packet; id={packet_id} Disconnecting..." ))); } handshake = packets::serverbound::handshake::Handshake::parse(client_packet) .await .ok_or_else(|| "Client HANDSHAKE -> malformed packet; Disconnecting...".to_string())?; let filter = eval_boolean(&format!( "addr=\"{}\";{}", handshake.get_server_address(), config.filter_conn )) .map_err(|e| format!("filter error! err={:?}", e))?; if filter { // TODO: if the server just returns here, the client does not know it // and sends a packet with the 122 WeirdID tracing::trace!("filtered out the connection"); return Ok(None); } Ok(Some(handshake)) } let handshake = match first_packet(&mut client_stream, config).await? { Some(x) => x, // this is needed because of the filter None => return Ok(()), }; let next_server_state = handshake.get_next_state(); match next_server_state { packets::ProtocolState::Status => { handle_status(&mut client_stream, &handshake, api).await?; } packets::ProtocolState::Login => { // This block of packet parsing is needed here, so the span with the // username is correctly propagated due to the async nature of things let span = tracing::span!( tracing::Level::INFO, "login_username_extract", server_addr = handshake.get_server_address(), server_port = handshake.server_port.get_value() ); let packet = Packet::parse(&mut client_stream) .instrument(span.clone()) .await?; let login_packet = packets::serverbound::login::LoginStart::parse(packet) .instrument(span.clone()) .await .ok_or("Failed to parse login start packet".to_string())?; handle_login(&mut client_stream, &handshake, login_packet, api).await? } packets::ProtocolState::Transfer => { return Err(OpaqueError::create( "next state is transfer; Not yet implemented!", )) } // This is used becuase Handshake::parse returns none if is something else _ => unreachable!(), }; Ok(()) } #[tracing::instrument(level = "info", fields(server_addr = handshake.get_server_address(),server_port = handshake.server_port.get_value()),skip(client_stream, handshake, api))] async fn handle_status( client_stream: &mut TcpStream, handshake: &Handshake, api: impl MinecraftAPI, ) -> Result<(), OpaqueError> { let client_packet = Packet::parse(client_stream).await?; if client_packet.id.get_int() != 0 { return Err(OpaqueError::create(&format!( "Client STATUS: {:#x} Unknown Id -> Shutdown", client_packet.id.get_int(), ))); }; let server_addr = handshake.get_server_address(); let mut status_struct = StatusStructNew::create(); status_struct.version.protocol = handshake.protocol_version.get_int(); let server = match api .query_server( &handshake.get_server_address(), &handshake.server_port.get_value().to_string(), ) .await { Ok(x) => x, Err(e) => { let span = tracing::span!(tracing::Level::INFO, "unavailable", err = e.get_kind()); status_struct.players.max = 0; status_struct.players.online = 0; status_struct.description.text = format!( "Could not find §kserver§r: §f§o{server_addr}§r\nMinecraft Ingress - {BYE_MESSAGE}" ); mc_server::complete_status_request(client_stream, status_struct) .instrument(span.clone()) .await?; // Recieve the ping packet, so the client does not send it again let _ping = Packet::parse(client_stream) .instrument(span.clone()) .await .map_err(|err| { tracing::debug!( err = OpaqueError::from(err).context, "failed to parse ping packet" ) }); // Send a bad ping packet back, so the client shows *searching* icon let _pong = Packet::new(9, vec![0; 8]) .ok_or("failed to create empty pong packet?")? .send_packet(client_stream) .instrument(span.clone()) .await .map_err(|_| tracing::debug!("failed to send pong packet")); let status = ServerDeploymentStatus::Unavailable(e.get_kind().to_string()); tracing::info!(status = ?status, "status request"); return Ok(()); } }; let status = server.query_status().await?; tracing::info!(status = ?status, "status request"); let motd = server .get_motd() .unwrap_or("A minecraft server (proxy motd)".to_string()); match status { ServerDeploymentStatus::Connectable(mut server_stream) => { return server .proxy_status(handshake, &client_packet, client_stream, &mut server_stream) .await } ServerDeploymentStatus::Starting | ServerDeploymentStatus::PodOk => { status_struct.players.max = 1; status_struct.players.online = 1; status_struct.description.text = format!("{motd}\n§2Starting!§r §b§oWait a bit§r§b ^^§r - {BYE_MESSAGE}"); } ServerDeploymentStatus::Offline => { status_struct.players.max = 1; status_struct.description.text = format!("{motd}\n§4Offline§r §oJoin to start!§r - {BYE_MESSAGE}"); } ServerDeploymentStatus::Unavailable(_) => unreachable!(), }; mc_server::complete_status_request(client_stream, status_struct).await?; return mc_server::handle_ping(client_stream).await; } #[tracing::instrument(level = "info", fields(server_addr = handshake.get_server_address(),server_port = handshake.server_port.get_value(),username = login_start.name.get_value()),skip(client_stream, handshake, api, login_start))] async fn handle_login( client_stream: &mut TcpStream, handshake: &Handshake, login_start: LoginStart, api: impl MinecraftAPI, ) -> Result<(), OpaqueError> { let server = api .query_server( &handshake.get_server_address(), &handshake.server_port.get_value().to_string(), ) .await .map_err(|e| e.set_level(tracing::Level::WARN))?; let status = server.query_status().await?; tracing::debug!(msg = "server status", status = ?status); match status { ServerDeploymentStatus::Connectable(mut server_stream) => { api.start_watch(server.clone(), Duration::from_secs(600)) .await?; // referenced from: // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs let io_sl2sr = tokio_splice2::context::SpliceIoCtx::prepare() .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? .into_io(); let io_sr2sl = tokio_splice2::context::SpliceIoCtx::prepare() .map_err(|e| format!("tokio_splice2::context::SpliceIoCtx err={}", e.to_string()))? .into_io(); handshake .send_packet(&mut server_stream) .await .map_err(|_| "failed to forward handshake packet to minecraft server")?; login_start .send_packet(&mut server_stream) .await .map_err(|e| { "failed to forward login packet to server before splicing".to_string() })?; tracing::info!("proxying with splice"); let traffic = tokio_splice2::io::SpliceBidiIo { io_sl2sr, io_sr2sl } .execute(client_stream, &mut server_stream) .await; match traffic.error { Some(e) => { tracing::warn!( tx = traffic.tx, rx = traffic.rx, err = ?e, "connection splicing ended with error", ) } None => tracing::info!( tx = traffic.tx, rx = traffic.rx, "connection splicing ended", ), } } ServerDeploymentStatus::PodOk | ServerDeploymentStatus::Starting => { mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"The server is still starting up...\n wait a bit more please ^^\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; } ServerDeploymentStatus::Offline => { server.start().await?; api.start_watch(server.clone(), Duration::from_secs(600)) .await?; mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; } ServerDeploymentStatus::Unavailable(_) => unreachable!(), } Ok(()) } #[derive(Clone)] struct Config { pub filter_conn: String, pub bind_addr: String, } impl Default for Config { fn default() -> Self { let filter_conn = match env::var("FILTER_CONN") { Ok(x) => x, Err(_) => "(addr == \"10.100.0.1\")".to_string(), }; let bind_addr = match env::var("BIND_ADDR") { Ok(x) => x, Err(_) => "0.0.0.0:25565".to_string(), }; Self { filter_conn, bind_addr, } } }