diff --git a/Cargo.lock b/Cargo.lock index fa7ecbf..599e7b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,6 +983,7 @@ dependencies = [ "tokio", "tokio-splice2", "tokio-stream", + "tokio-util", "tracing", "tracing-error", "tracing-subscriber", @@ -1706,13 +1707,14 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.17" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "slab", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6b03c20..85040c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ tokio-splice2 = "0.3.2" strip-ansi-escapes = "0.2.1" evalexpr = { version = "13.1.0", features = ["regex"] } async-trait = "0.1.89" +tokio-util = { version = "0.7.18", features = ["rt"] } diff --git a/kube/deployment.yaml b/kube/deployment.yaml index 5736290..1d5d7c8 100644 --- a/kube/deployment.yaml +++ b/kube/deployment.yaml @@ -15,7 +15,7 @@ spec: app: minecraft-ingress spec: serviceAccountName: minecraft-ingress - terminationGracePeriodSeconds: 5 + terminationGracePeriodSeconds: 28800 # This is 8 hours containers: - name: minecraft-ingress image: git.tami.moe/tamipes/minecraft-ingress:latest diff --git a/src/kube_cache.rs b/src/kube_cache.rs index b48ef26..199109d 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; +use async_trait::async_trait; use futures::StreamExt; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; use kube::{ @@ -115,6 +116,7 @@ pub struct McApi { map: Arc>>>, } +#[async_trait] impl MinecraftAPI for McApi { #[tracing::instrument( name = "MinecraftAPI::query_server", diff --git a/src/main.rs b/src/main.rs index e393480..0a5bf94 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,63 +35,30 @@ async fn main() { let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap(); tracing::info!(bind_addr = config.bind_addr, "started tcp server"); - let conn_task = tokio::spawn(async move { - loop { - let (socket, addr) = listener.accept().await.unwrap(); - let api = api.clone(); - - let config = config.clone(); - tokio::spawn(async move { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client connected" - ); - if let Err(e) = proxy::process_connection(socket, addr, api, config).await { - match e.level { - tracing::Level::ERROR => tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::WARN => tracing::warn!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::INFO => tracing::info!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - _ => { - tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - actual_level = ?e.level, - "Client disconnected (bad level)" - ) - } - } - } else { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client disconnected" - ); - } - }); + let mut sigterm = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + { + Ok(x) => x, + Err(e) => { + tracing::error!(error = ?e, "could not initilaize SIGTERM handling channel!"); + return; } - }); + }; + let cancel_token = tokio_util::sync::CancellationToken::new(); + + let mut conn_task = proxy::start_proxy(listener, api, config, cancel_token.clone()); tokio::select! { - result = api_task => { - tracing::error!("The api tokio:spawn'ed task ran to completion, which should not happen!"); + _ = api_task => { + tracing::error!("the MinecraftApi tokio:spawn'ed task ran to completion, which should not happen!"); } - result = conn_task => { - tracing::error!("The connection handling tokio:spawn'ed task ran to completion, which should not happen!"); + _ = &mut conn_task => { + tracing::error!("the connection handling tokio:spawn'ed task ran to completion, which should not happen!"); + } + result = sigterm.recv() => { + tracing::info!(sigterm_signal = ?result,"SIGTERM received"); + cancel_token.cancel(); + let res = conn_task.await; + tracing::info!(api_task_result = ?res, "shutdown complete"); } } } diff --git a/src/mc_server.rs b/src/mc_server.rs index 102523d..3794acf 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc}; +use async_trait::async_trait; use tokio::{io::AsyncWriteExt, net::TcpStream}; use tracing::Instrument; @@ -64,7 +65,7 @@ pub async fn send_disconnect( } #[async_trait::async_trait] -pub trait MinecraftServerHandle: Clone { +pub trait MinecraftServerHandle: Send + Sync + 'static + Clone { async fn start(&self) -> Result<(), OpaqueError>; async fn stop(&self) -> Result<(), OpaqueError>; async fn query_status(&self) -> Result; @@ -156,7 +157,8 @@ pub trait MinecraftServerHandle: Clone { } } -pub trait MinecraftAPI { +#[async_trait] +pub trait MinecraftAPI: Send + Sync + 'static { async fn query_server(&self, addr: &str, port: &str) -> Result; fn get_map(&self) -> Arc>>>; diff --git a/src/proxy.rs b/src/proxy.rs index cc343b1..8a23bc2 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,11 +1,12 @@ use evalexpr::*; +use std::collections::HashMap; use std::env; use std::net::SocketAddr; use std::time::Duration; +use tokio::task::JoinHandle; use tokio::net::{TcpListener, TcpStream}; use tracing::Instrument; -use tracing_subscriber::{prelude::*, EnvFilter}; use crate::mc_server::{self, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; use crate::opaque_error::OpaqueError; @@ -18,15 +19,85 @@ use crate::Config; static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); -pub async fn process_connection( +pub fn start_proxy( + listener: TcpListener, + api: impl MinecraftAPI + Clone, + config: Config, + token: tokio_util::sync::CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + let tracker = tokio_util::task::TaskTracker::new(); + loop { + tokio::select! { + Ok((socket, addr)) = listener.accept() => { + let api = api.clone(); + + let config = config.clone(); + tracker.spawn(async move { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client connected" + ); + if let Err(e) = process_connection(socket, addr, api, config).await { + trace_opaque(&e, "Client disconnected"); + } else { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client disconnected" + ); + } + }); + } + _ = token.cancelled() => { + tracker.close(); + let open_connections = tracker.len(); + tracing::info!(open_connections,"stopped handling new connections"); + tracker.wait().await; + break; + } + } + } + }) +} + +fn trace_opaque(e: &OpaqueError, str: &str) { + match e.level { + tracing::Level::ERROR => tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + message = str + ), + tracing::Level::WARN => tracing::warn!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + message = str + ), + tracing::Level::INFO => tracing::info!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + message = str + ), + _ => { + tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + actual_level = ?e.level, + "Client disconnected (bad level)" + ) + } + } +} + +async fn process_connection( mut client_stream: TcpStream, addr: SocketAddr, - api: impl MinecraftAPI + Send + Sync + 'static + Clone, + api: impl MinecraftAPI + Clone, config: Config, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ +) -> Result<(), OpaqueError> { // this is wrapper so that async doesnt mess up the span, and // to make sure this doesn't propagate to later `handle_*` #[tracing::instrument(level = "info", skip(client_stream, config))] @@ -109,10 +180,7 @@ async fn handle_status( client_stream: &mut TcpStream, handshake: &Handshake, api: impl MinecraftAPI, -) -> Result<(), OpaqueError> -where - T: Send + Sync + 'static, -{ +) -> Result<(), OpaqueError> { let client_packet = Packet::parse(client_stream).await?; if client_packet.id.get_int() != 0 { return Err(OpaqueError::create(&format!(