remove warnings and proximity-client

This commit is contained in:
Aubrey 2024-12-20 04:32:26 -06:00
parent 6678e908b8
commit c3f6a120df
No known key found for this signature in database
9 changed files with 20 additions and 1899 deletions

1435
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,6 +19,3 @@ zerocopy = { version = "0.8.13", features = ["derive"] }
[dev-dependencies] [dev-dependencies]
roead = "1.0.0" roead = "1.0.0"
[workspace]
members = ["proximity-client"]

BIN
out.opus

Binary file not shown.

View file

@ -1,15 +0,0 @@
[package]
name = "proximity-client"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.94"
cpal = "0.15.3"
tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "sync", "io-util", "fs"] }
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
webrtc = { path = "../../webrtc/webrtc" }
webtransport = "0.0.0"
xtra = { version = "0.6.0", features = ["macros"] }

View file

@ -1,3 +0,0 @@
fn main() {
}

View file

@ -1,445 +0,0 @@
use std::{
env::args,
fs::{read, File},
io::Cursor,
sync::{Arc, LazyLock},
time::Duration,
};
use tokio::{
io::BufReader,
sync::{mpsc, oneshot, OnceCell},
};
use tracing::{error, info, info_span, trace, warn, Instrument, Span};
use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, util::SubscriberInitExt, Layer};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors,
media_engine::{MediaEngine, MIME_TYPE_OPUS},
setting_engine::SettingEngine,
APIBuilder, API,
},
data_channel::data_channel_init::RTCDataChannelInit,
ice_transport::{
ice_candidate::{RTCIceCandidate, RTCIceCandidateInit},
ice_connection_state::RTCIceConnectionState,
ice_server::RTCIceServer,
},
interceptor::registry::Registry,
media::{
io::{ogg_reader::OggReader, ogg_writer::OggWriter, Writer},
Sample,
},
peer_connection::{
configuration::RTCConfiguration,
offer_answer_options::{RTCAnswerOptions, RTCOfferOptions},
peer_connection_state::RTCPeerConnectionState,
sdp::session_description::RTCSessionDescription,
RTCPeerConnection,
},
rtp_transceiver::{
rtp_codec::{RTCRtpCodecCapability, RTCRtpCodecParameters, RTPCodecType},
rtp_transceiver_direction::RTCRtpTransceiverDirection,
RTCRtpTransceiverInit,
},
track::track_local::{track_local_static_sample::TrackLocalStaticSample, TrackLocal},
};
use xtra::{Actor, Address, Handler, Mailbox};
const CODEC: LazyLock<RTCRtpCodecParameters> = LazyLock::new(|| RTCRtpCodecParameters {
capability: RTCRtpCodecCapability {
mime_type: MIME_TYPE_OPUS.to_owned(),
clock_rate: 48000,
channels: 2,
sdp_fmtp_line: "".to_owned(),
rtcp_feedback: vec![],
},
payload_type: 111,
..Default::default()
});
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_filter(FilterFn::new(|m| {
m.module_path().map(|path| !path.contains("webrtc_ice") && !path.contains("webrtc_mdns")).unwrap_or_default()
// m.module_path().map(|path| path.contains("proximity_client")).unwrap_or_default()
// true
})))
.init();
let mut m = MediaEngine::default();
m.register_codec(CODEC.clone(), RTPCodecType::Audio)?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = Arc::new(
APIBuilder::new()
.with_media_engine(m)
// .with_setting_engine({
// let mut engine = SettingEngine::default();
// // engine.enable_sender_rtx(is_enabled);
// engine
// })
.with_interceptor_registry(registry)
.build(),
);
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
let sender = Sender::spawn(api.clone(), config.clone());
Receiver::run(api, config, sender).instrument(info_span!("receiver")).await;
Ok(())
}
#[derive(Actor)]
struct Sender {
peer_connection: Arc<RTCPeerConnection>,
receiver: OnceCell<Address<Receiver>>,
audio_track: Arc<TrackLocalStaticSample>,
}
#[derive(Actor)]
struct Receiver {
peer_connection: Arc<RTCPeerConnection>,
sender: Address<Sender>,
}
impl Sender {
pub fn spawn(api: Arc<API>, config: RTCConfiguration) -> Address<Sender> {
let (address, mailbox) = Mailbox::unbounded();
let span = info_span!("sender");
tokio::spawn(
async move {
let address = mailbox.address();
let peer_connection = Arc::new(api.new_peer_connection(config).await.expect("rraaahaa"));
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change({
let address = address.clone();
Box::new(move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Connected {
let address = address.clone();
tokio::spawn(async move {
address.send(Connected).await.unwrap();
});
}
Box::pin(async {})
})
});
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
info!("Peer Connection has gone to failed exiting");
}
Box::pin(async {})
}));
peer_connection.on_ice_candidate({
let address = address.clone();
let span = Span::current();
Box::new(move |candidate: Option<RTCIceCandidate>| {
let sender = address.clone();
let span = span.clone();
Box::pin(
async move {
if let Some(candidate) = candidate {
let candidate = candidate.to_json().unwrap();
warn!("candidate: {:?}", candidate);
sender.send((IceNotify, candidate)).await.unwrap();
} else {
warn!("candidates drained");
}
}
.instrument(span),
)
})
});
peer_connection.on_track(Box::new(|track, _, _| {
warn!("aaaaaa");
panic!()
}));
let audio_track = Arc::new(TrackLocalStaticSample::new(
CODEC.capability.clone(),
"audio".to_owned(),
"webrtc-rs".to_owned(),
));
let rtp_sender = peer_connection
.add_transceiver_from_track(
Arc::clone(&audio_track) as Arc<dyn TrackLocal + Send + Sync>,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Sendonly,
send_encodings: Vec::new(),
}),
)
.await
.expect("oops");
let rtp_sender = rtp_sender.sender().await;
// Read incoming RTCP packets
// Before these packets are returned they are processed by interceptors. For things
// like NACK this needs to be called.
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
error!("rtcp...");
});
xtra::run(
mailbox,
Sender {
peer_connection,
receiver: OnceCell::const_new(),
audio_track,
},
)
.await;
info!("exited");
}
.instrument(span),
);
address
}
}
impl Receiver {
pub async fn run(api: Arc<API>, config: RTCConfiguration, sender: Address<Sender>) {
let (address, mailbox) = Mailbox::unbounded();
let peer_connection = Arc::new(api.new_peer_connection(config).await.expect("rraaahaa"));
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change({
let address = address.clone();
Box::new(move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
let address = address.clone();
Box::pin(async move {
if connection_state == RTCIceConnectionState::Connected {
let _ = address.send(Connected).await.unwrap();
}
})
})
});
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
error!("Peer Connection has gone to failed exiting");
}
Box::pin(async {})
}));
peer_connection.on_track(Box::new(move |track, track_receiver, transceiver| {
info!("holy track");
Box::pin(async move {
let mut file = File::create("out.opus").unwrap();
let mut samplero = OggWriter::new(&mut file, 48000, 2).unwrap();
while let Ok((packet, _)) = track.read_rtp().await {
samplero.write_rtp(&packet).unwrap();
}
println!("done");
std::process::exit(1);
})
}));
peer_connection.on_ice_candidate({
let sender = sender.clone();
let span = Span::current();
Box::new(move |candidate| {
let sender = sender.clone();
let span = span.clone();
Box::pin(
async move {
if let Some(candidate) = candidate {
let candidate = candidate.to_json().unwrap();
// warn!("candidate: {:?}", candidate);
sender.send(candidate).await.unwrap();
} else {
// warn!("candidates drained");
}
}
.instrument(span),
)
})
});
let transceiver = peer_connection
.add_transceiver_from_kind(
RTPCodecType::Audio,
Some(RTCRtpTransceiverInit {
direction: RTCRtpTransceiverDirection::Recvonly,
send_encodings: vec![],
}),
)
.await
.unwrap();
let rtp_sender = transceiver.sender().await;
tokio::spawn(async move {
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((packet, _)) = rtp_sender.read(&mut rtcp_buf).await {}
error!("rtcp...");
});
let offer = sender.send(address).await.unwrap();
error!("offer {offer:?}");
peer_connection.set_remote_description(offer).await.unwrap();
let answer = peer_connection.create_answer(None).await.unwrap();
peer_connection.set_local_description(answer.clone()).await.unwrap();
sender.send(answer).await.unwrap();
xtra::run(
mailbox,
Receiver {
peer_connection,
sender,
},
)
.await;
}
}
impl Handler<Address<Receiver>> for Sender {
type Return = RTCSessionDescription;
async fn handle(&mut self, address: Address<Receiver>, ctx: &mut xtra::Context<Self>) -> Self::Return {
self.receiver.set(address.clone()).unwrap();
self.peer_connection.on_ice_candidate(Box::new(move |candidate| {
let address = address.clone();
Box::pin(async move {
if let Some(candidate) = candidate {
address.send(candidate.to_json().unwrap()).await.unwrap();
}
})
}));
let offer = self.peer_connection.create_offer(None).await.unwrap();
self.peer_connection.set_local_description(offer.clone()).await.unwrap();
offer
}
}
impl Handler<RTCSessionDescription> for Sender {
type Return = ();
async fn handle(&mut self, message: RTCSessionDescription, ctx: &mut xtra::Context<Self>) -> Self::Return {
self.peer_connection.set_remote_description(message).await.unwrap();
}
}
struct IceNotify;
impl Handler<(IceNotify, RTCIceCandidateInit)> for Sender {
type Return = ();
async fn handle(
&mut self,
(_, message): (IceNotify, RTCIceCandidateInit),
_: &mut xtra::Context<Self>,
) -> Self::Return {
self.receiver.get().unwrap().send(message).await.unwrap()
}
}
impl Handler<RTCIceCandidateInit> for Sender {
type Return = ();
async fn handle(&mut self, message: RTCIceCandidateInit, _: &mut xtra::Context<Self>) -> Self::Return {
// trace!("adding ice candidate {message:?}");
self.peer_connection.add_ice_candidate(message).await.unwrap();
}
}
impl Handler<RTCIceCandidateInit> for Receiver {
type Return = ();
async fn handle(&mut self, message: RTCIceCandidateInit, _: &mut xtra::Context<Self>) -> Self::Return {
// trace!("adding ice candidate {message:?}");
self.peer_connection.add_ice_candidate(message).await.unwrap();
}
}
pub struct Connected;
impl Handler<Connected> for Sender {
type Return = ();
async fn handle(&mut self, _: Connected, ctx: &mut xtra::Context<Self>) -> Self::Return {
info!("connected");
let address = ctx.mailbox().address();
tokio::time::sleep(Duration::from_secs(1)).await;
let file = read(args().nth(1).unwrap()).unwrap();
let (mut ogg, _) = OggReader::new(Cursor::new(file), true).unwrap();
const OGG_PAGE_DURATION: Duration = Duration::from_millis(20);
let mut ticker = tokio::time::interval(OGG_PAGE_DURATION);
let audio_track = self.audio_track.clone();
tokio::spawn(
async move {
// Keep track of last granule, the difference is the amount of samples in the buffer
let mut last_granule: u64 = 0;
while let Ok((page_data, page_header)) = ogg.parse_next_page() {
// The amount of samples is the difference between the last and current timestamp
let sample_count = page_header.granule_position - last_granule;
last_granule = page_header.granule_position;
let sample_duration = Duration::from_millis(sample_count * 1000 / 48000);
info!("writing sample");
audio_track
.write_sample(&Sample {
data: page_data.freeze(),
duration: sample_duration,
..Default::default()
})
.await
.unwrap();
let _ = ticker.tick().await;
}
info!("done streaming audio");
address.send(Stop).await.unwrap();
}
.in_current_span(),
);
}
}
struct Stop;
impl Handler<Stop> for Sender {
type Return = ();
async fn handle(&mut self, _: Stop, ctx: &mut xtra::Context<Self>) -> Self::Return {
self.peer_connection.close().await.unwrap();
ctx.stop_self();
}
}
impl Handler<Connected> for Receiver {
type Return = ();
async fn handle(&mut self, _: Connected, _: &mut xtra::Context<Self>) -> Self::Return {
info!("connected!");
}
}

