use std::sync::Arc; use packet::{ Event, Event_variants::{Answer, Candidate, Offer}, }; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{error, info, info_span, trace, warn, Instrument}; use wtransport::endpoint::IncomingSession; use xtra::{Actor, Address, Handler, Mailbox}; use zerocopy::{FromZeros, IntoBytes}; use crate::{packet::CLIENT_NAME_SIZE, protocol::String}; use super::{ listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState, UuidString, }; pub struct ProximityPlayer { id: UuidString, name: String, global_speak: bool, send: wtransport::SendStream, connection: Arc, } impl ProximityPlayer { pub fn spawn(session: IncomingSession, manager: Address) { tokio::spawn( async move { trace!("proximity chat client connected"); let connection = Arc::new( session.await.expect("failed to acknowledge session").accept().await.expect("failed to accept session"), ); let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel"); let mut id = UuidString::new_zeroed(); recv.read_exact(id.as_mut_bytes()).await.expect("failed to read uuid"); let mut name = String::new_zeroed(); recv.read_exact(name.as_mut_bytes()).await.expect("failed to read name"); let global_speak = recv.read_u8().await.expect("failed to read global speak state") != 0; let span = info_span!("", %id); span.in_scope(|| trace!("uuid parsed")); info!(parent: &span, "connected as {name}"); let state = manager.send(RequestState).await.unwrap(); send.write_u8(state.len() as u8).await.expect("failed to write length"); for player in state.values() { trace!(parent: &span, "sending player {player:?}"); send.write_all(player.as_bytes()).await.expect("failed to write player"); } { let listeners = listeners().read().await; send.write_u32_le(listeners.len() as u32).await.expect("failed to write peer length"); for (id, listener) in listeners.iter() { if let Ok((name, global_speak)) = listener.send(GetStartInfo).await { send.write_all(id.as_bytes()).await.expect("failed to write peer id"); send.write_all(name.as_bytes()).await.expect("failed to write peer name"); send.write_all(&[if global_speak { 1 } else { 0 }]).await.expect("failed to write peer global speaking status"); } } } let (address, mailbox) = Mailbox::unbounded(); listeners().write().await.insert(id, address.clone()); tokio::spawn({ let connection = connection.clone(); let address = address.clone(); async move { connection.closed().await; let _ = address.send(Stop).await; } .instrument(span.clone()) }); tokio::spawn( async move { loop { match packet::Event::deserialize(&mut recv).await { Ok(event) => { let _ = address.send(event).detach().await; } Err(error) => { error!("error while deserializing: {error:?}"); let _ = address.send(Stop).await; return; } }; } } .instrument(span.clone()), ); xtra::run( mailbox, ProximityPlayer { id, name, global_speak, send, connection, }, ) .instrument(span) .await; } .in_current_span(), ); } } impl Actor for ProximityPlayer { type Stop = (); async fn started(&mut self, _: &Mailbox) -> Result<(), Self::Stop> { let changed_event = packet::Packet { kind: packet::Kind::PeerConnectionChanged, data: packet::PeerConnectionChanged { id: FromZeros::new_zeroed(), connected: true, name: FromZeros::new_zeroed(), global_speak: false }, }; for (id, listener) in listeners().write().await.iter() { if *id != self.id { if let Ok(Ok((name, global_speak))) = listener .send(PeerConnectionChanged { id: self.id, connected: true, name: self.name, global_speak: self.global_speak, }) .await { let personalized = packet::Packet { data: packet::PeerConnectionChanged { id: *id, name, global_speak, ..changed_event.data }, ..changed_event }; if let Err(error) = self.send.write_all(personalized.as_bytes()).await { error!("error while sending peer info {error}"); return Err(()); } } } } Ok(()) } async fn stopped(self) -> Self::Stop { listeners().write().await.remove(&self.id); for listener in listeners().write().await.iter() { if *listener.0 != self.id { let _ = listener .1 .send(PeerConnectionChanged { id: self.id, connected: false, name: self.name, global_speak: false, }) .detach() .await; } } } } struct Stop; impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, _: Stop, ctx: &mut xtra::Context) -> Self::Return { info!("connection closed (stopped)"); ctx.stop_self(); } } struct GetStartInfo; impl Handler for ProximityPlayer { type Return = (String, bool); async fn handle(&mut self, _: GetStartInfo, _: &mut xtra::Context) -> Self::Return { (self.name, self.global_speak) } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Event, _: &mut xtra::Context) -> Self::Return { async fn send_event(id: UuidString, value: T) where ProximityPlayer: Handler, { if let Some(listener) = listeners().read().await.get(&id) { if let Err(error) = listener.send(value).detach().await { warn!("listener {id} is dead: {error}") } } else { warn!("sending offer to dead listener {id}"); } } match message { Event::Offer(offer) => { send_event( offer.id, Offer { id: self.id, sdp: offer.sdp, }, ) .await; } Event::Answer(answer) => { send_event( answer.id, Answer { id: self.id, sdp: answer.sdp, }, ) .await; } Event::Candidate(candidate) => { send_event( candidate.id, Candidate { id: self.id, candidate: candidate.candidate, }, ) .await; } Event::TargetChanged(changed) => { self.name = changed.name; } } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Offer, ctx: &mut xtra::Context) -> Self::Return { if let Err(error) = Event::Offer(message).serialize(&mut self.send).await { error!("error while sending offer {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Answer, ctx: &mut xtra::Context) -> Self::Return { if let Err(error) = Event::Answer(message).serialize(&mut self.send).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: Candidate, ctx: &mut xtra::Context) -> Self::Return { if let Err(error) = Event::Candidate(message).serialize(&mut self.send).await { error!("error while sending candidate {error}"); ctx.stop_self(); } } } struct PeerConnectionChanged { id: UuidString, connected: bool, name: String, global_speak: bool, } impl Handler for ProximityPlayer { type Return = Result<(String, bool), ()>; async fn handle(&mut self, message: PeerConnectionChanged, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::PeerConnectionChanged, data: packet::PeerConnectionChanged { id: message.id, connected: message.connected, name: message.name, global_speak: message.global_speak }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); return Err(()); } Ok((self.name, self.global_speak)) } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PlayerConnected, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::Connected, data: packet::Connected { id: message.id as u32, name: message.name, }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PlayerDisconnected, ctx: &mut xtra::Context) -> Self::Return { warn!("todo: implement player disconnected"); let event = packet::Packet { kind: packet::Kind::Disconnected, data: packet::Disconnected { id: message.id as u32 }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PlayerMoved, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::Moved, data: packet::Moved { id: message.id as u32, position: message.position, }, }; if let Err(error) = self.connection.send_datagram(event.as_bytes()) { error!("error while sending player move {error}"); ctx.stop_self(); } } } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: ChangedStage, ctx: &mut xtra::Context) -> Self::Return { let event = packet::Packet { kind: packet::Kind::StageChanged, data: packet::StageChanged { id: message.id as u32, stage_name: message.stage, }, }; if let Err(error) = self.send.write_all(event.as_bytes()).await { error!("error while sending player move {error}"); ctx.stop_self(); } } } pub mod packet { use std::string::String as StdString; use anyhow::bail; use newtype_enum::newtype_enum; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use wtransport::{RecvStream, SendStream}; use zerocopy::{FromZeros, Immutable, IntoBytes}; use Event_variants::{Answer, Candidate, Offer, TargetChanged}; use crate::{ packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE}, protocol::String, server::UuidString, }; pub const HYP_UUID_SIZE: usize = 36; #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Packet { pub kind: Kind, pub data: T, } #[derive(Clone, Copy, IntoBytes, Immutable)] #[repr(u8)] pub enum Kind { Connected = 0, Disconnected = 1, Moved = 2, StageChanged = 3, PeerConnectionChanged = 4, Offer = 5, Answer = 6, Candidate = 7, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Connected { pub id: u32, pub name: String, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Disconnected { pub id: u32, } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Moved { pub id: u32, pub position: [f32; 3], } #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct StageChanged { pub id: u32, pub stage_name: String, } #[derive(Clone, Copy, IntoBytes, Immutable)] #[repr(C, packed)] pub struct PeerConnectionChanged { pub id: UuidString, pub connected: bool, pub name: String, pub global_speak: bool, } #[newtype_enum] #[derive(Debug)] pub enum Event { Offer { pub id: UuidString, pub sdp: StdString, }, Answer { pub id: UuidString, pub sdp: StdString, }, Candidate { pub id: UuidString, pub candidate: StdString, }, TargetChanged { pub name: String, pub global_speak: bool, }, } impl Event { pub async fn serialize(self, send: &mut SendStream) -> anyhow::Result<()> { async fn write_string(send: &mut SendStream, string: StdString) -> anyhow::Result<()> { send.write_u32_le(string.len() as u32).await?; send.write_all(string.as_bytes()).await?; Ok(()) } match self { Self::Offer(offer) => { send.write_u8(Kind::Offer as u8).await?; send.write_all(offer.id.as_bytes()).await?; write_string(send, offer.sdp).await?; } Self::Answer(answer) => { send.write_u8(Kind::Answer as u8).await?; send.write_all(answer.id.as_bytes()).await?; write_string(send, answer.sdp).await?; } Self::Candidate(candidate) => { send.write_u8(Kind::Candidate as u8).await?; send.write_all(candidate.id.as_bytes()).await?; write_string(send, candidate.candidate).await?; } Self::TargetChanged(_) => unimplemented!(), } Ok(()) } pub async fn deserialize(recv: &mut RecvStream) -> anyhow::Result { async fn read_string(recv: &mut RecvStream) -> anyhow::Result { let size = recv.read_u32_le().await?; let mut data = vec![0; size as usize]; recv.read_exact(&mut data).await?; Ok(StdString::from_utf8(data)?) } async fn read_fixed_string(recv: &mut RecvStream) -> anyhow::Result> { let mut str = String::new_zeroed(); recv.read_exact(str.as_mut_bytes()).await?; Ok(str) } let kind = recv.read_u8().await?; let event = match kind { 0 => Self::Offer(Offer { id: read_fixed_string(recv).await?, sdp: read_string(recv).await?, }), 1 => Self::Answer(Answer { id: read_fixed_string(recv).await?, sdp: read_string(recv).await?, }), 2 => Self::Candidate(Candidate { id: read_fixed_string(recv).await?, candidate: read_string(recv).await?, }), 3 => Self::TargetChanged(TargetChanged { name: read_fixed_string(recv).await?, global_speak: recv.read_u8().await? != 0 }), kind => bail!("invalid kind: {kind}"), }; Ok(event) } } }