feat: handle SIGTREM
All checks were successful
/ build (push) Successful in 2m56s

This commit is contained in:
Tamipes 2026-06-08 17:02:34 +02:00
parent ac8812be2f
commit 29ff12f115
7 changed files with 111 additions and 69 deletions

6
Cargo.lock generated
View file

@ -983,6 +983,7 @@ dependencies = [
"tokio", "tokio",
"tokio-splice2", "tokio-splice2",
"tokio-stream", "tokio-stream",
"tokio-util",
"tracing", "tracing",
"tracing-error", "tracing-error",
"tracing-subscriber", "tracing-subscriber",
@ -1706,13 +1707,14 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.17" version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"futures-util",
"pin-project-lite", "pin-project-lite",
"slab", "slab",
"tokio", "tokio",

View file

@ -42,3 +42,4 @@ tokio-splice2 = "0.3.2"
strip-ansi-escapes = "0.2.1" strip-ansi-escapes = "0.2.1"
evalexpr = { version = "13.1.0", features = ["regex"] } evalexpr = { version = "13.1.0", features = ["regex"] }
async-trait = "0.1.89" async-trait = "0.1.89"
tokio-util = { version = "0.7.18", features = ["rt"] }

View file

@ -15,7 +15,7 @@ spec:
app: minecraft-ingress app: minecraft-ingress
spec: spec:
serviceAccountName: minecraft-ingress serviceAccountName: minecraft-ingress
terminationGracePeriodSeconds: 5 terminationGracePeriodSeconds: 28800 # This is 8 hours
containers: containers:
- name: minecraft-ingress - name: minecraft-ingress
image: git.tami.moe/tamipes/minecraft-ingress:latest image: git.tami.moe/tamipes/minecraft-ingress:latest

View file

@ -1,5 +1,6 @@
use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::StreamExt; use futures::StreamExt;
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service};
use kube::{ use kube::{
@ -115,6 +116,7 @@ pub struct McApi {
map: Arc<tokio::sync::Mutex<HashMap<String, JoinHandle<()>>>>, map: Arc<tokio::sync::Mutex<HashMap<String, JoinHandle<()>>>>,
} }
#[async_trait]
impl MinecraftAPI<Server> for McApi { impl MinecraftAPI<Server> for McApi {
#[tracing::instrument( #[tracing::instrument(
name = "MinecraftAPI::query_server", name = "MinecraftAPI::query_server",

View file

@ -35,63 +35,30 @@ async fn main() {
let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap(); let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap();
tracing::info!(bind_addr = config.bind_addr, "started tcp server"); tracing::info!(bind_addr = config.bind_addr, "started tcp server");
let conn_task = tokio::spawn(async move { let mut sigterm = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
loop { {
let (socket, addr) = listener.accept().await.unwrap(); Ok(x) => x,
let api = api.clone(); Err(e) => {
tracing::error!(error = ?e, "could not initilaize SIGTERM handling channel!");
return;
}
};
let cancel_token = tokio_util::sync::CancellationToken::new();
let config = config.clone(); let mut conn_task = proxy::start_proxy(listener, api, config, cancel_token.clone());
tokio::spawn(async move {
tracing::debug!(
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
"Client connected"
);
if let Err(e) = proxy::process_connection(socket, addr, api, config).await {
match e.level {
tracing::Level::ERROR => tracing::error!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
"Client disconnected"
),
tracing::Level::WARN => tracing::warn!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
"Client disconnected"
),
tracing::Level::INFO => tracing::info!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
"Client disconnected"
),
_ => {
tracing::error!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
actual_level = ?e.level,
"Client disconnected (bad level)"
)
}
}
} else {
tracing::debug!(
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
"Client disconnected"
);
}
});
}
});
tokio::select! { tokio::select! {
result = api_task => { _ = api_task => {
tracing::error!("The api tokio:spawn'ed task ran to completion, which should not happen!"); tracing::error!("the MinecraftApi tokio:spawn'ed task ran to completion, which should not happen!");
} }
result = conn_task => { _ = &mut conn_task => {
tracing::error!("The connection handling tokio:spawn'ed task ran to completion, which should not happen!"); tracing::error!("the connection handling tokio:spawn'ed task ran to completion, which should not happen!");
}
result = sigterm.recv() => {
tracing::info!(sigterm_signal = ?result,"SIGTERM received");
cancel_token.cancel();
let res = conn_task.await;
tracing::info!(api_task_result = ?res, "shutdown complete");
} }
} }
} }

View file

