diff --git a/kube/rbac.yaml b/kube/rbac.yaml deleted file mode 100644 index 954af26..0000000 --- a/kube/rbac.yaml +++ /dev/null @@ -1,29 +0,0 @@ -apiVersion: v1 -kind: ServiceAccount -metadata: - name: minecraft-ingress - namespace: default ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: Role -metadata: - namespace: default - name: minecraft-ingress -rules: -- apiGroups: ["apps", ""] # "" indicates the core API group - resources: ["pods","deployments","services"] - verbs: ["get", "list", "patch", "watch"] ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding -metadata: - name: minecraft-ingress - namespace: default -subjects: -- kind: ServiceAccount - name: minecraft-ingress - namespace: default -roleRef: - kind: Role - name: minecraft-ingress - apiGroup: rbac.authorization.k8s.io diff --git a/kube/deployment.yaml b/kube/roles.yaml similarity index 55% rename from kube/deployment.yaml rename to kube/roles.yaml index e19de76..216972c 100644 --- a/kube/deployment.yaml +++ b/kube/roles.yaml @@ -1,3 +1,33 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: minecraft-ingress + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: default + name: minecraft-ingress +rules: +- apiGroups: ["apps", ""] # "" indicates the core API group + resources: ["pods","deployments","services"] + verbs: ["get", "list", "patch", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: minecraft-ingress + namespace: default +subjects: +- kind: ServiceAccount + name: minecraft-ingress + namespace: default +roleRef: + kind: Role + name: minecraft-ingress + apiGroup: rbac.authorization.k8s.io +--- apiVersion: apps/v1 kind: Deployment metadata: @@ -18,7 +48,7 @@ spec: terminationGracePeriodSeconds: 5 containers: - name: minecraft-ingress - image: git.tami.moe/tamipes/minecraft-ingress:testing + image: git.tami.moe/tamipes/minecraft-ingress:latest env: - name: FILTER_CONN value: '(addr == "87.229.85.222") || (addr == "") || (addr == "ogmur.xyz") || (addr == "@mat:matdoes.dev (hi honeypots) ") || (addr == "@mat:matdoes.dev ") || (addr == "slowstack.tv")' diff --git a/kube/test-deployment.yaml b/kube/test-deployment.yaml deleted file mode 100644 index 0d5089a..0000000 --- a/kube/test-deployment.yaml +++ /dev/null @@ -1,25 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: minecraft-ingress - labels: - app: minecraft-ingress -spec: - replicas: 1 - selector: - matchLabels: - app: minecraft-ingress - template: - metadata: - labels: - app: minecraft-ingress - spec: - serviceAccountName: minecraft-ingress - terminationGracePeriodSeconds: 5 - containers: - - name: minecraft-ingress - image: git.tami.moe/tamipes/minecraft-ingress:testing - env: - - name: FILTER_CONN - value: '(addr == "87.229.85.222") || (addr == "") || (addr == "ogmur.xyz") || (addr == "@mat:matdoes.dev (hi honeypots) ") || (addr == "@mat:matdoes.dev ") || (addr == "slowstack.tv")' - diff --git a/src/kube_cache.rs b/src/kube_cache.rs index 9a762ea..27ae892 100644 --- a/src/kube_cache.rs +++ b/src/kube_cache.rs @@ -1,17 +1,21 @@ use std::{collections::HashMap, fmt, sync::Arc, time::Duration}; -use futures::StreamExt; -use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service}; +use k8s_openapi::{ + api::{apps::v1::Deployment, core::v1::Service}, + apimachinery::pkg::util::intstr::IntOrString, +}; use kube::{ api::{ListParams, ObjectList, Patch, PatchParams}, - runtime::{reflector::Lookup, WatchStreamExt}, + runtime::reflector::Lookup, Api, Client, ResourceExt, }; use serde_json::json; use tokio::task::JoinHandle; +use tracing::Instrument; use crate::{ mc_server::{sanitize_addr, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus}, + packets::{clientbound::status::StatusTrait, SendPacket}, OpaqueError, }; const MAIN_LABEL: &str = "tami.moe/minecraft"; @@ -24,94 +28,69 @@ const PORT_LABEL: &str = "tami.moe/minecraft-port"; /// the underlying async data access. #[derive(Debug, Clone)] pub struct KubeCache { - // When the store gets copied, it points to the same backing store - // In other words, this satisfies my restriction for cloning and - // there is need no to wrap it in Arc<> - dep_cache: kube::runtime::reflector::Store, - dep_api: Api, - srv_api: Api, + deployments: Api, + services: Api, in_cluster: bool, } impl KubeCache { /// This initializes the creation of a "kubernetes client" /// and if it is not possible returns a None. - /// - /// It also returns a JoinHandle which is tied to the api watcher - /// and if it returns that means that the kubecache is not responsive now. - pub async fn create() -> Option<(KubeCache, JoinHandle<()>)> { + pub async fn create() -> Option { let in_cluster = match std::env::var("KUBERNETES_SERVICE_HOST") { - Ok(_) => true, - Err(_) => false, + Ok(x) => true, + Err(e) => false, }; let client = Client::try_default().await.unwrap(); - let dep_api: Api = Api::default_namespaced(client.clone()); - let lp = kube::runtime::watcher::Config::default().labels(MAIN_LABEL); - let (dep_reader, dep_writer) = kube::runtime::reflector::store(); - let dep_watcher_rf = - kube::runtime::reflector(dep_writer, kube::runtime::watcher(dep_api.clone(), lp)); + let deployments: Api = Api::default_namespaced(client.clone()); + let services: Api = Api::default_namespaced(client); - let handle = tokio::spawn(async move { - // Source: https://docs.rs/kube/latest/kube/runtime/fn.reflector.html - let infinite_watch = dep_watcher_rf - .applied_objects() - .for_each(|_o| std::future::ready(())); - let _res = infinite_watch.await; - tracing::error!( - "Deployments watcher ended. This should not happen. (Program should exit now)" - ); + return Some(KubeCache { + deployments, + services, + in_cluster, }); - - let srv_api: Api = Api::default_namespaced(client); - - return Some(( - KubeCache { - dep_cache: dep_reader, - dep_api: dep_api, - srv_api, - in_cluster, - }, - handle, - )); } - fn get_dep(&self, name: &str) -> Option> { - self.dep_cache.find(|x| x.name_any() == name) + async fn get_dep(&self, name: &str) -> Result { + self.deployments.get(name).await } async fn get_srv(&self, name: &str) -> Result { - self.srv_api.get(name).await + self.services.get(name).await } - async fn list_deploys(&self) -> Vec> { - self.dep_cache.state() - } - async fn list_srvs(&self) -> ObjectList { + async fn get_deploys(&self) -> ObjectList { // let lp: ListParams = ListParams::default(); let lp: ListParams = ListParams::default().labels(MAIN_LABEL); - self.srv_api.list(&lp).await.unwrap() + self.deployments.list(&lp).await.unwrap() + } + async fn get_srvs(&self) -> ObjectList { + // let lp: ListParams = ListParams::default(); + let lp: ListParams = ListParams::default().labels(MAIN_LABEL); + self.services.list(&lp).await.unwrap() } - pub async fn query_dep(&self, addr: &str, port: &str) -> Option> { - let deploys = self.list_deploys().await; - deploys - .into_iter() - .find(|x| filter_label_value(x.as_ref(), addr, port)) + pub async fn query_dep_addr(&self, addr: &str, port: &str) -> Option { + let deploys = self.get_deploys().await; + let result = deploys.iter().find(|x| filter_label_value(x, addr, port))?; + Some(result.name()?.to_string()) } pub async fn query_srv(&self, addr: &str, port: &str) -> Option { - let deploys = self.list_srvs().await; - deploys.into_iter().find(|x| { + let deploys = self.get_srvs().await; + let result = deploys.into_iter().find(|x| { let in_cluster = match x.spec.as_ref().unwrap().type_.as_ref() { Some(t) => t == "ClusterIP", None => false, }; let incorrect_type = in_cluster ^ self.in_cluster; - !incorrect_type && filter_label_value(x, addr, port) - }) + !incorrect_type && filter_label_value(&x, addr, port) + })?; + Some(result) } async fn set_dep_scale(&self, name: &str, num: i32) -> Result { let patch = Patch::Merge(json!({"spec":{"replicas": num}})); let pp = PatchParams::default(); - self.dep_api.patch(name, &pp, &patch).await + self.deployments.patch(name, &pp, &patch).await } } @@ -130,7 +109,7 @@ impl MinecraftAPI for McApi { async fn query_server(&self, addr: &str, port: &str) -> Result { let addr = sanitize_addr(&addr); - let deployment = match self.cache.query_dep(&addr, &port).await { + let dep_name = match self.cache.query_dep_addr(&addr, &port).await { Some(x) => x, None => { return Err(OpaqueError::create_with_kind( @@ -149,50 +128,51 @@ impl MinecraftAPI for McApi { } }; - let internal_id = match deployment.metadata.name.clone() { - Some(x) => x, - None => { - return Err(OpaqueError::create_with_kind( - "Could not query deployment metadata for name", - "InternalIdQueryFailed", - )); - } - }; - + let deployment = self.cache.get_dep(&dep_name).await.map_err(|x| { + format!( + "Failed to query cache for deployment with dep_name err:{}", + x.to_string() + ) + })?; tracing::debug!("found kubernetes deployment & service"); let service_port_spec = service.clone().spec.unwrap().ports.unwrap(); - let port_srv = service_port_spec + let port = service_port_spec .iter() .find(|x| x.name.clone().unwrap() == "mc-router") .ok_or(OpaqueError::create( "Could not find \"mc-router\" nodePort for server", ))?; + let port_string; let inter_addr = match self.cache.in_cluster { false => { - let node_port = port_srv + let node_port = port .node_port .map(|x| x.to_string()) .ok_or(OpaqueError::create("Could not map nodePort to port string"))?; + port_string = node_port.clone(); format!("localhost:{}", node_port) } true => { - let target_port = port_srv.port; - format!( + let target_port = port.port; + let a = format!( "{}.default.svc.cluster.local:{}", service .name() .ok_or("Could not get name of ClusterIP service")?, target_port - ) + ); + port_string = a.clone(); + a } }; return Ok(Server { + dep: deployment, + srv: service, server_addr: addr.to_string(), - server_port: port.to_string(), - internal_address: inter_addr, - internal_id, + server_port: port_string, cache: self.cache.clone(), + inter_addr: inter_addr, }); } @@ -202,33 +182,43 @@ impl MinecraftAPI for McApi { } impl McApi { - pub async fn create() -> Option<(Self, JoinHandle<()>)> { - let (kube_cache, kube_task) = KubeCache::create().await?; - Some(( - Self { - cache: kube_cache, - map: Arc::new(tokio::sync::Mutex::new(HashMap::new())), - }, - kube_task, - )) + pub async fn create() -> Option { + Some(Self { + cache: KubeCache::create().await?, + map: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + }) } } #[derive(Clone)] pub struct Server { + dep: Deployment, + srv: Service, server_addr: String, server_port: String, - internal_address: String, - internal_id: String, cache: KubeCache, + inter_addr: String, } impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("KubeServer") - .field("internal_id", &self.internal_id) .field( - "join_addr", - &format!("{}:{}", self.server_addr, self.server_port), + "dep", + &self + .dep + .metadata + .clone() + .name + .unwrap_or("#error#".to_string()), + ) + .field( + "srv", + &self + .srv + .metadata + .clone() + .name + .unwrap_or("#error#".to_string()), ) .finish() } @@ -249,12 +239,7 @@ impl MinecraftServerHandle for Server { #[tracing::instrument(level = "info")] async fn query_status(&self) -> Result { - let dep = self - .cache - .get_dep(&self.internal_id) - .ok_or_else(|| OpaqueError::create("Failed to get deployment from cache"))?; - - let status = match &dep.status { + let mut status = match self.dep.clone().status { Some(x) => x, None => { return Err(OpaqueError::create( @@ -264,21 +249,21 @@ impl MinecraftServerHandle for Server { }; let total_replicas = status .replicas - .unwrap_or_else(|| { + .get_or_insert_with(|| { tracing::trace!("total_replicas failed to get"); -1 }) .clone(); let available_replicas = status .available_replicas - .unwrap_or_else(|| { + .get_or_insert_with(|| { tracing::trace!("available_replicas failed to get"); -1 }) .clone(); let ready_replicas = status .ready_replicas - .unwrap_or_else(|| { + .get_or_insert_with(|| { tracing::trace!("ready_replicas failed to get"); -1 }) @@ -299,21 +284,60 @@ impl MinecraftServerHandle for Server { } fn get_internal_address(&self) -> &str { - self.internal_address.as_str() + self.inter_addr.as_str() } fn get_addr(&self) -> String { self.server_addr.clone() } + async fn query_description(&self) -> Result, OpaqueError> { + let status = self.query_status().await?; + match status { + ServerDeploymentStatus::Connectable(mut tcp_stream) => { + let handshake = crate::packets::serverbound::handshake::Handshake::create( + crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?, + crate::types::VarString::from(self.get_addr()), + crate::types::UShort::from(1234), + crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?, + ) + .ok_or("failed to create handshake packet from scratch... WTF?")?; + handshake + .send_packet(&mut tcp_stream) + .await + .map_err(|_e| "failed to send handshake packet to server")?; + let status_rq = crate::packets::Packet::from_bytes(0, Vec::new()) + .ok_or("Failed to create status request packet from scratch")?; + status_rq + .send_packet(&mut tcp_stream) + .await + .map_err(|_e| "failed to send status request packet to server")?; + let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?; + let status_response = + crate::packets::clientbound::status::StatusResponse::parse(return_packet) + .await + .unwrap(); + + return status_response.get_json().ok_or(OpaqueError::create( + "failed to parse status response from server", + )); + } + _ => { + return Err(OpaqueError::create(&format!( + "server is not running; status={:?}", + status + ))) + } + } + } + fn get_port(&self) -> String { self.server_port.clone() } fn get_motd(&self) -> Option { - let dep = self.cache.get_dep(&self.internal_id)?; - - let all_container_motds = dep + let all_container_motds = self + .dep .spec .clone()? .template @@ -337,17 +361,19 @@ impl MinecraftServerHandle for Server { impl Server { async fn set_scale(&self, num: i32) -> Result<(), kube::Error> { - let _res = self.cache.set_dep_scale(&self.internal_id, num).await?; - tracing::info!( - "scaled replicas of {}:{} to {num}", - self.server_addr, - self.server_port - ); + let name = self + .dep + .metadata + .clone() + .name + .unwrap_or("#error#".to_string()); + let _res = self.cache.set_dep_scale(&name, num).await?; + tracing::info!("scaled replicas of {} to {num}", self.server_addr); Ok(()) } } -fn filter_label_value(res: &R, addr: &str, port: &str) -> bool +fn filter_label_value(res: &&R, addr: &str, port: &str) -> bool where R: ResourceExt, { diff --git a/src/main.rs b/src/main.rs index 87bcb9e..89c421e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -21,7 +21,6 @@ mod packets; mod types; static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r"); -static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600); #[tokio::main] async fn main() { @@ -40,7 +39,7 @@ async fn main() { let revision: &'static str = env!("COMMIT_HASH"); tracing::info!(revision); - let (api, api_task) = kube_cache::McApi::create().await.unwrap(); + let api = kube_cache::McApi::create().await.unwrap(); tracing::info!("initialized kube api"); let config: Config = Default::default(); @@ -48,64 +47,53 @@ async fn main() { let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap(); tracing::info!(bind_addr = config.bind_addr, "started tcp server"); - let conn_task = tokio::spawn(async move { - loop { - let (socket, addr) = listener.accept().await.unwrap(); - let api = api.clone(); + loop { + let (socket, addr) = listener.accept().await.unwrap(); + let api = api.clone(); - let config = config.clone(); - tokio::spawn(async move { + let config = config.clone(); + tokio::spawn(async move { + tracing::debug!( + addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + "Client connected" + ); + if let Err(e) = process_connection(socket, addr, api, config).await { + match e.level { + tracing::Level::ERROR => tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + "Client disconnected" + ), + tracing::Level::WARN => tracing::warn!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + "Client disconnected" + ), + tracing::Level::INFO => tracing::info!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + "Client disconnected" + ), + _ => { + tracing::error!( + // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), + trace = %e.print_span_trace(), + err = format!("{}", e.context), + actual_level = ?e.level, + "Client disconnected (bad level)" + ) + } + } + } else { tracing::debug!( addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client connected" + "Client disconnected" ); - if let Err(e) = process_connection(socket, addr, api, config).await { - match e.level { - tracing::Level::ERROR => tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::WARN => tracing::warn!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - tracing::Level::INFO => tracing::info!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - "Client disconnected" - ), - _ => { - tracing::error!( - // addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - trace = %e.print_span_trace(), - err = format!("{}", e.context), - actual_level = ?e.level, - "Client disconnected (bad level)" - ) - } - } - } else { - tracing::debug!( - addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()), - "Client disconnected" - ); - } - }); - } - }); - - tokio::select! { - result = api_task => { - tracing::error!("The api tokio:spawn'ed task run to completion, which should not happen!"); - } - result = conn_task => { - tracing::error!("The connection handling tokio:spawn'ed task run to completion, which should not happen!"); - } + } + }); } } @@ -311,7 +299,8 @@ where tracing::debug!(msg = "server status", status = ?status); match status { ServerDeploymentStatus::Connectable(mut server_stream) => { - api.start_watch(server.clone(), OFFLINE_TIMER).await?; + api.start_watch(server.clone(), Duration::from_secs(600)) + .await?; // referenced from: // https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs @@ -361,7 +350,8 @@ where } ServerDeploymentStatus::Offline => { server.start().await?; - api.start_watch(server.clone(), OFFLINE_TIMER).await?; + api.start_watch(server.clone(), Duration::from_secs(600)) + .await?; mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?; } ServerDeploymentStatus::Unavailable(_) => unreachable!(), diff --git a/src/mc_server.rs b/src/mc_server.rs index 102523d..a4b097d 100644 --- a/src/mc_server.rs +++ b/src/mc_server.rs @@ -115,45 +115,7 @@ pub trait MinecraftServerHandle: Clone { tracing::trace!("data exchanged while proxying status: {:?}", data_amount); Ok(()) } - async fn query_description(&self) -> Result, OpaqueError> { - let status = self.query_status().await?; - match status { - ServerDeploymentStatus::Connectable(mut tcp_stream) => { - let handshake = crate::packets::serverbound::handshake::Handshake::create( - crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?, - crate::types::VarString::from(self.get_addr()), - crate::types::UShort::from(1234), - crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?, - ) - .ok_or("failed to create handshake packet from scratch... WTF?")?; - handshake - .send_packet(&mut tcp_stream) - .await - .map_err(|_e| "failed to send handshake packet to server")?; - let status_rq = crate::packets::Packet::from_bytes(0, Vec::new()) - .ok_or("Failed to create status request packet from scratch")?; - status_rq - .send_packet(&mut tcp_stream) - .await - .map_err(|_e| "failed to send status request packet to server")?; - let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?; - let status_response = - crate::packets::clientbound::status::StatusResponse::parse(return_packet) - .await - .unwrap(); - - return status_response.get_json().ok_or(OpaqueError::create( - "failed to parse status response from server", - )); - } - _ => { - return Err(OpaqueError::create(&format!( - "server is not running; status={:?}", - status - ))) - } - } - } + async fn query_description(&self) -> Result, OpaqueError>; } pub trait MinecraftAPI {