Compare commits
2 commits
a9a4e912f4
...
46c3a994c6
| Author | SHA1 | Date | |
|---|---|---|---|
| 46c3a994c6 | |||
| 6db804cdc1 |
6 changed files with 262 additions and 216 deletions
|
|
@ -1,33 +1,3 @@
|
||||||
apiVersion: v1
|
|
||||||
kind: ServiceAccount
|
|
||||||
metadata:
|
|
||||||
name: minecraft-ingress
|
|
||||||
namespace: default
|
|
||||||
---
|
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
|
||||||
kind: Role
|
|
||||||
metadata:
|
|
||||||
namespace: default
|
|
||||||
name: minecraft-ingress
|
|
||||||
rules:
|
|
||||||
- apiGroups: ["apps", ""] # "" indicates the core API group
|
|
||||||
resources: ["pods","deployments","services"]
|
|
||||||
verbs: ["get", "list", "patch", "watch"]
|
|
||||||
---
|
|
||||||
apiVersion: rbac.authorization.k8s.io/v1
|
|
||||||
kind: RoleBinding
|
|
||||||
metadata:
|
|
||||||
name: minecraft-ingress
|
|
||||||
namespace: default
|
|
||||||
subjects:
|
|
||||||
- kind: ServiceAccount
|
|
||||||
name: minecraft-ingress
|
|
||||||
namespace: default
|
|
||||||
roleRef:
|
|
||||||
kind: Role
|
|
||||||
name: minecraft-ingress
|
|
||||||
apiGroup: rbac.authorization.k8s.io
|
|
||||||
---
|
|
||||||
apiVersion: apps/v1
|
apiVersion: apps/v1
|
||||||
kind: Deployment
|
kind: Deployment
|
||||||
metadata:
|
metadata:
|
||||||
|
|
@ -48,7 +18,7 @@ spec:
|
||||||
terminationGracePeriodSeconds: 5
|
terminationGracePeriodSeconds: 5
|
||||||
containers:
|
containers:
|
||||||
- name: minecraft-ingress
|
- name: minecraft-ingress
|
||||||
image: git.tami.moe/tamipes/minecraft-ingress:latest
|
image: git.tami.moe/tamipes/minecraft-ingress:testing
|
||||||
env:
|
env:
|
||||||
- name: FILTER_CONN
|
- name: FILTER_CONN
|
||||||
value: '(addr == "87.229.85.222") || (addr == "") || (addr == "ogmur.xyz") || (addr == "@mat:matdoes.dev (hi honeypots) ") || (addr == "@mat:matdoes.dev ") || (addr == "slowstack.tv")'
|
value: '(addr == "87.229.85.222") || (addr == "") || (addr == "ogmur.xyz") || (addr == "@mat:matdoes.dev (hi honeypots) ") || (addr == "@mat:matdoes.dev ") || (addr == "slowstack.tv")'
|
||||||
29
kube/rbac.yaml
Normal file
29
kube/rbac.yaml
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
apiVersion: v1
|
||||||
|
kind: ServiceAccount
|
||||||
|
metadata:
|
||||||
|
name: minecraft-ingress
|
||||||
|
namespace: default
|
||||||
|
---
|
||||||
|
apiVersion: rbac.authorization.k8s.io/v1
|
||||||
|
kind: Role
|
||||||
|
metadata:
|
||||||
|
namespace: default
|
||||||
|
name: minecraft-ingress
|
||||||
|
rules:
|
||||||
|
- apiGroups: ["apps", ""] # "" indicates the core API group
|
||||||
|
resources: ["pods","deployments","services"]
|
||||||
|
verbs: ["get", "list", "patch", "watch"]
|
||||||
|
---
|
||||||
|
apiVersion: rbac.authorization.k8s.io/v1
|
||||||
|
kind: RoleBinding
|
||||||
|
metadata:
|
||||||
|
name: minecraft-ingress
|
||||||
|
namespace: default
|
||||||
|
subjects:
|
||||||
|
- kind: ServiceAccount
|
||||||
|
name: minecraft-ingress
|
||||||
|
namespace: default
|
||||||
|
roleRef:
|
||||||
|
kind: Role
|
||||||
|
name: minecraft-ingress
|
||||||
|
apiGroup: rbac.authorization.k8s.io
|
||||||
25
kube/test-deployment.yaml
Normal file
25
kube/test-deployment.yaml
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: minecraft-ingress
|
||||||
|
labels:
|
||||||
|
app: minecraft-ingress
|
||||||
|
spec:
|
||||||
|
replicas: 1
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
app: minecraft-ingress
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
labels:
|
||||||
|
app: minecraft-ingress
|
||||||
|
spec:
|
||||||
|
serviceAccountName: minecraft-ingress
|
||||||
|
terminationGracePeriodSeconds: 5
|
||||||
|
containers:
|
||||||
|
- name: minecraft-ingress
|
||||||
|
image: git.tami.moe/tamipes/minecraft-ingress:testing
|
||||||
|
env:
|
||||||
|
- name: FILTER_CONN
|
||||||
|
value: '(addr == "87.229.85.222") || (addr == "") || (addr == "ogmur.xyz") || (addr == "@mat:matdoes.dev (hi honeypots) ") || (addr == "@mat:matdoes.dev ") || (addr == "slowstack.tv")'
|
||||||
|
|
||||||
|
|
@ -1,21 +1,17 @@
|
||||||
use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
|
use std::{collections::HashMap, fmt, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use k8s_openapi::{
|
use futures::StreamExt;
|
||||||
api::{apps::v1::Deployment, core::v1::Service},
|
use k8s_openapi::api::{apps::v1::Deployment, core::v1::Service};
|
||||||
apimachinery::pkg::util::intstr::IntOrString,
|
|
||||||
};
|
|
||||||
use kube::{
|
use kube::{
|
||||||
api::{ListParams, ObjectList, Patch, PatchParams},
|
api::{ListParams, ObjectList, Patch, PatchParams},
|
||||||
runtime::reflector::Lookup,
|
runtime::{reflector::Lookup, WatchStreamExt},
|
||||||
Api, Client, ResourceExt,
|
Api, Client, ResourceExt,
|
||||||
};
|
};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tracing::Instrument;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
mc_server::{sanitize_addr, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus},
|
mc_server::{sanitize_addr, MinecraftAPI, MinecraftServerHandle, ServerDeploymentStatus},
|
||||||
packets::{clientbound::status::StatusTrait, SendPacket},
|
|
||||||
OpaqueError,
|
OpaqueError,
|
||||||
};
|
};
|
||||||
const MAIN_LABEL: &str = "tami.moe/minecraft";
|
const MAIN_LABEL: &str = "tami.moe/minecraft";
|
||||||
|
|
@ -28,69 +24,94 @@ const PORT_LABEL: &str = "tami.moe/minecraft-port";
|
||||||
/// the underlying async data access.
|
/// the underlying async data access.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct KubeCache {
|
pub struct KubeCache {
|
||||||
deployments: Api<Deployment>,
|
// When the store gets copied, it points to the same backing store
|
||||||
services: Api<Service>,
|
// In other words, this satisfies my restriction for cloning and
|
||||||
|
// there is need no to wrap it in Arc<>
|
||||||
|
dep_cache: kube::runtime::reflector::Store<Deployment>,
|
||||||
|
dep_api: Api<Deployment>,
|
||||||
|
srv_api: Api<Service>,
|
||||||
in_cluster: bool,
|
in_cluster: bool,
|
||||||
}
|
}
|
||||||
impl KubeCache {
|
impl KubeCache {
|
||||||
/// This initializes the creation of a "kubernetes client"
|
/// This initializes the creation of a "kubernetes client"
|
||||||
/// and if it is not possible returns a None.
|
/// and if it is not possible returns a None.
|
||||||
pub async fn create() -> Option<KubeCache> {
|
///
|
||||||
|
/// It also returns a JoinHandle which is tied to the api watcher
|
||||||
|
/// and if it returns that means that the kubecache is not responsive now.
|
||||||
|
pub async fn create() -> Option<(KubeCache, JoinHandle<()>)> {
|
||||||
let in_cluster = match std::env::var("KUBERNETES_SERVICE_HOST") {
|
let in_cluster = match std::env::var("KUBERNETES_SERVICE_HOST") {
|
||||||
Ok(x) => true,
|
Ok(_) => true,
|
||||||
Err(e) => false,
|
Err(_) => false,
|
||||||
};
|
};
|
||||||
let client = Client::try_default().await.unwrap();
|
let client = Client::try_default().await.unwrap();
|
||||||
|
|
||||||
let deployments: Api<Deployment> = Api::default_namespaced(client.clone());
|
let dep_api: Api<Deployment> = Api::default_namespaced(client.clone());
|
||||||
let services: Api<Service> = Api::default_namespaced(client);
|
let lp = kube::runtime::watcher::Config::default().labels(MAIN_LABEL);
|
||||||
|
let (dep_reader, dep_writer) = kube::runtime::reflector::store();
|
||||||
|
let dep_watcher_rf =
|
||||||
|
kube::runtime::reflector(dep_writer, kube::runtime::watcher(dep_api.clone(), lp));
|
||||||
|
|
||||||
return Some(KubeCache {
|
let handle = tokio::spawn(async move {
|
||||||
deployments,
|
// Source: https://docs.rs/kube/latest/kube/runtime/fn.reflector.html
|
||||||
services,
|
let infinite_watch = dep_watcher_rf
|
||||||
in_cluster,
|
.applied_objects()
|
||||||
|
.for_each(|_o| std::future::ready(()));
|
||||||
|
let _res = infinite_watch.await;
|
||||||
|
tracing::error!(
|
||||||
|
"Deployments watcher ended. This should not happen. (Program should exit now)"
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let srv_api: Api<Service> = Api::default_namespaced(client);
|
||||||
|
|
||||||
|
return Some((
|
||||||
|
KubeCache {
|
||||||
|
dep_cache: dep_reader,
|
||||||
|
dep_api: dep_api,
|
||||||
|
srv_api,
|
||||||
|
in_cluster,
|
||||||
|
},
|
||||||
|
handle,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
async fn get_dep(&self, name: &str) -> Result<Deployment, kube::Error> {
|
fn get_dep(&self, name: &str) -> Option<Arc<Deployment>> {
|
||||||
self.deployments.get(name).await
|
self.dep_cache.find(|x| x.name_any() == name)
|
||||||
}
|
}
|
||||||
async fn get_srv(&self, name: &str) -> Result<Service, kube::Error> {
|
async fn get_srv(&self, name: &str) -> Result<Service, kube::Error> {
|
||||||
self.services.get(name).await
|
self.srv_api.get(name).await
|
||||||
}
|
}
|
||||||
async fn get_deploys(&self) -> ObjectList<Deployment> {
|
async fn list_deploys(&self) -> Vec<Arc<Deployment>> {
|
||||||
|
self.dep_cache.state()
|
||||||
|
}
|
||||||
|
async fn list_srvs(&self) -> ObjectList<Service> {
|
||||||
// let lp: ListParams = ListParams::default();
|
// let lp: ListParams = ListParams::default();
|
||||||
let lp: ListParams = ListParams::default().labels(MAIN_LABEL);
|
let lp: ListParams = ListParams::default().labels(MAIN_LABEL);
|
||||||
self.deployments.list(&lp).await.unwrap()
|
self.srv_api.list(&lp).await.unwrap()
|
||||||
}
|
|
||||||
async fn get_srvs(&self) -> ObjectList<Service> {
|
|
||||||
// let lp: ListParams = ListParams::default();
|
|
||||||
let lp: ListParams = ListParams::default().labels(MAIN_LABEL);
|
|
||||||
self.services.list(&lp).await.unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn query_dep_addr(&self, addr: &str, port: &str) -> Option<String> {
|
pub async fn query_dep(&self, addr: &str, port: &str) -> Option<Arc<Deployment>> {
|
||||||
let deploys = self.get_deploys().await;
|
let deploys = self.list_deploys().await;
|
||||||
let result = deploys.iter().find(|x| filter_label_value(x, addr, port))?;
|
deploys
|
||||||
Some(result.name()?.to_string())
|
.into_iter()
|
||||||
|
.find(|x| filter_label_value(x.as_ref(), addr, port))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn query_srv(&self, addr: &str, port: &str) -> Option<Service> {
|
pub async fn query_srv(&self, addr: &str, port: &str) -> Option<Service> {
|
||||||
let deploys = self.get_srvs().await;
|
let deploys = self.list_srvs().await;
|
||||||
let result = deploys.into_iter().find(|x| {
|
deploys.into_iter().find(|x| {
|
||||||
let in_cluster = match x.spec.as_ref().unwrap().type_.as_ref() {
|
let in_cluster = match x.spec.as_ref().unwrap().type_.as_ref() {
|
||||||
Some(t) => t == "ClusterIP",
|
Some(t) => t == "ClusterIP",
|
||||||
None => false,
|
None => false,
|
||||||
};
|
};
|
||||||
let incorrect_type = in_cluster ^ self.in_cluster;
|
let incorrect_type = in_cluster ^ self.in_cluster;
|
||||||
!incorrect_type && filter_label_value(&x, addr, port)
|
!incorrect_type && filter_label_value(x, addr, port)
|
||||||
})?;
|
})
|
||||||
Some(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn set_dep_scale(&self, name: &str, num: i32) -> Result<Deployment, kube::Error> {
|
async fn set_dep_scale(&self, name: &str, num: i32) -> Result<Deployment, kube::Error> {
|
||||||
let patch = Patch::Merge(json!({"spec":{"replicas": num}}));
|
let patch = Patch::Merge(json!({"spec":{"replicas": num}}));
|
||||||
let pp = PatchParams::default();
|
let pp = PatchParams::default();
|
||||||
self.deployments.patch(name, &pp, &patch).await
|
self.dep_api.patch(name, &pp, &patch).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -109,7 +130,7 @@ impl MinecraftAPI<Server> for McApi {
|
||||||
async fn query_server(&self, addr: &str, port: &str) -> Result<Server, OpaqueError> {
|
async fn query_server(&self, addr: &str, port: &str) -> Result<Server, OpaqueError> {
|
||||||
let addr = sanitize_addr(&addr);
|
let addr = sanitize_addr(&addr);
|
||||||
|
|
||||||
let dep_name = match self.cache.query_dep_addr(&addr, &port).await {
|
let deployment = match self.cache.query_dep(&addr, &port).await {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => {
|
None => {
|
||||||
return Err(OpaqueError::create_with_kind(
|
return Err(OpaqueError::create_with_kind(
|
||||||
|
|
@ -128,51 +149,50 @@ impl MinecraftAPI<Server> for McApi {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let deployment = self.cache.get_dep(&dep_name).await.map_err(|x| {
|
let internal_id = match deployment.metadata.name.clone() {
|
||||||
format!(
|
Some(x) => x,
|
||||||
"Failed to query cache for deployment with dep_name err:{}",
|
None => {
|
||||||
x.to_string()
|
return Err(OpaqueError::create_with_kind(
|
||||||
)
|
"Could not query deployment metadata for name",
|
||||||
})?;
|
"InternalIdQueryFailed",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
tracing::debug!("found kubernetes deployment & service");
|
tracing::debug!("found kubernetes deployment & service");
|
||||||
|
|
||||||
let service_port_spec = service.clone().spec.unwrap().ports.unwrap();
|
let service_port_spec = service.clone().spec.unwrap().ports.unwrap();
|
||||||
let port = service_port_spec
|
let port_srv = service_port_spec
|
||||||
.iter()
|
.iter()
|
||||||
.find(|x| x.name.clone().unwrap() == "mc-router")
|
.find(|x| x.name.clone().unwrap() == "mc-router")
|
||||||
.ok_or(OpaqueError::create(
|
.ok_or(OpaqueError::create(
|
||||||
"Could not find \"mc-router\" nodePort for server",
|
"Could not find \"mc-router\" nodePort for server",
|
||||||
))?;
|
))?;
|
||||||
let port_string;
|
|
||||||
let inter_addr = match self.cache.in_cluster {
|
let inter_addr = match self.cache.in_cluster {
|
||||||
false => {
|
false => {
|
||||||
let node_port = port
|
let node_port = port_srv
|
||||||
.node_port
|
.node_port
|
||||||
.map(|x| x.to_string())
|
.map(|x| x.to_string())
|
||||||
.ok_or(OpaqueError::create("Could not map nodePort to port string"))?;
|
.ok_or(OpaqueError::create("Could not map nodePort to port string"))?;
|
||||||
port_string = node_port.clone();
|
|
||||||
format!("localhost:{}", node_port)
|
format!("localhost:{}", node_port)
|
||||||
}
|
}
|
||||||
true => {
|
true => {
|
||||||
let target_port = port.port;
|
let target_port = port_srv.port;
|
||||||
let a = format!(
|
format!(
|
||||||
"{}.default.svc.cluster.local:{}",
|
"{}.default.svc.cluster.local:{}",
|
||||||
service
|
service
|
||||||
.name()
|
.name()
|
||||||
.ok_or("Could not get name of ClusterIP service")?,
|
.ok_or("Could not get name of ClusterIP service")?,
|
||||||
target_port
|
target_port
|
||||||
);
|
)
|
||||||
port_string = a.clone();
|
|
||||||
a
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return Ok(Server {
|
return Ok(Server {
|
||||||
dep: deployment,
|
|
||||||
srv: service,
|
|
||||||
server_addr: addr.to_string(),
|
server_addr: addr.to_string(),
|
||||||
server_port: port_string,
|
server_port: port.to_string(),
|
||||||
|
internal_address: inter_addr,
|
||||||
|
internal_id,
|
||||||
cache: self.cache.clone(),
|
cache: self.cache.clone(),
|
||||||
inter_addr: inter_addr,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -182,43 +202,33 @@ impl MinecraftAPI<Server> for McApi {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl McApi {
|
impl McApi {
|
||||||
pub async fn create() -> Option<Self> {
|
pub async fn create() -> Option<(Self, JoinHandle<()>)> {
|
||||||
Some(Self {
|
let (kube_cache, kube_task) = KubeCache::create().await?;
|
||||||
cache: KubeCache::create().await?,
|
Some((
|
||||||
map: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
|
Self {
|
||||||
})
|
cache: kube_cache,
|
||||||
|
map: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
|
||||||
|
},
|
||||||
|
kube_task,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
dep: Deployment,
|
|
||||||
srv: Service,
|
|
||||||
server_addr: String,
|
server_addr: String,
|
||||||
server_port: String,
|
server_port: String,
|
||||||
|
internal_address: String,
|
||||||
|
internal_id: String,
|
||||||
cache: KubeCache,
|
cache: KubeCache,
|
||||||
inter_addr: String,
|
|
||||||
}
|
}
|
||||||
impl fmt::Debug for Server {
|
impl fmt::Debug for Server {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.debug_struct("KubeServer")
|
f.debug_struct("KubeServer")
|
||||||
|
.field("internal_id", &self.internal_id)
|
||||||
.field(
|
.field(
|
||||||
"dep",
|
"join_addr",
|
||||||
&self
|
&format!("{}:{}", self.server_addr, self.server_port),
|
||||||
.dep
|
|
||||||
.metadata
|
|
||||||
.clone()
|
|
||||||
.name
|
|
||||||
.unwrap_or("#error#".to_string()),
|
|
||||||
)
|
|
||||||
.field(
|
|
||||||
"srv",
|
|
||||||
&self
|
|
||||||
.srv
|
|
||||||
.metadata
|
|
||||||
.clone()
|
|
||||||
.name
|
|
||||||
.unwrap_or("#error#".to_string()),
|
|
||||||
)
|
)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
|
|
@ -239,7 +249,12 @@ impl MinecraftServerHandle for Server {
|
||||||
|
|
||||||
#[tracing::instrument(level = "info")]
|
#[tracing::instrument(level = "info")]
|
||||||
async fn query_status(&self) -> Result<crate::mc_server::ServerDeploymentStatus, OpaqueError> {
|
async fn query_status(&self) -> Result<crate::mc_server::ServerDeploymentStatus, OpaqueError> {
|
||||||
let mut status = match self.dep.clone().status {
|
let dep = self
|
||||||
|
.cache
|
||||||
|
.get_dep(&self.internal_id)
|
||||||
|
.ok_or_else(|| OpaqueError::create("Failed to get deployment from cache"))?;
|
||||||
|
|
||||||
|
let status = match &dep.status {
|
||||||
Some(x) => x,
|
Some(x) => x,
|
||||||
None => {
|
None => {
|
||||||
return Err(OpaqueError::create(
|
return Err(OpaqueError::create(
|
||||||
|
|
@ -249,21 +264,21 @@ impl MinecraftServerHandle for Server {
|
||||||
};
|
};
|
||||||
let total_replicas = status
|
let total_replicas = status
|
||||||
.replicas
|
.replicas
|
||||||
.get_or_insert_with(|| {
|
.unwrap_or_else(|| {
|
||||||
tracing::trace!("total_replicas failed to get");
|
tracing::trace!("total_replicas failed to get");
|
||||||
-1
|
-1
|
||||||
})
|
})
|
||||||
.clone();
|
.clone();
|
||||||
let available_replicas = status
|
let available_replicas = status
|
||||||
.available_replicas
|
.available_replicas
|
||||||
.get_or_insert_with(|| {
|
.unwrap_or_else(|| {
|
||||||
tracing::trace!("available_replicas failed to get");
|
tracing::trace!("available_replicas failed to get");
|
||||||
-1
|
-1
|
||||||
})
|
})
|
||||||
.clone();
|
.clone();
|
||||||
let ready_replicas = status
|
let ready_replicas = status
|
||||||
.ready_replicas
|
.ready_replicas
|
||||||
.get_or_insert_with(|| {
|
.unwrap_or_else(|| {
|
||||||
tracing::trace!("ready_replicas failed to get");
|
tracing::trace!("ready_replicas failed to get");
|
||||||
-1
|
-1
|
||||||
})
|
})
|
||||||
|
|
@ -284,60 +299,21 @@ impl MinecraftServerHandle for Server {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_internal_address(&self) -> &str {
|
fn get_internal_address(&self) -> &str {
|
||||||
self.inter_addr.as_str()
|
self.internal_address.as_str()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_addr(&self) -> String {
|
fn get_addr(&self) -> String {
|
||||||
self.server_addr.clone()
|
self.server_addr.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError> {
|
|
||||||
let status = self.query_status().await?;
|
|
||||||
match status {
|
|
||||||
ServerDeploymentStatus::Connectable(mut tcp_stream) => {
|
|
||||||
let handshake = crate::packets::serverbound::handshake::Handshake::create(
|
|
||||||
crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?,
|
|
||||||
crate::types::VarString::from(self.get_addr()),
|
|
||||||
crate::types::UShort::from(1234),
|
|
||||||
crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?,
|
|
||||||
)
|
|
||||||
.ok_or("failed to create handshake packet from scratch... WTF?")?;
|
|
||||||
handshake
|
|
||||||
.send_packet(&mut tcp_stream)
|
|
||||||
.await
|
|
||||||
.map_err(|_e| "failed to send handshake packet to server")?;
|
|
||||||
let status_rq = crate::packets::Packet::from_bytes(0, Vec::new())
|
|
||||||
.ok_or("Failed to create status request packet from scratch")?;
|
|
||||||
status_rq
|
|
||||||
.send_packet(&mut tcp_stream)
|
|
||||||
.await
|
|
||||||
.map_err(|_e| "failed to send status request packet to server")?;
|
|
||||||
let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?;
|
|
||||||
let status_response =
|
|
||||||
crate::packets::clientbound::status::StatusResponse::parse(return_packet)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
return status_response.get_json().ok_or(OpaqueError::create(
|
|
||||||
"failed to parse status response from server",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
return Err(OpaqueError::create(&format!(
|
|
||||||
"server is not running; status={:?}",
|
|
||||||
status
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_port(&self) -> String {
|
fn get_port(&self) -> String {
|
||||||
self.server_port.clone()
|
self.server_port.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_motd(&self) -> Option<String> {
|
fn get_motd(&self) -> Option<String> {
|
||||||
let all_container_motds = self
|
let dep = self.cache.get_dep(&self.internal_id)?;
|
||||||
.dep
|
|
||||||
|
let all_container_motds = dep
|
||||||
.spec
|
.spec
|
||||||
.clone()?
|
.clone()?
|
||||||
.template
|
.template
|
||||||
|
|
@ -361,19 +337,17 @@ impl MinecraftServerHandle for Server {
|
||||||
|
|
||||||
impl Server {
|
impl Server {
|
||||||
async fn set_scale(&self, num: i32) -> Result<(), kube::Error> {
|
async fn set_scale(&self, num: i32) -> Result<(), kube::Error> {
|
||||||
let name = self
|
let _res = self.cache.set_dep_scale(&self.internal_id, num).await?;
|
||||||
.dep
|
tracing::info!(
|
||||||
.metadata
|
"scaled replicas of {}:{} to {num}",
|
||||||
.clone()
|
self.server_addr,
|
||||||
.name
|
self.server_port
|
||||||
.unwrap_or("#error#".to_string());
|
);
|
||||||
let _res = self.cache.set_dep_scale(&name, num).await?;
|
|
||||||
tracing::info!("scaled replicas of {} to {num}", self.server_addr);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn filter_label_value<R>(res: &&R, addr: &str, port: &str) -> bool
|
fn filter_label_value<R>(res: &R, addr: &str, port: &str) -> bool
|
||||||
where
|
where
|
||||||
R: ResourceExt,
|
R: ResourceExt,
|
||||||
{
|
{
|
||||||
|
|
|
||||||
102
src/main.rs
102
src/main.rs
|
|
@ -21,6 +21,7 @@ mod packets;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r");
|
static BYE_MESSAGE: &str = concat!("§dTami§r with §d<3§r §8(rev: ", env!("COMMIT_HASH"), ")§r");
|
||||||
|
static OFFLINE_TIMER: std::time::Duration = Duration::from_secs(600);
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
|
|
@ -39,7 +40,7 @@ async fn main() {
|
||||||
let revision: &'static str = env!("COMMIT_HASH");
|
let revision: &'static str = env!("COMMIT_HASH");
|
||||||
tracing::info!(revision);
|
tracing::info!(revision);
|
||||||
|
|
||||||
let api = kube_cache::McApi::create().await.unwrap();
|
let (api, api_task) = kube_cache::McApi::create().await.unwrap();
|
||||||
tracing::info!("initialized kube api");
|
tracing::info!("initialized kube api");
|
||||||
|
|
||||||
let config: Config = Default::default();
|
let config: Config = Default::default();
|
||||||
|
|
@ -47,53 +48,64 @@ async fn main() {
|
||||||
let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap();
|
let listener = TcpListener::bind(config.bind_addr.clone()).await.unwrap();
|
||||||
tracing::info!(bind_addr = config.bind_addr, "started tcp server");
|
tracing::info!(bind_addr = config.bind_addr, "started tcp server");
|
||||||
|
|
||||||
loop {
|
let conn_task = tokio::spawn(async move {
|
||||||
let (socket, addr) = listener.accept().await.unwrap();
|
loop {
|
||||||
let api = api.clone();
|
let (socket, addr) = listener.accept().await.unwrap();
|
||||||
|
let api = api.clone();
|
||||||
|
|
||||||
let config = config.clone();
|
let config = config.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
||||||
"Client connected"
|
"Client connected"
|
||||||
);
|
);
|
||||||
if let Err(e) = process_connection(socket, addr, api, config).await {
|
if let Err(e) = process_connection(socket, addr, api, config).await {
|
||||||
match e.level {
|
match e.level {
|
||||||
tracing::Level::ERROR => tracing::error!(
|
tracing::Level::ERROR => tracing::error!(
|
||||||
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
|
||||||
trace = %e.print_span_trace(),
|
|
||||||
err = format!("{}", e.context),
|
|
||||||
"Client disconnected"
|
|
||||||
),
|
|
||||||
tracing::Level::WARN => tracing::warn!(
|
|
||||||
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
|
||||||
trace = %e.print_span_trace(),
|
|
||||||
err = format!("{}", e.context),
|
|
||||||
"Client disconnected"
|
|
||||||
),
|
|
||||||
tracing::Level::INFO => tracing::info!(
|
|
||||||
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
|
||||||
trace = %e.print_span_trace(),
|
|
||||||
err = format!("{}", e.context),
|
|
||||||
"Client disconnected"
|
|
||||||
),
|
|
||||||
_ => {
|
|
||||||
tracing::error!(
|
|
||||||
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
||||||
trace = %e.print_span_trace(),
|
trace = %e.print_span_trace(),
|
||||||
err = format!("{}", e.context),
|
err = format!("{}", e.context),
|
||||||
actual_level = ?e.level,
|
"Client disconnected"
|
||||||
"Client disconnected (bad level)"
|
),
|
||||||
)
|
tracing::Level::WARN => tracing::warn!(
|
||||||
|
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
||||||
|
trace = %e.print_span_trace(),
|
||||||
|
err = format!("{}", e.context),
|
||||||
|
"Client disconnected"
|
||||||
|
),
|
||||||
|
tracing::Level::INFO => tracing::info!(
|
||||||
|
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
||||||
|
trace = %e.print_span_trace(),
|
||||||
|
err = format!("{}", e.context),
|
||||||
|
"Client disconnected"
|
||||||
|
),
|
||||||
|
_ => {
|
||||||
|
tracing::error!(
|
||||||
|
// addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
||||||
|
trace = %e.print_span_trace(),
|
||||||
|
err = format!("{}", e.context),
|
||||||
|
actual_level = ?e.level,
|
||||||
|
"Client disconnected (bad level)"
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
tracing::debug!(
|
||||||
|
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
||||||
|
"Client disconnected"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} else {
|
});
|
||||||
tracing::debug!(
|
}
|
||||||
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
|
});
|
||||||
"Client disconnected"
|
|
||||||
);
|
tokio::select! {
|
||||||
}
|
result = api_task => {
|
||||||
});
|
tracing::error!("The api tokio:spawn'ed task run to completion, which should not happen!");
|
||||||
|
}
|
||||||
|
result = conn_task => {
|
||||||
|
tracing::error!("The connection handling tokio:spawn'ed task run to completion, which should not happen!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -299,8 +311,7 @@ where
|
||||||
tracing::debug!(msg = "server status", status = ?status);
|
tracing::debug!(msg = "server status", status = ?status);
|
||||||
match status {
|
match status {
|
||||||
ServerDeploymentStatus::Connectable(mut server_stream) => {
|
ServerDeploymentStatus::Connectable(mut server_stream) => {
|
||||||
api.start_watch(server.clone(), Duration::from_secs(600))
|
api.start_watch(server.clone(), OFFLINE_TIMER).await?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
// referenced from:
|
// referenced from:
|
||||||
// https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs
|
// https://github.com/hanyu-dev/tokio-splice2/blob/fc47199fffde8946b0acf867d1fa0b2222267a34/examples/proxy.rs
|
||||||
|
|
@ -350,8 +361,7 @@ where
|
||||||
}
|
}
|
||||||
ServerDeploymentStatus::Offline => {
|
ServerDeploymentStatus::Offline => {
|
||||||
server.start().await?;
|
server.start().await?;
|
||||||
api.start_watch(server.clone(), Duration::from_secs(600))
|
api.start_watch(server.clone(), OFFLINE_TIMER).await?;
|
||||||
.await?;
|
|
||||||
mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?;
|
mc_server::send_disconnect(client_stream, format!("[\"\",{{\"text\":\"Okayy, §2starting§r the server!\n\n\"}},{{\"text\":\"{BYE_MESSAGE}\"}}]").as_str()).await?;
|
||||||
}
|
}
|
||||||
ServerDeploymentStatus::Unavailable(_) => unreachable!(),
|
ServerDeploymentStatus::Unavailable(_) => unreachable!(),
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,45 @@ pub trait MinecraftServerHandle: Clone {
|
||||||
tracing::trace!("data exchanged while proxying status: {:?}", data_amount);
|
tracing::trace!("data exchanged while proxying status: {:?}", data_amount);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError>;
|
async fn query_description(&self) -> Result<Box<dyn StatusTrait>, OpaqueError> {
|
||||||
|
let status = self.query_status().await?;
|
||||||
|
match status {
|
||||||
|
ServerDeploymentStatus::Connectable(mut tcp_stream) => {
|
||||||
|
let handshake = crate::packets::serverbound::handshake::Handshake::create(
|
||||||
|
crate::types::VarInt::from(746).ok_or("could not create VarInt WTF?")?,
|
||||||
|
crate::types::VarString::from(self.get_addr()),
|
||||||
|
crate::types::UShort::from(1234),
|
||||||
|
crate::types::VarInt::from(1).ok_or("could not create VarInt WTF?")?,
|
||||||
|
)
|
||||||
|
.ok_or("failed to create handshake packet from scratch... WTF?")?;
|
||||||
|
handshake
|
||||||
|
.send_packet(&mut tcp_stream)
|
||||||
|
.await
|
||||||
|
.map_err(|_e| "failed to send handshake packet to server")?;
|
||||||
|
let status_rq = crate::packets::Packet::from_bytes(0, Vec::new())
|
||||||
|
.ok_or("Failed to create status request packet from scratch")?;
|
||||||
|
status_rq
|
||||||
|
.send_packet(&mut tcp_stream)
|
||||||
|
.await
|
||||||
|
.map_err(|_e| "failed to send status request packet to server")?;
|
||||||
|
let return_packet = crate::packets::Packet::parse(&mut tcp_stream).await?;
|
||||||
|
let status_response =
|
||||||
|
crate::packets::clientbound::status::StatusResponse::parse(return_packet)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
return status_response.get_json().ok_or(OpaqueError::create(
|
||||||
|
"failed to parse status response from server",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
return Err(OpaqueError::create(&format!(
|
||||||
|
"server is not running; status={:?}",
|
||||||
|
status
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait MinecraftAPI<T> {
|
pub trait MinecraftAPI<T> {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue