Compare commits

...

2 commits

Author SHA1 Message Date
Aubrey 6708cc3e22
proximity server+ 2024-12-17 18:45:01 -06:00
Aubrey 1963549b45
proximity server beginnings 2024-12-17 18:29:22 -06:00
42 changed files with 2318 additions and 249 deletions

2
.gitignore vendored
View file

@ -1,2 +1,4 @@
/target
/.direnv
*.pem
*.p12

3
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,3 @@
{
"rust-analyzer.showUnlinkedFileNotification": false
}

1459
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,11 +4,18 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.94"
anyhow = { version = "1.0.94", features = ["backtrace"] }
crc32fast = "1.4.2"
glam = "0.29.2"
heapless = "0.8.0"
newtype-enum = { git = "https://github.com/mgjm/newtype-enum", version = "0.1.0" }
tokio = { version = "1.42.0", features = ["rt", "macros", "net", "sync", "rt-multi-thread", "io-util", "io-std"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
uuid = { version = "1.11.0", features = ["v4"] }
wtransport = "0.5.0"
xtra = { version = "0.6.0", features = ["macros", "tokio"] }
zerocopy = { version = "0.8.13", features = ["derive"] }
[dev-dependencies]
roead = "1.0.0"

13
examples/extract.rs Normal file
View file

@ -0,0 +1,13 @@
fn main() {
let stages = std::fs::read_dir("game/stages").unwrap();
for stage in stages {
let stage = stage.unwrap();
let bytes: Vec<u8> = std::fs::read(stage.path()).unwrap();
let data = roead::byml::Byml::from_binary(bytes).unwrap();
let map = data.into_map().unwrap();
let mtx: Vec<f32> =
map.get("ProjMatrix").unwrap().as_array().unwrap().into_iter().map(|a| a.as_float().unwrap()).collect();
let mat4 = glam::Mat4::from_cols_slice(&mtx);
println!("{:?}", mat4);
}
}

View file

@ -21,6 +21,7 @@
devShells.default = mkShell {
buildInputs = [
pkgs.fenix.stable.completeToolchain
cmake
pkg-config
openssl
];

BIN
game/StagePosList.byml Normal file

Binary file not shown.

BIN
game/WorldListFromDb.byml Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

1
p12.nu Normal file
View file

@ -0,0 +1 @@
openssl pkcs12 -export -in cert.pem -inkey key.pem -out server.p12

63
src/faker.rs Normal file
View file

@ -0,0 +1,63 @@
use glam::Vec3;
use xtra::{Actor, Address, Handler};
use crate::{
broadcast_packet,
packet::{
Packet, PacketData,
PacketData_variants::{Connect, Player},
},
protocol::String,
};
pub struct Faker {
pub address: Address<Faker>,
}
impl Actor for Faker {
type Stop = ();
async fn stopped(self) -> Self::Stop {}
}
impl Handler<Packet> for Faker {
type Return = ();
async fn handle(&mut self, message: Packet, _: &mut xtra::Context<Self>) -> Self::Return {
// trace!("got packet {message:?}");
match message.data {
PacketData::Connect(connect) => {
broadcast_packet(Packet {
user_id: 1,
udp: message.udp,
data: PacketData::Connect(Connect {
client_name: String::try_from("Bot").unwrap(),
..connect
}),
})
.await;
}
PacketData::Player(player) => {
let pos = Vec3::from_array(player.position);
broadcast_packet(Packet {
user_id: 1,
udp: message.udp,
data: PacketData::Player(Player {
position: (pos + Vec3::new(0., -150., 0.)).into(),
..player
}),
})
.await;
}
data => {
broadcast_packet(Packet {
user_id: 1,
udp: message.udp,
data,
})
.await;
}
}
}
}

0
src/game/mod.rs Normal file
View file

View file

@ -1,38 +1,104 @@
pub mod faker;
pub mod game;
pub mod packet;
pub mod player;
pub mod protocol;
pub mod server;
use std::{collections::HashMap, sync::LazyLock};
use std::{
collections::HashMap,
io::Cursor,
net::{SocketAddr, ToSocketAddrs},
sync::{Arc, LazyLock},
};
use faker::Faker;
use packet::{rw::read_packet, Packet};
use player::PlayerActor;
use tokio::{net::TcpListener, sync::RwLock};
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use xtra::Address;
use server::web_main;
use tokio::{
net::{TcpListener, UdpSocket},
sync::RwLock,
};
use tracing::{error, info, Level};
use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, util::SubscriberInitExt, Layer};
use xtra::{prelude::MessageChannel, Mailbox};
pub fn clients() -> &'static RwLock<HashMap<u128, Address<PlayerActor>>> {
static CLIENTS: LazyLock<RwLock<HashMap<u128, Address<PlayerActor>>>> = LazyLock::new(|| RwLock::default());
type PacketChannel = MessageChannel<Packet, (), xtra::refcount::Strong>;
pub fn clients() -> &'static RwLock<HashMap<u128, (PacketChannel, SocketAddr)>> {
static CLIENTS: LazyLock<RwLock<HashMap<u128, (PacketChannel, SocketAddr)>>> = LazyLock::new(|| RwLock::default());
&CLIENTS
}
pub async fn broadcast_packet(packet: Packet) {
for (id, (address, _)) in clients().read().await.iter() {
if *id != packet.user_id {
let _ = address.send(packet.clone()).detach().await.unwrap();
}
}
}
#[tokio::main]
async fn main() {
//.with(EnvFilter::from_default_env())
tracing_subscriber::registry().with(tracing_subscriber::fmt::layer()).init();
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(FilterFn::new(|meta| {
*meta.level() < Level::INFO || meta.module_path().unwrap().contains("smo_server")
})))
.init();
let tcp = TcpListener::bind("0.0.0.0:1027").await.unwrap();
let manager = web_main();
{
let (address, mailbox) = Mailbox::unbounded();
let address = xtra::spawn_tokio(
Faker {
address: address.clone(),
},
(address, mailbox),
);
clients().write().await.insert(
1,
(
MessageChannel::new(address),
"0.0.0.0:1027".to_socket_addrs().unwrap().next().unwrap(),
),
);
}
let socket = Arc::new(UdpSocket::bind("0.0.0.0:1027").await.unwrap());
tokio::spawn({
let socket = socket.clone();
async move {
loop {
let mut packet = [0u8; 256];
let (size, _addr) = socket.recv_from(&mut packet).await.unwrap();
// trace!("got packet from {_addr:?}");
match read_packet(&mut Cursor::new(&mut packet[..size]), true).await {
Ok(packet) => {
// info!("packet from udp {packet:?}");
if let Some(client) = clients().read().await.get(&packet.user_id) {
let _ = client.0.send(packet).detach().await;
}
}
Err(error) => {
error!("udp packet error: {error:?}");
}
};
}
}
});
info!("listening on port 1027");
loop {
match tcp.accept().await {
Ok((stream, addr)) => {
PlayerActor::new_connection(stream, addr);
PlayerActor::new_connection(stream, addr, socket.clone(), manager.clone());
}
Err(error) => {
error!("failed to handle connection: {error:?}");
}
}
}
// let socket = Arc::new(UdpSocket::bind("0.0.0.0:1027").await.unwrap());
// let mut packet = [0u8; 1024];
}

