Compare commits
No commits in common. "750e8dcbf01b40bbbf92452bb117e697116fdd5d" and "bb60a21050d8ecc162e03575332f42a7f3a7165b" have entirely different histories.
750e8dcbf0
...
bb60a21050
5 changed files with 105 additions and 117 deletions
12
Cargo.lock
generated
12
Cargo.lock
generated
|
|
@ -76,17 +76,6 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "async-trait"
|
|
||||||
version = "0.1.89"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atomic-waker"
|
name = "atomic-waker"
|
||||||
version = "1.1.2"
|
version = "1.1.2"
|
||||||
|
|
@ -965,7 +954,6 @@ name = "mc-ingress"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
|
||||||
"clap",
|
"clap",
|
||||||
"either",
|
"either",
|
||||||
"evalexpr",
|
"evalexpr",
|
||||||
|
|
|
||||||
|
|
@ -41,4 +41,3 @@ nix = { version= "0.30.1", features = [ "zerocopy"] }
|
||||||
tokio-splice2 = "0.3.2"
|
tokio-splice2 = "0.3.2"
|
||||||
strip-ansi-escapes = "0.2.1"
|
strip-ansi-escapes = "0.2.1"
|
||||||
evalexpr = { version = "13.1.0", features = ["regex"] }
|
evalexpr = { version = "13.1.0", features = ["regex"] }
|
||||||
async-trait = "0.1.89"
|
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,6 @@ use crate::{
|
||||||
packets::{clientbound::status::StatusTrait, SendPacket},
|
packets::{clientbound::status::StatusTrait, SendPacket},
|
||||||
OpaqueError,
|
OpaqueError,
|
||||||
};
|
};
|
||||||
const MAIN_LABEL: &str = "tami.moe/minecraft";
|
|
||||||
const PORT_LABEL: &str = "tami.moe/minecraft-port";
|
|
||||||
|
|
||||||
/// This is the layer who is respinsible for caching requests.
|
/// This is the layer who is respinsible for caching requests.
|
||||||
///
|
///
|
||||||
|
|
@ -59,12 +57,12 @@ impl KubeCache {
|
||||||
}
|
}
|
||||||
async fn get_deploys(&self) -> ObjectList<Deployment> {
|
async fn get_deploys(&self) -> ObjectList<Deployment> {
|
||||||
// let lp: ListParams = ListParams::default();
|
// let lp: ListParams = ListParams::default();
|
||||||
let lp: ListParams = ListParams::default().labels(MAIN_LABEL);
|
let lp: ListParams = ListParams::default().labels("tami.moe/minecraft");
|
||||||
self.deployments.list(&lp).await.unwrap()
|
self.deployments.list(&lp).await.unwrap()
|
||||||
}
|
}
|
||||||
async fn get_srvs(&self) -> ObjectList<Service> {
|
async fn get_srvs(&self) -> ObjectList<Service> {
|
||||||
// let lp: ListParams = ListParams::default();
|
// let lp: ListParams = ListParams::default();
|
||||||
let lp: ListParams = ListParams::default().labels(MAIN_LABEL);
|
let lp: ListParams = ListParams::default().labels("tami.moe/minecraft");
|
||||||
self.services.list(&lp).await.unwrap()
|
self.services.list(&lp).await.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -166,6 +164,7 @@ impl MinecraftAPI<Server> for McApi {
|
||||||
a
|
a
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
tracing::info!(inter_addr = inter_addr);
|
||||||
return Ok(Server {
|
return Ok(Server {
|
||||||
dep: deployment,
|
dep: deployment,
|
||||||
srv: service,
|
srv: service,
|
||||||
|
|
@ -176,8 +175,77 @@ impl MinecraftAPI<Server> for McApi {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_map(&self) -> Arc<tokio::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>> {
|
async fn start_watch(
|
||||||
self.map.clone()
|
self,
|
||||||
|
server: impl MinecraftServerHandle,
|
||||||
|
frequency: Duration,
|
||||||
|
) -> Result<(), OpaqueError> {
|
||||||
|
let addr = server.get_addr().ok_or("could not get addr of server")?;
|
||||||
|
let port = server.get_port().ok_or("could not get port of server")?;
|
||||||
|
let full_addr = format!("{addr}:{port}");
|
||||||
|
|
||||||
|
if let Some(handle) = self.map.lock().await.get(&full_addr) {
|
||||||
|
if !handle.is_finished() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", addr, port);
|
||||||
|
|
||||||
|
let full_addr_clone = full_addr.clone();
|
||||||
|
let api = self.clone();
|
||||||
|
let handle = tokio::spawn(
|
||||||
|
async move {
|
||||||
|
tracing::info!("starting watcher");
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(frequency).await;
|
||||||
|
let server = match api.query_server(&addr, &port).await {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
err = format!("{}", e.context),
|
||||||
|
"could not query server"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let status_json = match server.query_description().await {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
err = format!("{}", e.context),
|
||||||
|
"could not query description"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if status_json.get_players_online() == 0 {
|
||||||
|
// With this I don't need to specify that StatusTrait
|
||||||
|
// should be send as well.
|
||||||
|
|
||||||
|
// Otherwise I would need to have it be defined as:
|
||||||
|
// trait StatusTrait: Send { ... }
|
||||||
|
drop(status_json);
|
||||||
|
let mut guard = api.map.lock().await;
|
||||||
|
guard.remove(&full_addr_clone);
|
||||||
|
drop(guard);
|
||||||
|
if let Err(err) = server.stop().await {
|
||||||
|
tracing::error!(
|
||||||
|
trace = %err.print_span_trace(),
|
||||||
|
err = err.context,
|
||||||
|
msg = "failed to stop server"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.instrument(span),
|
||||||
|
);
|
||||||
|
let mut guard = self.map.lock().await;
|
||||||
|
guard.insert(full_addr.clone(), handle);
|
||||||
|
drop(guard);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -220,10 +288,10 @@ impl fmt::Debug for Server {
|
||||||
.name
|
.name
|
||||||
.unwrap_or("#error#".to_string()),
|
.unwrap_or("#error#".to_string()),
|
||||||
)
|
)
|
||||||
|
.field("server_addr", &self.server_addr)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl MinecraftServerHandle for Server {
|
impl MinecraftServerHandle for Server {
|
||||||
async fn start(&self) -> Result<(), OpaqueError> {
|
async fn start(&self) -> Result<(), OpaqueError> {
|
||||||
self.set_scale(1).await.map_err(|e| {
|
self.set_scale(1).await.map_err(|e| {
|
||||||
|
|
@ -287,8 +355,8 @@ impl MinecraftServerHandle for Server {
|
||||||
self.inter_addr.as_str()
|
self.inter_addr.as_str()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_addr(&self) -> String {
|
fn get_addr(&self) -> Option<String> {
|
||||||
self.server_addr.clone()
|
Some(self.server_addr.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError> {
|
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError> {
|
||||||
|
|
@ -297,7 +365,9 @@ impl MinecraftServerHandle for Server {
|
||||||
ServerDeploymentStatus::Connectable(mut tcp_stream) => {
|
ServerDeploymentStatus::Connectable(mut tcp_stream) => {
|
||||||
let handshake = crate::packets::serverbound::handshake::Handshake::create(
|
let handshake = crate::packets::serverbound::handshake::Handshake::create(
|
||||||
crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?,
|
crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?,
|
||||||
crate::types::VarString::from(self.get_addr()),
|
crate::types::VarString::from(
|
||||||
|
self.get_addr().ok_or("failed to get addr of server")?,
|
||||||
|
),
|
||||||
crate::types::UShort::from(1234),
|
crate::types::UShort::from(1234),
|
||||||
crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?,
|
crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?,
|
||||||
)
|
)
|
||||||
|
|
@ -331,8 +401,8 @@ impl MinecraftServerHandle for Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_port(&self) -> String {
|
fn get_port(&self) -> Option<String> {
|
||||||
self.server_port.clone()
|
Some(self.server_port.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_motd(&self) -> Option<String> {
|
fn get_motd(&self) -> Option<String> {
|
||||||
|
|
@ -362,13 +432,15 @@ impl MinecraftServerHandle for Server {
|
||||||
impl Server {
|
impl Server {
|
||||||
async fn set_scale(&self, num: i32) -> Result<(), kube::Error> {
|
async fn set_scale(&self, num: i32) -> Result<(), kube::Error> {
|
||||||
let name = self
|
let name = self
|
||||||
.dep
|
.srv
|
||||||
.metadata
|
.metadata
|
||||||
.clone()
|
.clone()
|
||||||
.name
|
.name
|
||||||
.unwrap_or("#error#".to_string());
|
.unwrap_or("#error#".to_string());
|
||||||
let _res = self.cache.set_dep_scale(&name, num).await?;
|
let res = self.cache.set_dep_scale(&name, num).await;
|
||||||
|
if res.is_ok() {
|
||||||
tracing::info!("scaled replicas of {} to {num}", self.server_addr);
|
tracing::info!("scaled replicas of {} to {num}", self.server_addr);
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -381,8 +453,8 @@ where
|
||||||
res.labels()
|
res.labels()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(key, value)| match key.as_str() {
|
.filter(|(key, value)| match key.as_str() {
|
||||||
MAIN_LABEL => value.as_str() == addr,
|
"tami.moe/minecraft" => value.as_str() == addr,
|
||||||
PORT_LABEL => {
|
"tami.moe/minecraft-port" => {
|
||||||
found_port = true;
|
found_port = true;
|
||||||
value.as_str() == port
|
value.as_str() == port
|
||||||
}
|
}
|
||||||
|
|
|
||||||
23
src/main.rs
23
src/main.rs
|
|
@ -100,12 +100,9 @@ async fn main() {
|
||||||
async fn process_connection<T: MinecraftServerHandle>(
|
async fn process_connection<T: MinecraftServerHandle>(
|
||||||
mut client_stream: TcpStream,
|
mut client_stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
api: impl MinecraftAPI<T> + Send + Sync + 'static + Clone,
|
api: impl MinecraftAPI<T>,
|
||||||
config: Config,
|
config: Config,
|
||||||
) -> Result<(), OpaqueError>
|
) -> Result<(), OpaqueError> {
|
||||||
where
|
|
||||||
T: Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
// this is wrapper so that async doesnt mess up the span, and
|
// this is wrapper so that async doesnt mess up the span, and
|
||||||
// to make sure this doesn't propagate to later `handle_*`
|
// to make sure this doesn't propagate to later `handle_*`
|
||||||
#[tracing::instrument(level = "info", skip(client_stream, config))]
|
#[tracing::instrument(level = "info", skip(client_stream, config))]
|
||||||
|
|
@ -188,10 +185,7 @@ async fn handle_status<T: MinecraftServerHandle>(
|
||||||
client_stream: &mut TcpStream,
|
client_stream: &mut TcpStream,
|
||||||
handshake: &Handshake,
|
handshake: &Handshake,
|
||||||
api: impl MinecraftAPI<T>,
|
api: impl MinecraftAPI<T>,
|
||||||
) -> Result<(), OpaqueError>
|
) -> Result<(), OpaqueError> {
|
||||||
where
|
|
||||||
T: Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
let client_packet = Packet::parse(client_stream).await?;
|
let client_packet = Packet::parse(client_stream).await?;
|
||||||
if client_packet.id.get_int() != 0 {
|
if client_packet.id.get_int() != 0 {
|
||||||
return Err(OpaqueError::create(&format!(
|
return Err(OpaqueError::create(&format!(
|
||||||
|
|
@ -282,11 +276,8 @@ async fn handle_login<T: MinecraftServerHandle>(
|
||||||
client_stream: &mut TcpStream,
|
client_stream: &mut TcpStream,
|
||||||
handshake: &Handshake,
|
handshake: &Handshake,
|
||||||
login_start: LoginStart,
|
login_start: LoginStart,
|
||||||
api: impl MinecraftAPI<T> + Send + Sync + 'static + Clone,
|
api: impl MinecraftAPI<T>,
|
||||||
) -> Result<(), OpaqueError>
|
) -> Result<(), OpaqueError> {
|
||||||
where
|
|
||||||
T: Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
let server = api
|
let server = api
|
||||||
.query_server(
|
.query_server(
|
||||||
&handshake.get_server_address(),
|
&handshake.get_server_address(),
|
||||||
|
|
@ -299,7 +290,7 @@ where
|
||||||
tracing::debug!(msg = "server status", status = ?status);
|
tracing::debug!(msg = "server status", status = ?status);
|
||||||
match status {
|
match status {
|
||||||
ServerDeploymentStatus::Connectable(mut server_stream) => {
|
ServerDeploymentStatus::Connectable(mut server_stream) => {
|
||||||
api.start_watch(server.clone(), Duration::from_secs(60))
|
api.start_watch(server.clone(), Duration::from_secs(600))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// referenced from:
|
// referenced from:
|
||||||
|
|
@ -350,7 +341,7 @@ where
|
||||||
}
|
}
|
||||||
ServerDeploymentStatus::Offline => {
|
ServerDeploymentStatus::Offline => {
|
||||||
server.start().await?;
|
server.start().await?;
|
||||||
api.start_watch(server.clone(), Duration::from_secs(60))
|
api.start_watch(server.clone(), Duration::from_secs(600))
|
||||||
.await?;
|
.await?;
|
||||||
mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?;
|
mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,4 @@
|
||||||
use std::{collections::HashMap, sync::Arc};
|
|
||||||
|
|
||||||
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
use tokio::{io::AsyncWriteExt, net::TcpStream};
|
||||||
use tracing::Instrument;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
packets::{
|
packets::{
|
||||||
|
|
@ -63,14 +60,13 @@ pub async fn send_disconnect(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait MinecraftServerHandle: Clone {
|
pub trait MinecraftServerHandle: Clone {
|
||||||
async fn start(&self) -> Result<(), OpaqueError>;
|
async fn start(&self) -> Result<(), OpaqueError>;
|
||||||
async fn stop(&self) -> Result<(), OpaqueError>;
|
async fn stop(&self) -> Result<(), OpaqueError>;
|
||||||
async fn query_status(&self) -> Result<ServerDeploymentStatus, OpaqueError>;
|
async fn query_status(&self) -> Result<ServerDeploymentStatus, OpaqueError>;
|
||||||
fn get_internal_address(&self) -> &str;
|
fn get_internal_address(&self) -> &str;
|
||||||
fn get_addr(&self) -> String;
|
fn get_addr(&self) -> Option<String>;
|
||||||
fn get_port(&self) -> String;
|
fn get_port(&self) -> Option<String>;
|
||||||
fn get_motd(&self) -> Option<String>;
|
fn get_motd(&self) -> Option<String>;
|
||||||
|
|
||||||
async fn query_server_connectable(&self) -> Result<TcpStream, OpaqueError> {
|
async fn query_server_connectable(&self) -> Result<TcpStream, OpaqueError> {
|
||||||
|
|
@ -115,82 +111,24 @@ pub trait MinecraftServerHandle: Clone {
|
||||||
tracing::trace!("data exchanged while proxying status: {:?}", data_amount);
|
tracing::trace!("data exchanged while proxying status: {:?}", data_amount);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError>;
|
// TODO: move the implementation to here, but
|
||||||
|
// the async things are *strange* in rust
|
||||||
|
fn query_description(
|
||||||
|
&self,
|
||||||
|
) -> impl std::future::Future<Output = Result<Box<dyn StatusTrait>, OpaqueError>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait MinecraftAPI<T> {
|
pub trait MinecraftAPI<T> {
|
||||||
async fn query_server(&self, addr: &str, port: &str) -> Result<T, OpaqueError>;
|
async fn query_server(&self, addr: &str, port: &str) -> Result<T, OpaqueError>;
|
||||||
fn get_map(&self) -> Arc<tokio::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>;
|
|
||||||
|
|
||||||
// TODO: move the implementation to here, but
|
// TODO: move the implementation to here, but
|
||||||
/// This should be callable even if there is already a watcher,
|
/// This should be callable even if there is already a watcher,
|
||||||
/// and it should handle the collision itself while returning OK().
|
/// and it should handle the collision itself while returning OK().
|
||||||
async fn start_watch(
|
async fn start_watch(
|
||||||
self,
|
self,
|
||||||
server: impl MinecraftServerHandle + Send + Sync + 'static,
|
server: impl MinecraftServerHandle,
|
||||||
frequency: std::time::Duration,
|
frequency: std::time::Duration,
|
||||||
) -> Result<(), OpaqueError>
|
) -> Result<(), OpaqueError>;
|
||||||
where
|
|
||||||
Self: Send + Sync + 'static + Clone,
|
|
||||||
{
|
|
||||||
let inter_addr = server.get_internal_address().to_string();
|
|
||||||
|
|
||||||
if let Some(handle) = self.get_map().lock().await.get(&inter_addr) {
|
|
||||||
if !handle.is_finished() {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let span = tracing::span!(parent: None,tracing::Level::INFO, "server_watcher", inter_addr, join_addr = server.get_addr(), join_port = server.get_port());
|
|
||||||
|
|
||||||
let full_addr_clone = inter_addr.clone();
|
|
||||||
let api = self.clone();
|
|
||||||
let handle = tokio::spawn(
|
|
||||||
async move {
|
|
||||||
tracing::info!("starting watcher");
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(frequency).await;
|
|
||||||
let status_json = match server.query_description().await {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(
|
|
||||||
err = format!("{}", e.context),
|
|
||||||
"could not query description"
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if status_json.get_players_online() == 0 {
|
|
||||||
// With this I don't need to specify that StatusTrait
|
|
||||||
// should be send as well.
|
|
||||||
|
|
||||||
// Otherwise I would need to have it be defined as:
|
|
||||||
// trait StatusTrait: Send { ... }
|
|
||||||
drop(status_json);
|
|
||||||
let map = api.get_map();
|
|
||||||
let mut guard = map.lock().await;
|
|
||||||
guard.remove(&full_addr_clone);
|
|
||||||
drop(guard);
|
|
||||||
drop(map);
|
|
||||||
if let Err(err) = server.stop().await {
|
|
||||||
tracing::error!(
|
|
||||||
trace = %err.print_span_trace(),
|
|
||||||
err = err.context,
|
|
||||||
msg = "failed to stop server"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
.instrument(span),
|
|
||||||
);
|
|
||||||
let map = self.get_map();
|
|
||||||
let mut guard = map.lock().await;
|
|
||||||
guard.insert(inter_addr.clone(), handle);
|
|
||||||
drop(guard);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum ServerDeploymentStatus {
|
pub enum ServerDeploymentStatus {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue