implemented proximity chat using the js frontend

sorry webrtc-rs, you were just half baked...
This commit is contained in:
Aubrey 2024-12-20 04:29:34 -06:00
parent a129a11eed
commit 6678e908b8
No known key found for this signature in database
10 changed files with 684 additions and 72 deletions

34
Cargo.lock generated
View file

@ -1188,8 +1188,6 @@ dependencies = [
[[package]] [[package]]
name = "interceptor" name = "interceptor"
version = "0.13.0" version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ab04c530fd82e414e40394cabe5f0ebfe30d119f10fe29d6e3561926af412e"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes", "bytes",
@ -1784,10 +1782,14 @@ dependencies = [
name = "proximity-client" name = "proximity-client"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"cpal", "cpal",
"tokio", "tokio",
"tracing",
"tracing-subscriber",
"webrtc", "webrtc",
"webtransport", "webtransport",
"xtra",
] ]
[[package]] [[package]]
@ -1998,8 +2000,6 @@ dependencies = [
[[package]] [[package]]
name = "rtcp" name = "rtcp"
version = "0.12.0" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8306430fb118b7834bbee50e744dc34826eca1da2158657a3d6cbc70e24c2096"
dependencies = [ dependencies = [
"bytes", "bytes",
"thiserror 1.0.69", "thiserror 1.0.69",
@ -2009,8 +2009,6 @@ dependencies = [
[[package]] [[package]]
name = "rtp" name = "rtp"
version = "0.12.0" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e68baca5b6cb4980678713f0d06ef3a432aa642baefcbfd0f4dd2ef9eb5ab550"
dependencies = [ dependencies = [
"bytes", "bytes",
"memchr", "memchr",
@ -2157,8 +2155,6 @@ checksum = "a3cf7c11c38cb994f3d40e8a8cde3bbd1f72a435e4c49e85d6553d8312306152"
[[package]] [[package]]
name = "sdp" name = "sdp"
version = "0.7.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5"
dependencies = [ dependencies = [
"rand", "rand",
"substring", "substring",
@ -2398,8 +2394,6 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "stun" name = "stun"
version = "0.7.0" version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea256fb46a13f9204e9dee9982997b2c3097db175a9fddaa8350310d03c4d5a3"
dependencies = [ dependencies = [
"base64", "base64",
"crc", "crc",
@ -2700,8 +2694,6 @@ dependencies = [
[[package]] [[package]]
name = "turn" name = "turn"
version = "0.9.0" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0044fdae001dd8a1e247ea6289abf12f4fcea1331a2364da512f9cd680bbd8cb"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64", "base64",
@ -2911,8 +2903,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc" name = "webrtc"
version = "0.12.0" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30367074d9f18231d28a74fab0120856b2b665da108d71a12beab7185a36f97b"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -2955,8 +2945,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-data" name = "webrtc-data"
version = "0.10.0" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec93b991efcd01b73c5b3503fa8adba159d069abe5785c988ebe14fcf8f05d1"
dependencies = [ dependencies = [
"bytes", "bytes",
"log", "log",
@ -2970,8 +2958,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-dtls" name = "webrtc-dtls"
version = "0.11.0" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7c9b89fc909f9da0499283b1112cd98f72fec28e55a54a9e352525ca65cd95c"
dependencies = [ dependencies = [
"aes", "aes",
"aes-gcm", "aes-gcm",
@ -3007,8 +2993,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-ice" name = "webrtc-ice"
version = "0.12.0" version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0348b28b593f7709ac98d872beb58c0009523df652c78e01b950ab9c537ff17d"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -3032,8 +3016,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-mdns" name = "webrtc-mdns"
version = "0.8.0" version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6dfe9686c6c9c51428da4de415cb6ca2dc0591ce2b63212e23fd9cccf0e316b"
dependencies = [ dependencies = [
"log", "log",
"socket2", "socket2",
@ -3045,8 +3027,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-media" name = "webrtc-media"
version = "0.9.0" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e153be16b8650021ad3e9e49ab6e5fa9fb7f6d1c23c213fd8bbd1a1135a4c704"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"bytes", "bytes",
@ -3058,8 +3038,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-sctp" name = "webrtc-sctp"
version = "0.11.0" version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5faf3846ec4b7e64b56338d62cbafe084aa79806b0379dff5cc74a8b7a2b3063"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
@ -3076,8 +3054,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-srtp" name = "webrtc-srtp"
version = "0.14.0" version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "771db9993712a8fb3886d5be4613ebf27250ef422bd4071988bf55f1ed1a64fa"
dependencies = [ dependencies = [
"aead", "aead",
"aes", "aes",
@ -3099,8 +3075,6 @@ dependencies = [
[[package]] [[package]]
name = "webrtc-util" name = "webrtc-util"
version = "0.10.0" version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1438a8fd0d69c5775afb4a71470af92242dbd04059c61895163aa3c1ef933375"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bitflags 1.3.2", "bitflags 1.3.2",

View file

@ -24,6 +24,8 @@
cmake cmake
pkg-config pkg-config
openssl openssl
alsa-lib
ffmpeg
]; ];
}; };
} }

BIN
out.opus Normal file

Binary file not shown.

View file

@ -4,7 +4,12 @@ version = "0.1.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
anyhow = "1.0.94"
cpal = "0.15.3" cpal = "0.15.3"
tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util", "fs"] } tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util", "fs"] }
webrtc = "0.12.0" tracing = "0.1.41"
tracing-subscriber = "0.3.19"
webrtc = { path = "../../webrtc/webrtc" }
webtransport = "0.0.0" webtransport = "0.0.0"
xtra = { version = "0.6.0", features = ["macros"] }

View file

@ -1,17 +1,3 @@
use webrtc::{api::media_engine::MediaEngine, interceptor::registry::Registry, peer_connection::configuration::RTCConfiguration};
fn main() { fn main() {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new().with_media_engine(m).with_interceptor_registry(registry).build();
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
} }

View file

@ -0,0 +1,445 @@
use std::{
env::args,
fs::{read, File},
io::Cursor,
sync::{Arc, LazyLock},
time::Duration,
};
use tokio::{
io::BufReader,
sync::{mpsc, oneshot, OnceCell},
};
use tracing::{error, info, info_span, trace, warn, Instrument, Span};
use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, util::SubscriberInitExt, Layer};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors,
media_engine::{MediaEngine, MIME_TYPE_OPUS},
setting_engine::SettingEngine,
APIBuilder, API,
},
data_channel::data_channel_init::RTCDataChannelInit,
ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
ice_connection_state::RTCIceConnectionState,
ice_server::RTCIceServer,
},
interceptor::registry::Registry,
media::{
io::{ogg_reader::OggReader, ogg_writer::OggWriter, Writer},
Sample,
},
peer_connection::{
configuration::RTCConfiguration,
offer_answer_options::{RTCAnswerOptions, RTCOfferOptions},
peer_connection_state::RTCPeerConnectionState,
sdp::session_description::RTCSessionDescription,
RTCPeerConnection,
},
rtp_transceiver::{
rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType},
rtp_transceiver_direction::RTCRtpTransceiverDirection,
RTCRtpTransceiverInit,
},
track::track_local::{track_local_static_sample::TrackLocalStaticSample, TrackLocal},
};
use xtra::{Actor, Address, Handler, Mailbox};
const CODEC: LazyLock<RTCRtpCodecParameters> = LazyLock::new(|| RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_OPUS.to_owned(),
clock_rate: 48000,
channels: 2,
sdp_fmtp_line: "".to_owned(),
rtcp_feedback: vec![],
},
payload_type: 111,
..Default::default()
});
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(FilterFn::new(|m| {
m.module_path().map(|path| !path.contains("webrtc_ice") && !path.contains("webrtc_mdns")).unwrap_or_default()
// m.module_path().map(|path| path.contains("proximity_client")).unwrap_or_default()
// true
})))
.init();
let mut m = MediaEngine::default();
m.register_codec(CODEC.clone(), RTPCodecType::Audio)?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = Arc::new(
APIBuilder::new()
.with_media_engine(m)
// .with_setting_engine({
// let mut engine = SettingEngine::default();
// // engine.enable_sender_rtx(is_enabled);
// engine
// })
.with_interceptor_registry(registry)
.build(),
);
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
let sender = Sender::spawn(api.clone(), config.clone());
Receiver::run(api, config, sender).instrument(info_span!("receiver")).await;
Ok(())
}
#[derive(Actor)]
struct Sender {
peer_connection: Arc<RTCPeerConnection>,
receiver: OnceCell<Address<Receiver>>,
audio_track: Arc<TrackLocalStaticSample>,
}
#[derive(Actor)]
struct Receiver {
peer_connection: Arc<RTCPeerConnection>,
sender: Address<Sender>,
}
impl Sender {
pub fn spawn(api: Arc<API>, config: RTCConfiguration) -> Address<Sender> {
let (address, mailbox) = Mailbox::unbounded();
let span = info_span!("sender");
tokio::spawn(
async move {
let address = mailbox.address();
let peer_connection = Arc::new(api.new_peer_connection(config).await.expect("rraaahaa"));
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change({
let address = address.clone();
Box::new(move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Connected {
let address = address.clone();
tokio::spawn(async move {
address.send(Connected).await.unwrap();
});
}
Box::pin(async {})
})
});
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
info!("Peer Connection has gone to failed exiting");
}
Box::pin(async {})
}));
peer_connection.on_ice_candidate({
let address = address.clone();
let span = Span::current();
Box::new(move |candidate: Option<RTCIceCandidate>| {
let sender = address.clone();
let span = span.clone();
Box::pin(
async move {
if let Some(candidate) = candidate {
let candidate = candidate.to_json().unwrap();
warn!("candidate: {:?}", candidate);
sender.send((IceNotify, candidate)).await.unwrap();
} else {
warn!("candidates drained");
}
}
.instrument(span),
)
})
});
peer_connection.on_track(Box::new(|track, _, _| {
warn!("aaaaaa");
panic!()
}));
let audio_track = Arc::new(TrackLocalStaticSample::new(
CODEC.capability.clone(),
"audio".to_owned(),
"webrtc-rs".to_owned(),
));
let rtp_sender = peer_connection
.add_transceiver_from_track(
Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Sendonly,
send_encodings: Vec::new(),
}),
)
.await
.expect("oops");
let rtp_sender = rtp_sender.sender().await;
// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
error!("rtcp...");
});
xtra::run(
mailbox,
Sender {
peer_connection,
receiver: OnceCell::const_new(),
audio_track,
},
)
.await;
info!("exited");
}
.instrument(span),
);
address
}
}
impl Receiver {
pub async fn run(api: Arc<API>, config: RTCConfiguration, sender: Address<Sender>) {
let (address, mailbox) = Mailbox::unbounded();
let peer_connection = Arc::new(api.new_peer_connection(config).await.expect("rraaahaa"));
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change({
let address = address.clone();
Box::new(move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
let address = address.clone();
Box::pin(async move {
if connection_state == RTCIceConnectionState::Connected {
let _ = address.send(Connected).await.unwrap();
}
})
})
});
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
error!("Peer Connection has gone to failed exiting");
}
Box::pin(async {})
}));
peer_connection.on_track(Box::new(move |track, track_receiver, transceiver| {
info!("holy track");
Box::pin(async move {
let mut file = File::create("out.opus").unwrap();
let mut samplero = OggWriter::new(&mut file, 48000, 2).unwrap();
while let Ok((packet, _)) = track.read_rtp().await {
samplero.write_rtp(&packet).unwrap();
}
println!("done");
std::process::exit(1);
})
}));
peer_connection.on_ice_candidate({
let sender = sender.clone();
let span = Span::current();
Box::new(move |candidate| {
let sender = sender.clone();
let span = span.clone();
Box::pin(
async move {
if let Some(candidate) = candidate {
let candidate = candidate.to_json().unwrap();
// warn!("candidate: {:?}", candidate);
sender.send(candidate).await.unwrap();
} else {
// warn!("candidates drained");
}
}
.instrument(span),
)
})
});
let transceiver = peer_connection
.add_transceiver_from_kind(
RTPCodecType::Audio,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Recvonly,
send_encodings: vec![],
}),
)
.await
.unwrap();
let rtp_sender = transceiver.sender().await;
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((packet, _)) = rtp_sender.read(&mut rtcp_buf).await {}
error!("rtcp...");
});
let offer = sender.send(address).await.unwrap();
error!("offer {offer:?}");
peer_connection.set_remote_description(offer).await.unwrap();
let answer = peer_connection.create_answer(None).await.unwrap();
peer_connection.set_local_description(answer.clone()).await.unwrap();
sender.send(answer).await.unwrap();
xtra::run(
mailbox,
Receiver {
peer_connection,
sender,
},
)
.await;
}
}
impl Handler<Address<Receiver>> for Sender {
type Return = RTCSessionDescription;
async fn handle(&mut self, address: Address<Receiver>, ctx: &mut xtra::Context<Self>) -> Self::Return {
self.receiver.set(address.clone()).unwrap();
self.peer_connection.on_ice_candidate(Box::new(move |candidate| {
let address = address.clone();
Box::pin(async move {
if let Some(candidate) = candidate {
address.send(candidate.to_json().unwrap()).await.unwrap();
}
})
}));
let offer = self.peer_connection.create_offer(None).await.unwrap();
self.peer_connection.set_local_description(offer.clone()).await.unwrap();
offer
}
}
impl Handler<RTCSessionDescription> for Sender {
type Return = ();
async fn handle(&mut self, message: RTCSessionDescription, ctx: &mut xtra::Context<Self>) -> Self::Return {
self.peer_connection.set_remote_description(message).await.unwrap();
}
}
struct IceNotify;
impl Handler<(IceNotify, RTCIceCandidateInit)> for Sender {
type Return = ();
async fn handle(
&mut self,
(_, message): (IceNotify, RTCIceCandidateInit),
_: &mut xtra::Context<Self>,
) -> Self::Return {
self.receiver.get().unwrap().send(message).await.unwrap()
}
}
impl Handler<RTCIceCandidateInit> for Sender {
type Return = ();
async fn handle(&mut self, message: RTCIceCandidateInit, _: &mut xtra::Context<Self>) -> Self::Return {
// trace!("adding ice candidate {message:?}");
self.peer_connection.add_ice_candidate(message).await.unwrap();
}
}
impl Handler<RTCIceCandidateInit> for Receiver {
type Return = ();
async fn handle(&mut self, message: RTCIceCandidateInit, _: &mut xtra::Context<Self>) -> Self::Return {
// trace!("adding ice candidate {message:?}");
self.peer_connection.add_ice_candidate(message).await.unwrap();
}
}
pub struct Connected;
impl Handler<Connected> for Sender {
type Return = ();
async fn handle(&mut self, _: Connected, ctx: &mut xtra::Context<Self>) -> Self::Return {
info!("connected");
let address = ctx.mailbox().address();
tokio::time::sleep(Duration::from_secs(1)).await;
let file = read(args().nth(1).unwrap()).unwrap();
let (mut ogg, _) = OggReader::new(Cursor::new(file), true).unwrap();
const OGG_PAGE_DURATION: Duration = Duration::from_millis(20);
let mut ticker = tokio::time::interval(OGG_PAGE_DURATION);
let audio_track = self.audio_track.clone();
tokio::spawn(
async move {
// Keep track of last granule, the difference is the amount of samples in the buffer
let mut last_granule: u64 = 0;
while let Ok((page_data, page_header)) = ogg.parse_next_page() {
// The amount of samples is the difference between the last and current timestamp
let sample_count = page_header.granule_position - last_granule;
last_granule = page_header.granule_position;
let sample_duration = Duration::from_millis(sample_count * 1000 / 48000);
info!("writing sample");
audio_track
.write_sample(&Sample {
data: page_data.freeze(),
duration: sample_duration,
..Default::default()
})
.await
.unwrap();
let _ = ticker.tick().await;
}
info!("done streaming audio");
address.send(Stop).await.unwrap();
}
.in_current_span(),
);
}
}
struct Stop;
impl Handler<Stop> for Sender {
type Return = ();
async fn handle(&mut self, _: Stop, ctx: &mut xtra::Context<Self>) -> Self::Return {
self.peer_connection.close().await.unwrap();
ctx.stop_self();
}
}
impl Handler<Connected> for Receiver {
type Return = ();
async fn handle(&mut self, _: Connected, _: &mut xtra::Context<Self>) -> Self::Return {
info!("connected!");
}
}