View file

@ -1,14 +1,9 @@
use core::str;
use std::{
ffi::CStr,
fmt::{Debug, Display},
};
pub mod rw;
use anyhow::{bail, Context};
use newtype_enum::newtype_enum;
use zerocopy::{
FromBytes, FromZeros, Immutable, IntoBytes, KnownLayout, Unaligned,
};
use zerocopy::{FromZeros, Immutable, IntoBytes, KnownLayout};
use crate::protocol::{Bool, String};
#[derive(Debug, Clone, Copy, FromZeros, IntoBytes, KnownLayout, Immutable)]
#[repr(C, packed)]
@ -35,24 +30,25 @@ pub enum PacketKind {
ChangeStage = 11,
Command = 12,
UdpInit = 13,
HolePunch = 14
HolePunch = 14,
}
const COSTUME_NAME_SIZE: usize = 0x20;
const CAP_ANIM_SIZE: usize = 0x30;
const STAGE_GAME_NAME_SIZE: usize = 0x40;
pub const STAGE_GAME_NAME_SIZE: usize = 0x40;
const STAGE_CHANGE_NAME_SIZE: usize = 0x30;
const STAGE_ID_SIZE: usize = 0x10;
const CLIENT_NAME_SIZE: usize = COSTUME_NAME_SIZE;
pub const CLIENT_NAME_SIZE: usize = COSTUME_NAME_SIZE;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Packet {
pub user_id: u128,
pub udp: bool,
pub data: PacketData,
}
#[newtype_enum]
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum PacketData {
Unknown(Vec<u8>),
#[derive(FromZeros, IntoBytes, KnownLayout, Immutable)]
@ -129,9 +125,9 @@ pub enum PacketData {
#[derive(FromZeros, IntoBytes, KnownLayout, Immutable)]
#[repr(C, packed)]
UdpInit {
port: u16
port: u16,
},
HolePunch
HolePunch,
}
pub enum TagUpdateBit {
@ -145,69 +141,3 @@ pub enum ConnectionKind {
New = 0,
Old = 1,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub struct Bool(u8);
impl Bool {
pub fn new(value: bool) -> Bool {
Bool(if value { 1 } else { 0 })
}
pub fn get(&self) -> bool {
self.0 != 0
}
}
impl From<bool> for Bool {
fn from(value: bool) -> Self {
Bool::new(value)
}
}
impl From<Bool> for bool {
fn from(value: Bool) -> Self {
value.get()
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub struct String<const N: usize>([u8; N]);
impl<const N: usize> String<N> {
pub fn to_str(&self) -> anyhow::Result<&str> {
let cstr = CStr::from_bytes_until_nul(&self.0).context("interpreting bytes as c-string")?;
cstr.to_str().context("verifying string has utf-8")
// let str = str::from_utf8(&self.0).context("verifying string has utf-8")?;
// Ok(str.trim_end_matches('\0'))
}
pub fn assert_valid(&self) -> anyhow::Result<()> {
self.to_str().map(drop)
}
}
impl<const N: usize> TryFrom<&str> for String<N> {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut buf = [0; N];
if value.len() > N {
bail!("seggs")
}
value.write_to_prefix(&mut buf).unwrap();
Ok(Self(buf))
}
}
impl<const N: usize> Display for String<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.to_str().expect("failed to parse string"), f)
}
}
impl<const N: usize> Debug for String<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self.to_str().expect("failed to parse string"), f)
}
}