View file

@ -5,7 +5,6 @@ use std::{
}; };
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use tracing::warn;
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
#[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)] #[derive(Clone, Copy, Debug, PartialEq, Eq, FromBytes, IntoBytes, KnownLayout, Immutable, Unaligned)]
@ -61,7 +60,7 @@ impl<const N: usize> TryFrom<&str> for String<N> {
fn try_from(value: &str) -> Result<Self, Self::Error> { fn try_from(value: &str) -> Result<Self, Self::Error> {
let mut buf = [0; N]; let mut buf = [0; N];
if value.len() > N { if value.len() > N {
bail!("seggs") bail!("string large than {N}")
} }
value.write_to_prefix(&mut buf).unwrap(); value.write_to_prefix(&mut buf).unwrap();

View file

@ -1,20 +1,15 @@
use core::str;
use std::sync::Arc; use std::sync::Arc;
use packet::{ use packet::{
Event, Event,
Event_variants::{Answer, Candidate, Offer}, Event_variants::{Answer, Candidate, Offer},
Packet, HYP_UUID_SIZE,
}; };
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::AsyncWriteExt;
use tracing::{error, info, info_span, trace, warn, Instrument}; use tracing::{error, info, info_span, trace, warn, Instrument};
use uuid::Uuid; use wtransport::endpoint::IncomingSession;
use wtransport::{endpoint::IncomingSession, RecvStream};
use xtra::{Actor, Address, Handler, Mailbox}; use xtra::{Actor, Address, Handler, Mailbox};
use zerocopy::{FromZeros, IntoBytes}; use zerocopy::{FromZeros, IntoBytes};
use crate::protocol::String;
use super::{ use super::{
listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState, UuidString, listeners, ChangedStage, Manager, PlayerConnected, PlayerDisconnected, PlayerMoved, RequestState, UuidString,
}; };
@ -225,7 +220,7 @@ impl Handler<PeerConnectionChanged> for ProximityPlayer {
let event = packet::Packet { let event = packet::Packet {
kind: packet::Kind::PeerConnectionChanged, kind: packet::Kind::PeerConnectionChanged,
data: packet::PeerConnectionChanged { data: packet::PeerConnectionChanged {
id: self.id, id: message.id,
connected: message.connected, connected: message.connected,
}, },
}; };
@ -312,7 +307,7 @@ impl Handler<ChangedStage> for ProximityPlayer {
} }
pub mod packet { pub mod packet {
use std::{str, string::String as StdString}; use std::string::String as StdString;
use anyhow::bail; use anyhow::bail;
use newtype_enum::newtype_enum; use newtype_enum::newtype_enum;

Binary file not shown.