View file

@ -1,9 +1,11 @@
use core::str;
use std::{ use std::{
ffi::CStr, ffi::CStr,
fmt::{Debug, Display}, fmt::{Debug, Display},
}; };
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use tracing::warn;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
#[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)] #[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)]
@ -31,7 +33,9 @@ impl From<Bool> for bool {
} }
} }
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)] #[derive(
Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned,
)]
#[repr(C)] #[repr(C)]
pub struct String<const N: usize>([u8; N]); pub struct String<const N: usize>([u8; N]);
@ -40,10 +44,12 @@ impl<const N: usize> String<N> {
self.try_as_str().expect("wasn't a string") self.try_as_str().expect("wasn't a string")
} }
pub fn try_as_str(&self) -> anyhow::Result<&str> { pub fn try_as_str(&self) -> anyhow::Result<&str> {
if self.0.contains(&0) {
let cstr = CStr::from_bytes_until_nul(&self.0).context("interpreting bytes as c-string")?; let cstr = CStr::from_bytes_until_nul(&self.0).context("interpreting bytes as c-string")?;
cstr.to_str().context("verifying string has utf-8") cstr.to_str().context("verifying string has utf-8")
// let str = str::from_utf8(&self.0).context("verifying string has utf-8")?; } else {
// Ok(str.trim_end_matches('\0')) std::str::from_utf8(&self.0).context("verifying string has utf8")
}
} }
pub fn assert_valid(&self) -> anyhow::Result<()> { pub fn assert_valid(&self) -> anyhow::Result<()> {
self.try_as_str().map(drop) self.try_as_str().map(drop)

View file

@ -2,10 +2,9 @@ mod prox;
use std::{collections::HashMap, sync::LazyLock}; use std::{collections::HashMap, sync::LazyLock};
use prox::ProximityPlayer; use prox::{packet::HYP_UUID_SIZE, ProximityPlayer};
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tracing::{error, info_span, Instrument}; use tracing::{error, info_span, Instrument};
use uuid::Uuid;
use wtransport::{Endpoint, Identity, ServerConfig}; use wtransport::{Endpoint, Identity, ServerConfig};
use xtra::{Actor, Address, Handler, Mailbox}; use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::{FromZeros, Immutable, IntoBytes}; use zerocopy::{FromZeros, Immutable, IntoBytes};
@ -15,8 +14,10 @@ use crate::{
protocol::String, protocol::String,
}; };
fn listeners() -> &'static RwLock<HashMap<Uuid, Address<ProximityPlayer>>> { type UuidString = String<HYP_UUID_SIZE>;
static LISTENERS: LazyLock<RwLock<HashMap<Uuid, Address<ProximityPlayer>>>> = LazyLock::new(Default::default);
fn listeners() -> &'static RwLock<HashMap<UuidString, Address<ProximityPlayer>>> {
static LISTENERS: LazyLock<RwLock<HashMap<UuidString, Address<ProximityPlayer>>>> = LazyLock::new(Default::default);
&LISTENERS &LISTENERS
} }

