Skip to content

Commit

Permalink
Video daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
griffobeid committed Dec 13, 2023
1 parent e28c516 commit 568f002
Show file tree
Hide file tree
Showing 40 changed files with 3,850 additions and 3,128 deletions.
4,117 changes: 1,089 additions & 3,028 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
[workspace]

resolver = "2"
members = [
"actix-api",
"bot",
"src-tauri",
"types",
"videocall-client",
"video-daemon",
]
exclude = [
"src-tauri",
"yew-ui"
]
2 changes: 1 addition & 1 deletion Dockerfile.actix
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM securityunion/rustlemania-api-base:push-images-to-dockerhub-304d6434 as build
FROM securityunion/rustlemania-api-base:1.72-slim as build

ENV CARGO_TARGET_DIR=/app/actix-api/target
COPY . /app
Expand Down
38 changes: 38 additions & 0 deletions Dockerfile.video-daemon
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
FROM debian:bookworm-slim as build

# Install rust
RUN apt-get update && apt-get install -y curl
RUN curl https://sh.rustup.rs -sSf | sh -s -- -y
ENV PATH="/root/.cargo/bin:${PATH}"

# Install dependencies
RUN apt-get install -y libssl-dev pkg-config libvpx-dev build-essential libglib2.0-dev libgtk-3-dev libsoup2.4 libjavascriptcoregtk-4.0-dev libclang-dev clang libwebkit2gtk-4.0-dev

# Copy source code
WORKDIR /app
COPY . .

ENV LOGIN_URL ""
ENV ACTIX_UI_BACKEND_URL ""
ENV WEBTRANSPORT_HOST ""

# Build
RUN rustup default nightly-2023-10-05
RUN cargo build --release

FROM debian:bookworm-slim

ENV RUST_LOG=info
ENV RUST_BACKTRACE=0
ENV QUIC_HOST=https://transport.rustlemania.com
ENV MEETING_ID=1234
ENV USER_ID=1234

RUN apt-get update && \
apt-get install -y pkg-config libvpx-dev && \
apt-get clean

COPY --from=build /app/target/release/video-daemon /usr/bin/

CMD ["video-daemon"]

2 changes: 1 addition & 1 deletion Dockerfile.yew
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM securityunion/yew:fix-image-no-cigar-c4770e96 as build
FROM securityunion/yew:1.72-slim as build

# TODO - this is a hack to get around the fact that the yew-ui crate is not bundled with the backend
ENV ENABLE_OAUTH=false
Expand Down
2 changes: 2 additions & 0 deletions actix-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jsonwebtoken= "8.1.1"
oauth2 = { version = "4" }
octets = "0.2.0"
quinn = { version = "0.10.1", features = ["runtime-tokio", "tls-rustls", "ring"] }
protobuf = "3.3.0"
r2d2 = "0.8.10"
r2d2_postgres = "0.18.1"
rand = "0.8.5"
Expand All @@ -50,5 +51,6 @@ tokio = { version = "1.28.2", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["fmt", "ansi", "env-filter", "time", "tracing-log"] }
tracing-tree = "0.2.3"
types = { path= "../types"}
urlencoding = "2.1.3"
uuid = { version = "0.8", features = ["serde", "v4"] }
40 changes: 23 additions & 17 deletions actix-api/src/bin/websocket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,29 @@ async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
let cors = Cors::permissive();

let pool = get_pool();

