Software-only show-control readiness baseline

This commit is contained in:
jan
2026-04-17 21:17:23 +02:00
commit a56cecb23d
51 changed files with 16340 additions and 0 deletions

View File

@@ -0,0 +1,467 @@
use crate::dto::{
ApiCatalogResponse, ApiCommandRequest, ApiCommandResponse, ApiErrorResponse,
ApiGroupListResponse, ApiPresetListResponse, ApiPreviewResponse, ApiSnapshotResponse,
ApiStateResponse, ApiStateSnapshot, ApiStreamEnvelope, ApiStreamMessage, API_VERSION,
};
use crate::websocket::{websocket_accept_value, write_text_frame};
use infinity_host::HostApiPort;
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
pub struct HostApiServer {
local_addr: SocketAddr,
shutdown: Arc<AtomicBool>,
accept_thread: Option<JoinHandle<()>>,
}
#[derive(Debug)]
struct ApiRequestError {
status: u16,
code: String,
message: String,
}
impl HostApiServer {
pub fn bind(bind: &str, service: Arc<dyn HostApiPort>) -> io::Result<Self> {
let listener = TcpListener::bind(bind)?;
listener.set_nonblocking(true)?;
let local_addr = listener.local_addr()?;
let shutdown = Arc::new(AtomicBool::new(false));
let thread_shutdown = Arc::clone(&shutdown);
let accept_thread = thread::spawn(move || accept_loop(listener, service, thread_shutdown));
Ok(Self {
local_addr,
shutdown,
accept_thread: Some(accept_thread),
})
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn shutdown(mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.accept_thread.take() {
let _ = handle.join();
}
}
}
impl Drop for HostApiServer {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.accept_thread.take() {
let _ = handle.join();
}
}
}
fn accept_loop(listener: TcpListener, service: Arc<dyn HostApiPort>, shutdown: Arc<AtomicBool>) {
while !shutdown.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, _)) => {
let service = Arc::clone(&service);
thread::spawn(move || {
let _ = handle_connection(stream, service);
});
}
Err(error) if error.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(25));
}
Err(_) => break,
}
}
}
fn handle_connection(mut stream: TcpStream, service: Arc<dyn HostApiPort>) -> io::Result<()> {
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
let request = read_request(&mut stream)?;
if request.path == "/api/v1/stream" && request.is_websocket() {
return handle_websocket(stream, request, service);
}
match (request.method.as_str(), request.path.as_str()) {
("GET", "/api/v1/snapshot") => {
let snapshot = service.snapshot();
respond_json(
&mut stream,
200,
&ApiSnapshotResponse::from_snapshot(&snapshot),
)
}
("GET", "/api/v1/state") => {
let snapshot = service.snapshot();
respond_json(
&mut stream,
200,
&ApiStateResponse::from_snapshot(&snapshot),
)
}
("GET", "/api/v1/preview") => {
let snapshot = service.snapshot();
respond_json(
&mut stream,
200,
&ApiPreviewResponse::from_snapshot(&snapshot),
)
}
("GET", "/api/v1/catalog") => {
let snapshot = service.snapshot();
respond_json(
&mut stream,
200,
&ApiCatalogResponse::from_snapshot(&snapshot),
)
}
("GET", "/api/v1/presets") => {
let snapshot = service.snapshot();
respond_json(
&mut stream,
200,
&ApiPresetListResponse::from_snapshot(&snapshot),
)
}
("GET", "/api/v1/groups") => {
let snapshot = service.snapshot();
respond_json(
&mut stream,
200,
&ApiGroupListResponse::from_snapshot(&snapshot),
)
}
("POST", "/api/v1/command") => match handle_command_post(&mut stream, request, service) {
Ok(()) => Ok(()),
Err(error) => respond_error(&mut stream, error.status, error.code, error.message),
},
("GET", "/") => respond_text(
&mut stream,
200,
"text/html; charset=utf-8",
include_str!("../../../web/v1/index.html"),
),
("GET", "/index.html") => respond_text(
&mut stream,
200,
"text/html; charset=utf-8",
include_str!("../../../web/v1/index.html"),
),
("GET", "/app.js") => respond_text(
&mut stream,
200,
"application/javascript; charset=utf-8",
include_str!("../../../web/v1/app.js"),
),
("GET", "/styles.css") => respond_text(
&mut stream,
200,
"text/css; charset=utf-8",
include_str!("../../../web/v1/styles.css"),
),
_ => respond_text(
&mut stream,
404,
"application/json; charset=utf-8",
&serde_json::to_string_pretty(&ApiErrorResponse::new(
"not_found",
format!(
"no route registered for {} {}",
request.method, request.path
),
))
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?,
),
}
}
fn handle_command_post(
stream: &mut TcpStream,
request: HttpRequest,
service: Arc<dyn HostApiPort>,
) -> Result<(), ApiRequestError> {
let parsed = serde_json::from_slice::<ApiCommandRequest>(&request.body).map_err(|error| {
ApiRequestError {
status: 400,
code: "invalid_request_json".to_string(),
message: format!("command request body could not be parsed: {error}"),
}
})?;
let request_id = parsed.request_id.clone();
let command_type = parsed.command.kind_label().to_string();
let command = parsed
.into_host_command()
.map_err(|error| ApiRequestError {
status: 400,
code: "invalid_command".to_string(),
message: error,
})?;
let outcome = service
.send_command(command)
.map_err(|error| ApiRequestError {
status: 400,
code: error.code,
message: error.message,
})?;
respond_json(
stream,
200,
&ApiCommandResponse {
api_version: API_VERSION,
accepted: true,
request_id,
generated_at_millis: outcome.generated_at_millis,
command_type,
summary: outcome.summary,
},
)
.map_err(|error| ApiRequestError {
status: 500,
code: "response_write_failed".to_string(),
message: error.to_string(),
})
}
fn handle_websocket(
mut stream: TcpStream,
request: HttpRequest,
service: Arc<dyn HostApiPort>,
) -> io::Result<()> {
let Some(key) = request.header("sec-websocket-key") else {
return respond_error(
&mut stream,
400,
"missing_websocket_key",
"websocket upgrade requires sec-websocket-key",
);
};
let accept = websocket_accept_value(key);
let response = format!(
"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {accept}\r\n\r\n"
);
stream.write_all(response.as_bytes())?;
let mut sequence = 1u64;
let mut last_event_millis = None::<u64>;
let mut last_event_signatures = Vec::<(Option<String>, String)>::new();
loop {
let snapshot = service.snapshot();
send_stream_message(
&mut stream,
sequence,
snapshot.generated_at_millis,
ApiStreamMessage::Snapshot(ApiStateSnapshot::from_snapshot(&snapshot)),
)?;
sequence += 1;
send_stream_message(
&mut stream,
sequence,
snapshot.generated_at_millis,
ApiStreamMessage::Preview(crate::dto::ApiPreviewSnapshot::from_snapshot(&snapshot)),
)?;
sequence += 1;
let mut new_events = snapshot
.recent_events
.iter()
.filter(|event| match last_event_millis {
None => true,
Some(last_millis) if event.at_millis > last_millis => true,
Some(last_millis) if event.at_millis == last_millis => !last_event_signatures
.iter()
.any(|signature| signature.0 == event.code && signature.1 == event.message),
Some(_) => false,
})
.cloned()
.collect::<Vec<_>>();
new_events.sort_by_key(|event| event.at_millis);
for event in new_events {
let event_millis = event.at_millis;
let current_signature = (event.code.clone(), event.message.clone());
send_stream_message(
&mut stream,
sequence,
event_millis,
ApiStreamMessage::Event(event.into()),
)?;
sequence += 1;
match last_event_millis {
Some(last_millis) if last_millis == event_millis => {
last_event_signatures.push(current_signature);
}
_ => {
last_event_millis = Some(event_millis);
last_event_signatures = vec![current_signature];
}
}
}
thread::sleep(Duration::from_millis(250));
}
}
fn send_stream_message(
stream: &mut TcpStream,
sequence: u64,
generated_at_millis: u64,
message: ApiStreamMessage,
) -> io::Result<()> {
let payload = serde_json::to_string(&ApiStreamEnvelope {
api_version: API_VERSION,
sequence,
generated_at_millis,
message,
})
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
write_text_frame(stream, &payload)
}
fn respond_json<T: serde::Serialize>(
stream: &mut TcpStream,
status: u16,
body: &T,
) -> io::Result<()> {
let payload = serde_json::to_string_pretty(body)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
respond_text(stream, status, "application/json; charset=utf-8", &payload)
}
fn respond_error(
stream: &mut TcpStream,
status: u16,
code: impl Into<String>,
message: impl Into<String>,
) -> io::Result<()> {
respond_json(stream, status, &ApiErrorResponse::new(code, message))
}
fn respond_text(
stream: &mut TcpStream,
status: u16,
content_type: &str,
body: &str,
) -> io::Result<()> {
let reason = match status {
200 => "OK",
400 => "Bad Request",
404 => "Not Found",
500 => "Internal Server Error",
_ => "OK",
};
let response = format!(
"HTTP/1.1 {status} {reason}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.as_bytes().len(),
body
);
stream.write_all(response.as_bytes())
}
#[derive(Debug)]
struct HttpRequest {
method: String,
path: String,
headers: HashMap<String, String>,
body: Vec<u8>,
}
impl HttpRequest {
fn header(&self, key: &str) -> Option<&str> {
self.headers
.get(&key.to_ascii_lowercase())
.map(|value| value.as_str())
}
fn is_websocket(&self) -> bool {
self.header("upgrade")
.map(|value| value.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
}
}
fn read_request(stream: &mut TcpStream) -> io::Result<HttpRequest> {
let mut buffer = Vec::new();
let mut temp = [0u8; 4096];
let mut header_end = None;
let mut expected_len = None;
loop {
let read = stream.read(&mut temp)?;
if read == 0 {
break;
}
buffer.extend_from_slice(&temp[..read]);
if header_end.is_none() {
header_end = find_header_end(&buffer);
if let Some(end) = header_end {
let header_text = String::from_utf8_lossy(&buffer[..end]);
expected_len = parse_content_length(&header_text);
if expected_len == Some(0) || expected_len.is_none() {
break;
}
}
}
if let (Some(end), Some(content_len)) = (header_end, expected_len) {
if buffer.len() >= end + 4 + content_len {
break;
}
}
}
let header_end = header_end
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing header end"))?;
let header_text = String::from_utf8_lossy(&buffer[..header_end]);
let mut lines = header_text.lines();
let request_line = lines
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing request line"))?;
let mut request_parts = request_line.split_whitespace();
let method = request_parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing method"))?
.to_string();
let path = request_parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing path"))?
.split('?')
.next()
.unwrap_or("/")
.to_string();
let mut headers = HashMap::new();
for line in lines {
if let Some((key, value)) = line.split_once(':') {
headers.insert(key.trim().to_ascii_lowercase(), value.trim().to_string());
}
}
let body_start = header_end + 4;
let body = buffer.get(body_start..).unwrap_or_default().to_vec();
Ok(HttpRequest {
method,
path,
headers,
body,
})
}
fn parse_content_length(header_text: &str) -> Option<usize> {
header_text.lines().find_map(|line| {
line.split_once(':').and_then(|(key, value)| {
if key.trim().eq_ignore_ascii_case("content-length") {
value.trim().parse::<usize>().ok()
} else {
None
}
})
})
}
fn find_header_end(buffer: &[u8]) -> Option<usize> {
buffer.windows(4).position(|window| window == b"\r\n\r\n")
}