View file

@ -1,19 +1,26 @@
use core::str; use core::str;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncWriteExt; use packet::{
Event,
Event_variants::{Answer, Candidate, Offer},
Packet, HYP_UUID_SIZE,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{error, info, info_span, trace, warn, Instrument}; use tracing::{error, info, info_span, trace, warn, Instrument};
use uuid::Uuid; use uuid::Uuid;
use wtransport::endpoint::IncomingSession; use wtransport::{endpoint::IncomingSession, RecvStream};
use xtra::{Actor, Address, Handler, Mailbox}; use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::{FromZeros, IntoBytes}; use zerocopy::{FromZeros, IntoBytes};
use crate::protocol::String; use crate::protocol::String;
use super::{listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState}; use super::{
listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState, UuidString,
};
pub struct ProximityPlayer { pub struct ProximityPlayer {
id: Uuid, id: UuidString,
send: wtransport::SendStream, send: wtransport::SendStream,
connection: Arc<wtransport::Connection>, connection: Arc<wtransport::Connection>,
} }
@ -29,9 +36,8 @@ impl ProximityPlayer {
let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel"); let (mut send, mut recv) = connection.accept_bi().await.expect("failed to start channel");
trace!("getting peerjs uuid"); trace!("getting peerjs uuid");
let mut buffer = [0; 36]; let mut id = UuidString::new_zeroed();
recv.read_exact(buffer.as_mut_bytes()).await.expect("failed to read uuid"); recv.read_exact(id.as_mut_bytes()).await.expect("failed to read uuid");
let id = Uuid::parse_str(str::from_utf8(&buffer).expect("expected utf8")).expect("failed to parse uuid");
let span = info_span!("", %id); let span = info_span!("", %id);
span.in_scope(|| trace!("uuid parsed")); span.in_scope(|| trace!("uuid parsed"));
@ -47,9 +53,7 @@ impl ProximityPlayer {
let listeners = listeners().read().await; let listeners = listeners().read().await;
send.write_u32_le(listeners.len() as u32).await.expect("failed to write peer length"); send.write_u32_le(listeners.len() as u32).await.expect("failed to write peer length");
for (id, _) in listeners.iter() { for (id, _) in listeners.iter() {
let mut str = String::<36>::new_zeroed(); send.write_all(id.as_bytes()).await.expect("failed to write peer id")
id.as_hyphenated().encode_lower(str.as_mut_bytes());
send.write_all(str.as_bytes()).await.expect("failed to write peer id")
} }
} }
@ -58,12 +62,31 @@ impl ProximityPlayer {
listeners().write().await.insert(id, address.clone()); listeners().write().await.insert(id, address.clone());
tokio::spawn({ tokio::spawn({
let connection = connection.clone(); let connection = connection.clone();
let address = address.clone();
async move { async move {
connection.closed().await; connection.closed().await;
let _ = address.send(Stop).await; let _ = address.send(Stop).await;
} }
.instrument(span.clone()) .instrument(span.clone())
}); });
tokio::spawn(
async move {
loop {
match packet::Event::deserialize(&mut recv).await {
Ok(event) => {
info!("deserialized event: {event:?}");
let _ = address.send(event).detach().await;
}
Err(error) => {
error!("error while deserializing: {error:?}");
let _ = address.send(Stop).await;
return;
}
};
}
}
.instrument(span.clone()),
);
xtra::run(mailbox, ProximityPlayer { id, send, connection }).instrument(span).await; xtra::run(mailbox, ProximityPlayer { id, send, connection }).instrument(span).await;
} }
.in_current_span(), .in_current_span(),
@ -116,21 +139,93 @@ impl Handler<Stop> for ProximityPlayer {
} }
} }
impl Handler<Event> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, message: Event, _: &mut xtra::Context<Self>) -> Self::Return {
async fn send_event<T: Send + 'static>(id: UuidString, value: T)
where
ProximityPlayer: Handler<T>,
{
if let Some(listener) = listeners().read().await.get(&id) {
if let Err(error) = listener.send(value).detach().await {
warn!("listener {id} is dead: {error}")
}
} else {
warn!("sending offer to dead listener {id}");
}
}
match message {
Event::Offer(offer) => {
send_event(
offer.id,
Offer {
id: self.id,
sdp: offer.sdp,
},
)
.await;
}
Event::Answer(answer) => {
send_event(
answer.id,
Answer {
id: self.id,
sdp: answer.sdp,
},
)
.await;
}
Event::Candidate(candidate) => {
send_event(
candidate.id,
Candidate {
id: self.id,
candidate: candidate.candidate,
},
)
.await;
}
}
}
}
impl Handler<Offer> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, message: Offer, _: &mut xtra::Context<Self>) -> Self::Return {
Event::Offer(message).serialize(&mut self.send).await.expect("failed to write offer!");
}
}
impl Handler<Answer> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, message: Answer, _: &mut xtra::Context<Self>) -> Self::Return {
Event::Answer(message).serialize(&mut self.send).await.expect("failed to write offer!");
}
}
impl Handler<Candidate> for ProximityPlayer {
type Return = ();
async fn handle(&mut self, message: Candidate, _: &mut xtra::Context<Self>) -> Self::Return {
Event::Candidate(message).serialize(&mut self.send).await.expect("failed to write offer!");
}
}
struct PeerConnectionChanged { struct PeerConnectionChanged {
id: Uuid, id: UuidString,
connected: bool, connected: bool,
} }
impl Handler<PeerConnectionChanged> for ProximityPlayer { impl Handler<PeerConnectionChanged> for ProximityPlayer {
type Return = (); type Return = ();
async fn handle(&mut self, message: PeerConnectionChanged, ctx: &mut xtra::Context<Self>) -> Self::Return { async fn handle(&mut self, message: PeerConnectionChanged, ctx: &mut xtra::Context<Self>) -> Self::Return {
let mut id = String::new_zeroed();
message.id.hyphenated().encode_lower(id.as_mut_bytes());
let event = packet::Packet { let event = packet::Packet {
kind: packet::Kind::PeerConnectionChanged, kind: packet::Kind::PeerConnectionChanged,
data: packet::PeerConnectionChanged { data: packet::PeerConnectionChanged {
id, id: self.id,
connected: message.connected, connected: message.connected,
}, },
}; };
@ -217,13 +312,23 @@ impl Handler<ChangedStage> for ProximityPlayer {
} }
pub mod packet { pub mod packet {
use zerocopy::{Immutable, IntoBytes}; use std::{str, string::String as StdString};
use anyhow::bail;
use newtype_enum::newtype_enum;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::info;
use wtransport::{RecvStream, SendStream};
use zerocopy::{FromZeros, Immutable, IntoBytes};
use Event_variants::{Answer, Candidate, Offer};
use crate::{ use crate::{
packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE}, packet::{CLIENT_NAME_SIZE, STAGE_GAME_NAME_SIZE},
protocol::String, protocol::String,
}; };
pub const HYP_UUID_SIZE: usize = 36;
#[derive(IntoBytes, Immutable)] #[derive(IntoBytes, Immutable)]
#[repr(C, packed)] #[repr(C, packed)]
pub struct Packet<T> { pub struct Packet<T> {
@ -239,6 +344,9 @@ pub mod packet {
Moved = 2, Moved = 2,
StageChanged = 3, StageChanged = 3,
PeerConnectionChanged = 4, PeerConnectionChanged = 4,
Offer = 5,
Answer = 6,
Candidate = 7,
} }
#[derive(IntoBytes, Immutable)] #[derive(IntoBytes, Immutable)]
@ -271,7 +379,92 @@ pub mod packet {
#[derive(IntoBytes, Immutable)] #[derive(IntoBytes, Immutable)]
#[repr(C, packed)] #[repr(C, packed)]
pub struct PeerConnectionChanged { pub struct PeerConnectionChanged {
pub id: String<36>, pub id: String<HYP_UUID_SIZE>,
pub connected: bool, pub connected: bool,
} }
#[newtype_enum]
#[derive(Debug)]
pub enum Event {
Offer {
pub id: String<HYP_UUID_SIZE>,
pub sdp: StdString,
},
Answer {
pub id: String<HYP_UUID_SIZE>,
pub sdp: StdString,
},
Candidate {
pub id: String<HYP_UUID_SIZE>,
pub candidate: StdString,
},
}
impl Event {
pub async fn serialize(self, send: &mut SendStream) -> anyhow::Result<()> {
async fn write_string(send: &mut SendStream, string: StdString) -> anyhow::Result<()> {
send.write_u32_le(string.len() as u32).await?;
send.write_all(string.as_bytes()).await?;
Ok(())
}
info!("writing event: {self:?}");
match self {
Event::Offer(offer) => {
send.write_u8(Kind::Offer as u8).await?;
send.write_all(offer.id.as_bytes()).await?;
write_string(send, offer.sdp).await?;
}
Event::Answer(answer) => {
send.write_u8(Kind::Answer as u8).await?;
send.write_all(answer.id.as_bytes()).await?;
write_string(send, answer.sdp).await?;
}
Event::Candidate(candidate) => {
send.write_u8(Kind::Candidate as u8).await?;
send.write_all(candidate.id.as_bytes()).await?;
write_string(send, candidate.candidate).await?;
}
}
Ok(())
}
pub async fn deserialize(recv: &mut RecvStream) -> anyhow::Result<Self> {
async fn read_string(recv: &mut RecvStream) -> anyhow::Result<StdString> {
let size = recv.read_u32_le().await?;
info!("string size: {size}");
let mut data = vec![0; size as usize];
recv.read_exact(&mut data).await?;
Ok(StdString::from_utf8(data)?)
}
async fn read_fixed_string<const N: usize>(recv: &mut RecvStream) -> anyhow::Result<String<N>> {
let mut str = String::new_zeroed();
recv.read_exact(str.as_mut_bytes()).await?;
info!("read id: {str}");
Ok(str)
}
let kind = recv.read_u8().await?;
info!("reading kind: {kind}");
let event = match kind {
0 => Event::Offer(Offer {
id: read_fixed_string(recv).await?,
sdp: read_string(recv).await?,
}),
1 => Event::Answer(Answer {
id: read_fixed_string(recv).await?,
sdp: read_string(recv).await?,
}),
2 => Event::Candidate(Candidate {
id: read_fixed_string(recv).await?,
candidate: read_string(recv).await?,
}),
kind => bail!("invalid kind: {kind}"),
};
Ok(event)
}
}
} }

BIN
wyrmjewelbox.opus Normal file

Binary file not shown.