@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use tokio::{io::AsyncWriteExt, net::TcpStream}; use tokio::{io::AsyncWriteExt, net::TcpStream};
use tracing::Instrument; use tracing::Instrument;
@ -64,7 +65,7 @@ pub async fn send_disconnect(
} }
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait MinecraftServerHandle: Clone { pub trait MinecraftServerHandle: Send + Sync + 'static + Clone {
async fn start(&self) -> Result<(), OpaqueError>; async fn start(&self) -> Result<(), OpaqueError>;
async fn stop(&self) -> Result<(), OpaqueError>; async fn stop(&self) -> Result<(), OpaqueError>;
async fn query_status(&self) -> Result<ServerDeploymentStatus, OpaqueError>; async fn query_status(&self) -> Result<ServerDeploymentStatus, OpaqueError>;
@ -156,7 +157,8 @@ pub trait MinecraftServerHandle: Clone {
} }
} }
pub trait MinecraftAPI<T> { #[async_trait]
pub trait MinecraftAPI<T>: Send + Sync + 'static {
async fn query_server(&self, addr: &str, port: &str) -> Result<T, OpaqueError>; async fn query_server(&self, addr: &str, port: &str) -> Result<T, OpaqueError>;
fn get_map(&self) -> Arc<tokio::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>; fn get_map(&self) -> Arc<tokio::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>;

View file

@ -1,11 +1,12 @@
use evalexpr::*; use evalexpr::*;
use std::collections::HashMap;
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tracing::Instrument; use tracing::Instrument;
use tracing_subscriber::{prelude::*, EnvFilter};
use crate::mc_server::{self, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}; use crate::mc_server::{self, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus};
use crate::opaque_error::OpaqueError; use crate::opaque_error::OpaqueError;
@ -18,15 +19,85 @@ use crate::Config;
static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r");
static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600);
pub async fn process_connection<T: MinecraftServerHandle>( pub fn start_proxy<T: MinecraftServerHandle>(
listener: TcpListener,
api: impl MinecraftAPI<T> + Clone,
config: Config,
token: tokio_util::sync::CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
let tracker = tokio_util::task::TaskTracker::new();
loop {
tokio::select! {
Ok((socket, addr)) = listener.accept() => {
let api = api.clone();
let config = config.clone();
tracker.spawn(async move {
tracing::debug!(
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
"Client connected"
);
if let Err(e) = process_connection(socket, addr, api, config).await {
trace_opaque(&e, "Client disconnected");
} else {
tracing::debug!(
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
"Client disconnected"
);
}
});
}
_ = token.cancelled() => {
tracker.close();
let open_connections = tracker.len();
tracing::info!(open_connections,"stopped handling new connections");
tracker.wait().await;
break;
}
}
}
})
}
fn trace_opaque(e: &OpaqueError, str: &str) {
match e.level {
tracing::Level::ERROR => tracing::error!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
message = str
),
tracing::Level::WARN => tracing::warn!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
message = str
),
tracing::Level::INFO => tracing::info!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
message = str
),
_ => {
tracing::error!(
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
trace = %e.print_span_trace(),
err = format!("{}", e.context),
actual_level = ?e.level,
"Client disconnected (bad level)"
)
}
}
}
async fn process_connection<T: MinecraftServerHandle>(
mut client_stream: TcpStream, mut client_stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
api: impl MinecraftAPI<T> + Send + Sync + 'static + Clone, api: impl MinecraftAPI<T> + Clone,
config: Config, config: Config,
) -> Result<(), OpaqueError> ) -> Result<(), OpaqueError> {
where
T: Send + Sync + 'static,
{
// this is wrapper so that async doesnt mess up the span, and // this is wrapper so that async doesnt mess up the span, and
// to make sure this doesn't propagate to later `handle_*` // to make sure this doesn't propagate to later `handle_*`
#[tracing::instrument(level = "info", skip(client_stream, config))] #[tracing::instrument(level = "info", skip(client_stream, config))]
@ -109,10 +180,7 @@ async fn handle_status<T: MinecraftServerHandle>(
client_stream: &mut TcpStream, client_stream: &mut TcpStream,
handshake: &Handshake, handshake: &Handshake,
api: impl MinecraftAPI<T>, api: impl MinecraftAPI<T>,
) -> Result<(), OpaqueError> ) -> Result<(), OpaqueError> {
where
T: Send + Sync + 'static,
{
let client_packet = Packet::parse(client_stream).await?; let client_packet = Packet::parse(client_stream).await?;
if client_packet.id.get_int() != 0 { if client_packet.id.get_int() != 0 {
return Err(OpaqueError::create(&format!( return Err(OpaqueError::create(&format!(