fix: server_watcher pool JoinHandles and check them

This commit is contained in:
Tamipes 2025-12-12 02:54:28 +01:00
parent 7f6bd226a7
commit 3dcf2f03a8

View file

@ -7,6 +7,7 @@ use kube::{
Api, Client, ResourceExt, Api, Client, ResourceExt,
}; };
use serde_json::json; use serde_json::json;
use tokio::task::JoinHandle;
use tracing::Instrument; use tracing::Instrument;
use crate::{ use crate::{
@ -83,7 +84,7 @@ impl KubeCache {
#[derive(Clone)] #[derive(Clone)]
pub struct McApi { pub struct McApi {
cache: KubeCache, cache: KubeCache,
map: Arc<tokio::sync::Mutex<HashMap<String, Server>>>, map: Arc<tokio::sync::Mutex<HashMap<String, JoinHandle<()>>>>,
} }
impl MinecraftAPI<Server> for McApi { impl MinecraftAPI<Server> for McApi {
@ -141,14 +142,23 @@ impl MinecraftAPI<Server> for McApi {
) -> Result<(), OpaqueError> { ) -> Result<(), OpaqueError> {
let addr = server.get_addr().ok_or("could not get addr of server")?; let addr = server.get_addr().ok_or("could not get addr of server")?;
let port = server.get_port().ok_or("could not get port of server")?; let port = server.get_port().ok_or("could not get port of server")?;
if self.map.lock().await.get(&addr).is_none() { let full_addr = format!("{addr}:{port}");
let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", addr);
tokio::spawn( if let Some(handle) = self.map.lock().await.get(&full_addr) {
if !handle.is_finished() {
return Ok(());
}
}
let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", addr, port);
let full_addr_clone = full_addr.clone();
let api = self.clone();
let handle = tokio::spawn(
async move { async move {
tracing::info!("starting watch"); tracing::info!("starting watcher");
loop {
tokio::time::sleep(frequency).await; tokio::time::sleep(frequency).await;
let server = self.query_server(&addr, &port).await.unwrap(); let server = api.query_server(&addr, &port).await.unwrap();
let status_json = match server.query_description().await { let status_json = match server.query_description().await {
Ok(x) => x, Ok(x) => x,
Err(e) => { Err(e) => {
@ -166,7 +176,9 @@ impl MinecraftAPI<Server> for McApi {
// Otherwise I would need to have it be defined as: // Otherwise I would need to have it be defined as:
// trait StatusTrait: Send { ... } // trait StatusTrait: Send { ... }
drop(status_json); drop(status_json);
let mut guard = api.map.lock().await;
guard.remove(&full_addr_clone);
drop(guard);
if let Err(err) = server.stop().await { if let Err(err) = server.stop().await {
tracing::error!( tracing::error!(
trace = err.get_span_trace(), trace = err.get_span_trace(),
@ -177,9 +189,13 @@ impl MinecraftAPI<Server> for McApi {
return; return;
} }
} }
}
.instrument(span), .instrument(span),
); );
} let mut guard = self.map.lock().await;
guard.insert(full_addr.clone(), handle);
drop(guard);
Ok(()) Ok(())
} }
} }