114
src/packet/rw.rs Normal file
View file

@ -0,0 +1,114 @@
use anyhow::{bail, Context};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use zerocopy::{IntoBytes, TryFromBytes};
use crate::packet::{
Packet, PacketData,
PacketData_variants::{Command, Disconnect, HolePunch},
PacketHeader, PacketKind,
};
pub async fn read_packet<R: AsyncReadExt + Unpin>(reader: &mut R, udp: bool) -> anyhow::Result<Packet> {
let mut header = [0; size_of::<PacketHeader>()];
reader.read_exact(&mut header).await.context("reading data")?;
let Ok(header) = PacketHeader::try_read_from_bytes(&header) else {
bail!("parsing packet buffer")
};
macro_rules! read_data {
($read: expr, $size: expr, $ty: ident $(, $field:ident => $assert: expr)*) => {{
async fn read_data<R: AsyncReadExt + Unpin>(reader: &mut R, size: u16) -> anyhow::Result<PacketData> {
type T = crate::packet::PacketData_variants::$ty;
let size = size as usize;
if size < size_of::<T>() {
bail!("buffer too small for packet: expected {}, got {size}", size_of::<T>())
}
let mut data = [0; u16::MAX as usize];
reader.read_exact(&mut data[..size]).await.context("reading data")?;
let packet_data = match T::try_read_from_bytes(&data[..size_of::<T>()]) {
Ok(data) => data,
Err(error) => {
bail!(concat!("interpreting ", stringify!($ty), ": {:?}"), error)
}
};
$(
{
let $field = packet_data.$field;
$assert
}?;
)*
Ok(PacketData::$ty(packet_data))
}
type _FixIntellisense = crate::packet::PacketData_variants::$ty;
read_data($read, $size).await?
}};
}
let data: PacketData = match header.kind {
PacketKind::Unknown => {
let mut data = vec![0; header.size.into()];
reader.read_exact(&mut data).await.context("reading unknown data")?;
PacketData::Unknown(data)
}
PacketKind::Init => read_data!(reader, header.size, Init),
PacketKind::Player => read_data!(reader, header.size, Player),
PacketKind::Cap => read_data!(reader, header.size, Cap, anim => anim.assert_valid()),
PacketKind::Game => read_data!(reader, header.size, Game, stage => stage.assert_valid()),
PacketKind::Tag => read_data!(reader, header.size, Tag),
PacketKind::Connect => read_data!(reader, header.size, Connect),
PacketKind::Disconnect => PacketData::Disconnect(Disconnect),
PacketKind::Costume => read_data!(reader, header.size, Costume),
PacketKind::Shine => read_data!(reader, header.size, Shine),
PacketKind::Capture => read_data!(reader, header.size, Capture),
PacketKind::ChangeStage => read_data!(reader, header.size, ChangeStage),
PacketKind::Command => PacketData::Command(Command),
PacketKind::UdpInit => read_data!(reader, header.size, UdpInit),
PacketKind::HolePunch => PacketData::HolePunch(HolePunch),
};
Ok(Packet {
user_id: header.user_id,
udp,
data,
})
}
pub async fn write_packet<W: AsyncWriteExt + Unpin>(writer: &mut W, id: u128, data: PacketData) -> anyhow::Result<()> {
let (kind, slice) = match &data {
PacketData::Unknown(vec) => {
if vec.len() >= (256 as usize) {
bail!("unknown packet vec too large")
}
(PacketKind::Unknown, vec.as_slice())
}
PacketData::Init(init) => (PacketKind::Init, init.as_bytes()),
PacketData::Player(player) => (PacketKind::Player, player.as_bytes()),
PacketData::Cap(cap) => (PacketKind::Cap, cap.as_bytes()),
PacketData::Game(game) => (PacketKind::Game, game.as_bytes()),
PacketData::Tag(tag) => (PacketKind::Tag, tag.as_bytes()),
PacketData::Connect(connect) => (PacketKind::Connect, connect.as_bytes()),
PacketData::Disconnect(..) => (PacketKind::Disconnect, [].as_slice()),
PacketData::Costume(costume) => (PacketKind::Costume, costume.as_bytes()),
PacketData::Shine(shine) => (PacketKind::Shine, shine.as_bytes()),
PacketData::Capture(capture) => (PacketKind::Capture, capture.as_bytes()),
PacketData::ChangeStage(change_stage) => (PacketKind::ChangeStage, change_stage.as_bytes()),
PacketData::Command(..) => (PacketKind::Command, [].as_slice()),
PacketData::UdpInit(udp_init) => (PacketKind::UdpInit, udp_init.as_bytes()),
PacketData::HolePunch(..) => (PacketKind::HolePunch, [].as_slice()),
};
writer
.write_all(
PacketHeader {
kind,
size: slice.len() as u16,
user_id: id,
}
.as_bytes(),
)
.await
.context("writing header")?;
writer.write_all(slice).await.context("writing data")?;
writer.flush().await.context("flushing writer")
}