App::new()
.app_data(web::Data::new(pool))
.app_data(web::Data::new(AppState { chat: chat.clone() }))
.app_data(web::Data::new(AppConfig {
oauth_client_id: oauth_client_id.clone(),
oauth_auth_url: oauth_auth_url.clone(),
oauth_token_url: oauth_token_url.clone(),
oauth_secret: oauth_secret.clone(),
oauth_redirect_url: oauth_redirect_url.clone(),
after_login_url: after_login_url.clone(),
}))
.wrap(cors)
.service(handle_google_oauth_callback)
.service(login)
.service(ws_connect)
if oauth_client_id.is_empty() {
App::new()
.wrap(cors)
.app_data(web::Data::new(AppState { chat: chat.clone() }))
.service(ws_connect)
} else {
let pool = get_pool();
App::new()
.app_data(web::Data::new(pool))
.app_data(web::Data::new(AppState { chat: chat.clone() }))
.app_data(web::Data::new(AppConfig {
oauth_client_id: oauth_client_id.clone(),
oauth_auth_url: oauth_auth_url.clone(),
oauth_token_url: oauth_token_url.clone(),
oauth_secret: oauth_secret.clone(),
oauth_redirect_url: oauth_redirect_url.clone(),
after_login_url: after_login_url.clone(),
}))
.wrap(cors)
.service(handle_google_oauth_callback)
.service(login)
.service(ws_connect)
}
})
.bind((
"0.0.0.0",
Expand Down
219 changes: 191 additions & 28 deletions actix-api/src/webtransport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures::StreamExt;
use http::Method;
use protobuf::Message;
use quinn::crypto::rustls::HandshakeData;
use quinn::VarInt;
use rustls::{Certificate, PrivateKey};
use sec_http3::error::Code;
Expand All @@ -16,8 +18,15 @@ use sec_http3::{
use std::sync::atomic::{AtomicBool, Ordering};
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::RwLock;
use tokio::sync::{oneshot, watch, RwLock};
use tracing::{error, info, trace_span};
use types::protos::connection_packet::ConnectionPacket;
use types::protos::packet_wrapper::packet_wrapper::PacketType;
use types::protos::packet_wrapper::PacketWrapper;

pub const WEB_TRANSPORT_ALPN: &[&[u8]] = &[b"h3", b"h3-32", b"h3-31", b"h3-30", b"h3-29"];

pub const QUIC_ALPN: &[u8] = b"hq-29";

#[derive(Debug)]
pub struct WebTransportOpt {
Expand Down Expand Up @@ -81,13 +90,12 @@ pub async fn start(opt: WebTransportOpt) -> Result<(), Box<dyn std::error::Error
.with_single_cert(certs, key)?;

tls_config.max_early_data_size = u32::MAX;
let alpn: Vec<Vec<u8>> = vec![
b"h3".to_vec(),
b"h3-32".to_vec(),
b"h3-31".to_vec(),
b"h3-30".to_vec(),
b"h3-29".to_vec(),
];
let mut alpn = vec![];
for proto in WEB_TRANSPORT_ALPN {
alpn.push(proto.to_vec());
}
alpn.push(QUIC_ALPN.to_vec());

tls_config.alpn_protocols = alpn;

// 1. create quinn server endpoint and bind UDP socket
Expand All @@ -109,27 +117,41 @@ pub async fn start(opt: WebTransportOpt) -> Result<(), Box<dyn std::error::Error
while let Some(new_conn) = endpoint.accept().await {
trace_span!("New connection being attempted");
let nc = nc.clone();

tokio::spawn(async move {
match new_conn.await {
Ok(conn) => {
info!("new http3 established");
let h3_conn = sec_http3::server::builder()
.enable_webtransport(true)
.enable_connect(true)
.enable_datagram(true)
.max_webtransport_sessions(1)
.send_grease(true)
.build(h3_quinn::Connection::new(conn))
.await
.unwrap();

// info!("Establishing WebTransport session");
// // 3. TODO: Conditionally, if the client indicated that this is a webtransport session, we should accept it here, else use regular h3.
// // if this is a webtransport session, then h3 needs to stop handing the datagrams, bidirectional streams, and unidirectional streams and give them
// // to the webtransport session.
let nc = nc.clone();
if let Err(err) = handle_connection(h3_conn, nc).await {
error!("Failed to handle connection: {err:?}");
let mut http3 = false;
if let Some(data) = conn.handshake_data() {
if let Some(d) = data.downcast_ref::<HandshakeData>() {
if let Some(alpn) = &d.protocol {
if WEB_TRANSPORT_ALPN.contains(&alpn.as_slice()) {
http3 = true;
}
}
}
};
if http3 {
info!("new http3 established");
let h3_conn = sec_http3::server::builder()
.enable_webtransport(true)
.enable_connect(true)
.enable_datagram(true)
.max_webtransport_sessions(1)
.send_grease(true)
.build(h3_quinn::Connection::new(conn))
.await
.unwrap();
let nc = nc.clone();
if let Err(err) = handle_h3_connection(h3_conn, nc).await {
error!("Failed to handle connection: {err:?}");
}
} else {
info!("new quic established");
let nc = nc.clone();
if let Err(err) = handle_quic_connection(conn, nc).await {
error!("Failed to handle connection: {err:?}");
}
}
}
Err(err) => {
Expand All @@ -146,7 +168,7 @@ pub async fn start(opt: WebTransportOpt) -> Result<(), Box<dyn std::error::Error
Ok(())
}

async fn handle_connection(
async fn handle_h3_connection(
mut conn: Connection<h3_quinn::Connection, Bytes>,
nc: async_nats::client::Client,
) -> Result<()> {
Expand Down Expand Up @@ -257,7 +279,6 @@ where
let session_id = session.session_id();
let session = Arc::new(RwLock::new(session));
let should_run = Arc::new(AtomicBool::new(true));
info!("Connected to NATS");

let subject = format!("room.{}.*", lobby_id).replace(' ', "_");
let specific_subject = format!("room.{}.{}", lobby_id, username).replace(' ', "_");
Expand Down Expand Up @@ -354,3 +375,145 @@ where
info!("Finished handling session");
Ok(())
}

async fn handle_quic_connection(
conn: quinn::Connection,
nc: async_nats::client::Client,
) -> Result<()> {
let session_id = conn.stable_id();
let session = Arc::new(RwLock::new(conn));
let should_run = Arc::new(AtomicBool::new(true));
// let lobby_id = 666;

struct MeetingInfo {
lobby_id: String,
session_id: String,
}
let meeting_info = oneshot::channel::<MeetingInfo>();
let (specific_subject_tx, mut specific_subject_rx) = watch::channel::<Option<String>>(None);

let nats_task = {
let session = session.clone();
let should_run = should_run.clone();
let nc_clone = nc.clone();
let specific_subject_rx_clone = specific_subject_rx.clone();
tokio::spawn(async move {
let mut specific_subject_rx = specific_subject_rx_clone;
let nc = nc_clone;
specific_subject_rx.changed().await.unwrap();
let specific_subject = specific_subject_rx.borrow().clone().unwrap();
let subject = session_subject_to_lobby_subject(&specific_subject);
let mut sub = match nc
.queue_subscribe(subject.clone(), specific_subject.clone())
.await
{
Ok(sub) => {
info!("Subscribed to subject {}", subject);
sub
}
Err(e) => {
let err = format!("error subscribing to subject {}: {}", subject, e);
error!("{}", err);
return;
}
};
while let Some(msg) = sub.next().await {
if !should_run.load(Ordering::SeqCst) {
break;
}
if Some(msg.subject) == specific_subject_rx.borrow().clone() {
continue;
}
let session = session.read().await;
if msg.payload.len() > 400 {
let stream = session.open_uni().await;
tokio::spawn(async move {
match stream {
Ok(mut uni_stream) => {
if let Err(e) = uni_stream.write_all(&msg.payload).await {
error!("Error writing to unidirectional stream: {}", e);
}
}
Err(e) => {
error!("Error opening unidirectional stream: {}", e);
}
}
});
} else if let Err(e) = session.send_datagram(msg.payload) {
error!("Error sending datagram: {}", e);
}
}
})
};

let quic_task = {
let specific_subject_rx_clone = specific_subject_rx.clone();
let session = session.clone();
let nc = nc.clone();
tokio::spawn(async move {
let session = session.read().await;
let specific_subject_tx = Arc::new(specific_subject_tx);
while let Ok(mut uni_stream) = session.accept_uni().await {
let nc = nc.clone();
let specific_subject_tx_clone = specific_subject_tx.clone();
let specific_subject_rx = specific_subject_rx_clone.clone();
tokio::spawn(async move {
if let Ok(d) = uni_stream.read_to_end(32000).await {
if specific_subject_rx.borrow().is_none() {
if let Ok(packet_wrapper) = PacketWrapper::parse_from_bytes(&d) {
if packet_wrapper.packet_type == PacketType::CONNECTION.into() {
let connection_packet =
ConnectionPacket::parse_from_bytes(&packet_wrapper.data)
.unwrap();
let specific_subject = format!(
"room.{}.{}",
connection_packet.meeting_id, packet_wrapper.email
)
.replace(' ', "_");
info!("Specific subject: {}", specific_subject);
specific_subject_tx_clone.send(Some(specific_subject.clone()));
}
}
} else {
let specific_subject = specific_subject_rx.borrow().clone().unwrap();
if let Err(e) = nc.publish(specific_subject.clone(), d.into()).await {
error!("Error publishing to subject {}: {}", &specific_subject, e);
}
}
} else {
error!("Error reading from unidirectional stream");
};
});
}
})
};

let _datagrams_task = {
tokio::spawn(async move {
let session = session.read().await;
if specific_subject_rx.borrow().is_none() {
specific_subject_rx.changed().await.unwrap();
}
let specific_subject = specific_subject_rx.borrow().clone().unwrap();
while let Ok(datagram) = session.read_datagram().await {
let nc = nc.clone();
if let Err(e) = nc.publish(specific_subject.clone(), datagram).await {
error!("Error publishing to subject {}: {}", specific_subject, e);
}
}
})
};
quic_task.await?;
should_run.store(false, Ordering::SeqCst);
nats_task.abort();
info!("Finished handling session");
Ok(())
}

fn session_subject_to_lobby_subject(subject: &str) -> String {
let parts = subject.split('.').collect::<Vec<&str>>();
let mut lobby_subject = String::from("room.");
lobby_subject.push_str(parts[1]);
lobby_subject.push_str(".*");
lobby_subject
}
Loading

0 comments on commit 568f002

Please sign in to comment.