use std::sync::Arc; use packet::{ Event, Event_variants::{Answer, Candidate, Offer}, }; use tokio::io::AsyncWriteExt; use tracing::{error, info, info_span, trace, warn, Instrument}; use wtransport::endpoint::IncomingSession; use xtra::{Actor, Address, Handler, Mailbox}; use zerocopy::{FromZeros, IntoBytes}; use super::{ listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState, UuidString, }; pub struct ProximityPlayer { id: UuidString, send: wtransport::SendStream, connection: Arc, } impl ProximityPlayer { pub fn spawn(session: IncomingSession, manager: Address) { tokio::spawn( async move { trace!("proximity chat client connected"); let connection = Arc::new( session.await.expect("failed to acknowledge session").accept().await.expect("failed to accept session"), ); let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel"); trace!("getting peerjs uuid"); let mut id = UuidString::new_zeroed(); recv.read_exact(id.as_mut_bytes()).await.expect("failed to read uuid"); let span = info_span!("", %id); span.in_scope(|| trace!("uuid parsed")); let state = manager.send(RequestState).await.unwrap(); send.write_u8(state.len() as u8).await.expect("failed to write length"); for player in state.values() { trace!("sending player {player:?}"); send.write_all(player.as_bytes()).await.expect("failed to write player"); } { let listeners = listeners().read().await; send.write_u32_le(listeners.len() as u32).await.expect("failed to write peer length"); for (id, _) in listeners.iter() { send.write_all(id.as_bytes()).await.expect("failed to write peer id") } } let (address, mailbox) = Mailbox::unbounded(); listeners().write().await.insert(id, address.clone()); tokio::spawn({ let connection = connection.clone(); let address = address.clone(); async move { connection.closed().await; let _ = address.send(Stop).await; } .instrument(span.clone()) }); tokio::spawn( async move { loop { match packet::Event::deserialize(&mut recv).await { Ok(event) => { info!("deserialized event: {event:?}"); let _ = address.send(event).detach().await; } Err(error) => { error!("error while deserializing: {error:?}"); let _ = address.send(Stop).await; return; } }; } } .instrument(span.clone()), ); xtra::run(mailbox, ProximityPlayer { id, send, connection }).instrument(span).await; } .in_current_span(), ); } } impl Actor for ProximityPlayer { type Stop = (); async fn started(&mut self, _: &Mailbox) -> Result<(), Self::Stop> { for listener in listeners().write().await.iter() { if *listener.0 != self.id { let _ = listener .1 .send(PeerConnectionChanged { id: self.id, connected: true, }) .detach() .await; } } Ok(()) } async fn stopped(self) -> Self::Stop { listeners().write().await.remove(&self.id); for listener in listeners().write().await.iter() { if *listener.0 != self.id { let _ = listener .1 .send(PeerConnectionChanged { id: self.id, connected: false, }) .detach() .await; } } } } struct Stop; impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, _: Stop, ctx: &mut xtra::Context) -> Self::Return { info!("connection closed (stopped)"); ctx.stop_self(); } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Event, _: &mut xtra::Context) -> Self::Return { async fn send_event(id: UuidString, value: T) where ProximityPlayer: Handler, { if let Some(listener) = listeners().read().await.get(&id) { if let Err(error) = listener.send(value).detach().await { warn!("listener {id} is dead: {error}") } } else { warn!("sending offer to dead listener {id}"); } } match message { Event::Offer(offer) => { send_event( offer.id, Offer { id: self.id, sdp: offer.sdp, }, ) .await; } Event::Answer(answer) => { send_event( answer.id, Answer { id: self.id, sdp: answer.sdp, }, ) .await; } Event::Candidate(candidate) => { send_event( candidate.id, Candidate { id: self.id, candidate: candidate.candidate, }, ) .await; } } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Offer, _: &mut xtra::Context) -> Self::Return { Event::Offer(message).serialize(&mut self.send).await.expect("failed to write offer!"); } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Answer, _: &mut xtra::Context) -> Self::Return { Event::Answer(message).serialize(&mut self.send).await.expect("failed to write offer!"); } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Candidate, _: &mut xtra::Context) -> Self::Return { Event::Candidate(message).serialize(&mut self.send).await.expect("failed to write offer!"); } } struct PeerConnectionChanged { id: UuidString, connected: bool, } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PeerConnectionChanged, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::PeerConnectionChanged, data: packet::PeerConnectionChanged { id: message.id, connected: message.connected, }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PlayerConnected, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::Connected, data: packet::Connected { id: message.id as u32, name: message.name, }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PlayerDisconnected, ctx: &mut xtra::Context) -> Self::Return { warn!("todo: implement player disconnected"); let event = packet::Packet { kind: packet::Kind::Disconnected, data: packet::Disconnected { id: message.id as u32 }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PlayerMoved, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::Moved, data: packet::Moved { id: message.id as u32, position: message.position, }, }; if let Err(error) = self.connection.send_datagram(event.as_bytes()) { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: ChangedStage, ctx: &mut xtra::Context) -> Self::Return { warn!("todo: implement changed stage"); let event = packet::Packet { kind: packet::Kind::StageChanged, data: packet::StageChanged { id: message.id as u32, stage_name: message.stage, }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } pub mod packet { use std::string::String as StdString; use anyhow::bail; use newtype_enum::newtype_enum; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::info; use wtransport::{RecvStream, SendStream}; use zerocopy::{FromZeros, Immutable, IntoBytes}; use Event_variants::{Answer, Candidate, Offer}; use crate::{ packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE}, protocol::String, }; pub const HYP_UUID_SIZE: usize = 36; #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Packet { pub kind: Kind, pub data: T, } #[derive(IntoBytes, Immutable)] #[repr(u8)] pub enum Kind { Connected = 0, Disconnected = 1, Moved = 2, StageChanged = 3, PeerConnectionChanged = 4, Offer = 5, Answer = 6, Candidate = 7, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Connected { pub id: u32, pub name: String, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Disconnected { pub id: u32, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Moved { pub id: u32, pub position: [f32; 3], } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct StageChanged { pub id: u32, pub stage_name: String, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct PeerConnectionChanged { pub id: String, pub connected: bool, } #[newtype_enum] #[derive(Debug)] pub enum Event { Offer { pub id: String, pub sdp: StdString, }, Answer { pub id: String, pub sdp: StdString, }, Candidate { pub id: String, pub candidate: StdString, }, } impl Event { pub async fn serialize(self, send: &mut SendStream) -> anyhow::Result<()> { async fn write_string(send: &mut SendStream, string: StdString) -> anyhow::Result<()> { send.write_u32_le(string.len() as u32).await?; send.write_all(string.as_bytes()).await?; Ok(()) } info!("writing event: {self:?}"); match self { Event::Offer(offer) => { send.write_u8(Kind::Offer as u8).await?; send.write_all(offer.id.as_bytes()).await?; write_string(send, offer.sdp).await?; } Event::Answer(answer) => { send.write_u8(Kind::Answer as u8).await?; send.write_all(answer.id.as_bytes()).await?; write_string(send, answer.sdp).await?; } Event::Candidate(candidate) => { send.write_u8(Kind::Candidate as u8).await?; send.write_all(candidate.id.as_bytes()).await?; write_string(send, candidate.candidate).await?; } } Ok(()) } pub async fn deserialize(recv: &mut RecvStream) -> anyhow::Result { async fn read_string(recv: &mut RecvStream) -> anyhow::Result { let size = recv.read_u32_le().await?; info!("string size: {size}"); let mut data = vec![0; size as usize]; recv.read_exact(&mut data).await?; Ok(StdString::from_utf8(data)?) } async fn read_fixed_string(recv: &mut RecvStream) -> anyhow::Result> { let mut str = String::new_zeroed(); recv.read_exact(str.as_mut_bytes()).await?; info!("read id: {str}"); Ok(str) } let kind = recv.read_u8().await?; info!("reading kind: {kind}"); let event = match kind { 0 => Event::Offer(Offer { id: read_fixed_string(recv).await?, sdp: read_string(recv).await?, }), 1 => Event::Answer(Answer { id: read_fixed_string(recv).await?, sdp: read_string(recv).await?, }), 2 => Event::Candidate(Candidate { id: read_fixed_string(recv).await?, candidate: read_string(recv).await?, }), kind => bail!("invalid kind: {kind}"), }; Ok(event) } } }