View file

@ -1,183 +1,231 @@
use std::{net::SocketAddr, time::Duration};
use std::{io::Cursor, net::SocketAddr, sync::Arc};
use anyhow::{bail, Context};
use glam::Vec3;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter},
net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpStream,
}, sync::mpsc, time::Instant,
io::{BufReader, BufWriter},
net::{TcpStream, UdpSocket},
sync::mpsc,
};
use tracing::{info, info_span, trace, Instrument};
use zerocopy::{IntoBytes, TryFromBytes};
use tracing::{error, info, info_span, trace, Instrument};
use xtra::{prelude::MessageChannel, scoped, Actor, Address, Handler, Mailbox};
use crate::packet::{
Packet, PacketData,
PacketData_variants::{Cap, Command, Connect, Disconnect, HolePunch, Init, Player},
PacketHeader, PacketKind, String,
use crate::{
broadcast_packet, clients,
packet::{
rw::{read_packet, write_packet},
ConnectionKind, Packet, PacketData,
PacketData_variants::{Connect, HolePunch, Init, UdpInit},
CLIENT_NAME_SIZE,
},
protocol::String,
server::{ChangedStage, Manager, PlayerConnected, PlayerMoved},
};
// #[derive(Actor)]
pub struct PlayerActor {
write_half: BufWriter<OwnedWriteHalf>,
id: u128,
connection_kind: ConnectionKind,
name: String<CLIENT_NAME_SIZE>,
write_sender: mpsc::UnboundedSender<WriteMessage>,
manager: Address<Manager>,
}
enum WriteMessage {
Data(Packet),
SetUdp(u16),
}
impl PlayerActor {
pub async fn read_packet(reader: &mut BufReader<OwnedReadHalf>) -> anyhow::Result<Packet> {
let mut header = [0; size_of::<PacketHeader>()];
reader.read_exact(&mut header).await.context("reading data")?;
let Ok(header) = PacketHeader::try_read_from_bytes(&header) else {
bail!("parsing packet buffer")
};
macro_rules! read_data {
($read: expr, $size: expr, $ty: ident $(, $field:ident => $assert: expr)*) => {{
async fn read_data(reader: &mut BufReader<OwnedReadHalf>, size: u16) -> anyhow::Result<PacketData> {
type T = crate::packet::PacketData_variants::$ty;
let size = size as usize;
if size < size_of::<T>() {
bail!("buffer too small for packet: expected {}, got {size}", size_of::<T>())
}
let mut data = [0; u16::MAX as usize];
reader.read_exact(&mut data[..size]).await.context("reading data")?;
let packet_data = match T::try_read_from_bytes(&data[..size_of::<T>()]) {
Ok(data) => data,
Err(error) => {
bail!(concat!("interpreting ", stringify!($ty), ": {:?}"), error)
}
};
$(
{
let $field = packet_data.$field;
$assert
}?;
)*
Ok(PacketData::$ty(packet_data))
}
type _FixIntellisense = crate::packet::PacketData_variants::$ty;
read_data($read, $size).await?
}};
}
let data: PacketData = match header.kind {
PacketKind::Unknown => {
let mut data = vec![0; header.size.into()];
reader.read_exact(&mut data).await.context("reading unknown data")?;
PacketData::Unknown(data)
}
PacketKind::Init => read_data!(reader, header.size, Init),
PacketKind::Player => read_data!(reader, header.size, Player),
PacketKind::Cap => read_data!(reader, header.size, Cap, anim => anim.assert_valid()),
PacketKind::Game => read_data!(reader, header.size, Game, stage => stage.assert_valid()),
PacketKind::Tag => read_data!(reader, header.size, Tag),
PacketKind::Connect => read_data!(reader, header.size, Connect),
PacketKind::Disconnect => PacketData::Disconnect(Disconnect),
PacketKind::Costume => read_data!(reader, header.size, Costume),
PacketKind::Shine => read_data!(reader, header.size, Shine),
PacketKind::Capture => read_data!(reader, header.size, Capture),
PacketKind::ChangeStage => read_data!(reader, header.size, ChangeStage),
PacketKind::Command => PacketData::Command(Command),
PacketKind::UdpInit => read_data!(reader, header.size, UdpInit),
PacketKind::HolePunch => PacketData::HolePunch(HolePunch),
};
Ok(Packet {
user_id: header.user_id,
data,
})
}
pub async fn write_packet(writer: &mut BufWriter<OwnedWriteHalf>, id: u128, data: PacketData) -> anyhow::Result<()> {
let (kind, slice) = match &data {
PacketData::Unknown(vec) => {
if vec.len() >= (u16::MAX as usize) {
bail!("unknown packet vec too large")
}
(PacketKind::Unknown, vec.as_slice())
}
PacketData::Init(init) => (PacketKind::Init, init.as_bytes()),
PacketData::Player(player) => (PacketKind::Player, player.as_bytes()),
PacketData::Cap(cap) => (PacketKind::Cap, cap.as_bytes()),
PacketData::Game(game) => (PacketKind::Game, game.as_bytes()),
PacketData::Tag(tag) => (PacketKind::Tag, tag.as_bytes()),
PacketData::Connect(connect) => (PacketKind::Connect, connect.as_bytes()),
PacketData::Disconnect(..) => (PacketKind::Disconnect, [].as_slice()),
PacketData::Costume(costume) => (PacketKind::Costume, costume.as_bytes()),
PacketData::Shine(shine) => (PacketKind::Shine, shine.as_bytes()),
PacketData::Capture(capture) => (PacketKind::Capture, capture.as_bytes()),
PacketData::ChangeStage(change_stage) => (PacketKind::ChangeStage, change_stage.as_bytes()),
PacketData::Command(..) => (PacketKind::Command, [].as_slice()),
PacketData::UdpInit(udp_init) => (PacketKind::UdpInit, udp_init.as_bytes()),
PacketData::HolePunch(..) => (PacketKind::HolePunch, [].as_slice()),
};
writer
.write_all(
PacketHeader {
kind,
size: slice.len() as u16,
user_id: id,
}
.as_bytes(),
)
.await
.context("writing header")?;
writer.write_all(slice).await.context("writing data")?;
writer.flush().await.context("flushing writer")
}
pub fn new_connection(stream: TcpStream, addr: SocketAddr) {
pub fn new_connection(stream: TcpStream, addr: SocketAddr, socket: Arc<UdpSocket>, manager: Address<Manager>) {
tokio::spawn(async move {
info!("accepted connection from {addr}");
let (reader, writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut writer = BufWriter::new(writer);
let packet = Self::read_packet(&mut reader).await.unwrap();
let packet = read_packet(&mut reader, false).await.unwrap();
let Packet {
user_id,
data: PacketData::Connect(connect),
..
} = packet
else {
tracing::error!("not a valid player!");
return;
};
let bot_id = user_id + 1;
Self::write_packet(&mut writer, user_id, PacketData::Init(Init { max_players: 8 }))
.await
.expect("msfrarausfhsdagsdgkog");
Self::write_packet(&mut writer, bot_id, PacketData::Connect(Connect { kind: crate::packet::ConnectionKind::New, client_name: String::try_from("Bot").unwrap(), max_player: 8 }))
write_packet(&mut writer, user_id, PacketData::Init(Init { max_players: 8 }))
.await
.expect("msfrarausfhsdagsdgkog");
let span = info_span!("", player = connect.client_name.to_string());
let span = info_span!("Client", player = connect.client_name.to_string());
let (address, mailbox) = Mailbox::bounded(512);
let (sender, mut receiver) = mpsc::unbounded_channel();
tokio::spawn(async move {
while let Some((instant, packet)) = receiver.recv().await {
tokio::time::sleep_until(instant).await;
Self::write_packet(&mut writer, bot_id, packet).await.unwrap();
}
}.instrument(span.clone()));
async move {
info!("connected {user_id}");
tokio::spawn(scoped(&address.downgrade(), {
let address = address.downgrade();
async move {
let ip = addr.ip();
let mut addr = None;
while let Some(message) = receiver.recv().await {
match message {
WriteMessage::Data(Packet { user_id, udp, data }) => {
// trace!("writing packet {udp:?}, {data:?}");
let res = if udp {
async {
let Some(addr) = addr else {
trace!("no address set yet");
return Ok(());
};
// trace!("sending udp packet: {data:?}");
loop {
let packet = Self::read_packet(&mut reader).await.unwrap();
trace!("got packet {packet:?}");
let sender = sender.clone();
tokio::spawn(async move {
let _ = sender.send((Instant::now().checked_add(Duration::from_millis(200)).unwrap(), match packet.data {
PacketData::Player(player) => PacketData::Player(Player {position: [player.position[0], -20., player.position[2]], ..player}),
PacketData::Cap(cap) => PacketData::Cap(Cap {position: [cap.position[0], -20., cap.position[2]], ..cap}),
pass => pass,
}));
});
let mut buf = [0; 256];
let mut writer = Cursor::new(buf.as_mut_slice());
write_packet(&mut writer, user_id, data).await?;
socket.send_to(&buf, &addr).await.expect("kys");
Ok(())
}
.await
} else {
write_packet(&mut writer, user_id, data).await
};
if let Err(error) = res {
error!("error while writing packet: {error:?}");
let _ = address.send(StopError).detach().await;
return;
}
}
WriteMessage::SetUdp(port) => {
trace!("set udp port, connected on udp!");
addr = Some(SocketAddr::new(ip, port))
}
}
}
}
}
.instrument(span.clone())
}));
tokio::spawn(scoped(
&address.downgrade(),
{
let address = address.downgrade();
async move {
info!("connected {user_id}");
loop {
match read_packet(&mut reader, false).await {
Ok(packet) => {
let _ = address.send(packet).detach().await;
}
Err(error) => {
error!("error while reading packet: {error:?}");
let _ = address.send(StopError).detach().await;
break;
}
}
}
}
}
.instrument(span.clone()),
));
clients().write().await.insert(user_id, (MessageChannel::new(address), addr));
xtra::run(
mailbox,
PlayerActor {
id: user_id,
connection_kind: connect.kind,
name: connect.client_name,
write_sender: sender,
manager,
},
)
.instrument(span)
.await;
});
}
}
impl Actor for PlayerActor {
type Stop = ();
async fn started(&mut self, _: &Mailbox<Self>) -> Result<(), Self::Stop> {
broadcast_packet(Packet {
user_id: self.id,
udp: false,
data: PacketData::Connect(Connect {
kind: self.connection_kind,
max_player: 8,
client_name: self.name,
}),
})
.await;
self
.write_sender
.send(WriteMessage::Data(Packet {
user_id: 0,
udp: false,
data: PacketData::UdpInit(UdpInit { port: 1027 }),
}))
.map_err(drop)?;
self
.manager
.send(PlayerConnected {
id: self.id,
name: self.name,
})
.await
.unwrap();
Ok(())
}
async fn stopped(self) -> Self::Stop {}
}
struct StopError;
impl Handler<StopError> for PlayerActor {
type Return = ();
async fn handle(&mut self, _: StopError, ctx: &mut xtra::Context<Self>) -> Self::Return {
ctx.stop_all();
}
}
impl Handler<Packet> for PlayerActor {
type Return = ();
async fn handle(&mut self, packet: Packet, _: &mut xtra::Context<Self>) {
if matches!(packet.data, PacketData::HolePunch(..)) {
trace!("hole puncher");
return;
}
if packet.user_id == self.id {
match packet.data {
PacketData::UdpInit(UdpInit { port }) => {
trace!("hole puncher");
let _ = self.write_sender.send(WriteMessage::SetUdp(port));
let _ = self.write_sender.send(WriteMessage::Data(Packet {
user_id: 0,
udp: true,
data: PacketData::HolePunch(HolePunch),
}));
return;
}
PacketData::Connect(..) | PacketData::Init(..) => {
trace!("suspicious packet sent: {packet:?}");
return;
}
PacketData::Player(ref player) => {
let _ = self.manager.send(PlayerMoved { id: self.id, position: Vec3::from_array(player.position)}).detach().await.unwrap();
}
PacketData::Game(ref game) => {
let _ = self.manager.send(ChangedStage { id: self.id, stage: game.stage}).detach().await.unwrap();
}
_ => {}
}
broadcast_packet(packet).await;
} else {
let _ = self.write_sender.send(WriteMessage::Data(packet));
}
}
}

76
src/protocol.rs Normal file
View file

@ -0,0 +1,76 @@
use std::{
ffi::CStr,
fmt::{Debug, Display},
};
use anyhow::{bail, Context};
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
#[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub struct Bool(u8);
impl Bool {
pub fn new(value: bool) -> Bool {
Bool(if value { 1 } else { 0 })
}
pub fn get(&self) -> bool {
self.0 != 0
}
}
impl From<bool> for Bool {
fn from(value: bool) -> Self {
Bool::new(value)
}
}
impl From<Bool> for bool {
fn from(value: Bool) -> Self {
value.get()
}
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub struct String<const N: usize>([u8; N]);
impl<const N: usize> String<N> {
pub fn as_str(&self) -> &str {
self.try_as_str().expect("wasn't a string")
}
pub fn try_as_str(&self) -> anyhow::Result<&str> {
let cstr = CStr::from_bytes_until_nul(&self.0).context("interpreting bytes as c-string")?;
cstr.to_str().context("verifying string has utf-8")
// let str = str::from_utf8(&self.0).context("verifying string has utf-8")?;
// Ok(str.trim_end_matches('\0'))
}
pub fn assert_valid(&self) -> anyhow::Result<()> {
self.try_as_str().map(drop)
}
}
impl<const N: usize> TryFrom<&str> for String<N> {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut buf = [0; N];
if value.len() > N {
bail!("seggs")
}
value.write_to_prefix(&mut buf).unwrap();
Ok(Self(buf))
}
}
impl<const N: usize> Display for String<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.try_as_str().expect("failed to parse string"), f)
}
}
impl<const N: usize> Debug for String<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self.try_as_str().expect("failed to parse string"), f)
}
}

