Stabilize control surface and external bridge v1
This commit is contained in:
@@ -1,19 +1,21 @@
|
||||
use crate::dto::{
|
||||
ApiCatalogResponse, ApiCommandRequest, ApiCommandResponse, ApiErrorResponse,
|
||||
ApiCatalogResponse, ApiCommandRequest, ApiCommandResponse, ApiDiscoveredNodeType,
|
||||
ApiDiscoveryResult, ApiDiscoveryScanRequest, ApiDiscoveryScanResponse, ApiErrorResponse,
|
||||
ApiGroupListResponse, ApiPresetListResponse, ApiPreviewResponse, ApiSnapshotResponse,
|
||||
ApiStateResponse, ApiStateSnapshot, ApiStreamEnvelope, ApiStreamMessage, API_VERSION,
|
||||
};
|
||||
use crate::websocket::{websocket_accept_value, write_text_frame};
|
||||
use infinity_host::HostApiPort;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
mpsc, Arc, Mutex,
|
||||
};
|
||||
use std::thread::{self, JoinHandle};
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
pub struct HostApiServer {
|
||||
local_addr: SocketAddr,
|
||||
@@ -141,6 +143,12 @@ fn handle_connection(mut stream: TcpStream, service: Arc<dyn HostApiPort>) -> io
|
||||
Ok(()) => Ok(()),
|
||||
Err(error) => respond_error(&mut stream, error.status, error.code, error.message),
|
||||
},
|
||||
("POST", "/api/v1/discovery/scan") => {
|
||||
match handle_discovery_scan_post(&mut stream, request) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(error) => respond_error(&mut stream, error.status, error.code, error.message),
|
||||
}
|
||||
}
|
||||
("GET", "/") => respond_text(
|
||||
&mut stream,
|
||||
200,
|
||||
@@ -153,12 +161,30 @@ fn handle_connection(mut stream: TcpStream, service: Arc<dyn HostApiPort>) -> io
|
||||
"text/html; charset=utf-8",
|
||||
include_str!("../../../web/v1/index.html"),
|
||||
),
|
||||
("GET", "/technical") => respond_text(
|
||||
&mut stream,
|
||||
200,
|
||||
"text/html; charset=utf-8",
|
||||
include_str!("../../../web/v1/technical.html"),
|
||||
),
|
||||
("GET", "/technical.html") => respond_text(
|
||||
&mut stream,
|
||||
200,
|
||||
"text/html; charset=utf-8",
|
||||
include_str!("../../../web/v1/technical.html"),
|
||||
),
|
||||
("GET", "/app.js") => respond_text(
|
||||
&mut stream,
|
||||
200,
|
||||
"application/javascript; charset=utf-8",
|
||||
include_str!("../../../web/v1/app.js"),
|
||||
),
|
||||
("GET", "/technical.js") => respond_text(
|
||||
&mut stream,
|
||||
200,
|
||||
"application/javascript; charset=utf-8",
|
||||
include_str!("../../../web/v1/technical.js"),
|
||||
),
|
||||
("GET", "/styles.css") => respond_text(
|
||||
&mut stream,
|
||||
200,
|
||||
@@ -307,6 +333,275 @@ fn handle_websocket(
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_discovery_scan_post(
|
||||
stream: &mut TcpStream,
|
||||
request: HttpRequest,
|
||||
) -> Result<(), ApiRequestError> {
|
||||
let parsed =
|
||||
serde_json::from_slice::<ApiDiscoveryScanRequest>(&request.body).map_err(|error| {
|
||||
ApiRequestError {
|
||||
status: 400,
|
||||
code: "invalid_request_json".to_string(),
|
||||
message: format!("discovery request body could not be parsed: {error}"),
|
||||
}
|
||||
})?;
|
||||
let targets = parse_subnet_targets(&parsed.subnet).map_err(|message| ApiRequestError {
|
||||
status: 400,
|
||||
code: "invalid_subnet_cidr".to_string(),
|
||||
message,
|
||||
})?;
|
||||
|
||||
let started_at = Instant::now();
|
||||
let mut results = scan_subnet_targets(&targets);
|
||||
results.sort_by_key(|result| {
|
||||
result
|
||||
.ip
|
||||
.parse::<Ipv4Addr>()
|
||||
.map(u32::from)
|
||||
.unwrap_or_default()
|
||||
});
|
||||
let reachable_hosts = results.iter().filter(|result| result.reachable).count();
|
||||
|
||||
respond_json(
|
||||
stream,
|
||||
200,
|
||||
&ApiDiscoveryScanResponse {
|
||||
api_version: API_VERSION,
|
||||
subnet: parsed.subnet.trim().to_string(),
|
||||
scanned_hosts: targets.len(),
|
||||
reachable_hosts,
|
||||
results,
|
||||
},
|
||||
)
|
||||
.map_err(|error| ApiRequestError {
|
||||
status: 500,
|
||||
code: "response_write_failed".to_string(),
|
||||
message: format!(
|
||||
"discovery response could not be written after {} ms: {error}",
|
||||
started_at.elapsed().as_millis()
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_subnet_targets(raw_subnet: &str) -> Result<Vec<Ipv4Addr>, String> {
|
||||
const MAX_SCAN_HOSTS: u64 = 1024;
|
||||
|
||||
let subnet = raw_subnet.trim();
|
||||
let (address, prefix) = subnet
|
||||
.split_once('/')
|
||||
.ok_or_else(|| format!("subnet '{subnet}' must be in CIDR form, e.g. 192.168.40.0/24"))?;
|
||||
let ip = address
|
||||
.trim()
|
||||
.parse::<Ipv4Addr>()
|
||||
.map_err(|_| format!("subnet '{subnet}' contains an invalid IPv4 address"))?;
|
||||
let prefix = prefix
|
||||
.trim()
|
||||
.parse::<u8>()
|
||||
.map_err(|_| format!("subnet '{subnet}' contains an invalid CIDR prefix"))?;
|
||||
if prefix > 32 {
|
||||
return Err(format!(
|
||||
"subnet '{subnet}' has prefix {prefix}, expected 0..=32"
|
||||
));
|
||||
}
|
||||
|
||||
let host_span = 1u64 << (32u8.saturating_sub(prefix));
|
||||
if host_span > MAX_SCAN_HOSTS {
|
||||
return Err(format!(
|
||||
"subnet '{subnet}' spans {host_span} addresses, limit is {MAX_SCAN_HOSTS}"
|
||||
));
|
||||
}
|
||||
|
||||
let ip_u32 = u32::from(ip);
|
||||
let mask = if prefix == 0 {
|
||||
0
|
||||
} else {
|
||||
u32::MAX << (32 - u32::from(prefix))
|
||||
};
|
||||
let network = ip_u32 & mask;
|
||||
let broadcast = network | !mask;
|
||||
let (start, end) = if prefix >= 31 {
|
||||
(network, broadcast)
|
||||
} else {
|
||||
(network.saturating_add(1), broadcast.saturating_sub(1))
|
||||
};
|
||||
|
||||
if start > end {
|
||||
return Err(format!("subnet '{subnet}' has no scanable host addresses"));
|
||||
}
|
||||
|
||||
Ok((start..=end).map(Ipv4Addr::from).collect())
|
||||
}
|
||||
|
||||
fn scan_subnet_targets(targets: &[Ipv4Addr]) -> Vec<ApiDiscoveryResult> {
|
||||
if targets.is_empty() {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
let worker_count = usize::min(32, targets.len().max(1));
|
||||
let (job_sender, job_receiver) = mpsc::channel::<Ipv4Addr>();
|
||||
let job_receiver = Arc::new(Mutex::new(job_receiver));
|
||||
let (result_sender, result_receiver) = mpsc::channel::<ApiDiscoveryResult>();
|
||||
let mut handles = Vec::with_capacity(worker_count);
|
||||
|
||||
for _ in 0..worker_count {
|
||||
let receiver = Arc::clone(&job_receiver);
|
||||
let sender = result_sender.clone();
|
||||
handles.push(thread::spawn(move || loop {
|
||||
let next_job = {
|
||||
let guard = receiver.lock();
|
||||
match guard {
|
||||
Ok(receiver) => receiver.recv().ok(),
|
||||
Err(_) => None,
|
||||
}
|
||||
};
|
||||
let Some(ip) = next_job else {
|
||||
break;
|
||||
};
|
||||
let _ = sender.send(probe_ip(ip));
|
||||
}));
|
||||
}
|
||||
drop(result_sender);
|
||||
|
||||
for ip in targets {
|
||||
let _ = job_sender.send(*ip);
|
||||
}
|
||||
drop(job_sender);
|
||||
|
||||
let mut results = Vec::with_capacity(targets.len());
|
||||
for _ in 0..targets.len() {
|
||||
if let Ok(result) = result_receiver.recv() {
|
||||
results.push(result);
|
||||
}
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
let _ = handle.join();
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
fn probe_ip(ip: Ipv4Addr) -> ApiDiscoveryResult {
|
||||
let mut reachable = false;
|
||||
let mut detected_type = ApiDiscoveredNodeType::Unknown;
|
||||
let mut hostname = None;
|
||||
|
||||
if let Some(info_probe) = probe_http_endpoint(ip, 80, "/json/info") {
|
||||
reachable = true;
|
||||
detected_type = detect_node_type(&info_probe.body, detected_type);
|
||||
hostname = extract_probe_hostname(&info_probe);
|
||||
} else if can_connect(ip, 80) {
|
||||
reachable = true;
|
||||
}
|
||||
|
||||
if !reachable && can_connect(ip, 81) {
|
||||
reachable = true;
|
||||
}
|
||||
|
||||
if detected_type == ApiDiscoveredNodeType::Unknown {
|
||||
if let Some(node_probe) = probe_http_endpoint(ip, 80, "/api/v1/node/info") {
|
||||
reachable = true;
|
||||
detected_type = detect_node_type(&node_probe.body, detected_type);
|
||||
if hostname.is_none() {
|
||||
hostname = extract_probe_hostname(&node_probe);
|
||||
}
|
||||
} else if let Some(state_probe) = probe_http_endpoint(ip, 80, "/api/v1/state") {
|
||||
reachable = true;
|
||||
detected_type = detect_node_type(&state_probe.body, detected_type);
|
||||
if hostname.is_none() {
|
||||
hostname = extract_probe_hostname(&state_probe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ApiDiscoveryResult {
|
||||
ip: ip.to_string(),
|
||||
reachable,
|
||||
detected_type,
|
||||
hostname,
|
||||
}
|
||||
}
|
||||
|
||||
fn can_connect(ip: Ipv4Addr, port: u16) -> bool {
|
||||
let address = SocketAddr::new(IpAddr::V4(ip), port);
|
||||
TcpStream::connect_timeout(&address, Duration::from_millis(120)).is_ok()
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct HttpProbe {
|
||||
headers: HashMap<String, String>,
|
||||
body: String,
|
||||
}
|
||||
|
||||
fn probe_http_endpoint(ip: Ipv4Addr, port: u16, path: &str) -> Option<HttpProbe> {
|
||||
let address = SocketAddr::new(IpAddr::V4(ip), port);
|
||||
let mut stream = TcpStream::connect_timeout(&address, Duration::from_millis(120)).ok()?;
|
||||
let _ = stream.set_read_timeout(Some(Duration::from_millis(180)));
|
||||
let _ = stream.set_write_timeout(Some(Duration::from_millis(120)));
|
||||
let request = format!(
|
||||
"GET {path} HTTP/1.1\r\nHost: {ip}\r\nConnection: close\r\nAccept: application/json\r\n\r\n"
|
||||
);
|
||||
stream.write_all(request.as_bytes()).ok()?;
|
||||
|
||||
let mut raw = Vec::new();
|
||||
stream.read_to_end(&mut raw).ok()?;
|
||||
let header_end = find_header_end(&raw)?;
|
||||
let header_text = String::from_utf8_lossy(&raw[..header_end]);
|
||||
let headers = header_text
|
||||
.lines()
|
||||
.skip(1)
|
||||
.filter_map(|line| line.split_once(':'))
|
||||
.map(|(key, value)| (key.trim().to_ascii_lowercase(), value.trim().to_string()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let body = String::from_utf8_lossy(raw.get(header_end + 4..).unwrap_or_default()).to_string();
|
||||
Some(HttpProbe { headers, body })
|
||||
}
|
||||
|
||||
fn detect_node_type(body: &str, fallback: ApiDiscoveredNodeType) -> ApiDiscoveredNodeType {
|
||||
let lowered = body.to_ascii_lowercase();
|
||||
if lowered.contains("\"wled\"")
|
||||
|| lowered.contains("\"brand\":\"wled\"")
|
||||
|| lowered.contains("\"product\":\"wled\"")
|
||||
{
|
||||
return ApiDiscoveredNodeType::Wled;
|
||||
}
|
||||
if lowered.contains("\"native_node\"")
|
||||
|| lowered.contains("\"node_kind\":\"native\"")
|
||||
|| lowered.contains("\"infinity_node\"")
|
||||
{
|
||||
return ApiDiscoveredNodeType::NativeNode;
|
||||
}
|
||||
fallback
|
||||
}
|
||||
|
||||
fn extract_probe_hostname(probe: &HttpProbe) -> Option<String> {
|
||||
if let Ok(json) = serde_json::from_str::<Value>(&probe.body) {
|
||||
let name = json
|
||||
.get("name")
|
||||
.and_then(Value::as_str)
|
||||
.or_else(|| {
|
||||
json.get("info")
|
||||
.and_then(|value| value.get("name"))
|
||||
.and_then(Value::as_str)
|
||||
})
|
||||
.or_else(|| json.get("mdns").and_then(Value::as_str));
|
||||
if let Some(name) = name {
|
||||
let trimmed = name.trim();
|
||||
if !trimmed.is_empty() {
|
||||
return Some(trimmed.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
probe.headers.get("server").and_then(|value| {
|
||||
let trimmed = value.trim();
|
||||
if trimmed.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(trimmed.to_string())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn send_stream_message(
|
||||
stream: &mut TcpStream,
|
||||
sequence: u64,
|
||||
|
||||
Reference in New Issue
Block a user