diff --git a/Cargo.lock b/Cargo.lock index 86bf646..ed44fd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1188,8 +1188,6 @@ dependencies = [ [[package]] name = "interceptor" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ab04c530fd82e414e40394cabe5f0ebfe30d119f10fe29d6e3561926af412e" dependencies = [ "async-trait", "bytes", @@ -1784,10 +1782,14 @@ dependencies = [ name = "proximity-client" version = "0.1.0" dependencies = [ + "anyhow", "cpal", "tokio", + "tracing", + "tracing-subscriber", "webrtc", "webtransport", + "xtra", ] [[package]] @@ -1998,8 +2000,6 @@ dependencies = [ [[package]] name = "rtcp" version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8306430fb118b7834bbee50e744dc34826eca1da2158657a3d6cbc70e24c2096" dependencies = [ "bytes", "thiserror 1.0.69", @@ -2009,8 +2009,6 @@ dependencies = [ [[package]] name = "rtp" version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68baca5b6cb4980678713f0d06ef3a432aa642baefcbfd0f4dd2ef9eb5ab550" dependencies = [ "bytes", "memchr", @@ -2157,8 +2155,6 @@ checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152" [[package]] name = "sdp" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5" dependencies = [ "rand", "substring", @@ -2398,8 +2394,6 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] name = "stun" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea256fb46a13f9204e9dee9982997b2c3097db175a9fddaa8350310d03c4d5a3" dependencies = [ "base64", "crc", @@ -2700,8 +2694,6 @@ dependencies = [ [[package]] name = "turn" version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0044fdae001dd8a1e247ea6289abf12f4fcea1331a2364da512f9cd680bbd8cb" dependencies = [ "async-trait", "base64", @@ -2911,8 +2903,6 @@ dependencies = [ [[package]] name = "webrtc" version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30367074d9f18231d28a74fab0120856b2b665da108d71a12beab7185a36f97b" dependencies = [ "arc-swap", "async-trait", @@ -2955,8 +2945,6 @@ dependencies = [ [[package]] name = "webrtc-data" version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec93b991efcd01b73c5b3503fa8adba159d069abe5785c988ebe14fcf8f05d1" dependencies = [ "bytes", "log", @@ -2970,8 +2958,6 @@ dependencies = [ [[package]] name = "webrtc-dtls" version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7c9b89fc909f9da0499283b1112cd98f72fec28e55a54a9e352525ca65cd95c" dependencies = [ "aes", "aes-gcm", @@ -3007,8 +2993,6 @@ dependencies = [ [[package]] name = "webrtc-ice" version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0348b28b593f7709ac98d872beb58c0009523df652c78e01b950ab9c537ff17d" dependencies = [ "arc-swap", "async-trait", @@ -3032,8 +3016,6 @@ dependencies = [ [[package]] name = "webrtc-mdns" version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6dfe9686c6c9c51428da4de415cb6ca2dc0591ce2b63212e23fd9cccf0e316b" dependencies = [ "log", "socket2", @@ -3045,8 +3027,6 @@ dependencies = [ [[package]] name = "webrtc-media" version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e153be16b8650021ad3e9e49ab6e5fa9fb7f6d1c23c213fd8bbd1a1135a4c704" dependencies = [ "byteorder", "bytes", @@ -3058,8 +3038,6 @@ dependencies = [ [[package]] name = "webrtc-sctp" version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5faf3846ec4b7e64b56338d62cbafe084aa79806b0379dff5cc74a8b7a2b3063" dependencies = [ "arc-swap", "async-trait", @@ -3076,8 +3054,6 @@ dependencies = [ [[package]] name = "webrtc-srtp" version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "771db9993712a8fb3886d5be4613ebf27250ef422bd4071988bf55f1ed1a64fa" dependencies = [ "aead", "aes", @@ -3099,8 +3075,6 @@ dependencies = [ [[package]] name = "webrtc-util" version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1438a8fd0d69c5775afb4a71470af92242dbd04059c61895163aa3c1ef933375" dependencies = [ "async-trait", "bitflags 1.3.2", diff --git a/flake.nix b/flake.nix index 572eb0a..f82ac7f 100644 --- a/flake.nix +++ b/flake.nix @@ -24,6 +24,8 @@ cmake pkg-config openssl + alsa-lib + ffmpeg ]; }; } diff --git a/out.opus b/out.opus new file mode 100644 index 0000000..d9a9df5 Binary files /dev/null and b/out.opus differ diff --git a/proximity-client/Cargo.toml b/proximity-client/Cargo.toml index 791c41d..ff4625e 100644 --- a/proximity-client/Cargo.toml +++ b/proximity-client/Cargo.toml @@ -4,7 +4,12 @@ version = "0.1.0" edition = "2021" [dependencies] +anyhow = "1.0.94" cpal = "0.15.3" tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util", "fs"] } -webrtc = "0.12.0" +tracing = "0.1.41" +tracing-subscriber = "0.3.19" +webrtc = { path = "../../webrtc/webrtc" } webtransport = "0.0.0" +xtra = { version = "0.6.0", features = ["macros"] } + diff --git a/proximity-client/src/main.rs b/proximity-client/src/main.rs index 48f5b12..d091d78 100644 --- a/proximity-client/src/main.rs +++ b/proximity-client/src/main.rs @@ -1,17 +1,3 @@ -use webrtc::{api::media_engine::MediaEngine, interceptor::registry::Registry, peer_connection::configuration::RTCConfiguration}; - fn main() { - let mut m = MediaEngine::default(); - - m.register_default_codecs()?; - let mut registry = Registry::new(); - registry = register_default_interceptors(registry, &mut m)?; - let api = APIBuilder::new().with_media_engine(m).with_interceptor_registry(registry).build(); - let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec!["stun:stun.l.google.com:19302".to_owned()], - ..Default::default() - }], - ..Default::default() - }; + } diff --git a/proximity-client/src/webrtc-main.rs b/proximity-client/src/webrtc-main.rs new file mode 100644 index 0000000..322f40d --- /dev/null +++ b/proximity-client/src/webrtc-main.rs @@ -0,0 +1,445 @@ +use std::{ + env::args, + fs::{read, File}, + io::Cursor, + sync::{Arc, LazyLock}, + time::Duration, +}; + +use tokio::{ + io::BufReader, + sync::{mpsc, oneshot, OnceCell}, +}; +use tracing::{error, info, info_span, trace, warn, Instrument, Span}; +use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, util::SubscriberInitExt, Layer}; +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, + media_engine::{MediaEngine, MIME_TYPE_OPUS}, + setting_engine::SettingEngine, + APIBuilder, API, + }, + data_channel::data_channel_init::RTCDataChannelInit, + ice_transport::{ + ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, + ice_connection_state::RTCIceConnectionState, + ice_server::RTCIceServer, + }, + interceptor::registry::Registry, + media::{ + io::{ogg_reader::OggReader, ogg_writer::OggWriter, Writer}, + Sample, + }, + peer_connection::{ + configuration::RTCConfiguration, + offer_answer_options::{RTCAnswerOptions, RTCOfferOptions}, + peer_connection_state::RTCPeerConnectionState, + sdp::session_description::RTCSessionDescription, + RTCPeerConnection, + }, + rtp_transceiver::{ + rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType}, + rtp_transceiver_direction::RTCRtpTransceiverDirection, + RTCRtpTransceiverInit, + }, + track::track_local::{track_local_static_sample::TrackLocalStaticSample, TrackLocal}, +}; +use xtra::{Actor, Address, Handler, Mailbox}; +const CODEC: LazyLock = LazyLock::new(|| RTCRtpCodecParameters { + capability: RTCRtpCodecCapability { + mime_type: MIME_TYPE_OPUS.to_owned(), + clock_rate: 48000, + channels: 2, + sdp_fmtp_line: "".to_owned(), + rtcp_feedback: vec![], + }, + payload_type: 111, + ..Default::default() +}); +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_filter(FilterFn::new(|m| { + m.module_path().map(|path| !path.contains("webrtc_ice") && !path.contains("webrtc_mdns")).unwrap_or_default() + // m.module_path().map(|path| path.contains("proximity_client")).unwrap_or_default() + // true + }))) + .init(); + let mut m = MediaEngine::default(); + + m.register_codec(CODEC.clone(), RTPCodecType::Audio)?; + let mut registry = Registry::new(); + registry = register_default_interceptors(registry, &mut m)?; + let api = Arc::new( + APIBuilder::new() + .with_media_engine(m) + // .with_setting_engine({ + // let mut engine = SettingEngine::default(); + // // engine.enable_sender_rtx(is_enabled); + // engine + // }) + .with_interceptor_registry(registry) + .build(), + ); + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], + ..Default::default() + }], + ..Default::default() + }; + + let sender = Sender::spawn(api.clone(), config.clone()); + + Receiver::run(api, config, sender).instrument(info_span!("receiver")).await; + Ok(()) +} + +#[derive(Actor)] +struct Sender { + peer_connection: Arc, + receiver: OnceCell>, + audio_track: Arc, +} +#[derive(Actor)] +struct Receiver { + peer_connection: Arc, + sender: Address, +} + +impl Sender { + pub fn spawn(api: Arc, config: RTCConfiguration) -> Address { + let (address, mailbox) = Mailbox::unbounded(); + let span = info_span!("sender"); + tokio::spawn( + async move { + let address = mailbox.address(); + let peer_connection = Arc::new(api.new_peer_connection(config).await.expect("rraaahaa")); + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_ice_connection_state_change({ + let address = address.clone(); + Box::new(move |connection_state: RTCIceConnectionState| { + info!("Connection State has changed {connection_state}"); + if connection_state == RTCIceConnectionState::Connected { + let address = address.clone(); + tokio::spawn(async move { + address.send(Connected).await.unwrap(); + }); + } + Box::pin(async {}) + }) + }); + + // Set the handler for Peer connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + info!("Peer Connection State has changed: {s}"); + + if s == RTCPeerConnectionState::Failed { + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. + info!("Peer Connection has gone to failed exiting"); + } + + Box::pin(async {}) + })); + + peer_connection.on_ice_candidate({ + let address = address.clone(); + let span = Span::current(); + Box::new(move |candidate: Option| { + let sender = address.clone(); + let span = span.clone(); + Box::pin( + async move { + if let Some(candidate) = candidate { + let candidate = candidate.to_json().unwrap(); + warn!("candidate: {:?}", candidate); + sender.send((IceNotify, candidate)).await.unwrap(); + } else { + warn!("candidates drained"); + } + } + .instrument(span), + ) + }) + }); + + peer_connection.on_track(Box::new(|track, _, _| { + warn!("aaaaaa"); + + panic!() + })); + + let audio_track = Arc::new(TrackLocalStaticSample::new( + CODEC.capability.clone(), + "audio".to_owned(), + "webrtc-rs".to_owned(), + )); + let rtp_sender = peer_connection + .add_transceiver_from_track( + Arc::clone(&audio_track) as Arc, + Some(RTCRtpTransceiverInit { + direction: RTCRtpTransceiverDirection::Sendonly, + send_encodings: Vec::new(), + }), + ) + .await + .expect("oops"); + let rtp_sender = rtp_sender.sender().await; + + // Read incoming RTCP packets + // Before these packets are returned they are processed by interceptors. For things + // like NACK this needs to be called. + tokio::spawn(async move { + let mut rtcp_buf = vec![0u8; 1500]; + while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} + error!("rtcp..."); + }); + + xtra::run( + mailbox, + Sender { + peer_connection, + receiver: OnceCell::const_new(), + audio_track, + }, + ) + .await; + + info!("exited"); + } + .instrument(span), + ); + + address + } +} + +impl Receiver { + pub async fn run(api: Arc, config: RTCConfiguration, sender: Address) { + let (address, mailbox) = Mailbox::unbounded(); + let peer_connection = Arc::new(api.new_peer_connection(config).await.expect("rraaahaa")); + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_ice_connection_state_change({ + let address = address.clone(); + Box::new(move |connection_state: RTCIceConnectionState| { + info!("Connection State has changed {connection_state}"); + let address = address.clone(); + Box::pin(async move { + if connection_state == RTCIceConnectionState::Connected { + let _ = address.send(Connected).await.unwrap(); + } + }) + }) + }); + + // Set the handler for Peer connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + info!("Peer Connection State has changed: {s}"); + + if s == RTCPeerConnectionState::Failed { + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. + error!("Peer Connection has gone to failed exiting"); + } + + Box::pin(async {}) + })); + + peer_connection.on_track(Box::new(move |track, track_receiver, transceiver| { + info!("holy track"); + + Box::pin(async move { + let mut file = File::create("out.opus").unwrap(); + let mut samplero = OggWriter::new(&mut file, 48000, 2).unwrap(); + + while let Ok((packet, _)) = track.read_rtp().await { + samplero.write_rtp(&packet).unwrap(); + } + println!("done"); + std::process::exit(1); + }) + })); + peer_connection.on_ice_candidate({ + let sender = sender.clone(); + let span = Span::current(); + Box::new(move |candidate| { + let sender = sender.clone(); + let span = span.clone(); + Box::pin( + async move { + if let Some(candidate) = candidate { + let candidate = candidate.to_json().unwrap(); + // warn!("candidate: {:?}", candidate); + sender.send(candidate).await.unwrap(); + } else { + // warn!("candidates drained"); + } + } + .instrument(span), + ) + }) + }); + + let transceiver = peer_connection + .add_transceiver_from_kind( + RTPCodecType::Audio, + Some(RTCRtpTransceiverInit { + direction: RTCRtpTransceiverDirection::Recvonly, + send_encodings: vec![], + }), + ) + .await + .unwrap(); + let rtp_sender = transceiver.sender().await; + tokio::spawn(async move { + let mut rtcp_buf = vec![0u8; 1500]; + while let Ok((packet, _)) = rtp_sender.read(&mut rtcp_buf).await {} + error!("rtcp..."); + }); + + let offer = sender.send(address).await.unwrap(); + error!("offer {offer:?}"); + peer_connection.set_remote_description(offer).await.unwrap(); + let answer = peer_connection.create_answer(None).await.unwrap(); + peer_connection.set_local_description(answer.clone()).await.unwrap(); + + sender.send(answer).await.unwrap(); + + xtra::run( + mailbox, + Receiver { + peer_connection, + sender, + }, + ) + .await; + } +} + +impl Handler> for Sender { + type Return = RTCSessionDescription; + + async fn handle(&mut self, address: Address, ctx: &mut xtra::Context) -> Self::Return { + self.receiver.set(address.clone()).unwrap(); + self.peer_connection.on_ice_candidate(Box::new(move |candidate| { + let address = address.clone(); + Box::pin(async move { + if let Some(candidate) = candidate { + address.send(candidate.to_json().unwrap()).await.unwrap(); + } + }) + })); + let offer = self.peer_connection.create_offer(None).await.unwrap(); + self.peer_connection.set_local_description(offer.clone()).await.unwrap(); + + offer + } +} + +impl Handler for Sender { + type Return = (); + + async fn handle(&mut self, message: RTCSessionDescription, ctx: &mut xtra::Context) -> Self::Return { + self.peer_connection.set_remote_description(message).await.unwrap(); + } +} + +struct IceNotify; +impl Handler<(IceNotify, RTCIceCandidateInit)> for Sender { + type Return = (); + + async fn handle( + &mut self, + (_, message): (IceNotify, RTCIceCandidateInit), + _: &mut xtra::Context, + ) -> Self::Return { + self.receiver.get().unwrap().send(message).await.unwrap() + } +} +impl Handler for Sender { + type Return = (); + + async fn handle(&mut self, message: RTCIceCandidateInit, _: &mut xtra::Context) -> Self::Return { + // trace!("adding ice candidate {message:?}"); + self.peer_connection.add_ice_candidate(message).await.unwrap(); + } +} + +impl Handler for Receiver { + type Return = (); + + async fn handle(&mut self, message: RTCIceCandidateInit, _: &mut xtra::Context) -> Self::Return { + // trace!("adding ice candidate {message:?}"); + self.peer_connection.add_ice_candidate(message).await.unwrap(); + } +} + +pub struct Connected; +impl Handler for Sender { + type Return = (); + + async fn handle(&mut self, _: Connected, ctx: &mut xtra::Context) -> Self::Return { + info!("connected"); + let address = ctx.mailbox().address(); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let file = read(args().nth(1).unwrap()).unwrap(); + let (mut ogg, _) = OggReader::new(Cursor::new(file), true).unwrap(); + const OGG_PAGE_DURATION: Duration = Duration::from_millis(20); + let mut ticker = tokio::time::interval(OGG_PAGE_DURATION); + let audio_track = self.audio_track.clone(); + + tokio::spawn( + async move { + // Keep track of last granule, the difference is the amount of samples in the buffer + let mut last_granule: u64 = 0; + while let Ok((page_data, page_header)) = ogg.parse_next_page() { + // The amount of samples is the difference between the last and current timestamp + let sample_count = page_header.granule_position - last_granule; + last_granule = page_header.granule_position; + let sample_duration = Duration::from_millis(sample_count * 1000 / 48000); + + info!("writing sample"); + audio_track + .write_sample(&Sample { + data: page_data.freeze(), + duration: sample_duration, + ..Default::default() + }) + .await + .unwrap(); + + let _ = ticker.tick().await; + } + info!("done streaming audio"); + address.send(Stop).await.unwrap(); + } + .in_current_span(), + ); + } +} + +struct Stop; +impl Handler for Sender { + type Return = (); + async fn handle(&mut self, _: Stop, ctx: &mut xtra::Context) -> Self::Return { + self.peer_connection.close().await.unwrap(); + ctx.stop_self(); + } +} + +impl Handler for Receiver { + type Return = (); + + async fn handle(&mut self, _: Connected, _: &mut xtra::Context) -> Self::Return { + info!("connected!"); + } +} diff --git a/src/protocol.rs b/src/protocol.rs index 54aa906..849427a 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -1,9 +1,11 @@ +use core::str; use std::{ ffi::CStr, fmt::{Debug, Display}, }; use anyhow::{bail, Context}; +use tracing::warn; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; #[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)] @@ -31,7 +33,9 @@ impl From for bool { } } -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)] +#[derive( + Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned, +)] #[repr(C)] pub struct String([u8; N]); @@ -40,10 +44,12 @@ impl String { self.try_as_str().expect("wasn't a string") } pub fn try_as_str(&self) -> anyhow::Result<&str> { - let cstr = CStr::from_bytes_until_nul(&self.0).context("interpreting bytes as c-string")?; - cstr.to_str().context("verifying string has utf-8") - // let str = str::from_utf8(&self.0).context("verifying string has utf-8")?; - // Ok(str.trim_end_matches('\0')) + if self.0.contains(&0) { + let cstr = CStr::from_bytes_until_nul(&self.0).context("interpreting bytes as c-string")?; + cstr.to_str().context("verifying string has utf-8") + } else { + std::str::from_utf8(&self.0).context("verifying string has utf8") + } } pub fn assert_valid(&self) -> anyhow::Result<()> { self.try_as_str().map(drop) diff --git a/src/server/mod.rs b/src/server/mod.rs index cc61ecc..52a09c3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,10 +2,9 @@ mod prox; use std::{collections::HashMap, sync::LazyLock}; -use prox::ProximityPlayer; +use prox::{packet::HYP_UUID_SIZE, ProximityPlayer}; use tokio::sync::RwLock; use tracing::{error, info_span, Instrument}; -use uuid::Uuid; use wtransport::{Endpoint, Identity, ServerConfig}; use xtra::{Actor, Address, Handler, Mailbox}; use zerocopy::{FromZeros, Immutable, IntoBytes}; @@ -15,8 +14,10 @@ use crate::{ protocol::String, }; -fn listeners() -> &'static RwLock>> { - static LISTENERS: LazyLock>>> = LazyLock::new(Default::default); +type UuidString = String; + +fn listeners() -> &'static RwLock>> { + static LISTENERS: LazyLock>>> = LazyLock::new(Default::default); &LISTENERS } diff --git a/src/server/prox.rs b/src/server/prox.rs index 63b947b..49d6ed5 100644 --- a/src/server/prox.rs +++ b/src/server/prox.rs @@ -1,19 +1,26 @@ use core::str; use std::sync::Arc; -use tokio::io::AsyncWriteExt; +use packet::{ + Event, + Event_variants::{Answer, Candidate, Offer}, + Packet, HYP_UUID_SIZE, +}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{error, info, info_span, trace, warn, Instrument}; use uuid::Uuid; -use wtransport::endpoint::IncomingSession; +use wtransport::{endpoint::IncomingSession, RecvStream}; use xtra::{Actor, Address, Handler, Mailbox}; use zerocopy::{FromZeros, IntoBytes}; use crate::protocol::String; -use super::{listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState}; +use super::{ + listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState, UuidString, +}; pub struct ProximityPlayer { - id: Uuid, + id: UuidString, send: wtransport::SendStream, connection: Arc, } @@ -29,9 +36,8 @@ impl ProximityPlayer { let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel"); trace!("getting peerjs uuid"); - let mut buffer = [0; 36]; - recv.read_exact(buffer.as_mut_bytes()).await.expect("failed to read uuid"); - let id = Uuid::parse_str(str::from_utf8(&buffer).expect("expected utf8")).expect("failed to parse uuid"); + let mut id = UuidString::new_zeroed(); + recv.read_exact(id.as_mut_bytes()).await.expect("failed to read uuid"); let span = info_span!("", %id); span.in_scope(|| trace!("uuid parsed")); @@ -47,9 +53,7 @@ impl ProximityPlayer { let listeners = listeners().read().await; send.write_u32_le(listeners.len() as u32).await.expect("failed to write peer length"); for (id, _) in listeners.iter() { - let mut str = String::<36>::new_zeroed(); - id.as_hyphenated().encode_lower(str.as_mut_bytes()); - send.write_all(str.as_bytes()).await.expect("failed to write peer id") + send.write_all(id.as_bytes()).await.expect("failed to write peer id") } } @@ -58,12 +62,31 @@ impl ProximityPlayer { listeners().write().await.insert(id, address.clone()); tokio::spawn({ let connection = connection.clone(); + let address = address.clone(); async move { connection.closed().await; let _ = address.send(Stop).await; } .instrument(span.clone()) }); + tokio::spawn( + async move { + loop { + match packet::Event::deserialize(&mut recv).await { + Ok(event) => { + info!("deserialized event: {event:?}"); + let _ = address.send(event).detach().await; + } + Err(error) => { + error!("error while deserializing: {error:?}"); + let _ = address.send(Stop).await; + return; + } + }; + } + } + .instrument(span.clone()), + ); xtra::run(mailbox, ProximityPlayer { id, send, connection }).instrument(span).await; } .in_current_span(), @@ -116,21 +139,93 @@ impl Handler for ProximityPlayer { } } +impl Handler for ProximityPlayer { + type Return = (); + + async fn handle(&mut self, message: Event, _: &mut xtra::Context) -> Self::Return { + async fn send_event(id: UuidString, value: T) + where + ProximityPlayer: Handler, + { + if let Some(listener) = listeners().read().await.get(&id) { + if let Err(error) = listener.send(value).detach().await { + warn!("listener {id} is dead: {error}") + } + } else { + warn!("sending offer to dead listener {id}"); + } + } + match message { + Event::Offer(offer) => { + send_event( + offer.id, + Offer { + id: self.id, + sdp: offer.sdp, + }, + ) + .await; + } + Event::Answer(answer) => { + send_event( + answer.id, + Answer { + id: self.id, + sdp: answer.sdp, + }, + ) + .await; + } + Event::Candidate(candidate) => { + send_event( + candidate.id, + Candidate { + id: self.id, + candidate: candidate.candidate, + }, + ) + .await; + } + } + } +} + +impl Handler for ProximityPlayer { + type Return = (); + + async fn handle(&mut self, message: Offer, _: &mut xtra::Context) -> Self::Return { + Event::Offer(message).serialize(&mut self.send).await.expect("failed to write offer!"); + } +} + +impl Handler for ProximityPlayer { + type Return = (); + + async fn handle(&mut self, message: Answer, _: &mut xtra::Context) -> Self::Return { + Event::Answer(message).serialize(&mut self.send).await.expect("failed to write offer!"); + } +} + +impl Handler for ProximityPlayer { + type Return = (); + + async fn handle(&mut self, message: Candidate, _: &mut xtra::Context) -> Self::Return { + Event::Candidate(message).serialize(&mut self.send).await.expect("failed to write offer!"); + } +} + struct PeerConnectionChanged { - id: Uuid, + id: UuidString, connected: bool, } impl Handler for ProximityPlayer { type Return = (); async fn handle(&mut self, message: PeerConnectionChanged, ctx: &mut xtra::Context) -> Self::Return { - let mut id = String::new_zeroed(); - - message.id.hyphenated().encode_lower(id.as_mut_bytes()); let event = packet::Packet { kind: packet::Kind::PeerConnectionChanged, data: packet::PeerConnectionChanged { - id, + id: self.id, connected: message.connected, }, }; @@ -217,13 +312,23 @@ impl Handler for ProximityPlayer { } pub mod packet { - use zerocopy::{Immutable, IntoBytes}; + use std::{str, string::String as StdString}; + + use anyhow::bail; + use newtype_enum::newtype_enum; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tracing::info; +use wtransport::{RecvStream, SendStream}; + use zerocopy::{FromZeros, Immutable, IntoBytes}; + use Event_variants::{Answer, Candidate, Offer}; use crate::{ packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE}, protocol::String, }; + pub const HYP_UUID_SIZE: usize = 36; + #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct Packet { @@ -239,6 +344,9 @@ pub mod packet { Moved = 2, StageChanged = 3, PeerConnectionChanged = 4, + Offer = 5, + Answer = 6, + Candidate = 7, } #[derive(IntoBytes, Immutable)] @@ -271,7 +379,92 @@ pub mod packet { #[derive(IntoBytes, Immutable)] #[repr(C, packed)] pub struct PeerConnectionChanged { - pub id: String<36>, + pub id: String, pub connected: bool, } + + #[newtype_enum] + #[derive(Debug)] + pub enum Event { + Offer { + pub id: String, + pub sdp: StdString, + }, + Answer { + pub id: String, + pub sdp: StdString, + }, + Candidate { + pub id: String, + pub candidate: StdString, + }, + } + + impl Event { + pub async fn serialize(self, send: &mut SendStream) -> anyhow::Result<()> { + async fn write_string(send: &mut SendStream, string: StdString) -> anyhow::Result<()> { + send.write_u32_le(string.len() as u32).await?; + send.write_all(string.as_bytes()).await?; + + Ok(()) + } + + info!("writing event: {self:?}"); + + match self { + Event::Offer(offer) => { + send.write_u8(Kind::Offer as u8).await?; + send.write_all(offer.id.as_bytes()).await?; + write_string(send, offer.sdp).await?; + } + Event::Answer(answer) => { + send.write_u8(Kind::Answer as u8).await?; + send.write_all(answer.id.as_bytes()).await?; + write_string(send, answer.sdp).await?; + } + Event::Candidate(candidate) => { + send.write_u8(Kind::Candidate as u8).await?; + send.write_all(candidate.id.as_bytes()).await?; + write_string(send, candidate.candidate).await?; + } + } + Ok(()) + } + + pub async fn deserialize(recv: &mut RecvStream) -> anyhow::Result { + async fn read_string(recv: &mut RecvStream) -> anyhow::Result { + let size = recv.read_u32_le().await?; + info!("string size: {size}"); + let mut data = vec![0; size as usize]; + recv.read_exact(&mut data).await?; + Ok(StdString::from_utf8(data)?) + } + async fn read_fixed_string(recv: &mut RecvStream) -> anyhow::Result> { + let mut str = String::new_zeroed(); + recv.read_exact(str.as_mut_bytes()).await?; + info!("read id: {str}"); + Ok(str) + } + + let kind = recv.read_u8().await?; + info!("reading kind: {kind}"); + let event = match kind { + 0 => Event::Offer(Offer { + id: read_fixed_string(recv).await?, + sdp: read_string(recv).await?, + }), + 1 => Event::Answer(Answer { + id: read_fixed_string(recv).await?, + sdp: read_string(recv).await?, + }), + 2 => Event::Candidate(Candidate { + id: read_fixed_string(recv).await?, + candidate: read_string(recv).await?, + }), + kind => bail!("invalid kind: {kind}"), + }; + + Ok(event) + } + } } diff --git a/wyrmjewelbox.opus b/wyrmjewelbox.opus new file mode 100644 index 0000000..d0a4fe1 Binary files /dev/null and b/wyrmjewelbox.opus differ