170
src/server/mod.rs Normal file
View file

@ -0,0 +1,170 @@
mod packet;
mod prox;
use std::{collections::HashMap, sync::LazyLock};
use glam::Vec3;
use prox::ProximityPlayer;
use tokio::sync::RwLock;
use tracing::{error, info_span, Instrument};
use uuid::Uuid;
use wtransport::{Endpoint, Identity, ServerConfig};
use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::FromZeros;
use crate::{
packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE},
protocol::String,
};
fn listeners() -> &'static RwLock<HashMap<Uuid, Address<ProximityPlayer>>> {
static LISTENERS: LazyLock<RwLock<HashMap<Uuid, Address<ProximityPlayer>>>> = LazyLock::new(Default::default);
&LISTENERS
}
pub fn web_main() -> Address<Manager> {
let span = info_span!("wt");
let manager = xtra::spawn_tokio(
Manager {
players: HashMap::new(),
next_id: 0,
},
Mailbox::bounded(8),
);
tokio::spawn({
let manager = manager.clone();
async move {
if let Err(result) = webtransport_server(manager).await {
error!("{:?}", result);
}
}
.instrument(span)
});
manager
}
async fn webtransport_server(manager: Address<Manager>) -> anyhow::Result<()> {
// let identity = Identity::self_signed(["localhost", "127.0.0.1", "::1"]).unwrap();
let identity = Identity::load_pemfiles("./cert.pem", "./key.pem").await.unwrap();
let config = ServerConfig::builder().with_bind_default(4433).with_identity(identity).build();
let endpoint = Endpoint::server(config)?;
loop {
let connection = endpoint.accept().await;
ProximityPlayer::spawn(connection, manager.clone());
}
}
#[derive(Actor)]
pub struct Manager {
players: HashMap<u128, PlayerInstance>,
next_id: u32,
}
impl Manager {
pub async fn broadcast<M: Copy + Send + 'static>(&self, message: M)
where
ProximityPlayer: Handler<M>,
{
for player in listeners().read().await.values() {
let _ = player.send(message).detach().await;
}
}
}
#[derive(Debug, Clone, Copy)]
struct PlayerInstance {
id: u32,
name: String<CLIENT_NAME_SIZE>,
position: Vec3,
stage: String<STAGE_GAME_NAME_SIZE>,
}
struct RequestState;
impl Handler<RequestState> for Manager {
type Return = HashMap<u128, PlayerInstance>;
async fn handle(&mut self, _: RequestState, _: &mut xtra::Context<Self>) -> Self::Return {
self.players.clone()
}
}
pub struct PlayerConnected {
pub id: u128,
pub name: String<CLIENT_NAME_SIZE>,
}
impl Handler<PlayerConnected> for Manager {
type Return = ();
async fn handle(&mut self, message: PlayerConnected, _: &mut xtra::Context<Self>) -> Self::Return {
self.players.insert(
message.id,
PlayerInstance {
id: {
let id = self.next_id;
self.next_id += 1;
id
},
name: message.name,
position: Vec3::ZERO,
stage: String::new_zeroed(),
},
);
}
}
#[derive(Clone, Copy)]
pub struct PlayerDisconnected {
pub id: u128,
}
impl Handler<PlayerDisconnected> for Manager {
type Return = ();
async fn handle(&mut self, message: PlayerDisconnected, _: &mut xtra::Context<Self>) -> Self::Return {
if self.players.remove(&message.id).is_some() {
self.broadcast(message).await;
}
}
}
#[derive(Clone, Copy)]
pub struct PlayerMoved {
pub id: u128,
pub position: Vec3,
}
impl Handler<PlayerMoved> for Manager {
type Return = ();
async fn handle(&mut self, message: PlayerMoved, _: &mut xtra::Context<Self>) -> Self::Return {
if let Some(player) = self.players.get_mut(&message.id) {
player.position = message.position;
self.broadcast(message).await;
}
}
}
#[derive(Clone, Copy)]
pub struct ChangedStage {
pub id: u128,
pub stage: String<STAGE_GAME_NAME_SIZE>,
}
impl Handler<ChangedStage> for Manager {
type Return = ();
async fn handle(&mut self, message: ChangedStage, _: &mut xtra::Context<Self>) -> Self::Return {
if let Some(player) = self.players.get_mut(&message.id) {
player.stage = message.stage;
self.broadcast(message).await;
}
}
}

