feat: move the packets library over to use async

This commit is contained in:
Tamipes 2025-11-26 00:37:32 +01:00
parent 79ffeafc29
commit b75eec70e0
9 changed files with 163 additions and 71 deletions

View file

@ -1,9 +1,13 @@
//! This is a simple imitation of the basic functionality of kubectl:
//! kubectl {get, delete, apply, watch, edit} <resource> [name]
//! with labels and namespace selectors supported.
use std::sync::Arc;
use std::{net::SocketAddr, sync::Arc};
use tokio::net::{TcpListener, TcpStream};
use tracing::instrument;
use tracing_subscriber::{prelude::*, EnvFilter};
use crate::packets::Packet;
mod KubeCache;
mod packets;
@ -11,31 +15,59 @@ mod types;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
// tracing_subscriber::fmt::init();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_level(true);
let filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); // default to INFO
tracing_subscriber::registry()
.with(fmt_layer)
.with(filter_layer)
.init();
let commit_hash: &'static str = env!("COMMIT_HASH");
tracing::info!("COMMIT_HASH: {}", commit_hash);
let cache = KubeCache::Cache::create().unwrap();
let arcCache = Arc::new(cache);
let arc_cache = Arc::new(cache);
tracing::info!("kube api initialized");
let mut listener = TcpListener::bind("0.0.0.0:25565").await.unwrap();
let listener = TcpListener::bind("0.0.0.0:25565").await.unwrap();
tracing::info!("tcp server started");
loop {
let (socket, _) = listener.accept().await.unwrap();
let acc = arcCache.clone();
let (socket, addr) = listener.accept().await.unwrap();
let acc = arc_cache.clone();
tokio::spawn(async move {
if let Err(e) = process_socket(socket, acc).await {
if let Err(e) = process_socket(socket, addr, acc).await {
tracing::error!("socket error: {e:?}");
}
});
}
}
async fn process_socket(stream: TcpStream, cache: Arc<KubeCache::Cache>) -> Result<(), ()> {
#[instrument(level = "trace", skip(cache, stream))]
async fn process_socket(
mut stream: TcpStream,
addr: SocketAddr,
cache: Arc<KubeCache::Cache>,
) -> Result<(), ()> {
tracing::info!(
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
"Client connected"
);
let client_packet = match Packet::parse(&mut stream).await {
Some(x) => x,
None => {
tracing::trace!("Client HANDSHAKE -> bad packet; Disconnecting...");
return Ok(());
}
};
tracing::info!(
addr = format!("{}:{}", addr.ip().to_string(), addr.port().to_string()),
"Client disconnected"
);
todo!()
}
// ----- Debug tools -----

View file

@ -1,4 +1,4 @@
use std::io::Write;
use tokio::{io::AsyncWriteExt, net::TcpStream};
use crate::{
packets::{Packet, SendPacket},
@ -13,19 +13,18 @@ pub struct Disconnect {
}
impl Disconnect {
pub fn parse(packet: Packet) -> Option<Disconnect> {
let mut reader = packet.data.into_iter();
pub async fn parse(packet: Packet) -> Option<Disconnect> {
Some(Disconnect {
all: packet.all,
reason: VarString::parse(&mut reader)?,
reason: VarString::parse(&mut packet.data.clone()).await?,
})
}
pub fn get_string(&self) -> String {
self.reason.get_value()
}
pub fn set_reason(reason: String) -> Option<Disconnect> {
pub async fn set_reason(reason: String) -> Option<Disconnect> {
let vec = VarString::from(reason).move_data()?;
Disconnect::parse(Packet::from_bytes(0, vec)?)
Disconnect::parse(Packet::from_bytes(0, vec)?).await
}
pub fn get_all(&self) -> Vec<u8> {
self.all.clone()
@ -33,9 +32,9 @@ impl Disconnect {
}
impl SendPacket for Disconnect {
fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all)?;
stream.flush()?;
async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all).await?;
stream.flush().await?;
Ok(())
}
}

View file

@ -1,7 +1,8 @@
use std::{collections::HashMap, io::Write};
use std::collections::HashMap;
use serde_derive::{Deserialize, Serialize};
use serde_json::Value;
use tokio::{io::AsyncWriteExt, net::TcpStream};
use crate::{
packets::{Packet, SendPacket},
@ -117,11 +118,10 @@ pub struct StatusResponse {
}
impl StatusResponse {
pub fn parse(packet: Packet) -> Option<StatusResponse> {
let mut reader = packet.data.into_iter();
pub async fn parse(packet: Packet) -> Option<StatusResponse> {
Some(StatusResponse {
all: packet.all,
json: VarString::parse(&mut reader)?,
json: VarString::parse(&mut packet.data.clone()).await?,
})
}
pub fn get_string(&self) -> String {
@ -137,9 +137,11 @@ impl StatusResponse {
}
None
}
pub fn set_json(json: Box<dyn StatusTrait>) -> StatusResponse {
pub async fn set_json(json: Box<dyn StatusTrait>) -> StatusResponse {
let vec = VarString::from(json.get_string()).move_data().unwrap();
StatusResponse::parse(Packet::from_bytes(0, vec).unwrap()).unwrap()
StatusResponse::parse(Packet::from_bytes(0, vec).unwrap())
.await
.unwrap()
}
pub fn get_all(&self) -> Vec<u8> {
self.all.clone()
@ -147,9 +149,9 @@ impl StatusResponse {
}
impl SendPacket for StatusResponse {
fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all)?;
stream.flush()?;
async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all).await?;
stream.flush().await?;
Ok(())
}
}

View file

@ -1,7 +1,10 @@
use std::{
io::{self, Read, Write},
net::TcpStream,
};
// use std::io::{self, Read, Write};
use tokio::io;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tracing::instrument;
use crate::types::VarInt;
pub mod clientbound;
@ -15,12 +18,12 @@ pub struct Packet {
pub all: Vec<u8>,
}
pub trait SendPacket {
fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()>;
async fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()>;
}
impl SendPacket for Packet {
fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()> {
stream.write_all(&self.all)?;
async fn send_packet(&self, stream: &mut TcpStream) -> io::Result<()> {
stream.write_all(&self.all).await?;
Ok(())
}
}
@ -53,14 +56,14 @@ impl Packet {
all,
})
}
pub fn parse(buf: &mut TcpStream) -> Option<Packet> {
let bytes_iter = &mut buf.bytes().into_iter().map(|x| x.unwrap());
let length = VarInt::parse(bytes_iter)?;
// println!("---length: {length}");
let id = match VarInt::parse(bytes_iter) {
#[instrument(level = "trace")]
pub async fn parse(buf: &mut TcpStream) -> Option<Packet> {
let length = VarInt::parse(buf).await?;
tracing::trace!(length = length.get_int());
let id = match VarInt::parse(buf).await {
Some(x) => x,
None => {
println!("Packet id problem(it was None)! REEEEEEEEEEEEEEEEEEEE");
tracing::error!("Packet id problem(it was None)! REEEEEEEEEEEEEEEEEEEE");
panic!();
// return None;
}
@ -71,7 +74,7 @@ impl Packet {
}
let mut data: Vec<u8> = vec![0; length.get_int() as usize - id.get_data().len()];
match buf.read_exact(&mut data) {
match buf.read_exact(&mut data).await {
Ok(_) => {
// data_id.append(&mut data.clone());
// data_length.append(&mut data_id);

View file

@ -1,6 +1,4 @@
use std::io::Write;
use nix::NixPath;
use tokio::{io::AsyncWriteExt, net::TcpStream};
use crate::{
packets::{Packet, SendPacket},
@ -17,12 +15,12 @@ pub struct Handshake {
}
impl Handshake {
pub fn parse(packet: Packet) -> Option<Handshake> {
let mut reader = packet.data.clone().into_iter();
let protocol_version = VarInt::parse(&mut reader)?;
let server_address = VarString::parse(&mut reader)?;
let server_port = UShort::parse(&mut reader)?;
let next_state = VarInt::parse(&mut reader)?;
pub async fn parse(packet: Packet) -> Option<Handshake> {
let mut reader = packet.data.clone();
let protocol_version = VarInt::parse(&mut reader).await?;
let server_address = VarString::parse(&mut reader).await?;
let server_port = UShort::parse(&mut reader).await?;
let next_state = VarInt::parse(&mut reader).await?;
Some(Handshake {
protocol_version,
server_address,
@ -62,9 +60,9 @@ impl Handshake {
}
impl SendPacket for Handshake {
fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all)?;
stream.flush()?;
async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all).await?;
stream.flush().await?;
Ok(())
}
}

View file

@ -1,5 +1,7 @@
use std::io::Write;
use tokio::{io::AsyncWriteExt, net::TcpStream};
use crate::packets::{Packet, SendPacket};
/// id: 0x00
@ -14,9 +16,9 @@ impl StatusRequest {
}
impl SendPacket for StatusRequest {
fn send_packet(&self, stream: &mut std::net::TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all)?;
stream.flush()?;
async fn send_packet(&self, stream: &mut TcpStream) -> std::io::Result<()> {
stream.write_all(&self.all).await?;
stream.flush().await?;
Ok(())
}
}

View file

@ -1,8 +1,38 @@
use std::fmt::Display;
use tokio::{
io::{self, AsyncRead, AsyncReadExt},
net::TcpStream,
};
const SEGMENT_BITS: u8 = 0x7F;
const CONTINUE_BIT: u8 = 0x80;
pub trait TamiReader {
async fn next_byte(&mut self) -> io::Result<Option<u8>>;
}
impl TamiReader for TcpStream {
async fn next_byte(&mut self) -> io::Result<Option<u8>> {
let mut buf: [u8; 1] = [0];
let n = self.read(&mut buf).await?;
if n == 0 {
return Ok(None);
}
Ok(Some(buf[0]))
}
}
impl TamiReader for Vec<u8> {
async fn next_byte(&mut self) -> io::Result<Option<u8>> {
if self.is_empty() {
return Ok(None);
} else {
return Ok(Some(self.remove(0)));
}
}
}
#[derive(Debug)]
pub struct VarInt {
value: i32,
@ -28,14 +58,18 @@ impl VarInt {
pub fn move_data(self) -> Vec<u8> {
self.data
}
pub fn read<I>(data: &mut I) -> Option<i32>
pub async fn read<I>(data: &mut I) -> Option<i32>
where
I: Iterator<Item = u8>,
I: TamiReader,
{
let mut value: i32 = 0;
let mut position = 0;
for current_byte in data {
loop {
let current_byte = match data.next_byte().await {
Ok(x) => x?,
Err(e) => return None,
};
value |= ((current_byte & SEGMENT_BITS) as i32) << position;
if current_byte & CONTINUE_BIT == 0 {
@ -49,16 +83,16 @@ impl VarInt {
}
Some(value)
}
pub fn parse<I>(reader: &mut I) -> Option<VarInt>
pub async fn parse<I>(reader: &mut I) -> Option<VarInt>
where
I: Iterator<Item = u8>,
I: TamiReader,
{
let mut value: i32 = 0;
let mut position = 0;
let mut vec = Vec::new();
for current_byte in reader {
let current_byte = current_byte;
loop {
let current_byte = reader.next_byte().await.ok()??;
vec.push(current_byte);
value |= ((current_byte & SEGMENT_BITS) as i32) << position;
@ -121,14 +155,17 @@ impl VarString {
pub fn from(string: String) -> VarString {
VarString { value: string }
}
pub fn parse<I>(data: &mut I) -> Option<VarString>
pub async fn parse<I>(data: &mut I) -> Option<VarString>
where
I: Iterator<Item = u8>,
I: TamiReader,
{
let length = VarInt::read(data)?;
let length = VarInt::read(data).await?;
let mut vec = Vec::new();
for _ in 0..length {
vec.push(data.next()?);
vec.push(match data.next_byte().await {
Ok(x) => x?,
Err(e) => return None,
});
}
Some(VarString {
value: String::from_utf8(vec).ok()?,
@ -147,14 +184,20 @@ impl UShort {
pub fn get_data(&self) -> Vec<u8> {
self.data.clone()
}
pub fn parse<I>(data: &mut I) -> Option<UShort>
pub async fn parse<I>(data: &mut I) -> Option<UShort>
where
I: Iterator<Item = u8>,
I: TamiReader,
{
let mut vec = vec![data.next()?];
let mut vec = vec![match data.next_byte().await {
Ok(x) => x?,
Err(e) => return None,
}];
let mut int: u16 = vec[0] as u16;
int = int << 8;
vec.push(data.next()?);
vec.push(match data.next_byte().await {
Ok(x) => x?,
Err(_) => return None,
});
int |= vec[1] as u16;
Some(UShort {
value: int,