before starting a new client

This commit is contained in:
Aubrey 2024-12-18 15:34:50 -06:00
parent 6708cc3e22
commit a129a11eed
No known key found for this signature in database
10 changed files with 1783 additions and 122 deletions

1461
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,3 +19,6 @@ zerocopy = { version = "0.8.13", features = ["derive"] }
[dev-dependencies]
roead = "1.0.0"
[workspace]
members = ["proximity-client"]

View file

@ -0,0 +1,10 @@
[package]
name = "proximity-client"
version = "0.1.0"
edition = "2021"
[dependencies]
cpal = "0.15.3"
tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util", "fs"] }
webrtc = "0.12.0"
webtransport = "0.0.0"

View file

@ -0,0 +1,17 @@
use webrtc::{api::media_engine::MediaEngine, interceptor::registry::Registry, peer_connection::configuration::RTCConfiguration};
fn main() {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new().with_media_engine(m).with_interceptor_registry(registry).build();
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
}

View file

@ -2,12 +2,10 @@ use glam::Vec3;
use xtra::{Actor, Address, Handler};
use crate::{
broadcast_packet,
packet::{
Packet, PacketData,
PacketData_variants::{Connect, Player},
},
protocol::String,
}, player::broadcast_packet, protocol::String
};
pub struct Faker {

View file

@ -15,14 +15,14 @@ use std::{
use faker::Faker;
use packet::{rw::read_packet, Packet};
use player::PlayerActor;
use server::web_main;
use server::{web_main, Manager};
use tokio::{
net::{TcpListener, UdpSocket},
sync::RwLock,
};
use tracing::{error, info, Level};
use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, util::SubscriberInitExt, Layer};
use xtra::{prelude::MessageChannel, Mailbox};
use xtra::{prelude::MessageChannel, Address, Mailbox};
type PacketChannel = MessageChannel<Packet, (), xtra::refcount::Strong>;
pub fn clients() -> &'static RwLock<HashMap<u128, (PacketChannel, SocketAddr)>> {
@ -31,14 +31,14 @@ pub fn clients() -> &'static RwLock<HashMap<u128, (PacketChannel, SocketAddr)>>
&CLIENTS
}
pub async fn broadcast_packet(packet: Packet) {
for (id, (address, _)) in clients().read().await.iter() {
if *id != packet.user_id {
let _ = address.send(packet.clone()).detach().await.unwrap();
}
}
pub fn manager() -> &'static Address<Manager> {
static MANAGER: LazyLock<Address<Manager>> = LazyLock::new(web_main);
&MANAGER
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
@ -48,8 +48,6 @@ async fn main() {
.init();
let tcp = TcpListener::bind("0.0.0.0:1027").await.unwrap();
let manager = web_main();
{
let (address, mailbox) = Mailbox::unbounded();
let address = xtra::spawn_tokio(
@ -94,7 +92,7 @@ async fn main() {
loop {
match tcp.accept().await {
Ok((stream, addr)) => {
PlayerActor::new_connection(stream, addr, socket.clone(), manager.clone());
PlayerActor::new_connection(stream, addr, socket.clone());
}
Err(error) => {
error!("failed to handle connection: {error:?}");

View file

@ -1,16 +1,15 @@
use std::{io::Cursor, net::SocketAddr, sync::Arc};
use glam::Vec3;
use tokio::{
io::{BufReader, BufWriter},
net::{TcpStream, UdpSocket},
sync::mpsc,
};
use tracing::{error, info, info_span, trace, Instrument};
use xtra::{prelude::MessageChannel, scoped, Actor, Address, Handler, Mailbox};
use xtra::{prelude::MessageChannel, scoped, Actor, Handler, Mailbox};
use crate::{
broadcast_packet, clients,
clients, manager,
packet::{
rw::{read_packet, write_packet},
ConnectionKind, Packet, PacketData,
@ -18,15 +17,58 @@ use crate::{
CLIENT_NAME_SIZE,
},
protocol::String,
server::{ChangedStage, Manager, PlayerConnected, PlayerMoved},
server::{ChangedStage, PlayerConnected, PlayerDisconnected, PlayerMoved},
};
pub async fn broadcast_packet(packet: Packet) {
for (id, (address, _)) in clients().read().await.iter() {
if *id != packet.user_id {
let _ = address.send(packet.clone()).detach().await.unwrap();
}
}
match packet.data {
PacketData::Connect(connect) => {
manager()
.send(PlayerConnected {
id: packet.user_id,
name: connect.client_name,
})
.await
.unwrap();
}
PacketData::Disconnect(..) => {
manager().send(PlayerDisconnected { id: packet.user_id }).await.unwrap();
}
PacketData::Player(player) => {
let _ = manager()
.send(PlayerMoved {
id: packet.user_id,
position: player.position,
})
.detach()
.await
.unwrap();
}
PacketData::Game(game) => {
let _ = manager()
.send(ChangedStage {
id: packet.user_id,
stage: game.stage,
})
.detach()
.await
.unwrap();
}
_ => {}
}
}
pub struct PlayerActor {
id: u128,
connection_kind: ConnectionKind,
name: String<CLIENT_NAME_SIZE>,
write_sender: mpsc::UnboundedSender<WriteMessage>,
manager: Address<Manager>,
}
enum WriteMessage {
@ -35,7 +77,7 @@ enum WriteMessage {
}
impl PlayerActor {
pub fn new_connection(stream: TcpStream, addr: SocketAddr, socket: Arc<UdpSocket>, manager: Address<Manager>) {
pub fn new_connection(stream: TcpStream, addr: SocketAddr, socket: Arc<UdpSocket>) {
tokio::spawn(async move {
info!("accepted connection from {addr}");
let (reader, writer) = stream.into_split();
@ -135,7 +177,6 @@ impl PlayerActor {
connection_kind: connect.kind,
name: connect.client_name,
write_sender: sender,
manager,
},
)
.instrument(span)
@ -167,14 +208,6 @@ impl Actor for PlayerActor {
}))
.map_err(drop)?;
self
.manager
.send(PlayerConnected {
id: self.id,
name: self.name,
})
.await
.unwrap();
Ok(())
}
async fn stopped(self) -> Self::Stop {}
@ -213,12 +246,6 @@ impl Handler<Packet> for PlayerActor {
trace!("suspicious packet sent: {packet:?}");
return;
}
PacketData::Player(ref player) => {
let _ = self.manager.send(PlayerMoved { id: self.id, position: Vec3::from_array(player.position)}).detach().await.unwrap();
}
PacketData::Game(ref game) => {
let _ = self.manager.send(ChangedStage { id: self.id, stage: game.stage}).detach().await.unwrap();
}
_ => {}
}

View file

@ -1,16 +1,14 @@
mod packet;
mod prox;
use std::{collections::HashMap, sync::LazyLock};
use glam::Vec3;
use prox::ProximityPlayer;
use tokio::sync::RwLock;
use tracing::{error, info_span, Instrument};
use uuid::Uuid;
use wtransport::{Endpoint, Identity, ServerConfig};
use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::FromZeros;
use zerocopy::{FromZeros, Immutable, IntoBytes};
use crate::{
packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE},
@ -77,12 +75,13 @@ impl Manager {
}
}
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, IntoBytes, Immutable)]
#[repr(C, packed)]
struct PlayerInstance {
id: u32,
name: String<CLIENT_NAME_SIZE>,
position: Vec3,
stage: String<STAGE_GAME_NAME_SIZE>,
position: [f32; 3],
}
struct RequestState;
@ -94,6 +93,8 @@ impl Handler<RequestState> for Manager {
}
}
#[derive(Debug, Clone, Copy, IntoBytes, Immutable)]
#[repr(C)]
pub struct PlayerConnected {
pub id: u128,
pub name: String<CLIENT_NAME_SIZE>,
@ -103,23 +104,26 @@ impl Handler<PlayerConnected> for Manager {
type Return = ();
async fn handle(&mut self, message: PlayerConnected, _: &mut xtra::Context<Self>) -> Self::Return {
self.players.insert(
message.id,
PlayerInstance {
id: {
let id = {
let id = self.next_id;
self.next_id += 1;
id
},
};
self.players.insert(
message.id,
PlayerInstance {
id,
name: message.name,
position: Vec3::ZERO,
position: [0.0; 3],
stage: String::new_zeroed(),
},
);
self.broadcast(PlayerConnected { id: id as u128, name: message.name}).await;
}
}
#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy, IntoBytes, Immutable)]
#[repr(C)]
pub struct PlayerDisconnected {
pub id: u128,
}
@ -128,31 +132,39 @@ impl Handler<PlayerDisconnected> for Manager {
type Return = ();
async fn handle(&mut self, message: PlayerDisconnected, _: &mut xtra::Context<Self>) -> Self::Return {
if self.players.remove(&message.id).is_some() {
self.broadcast(message).await;
if let Some(player) = self.players.remove(&message.id) {
self.broadcast(PlayerDisconnected { id: player.id as u128 }).await;
}
}
}
#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy, IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct PlayerMoved {
pub id: u128,
pub position: Vec3,
pub position: [f32; 3],
}
impl Handler<PlayerMoved> for Manager {
type Return = ();
async fn handle(&mut self, message: PlayerMoved, _: &mut xtra::Context<Self>) -> Self::Return {
if let Some(player) = self.players.get_mut(&message.id) {
let id = message.id;
if let Some(player) = self.players.get_mut(&id) {
player.position = message.position;
self.broadcast(message).await;
let id = player.id;
self
.broadcast(PlayerMoved {
id: id as u128,
..message
})
.await;
}
}
}
#[derive(Clone, Copy)]
#[derive(Debug, Clone, Copy, IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct ChangedStage {
pub id: u128,
pub stage: String<STAGE_GAME_NAME_SIZE>,
@ -162,9 +174,16 @@ impl Handler<ChangedStage> for Manager {
type Return = ();
async fn handle(&mut self, message: ChangedStage, _: &mut xtra::Context<Self>) -> Self::Return {
if let Some(player) = self.players.get_mut(&message.id) {
let id = message.id;
if let Some(player) = self.players.get_mut(&id) {
player.stage = message.stage;
self.broadcast(message).await;
let id = player.id;
self
.broadcast(ChangedStage {
id: id as u128,
..message
})
.await;
}
}
}

View file

@ -1,30 +0,0 @@
use zerocopy::{Immutable, IntoBytes};
use crate::{
packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE},
protocol::String,
};
use super::PlayerInstance;
#[allow(unused)]
#[derive(Debug, IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct HelloPlayer {
name: String<CLIENT_NAME_SIZE>,
id: u32,
position: [f32; 3],
stage: String<STAGE_GAME_NAME_SIZE>,
}
impl From<PlayerInstance> for HelloPlayer {
fn from(value: PlayerInstance) -> Self {
Self {
id: value.id,
name: value.name,
position: value.position.to_array(),
stage: value.stage,
}
}
}

View file

@ -1,20 +1,21 @@
use core::str;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tracing::{info_span, trace, warn, Instrument};
use tracing::{error, info, info_span, trace, warn, Instrument};
use uuid::Uuid;
use wtransport::endpoint::IncomingSession;
use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::IntoBytes;
use zerocopy::{FromZeros, IntoBytes};
use super::{
listeners, packet::HelloPlayer, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState,
};
use crate::protocol::String;
use super::{listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState};
pub struct ProximityPlayer {
id: Uuid,
_send: wtransport::SendStream,
_connection: wtransport::Connection,
send: wtransport::SendStream,
connection: Arc<wtransport::Connection>,
}
impl ProximityPlayer {
@ -22,8 +23,9 @@ impl ProximityPlayer {
tokio::spawn(
async move {
trace!("proximity chat client connected");
let connection =
session.await.expect("failed to acknowledge session").accept().await.expect("failed to accept session");
let connection = Arc::new(
session.await.expect("failed to acknowledge session").accept().await.expect("failed to accept session"),
);
let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel");
trace!("getting peerjs uuid");
@ -31,19 +33,38 @@ impl ProximityPlayer {
recv.read_exact(buffer.as_mut_bytes()).await.expect("failed to read uuid");
let id = Uuid::parse_str(str::from_utf8(&buffer).expect("expected utf8")).expect("failed to parse uuid");
let span = info_span!("", %id);
span.in_scope(||trace!( "uuid parsed"));
span.in_scope(|| trace!("uuid parsed"));
let state = manager.send(RequestState).await.unwrap();
send.write_u8(state.len() as u8).await.expect("failed to write length");
for player in state.values() {
trace!("sending player {player:?}");
send.write_all(HelloPlayer::from(*player).as_bytes()).await.expect("failed to write player");
send.write_all(player.as_bytes()).await.expect("failed to write player");
}
{
let listeners = listeners().read().await;
send.write_u32_le(listeners.len() as u32).await.expect("failed to write peer length");
for (id, _) in listeners.iter() {
let mut str = String::<36>::new_zeroed();
id.as_hyphenated().encode_lower(str.as_mut_bytes());
send.write_all(str.as_bytes()).await.expect("failed to write peer id")
}
}
let (address, mailbox) = Mailbox::unbounded();
listeners().write().await.insert(id, address);
xtra::run(mailbox, ProximityPlayer { id, _send: send, _connection: connection }).instrument(span).await;
listeners().write().await.insert(id, address.clone());
tokio::spawn({
let connection = connection.clone();
async move {
connection.closed().await;
let _ = address.send(Stop).await;
}
.instrument(span.clone())
});
xtra::run(mailbox, ProximityPlayer { id, send, connection }).instrument(span).await;
}
.in_current_span(),
);
@ -53,39 +74,204 @@ impl ProximityPlayer {
impl Actor for ProximityPlayer {
type Stop = ();
async fn started(&mut self, _: &Mailbox<Self>) -> Result<(), Self::Stop> {
for listener in listeners().write().await.iter() {
if *listener.0 != self.id {
let _ = listener
.1
.send(PeerConnectionChanged {
id: self.id,
connected: true,
})
.detach()
.await;
}
}
Ok(())
}
async fn stopped(self) -> Self::Stop {
listeners().write().await.remove(&self.id);
for listener in listeners().write().await.iter() {
if *listener.0 != self.id {
let _ = listener
.1
.send(PeerConnectionChanged {
id: self.id,
connected: false,
})
.detach()
.await;
}
}
}
}
struct Stop;
impl Handler<Stop> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _: Stop, ctx: &mut xtra::Context<Self>) -> Self::Return {
info!("connection closed (stopped)");
ctx.stop_self();
}
}
struct PeerConnectionChanged {
id: Uuid,
connected: bool,
}
impl Handler<PeerConnectionChanged> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, message: PeerConnectionChanged, ctx: &mut xtra::Context<Self>) -> Self::Return {
let mut id = String::new_zeroed();
message.id.hyphenated().encode_lower(id.as_mut_bytes());
let event = packet::Packet {
kind: packet::Kind::PeerConnectionChanged,
data: packet::PeerConnectionChanged {
id,
connected: message.connected,
},
};
if let Err(error) = self.send.write_all(event.as_bytes()).await {
error!("error while sending player move {error}");
ctx.stop_self();
}
}
}
impl Handler<PlayerConnected> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: PlayerConnected, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player connected")
async fn handle(&mut self, message: PlayerConnected, ctx: &mut xtra::Context<Self>) -> Self::Return {
let event = packet::Packet {
kind: packet::Kind::Connected,
data: packet::Connected {
id: message.id as u32,
name: message.name,
},
};
if let Err(error) = self.send.write_all(event.as_bytes()).await {
error!("error while sending player move {error}");
ctx.stop_self();
}
}
}
impl Handler<PlayerDisconnected> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: PlayerDisconnected, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player disconnected")
async fn handle(&mut self, message: PlayerDisconnected, ctx: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player disconnected");
let event = packet::Packet {
kind: packet::Kind::Disconnected,
data: packet::Disconnected { id: message.id as u32 },
};
if let Err(error) = self.send.write_all(event.as_bytes()).await {
error!("error while sending player move {error}");
ctx.stop_self();
}
}
}
impl Handler<PlayerMoved> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: PlayerMoved, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement player moved")
async fn handle(&mut self, message: PlayerMoved, ctx: &mut xtra::Context<Self>) -> Self::Return {
let event = packet::Packet {
kind: packet::Kind::Moved,
data: packet::Moved {
id: message.id as u32,
position: message.position,
},
};
if let Err(error) = self.connection.send_datagram(event.as_bytes()) {
error!("error while sending player move {error}");
ctx.stop_self();
}
}
}
impl Handler<ChangedStage> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, _message: ChangedStage, _: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement changed stage")
async fn handle(&mut self, message: ChangedStage, ctx: &mut xtra::Context<Self>) -> Self::Return {
warn!("todo: implement changed stage");
let event = packet::Packet {
kind: packet::Kind::StageChanged,
data: packet::StageChanged {
id: message.id as u32,
stage_name: message.stage,
},
};
if let Err(error) = self.send.write_all(event.as_bytes()).await {
error!("error while sending player move {error}");
ctx.stop_self();
}
}
}
pub mod packet {
use zerocopy::{Immutable, IntoBytes};
use crate::{
packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE},
protocol::String,
};
#[derive(IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct Packet<T> {
pub kind: Kind,
pub data: T,
}
#[derive(IntoBytes, Immutable)]
#[repr(u8)]
pub enum Kind {
Connected = 0,
Disconnected = 1,
Moved = 2,
StageChanged = 3,
PeerConnectionChanged = 4,
}
#[derive(IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct Connected {
pub id: u32,
pub name: String<CLIENT_NAME_SIZE>,
}
#[derive(IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct Disconnected {
pub id: u32,
}
#[derive(IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct Moved {
pub id: u32,
pub position: [f32; 3],
}
#[derive(IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct StageChanged {
pub id: u32,
pub stage_name: String<STAGE_GAME_NAME_SIZE>,
}
#[derive(IntoBytes, Immutable)]
#[repr(C, packed)]
pub struct PeerConnectionChanged {
pub id: String<36>,
pub connected: bool,
}
}