30
src/server/packet.rs Normal file
View file

@ -0,0 +1,30 @@
use zerocopy::{Immutable, IntoBytes};
use crate::{
packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE},
protocol::String,
};
use super::PlayerInstance;
#[allow(unused)]
#[derive(Debug, IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct HelloPlayer {
name: String<CLIENT_NAME_SIZE>,
id: u32,
position: [f32; 3],
stage: String<STAGE_GAME_NAME_SIZE>,
}
impl From<PlayerInstance> for HelloPlayer {
fn from(value: PlayerInstance) -> Self {
Self {
id: value.id,
name: value.name,
position: value.position.to_array(),
stage: value.stage,
}
}
}

91
src/server/prox.rs Normal file
View file

@ -0,0 +1,91 @@
use core::str;
use tokio::io::AsyncWriteExt;
use tracing::{info_span, trace, warn, Instrument};
use uuid::Uuid;
use wtransport::endpoint::IncomingSession;
use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::IntoBytes;
use super::{
listeners, packet::HelloPlayer, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState,
};
pub struct ProximityPlayer {
id: Uuid,
_send: wtransport::SendStream,
_connection: wtransport::Connection,
}
impl ProximityPlayer {
pub fn spawn(session: IncomingSession, manager: Address<Manager>) {
tokio::spawn(
async move {
trace!("proximity chat client connected");
let connection =
session.await.expect("failed to acknowledge session").accept().await.expect("failed to accept session");
let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel");
trace!("getting peerjs uuid");
let mut buffer = [0; 36];
recv.read_exact(buffer.as_mut_bytes()).await.expect("failed to read uuid");
let id = Uuid::parse_str(str::from_utf8(&buffer).expect("expected utf8")).expect("failed to parse uuid");
let span = info_span!("", %id);
span.in_scope(||trace!( "uuid parsed"));
let state = manager.send(RequestState).await.unwrap();
send.write_u8(state.len() as u8).await.expect("failed to write length");
for player in state.values() {
trace!("sending player {player:?}");
send.write_all(HelloPlayer::from(*player).as_bytes()).await.expect("failed to write player");
}
let (address, mailbox) = Mailbox::unbounded();
listeners().write().await.insert(id, address);
xtra::run(mailbox, ProximityPlayer { id, _send: send, _connection: connection }).instrument(span).await;
}
.in_current_span(),
);
}
}
impl Actor for ProximityPlayer {
type Stop = ();
async fn stopped(self) -> Self::Stop {
listeners().write().await.remove(&self.id);
}
}
impl Handler<PlayerConnected> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: PlayerConnected, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player connected")
}
}
impl Handler<PlayerDisconnected> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: PlayerDisconnected, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player disconnected")
}
}
impl Handler<PlayerMoved> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: PlayerMoved, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player moved")
}
}
impl Handler<ChangedStage> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: ChangedStage, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement changed stage")
}
}

5
todo.txt Normal file
View file

@ -0,0 +1,5 @@
players -> manager (accumulate player state)
manager -> proximity player (relay state changes)
proximity player -> web (relay state changes, signaling)
web -> proximity player (signaling)