Der nächste Layer ist jetzt als echte gemeinsame Außenkante umgesetzt. Das neue API-Crate in [server.rs](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/crates/infinity_host_api/src/server.rs>), [dto.rs](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/crates/infinity_host_api/src/dto.rs>) und [main.rs](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/crates/infinity_host_api/src/main.rs>) stellt die geforderten versionierten Endpunkte bereit: GET /api/v1/snapshot, GET /api/v1/catalog, GET /api/v1/presets, GET /api/v1/groups, POST /api/v1/command und WS /api/v1/stream. Die API trennt jetzt sauber zwischen Command-, State-, Preview- und Event-Modell, inklusive stabiler Außen-Enums und dokumentierten Fehlerobjekten statt eines unreflektierten 1:1-Core-Leaks.

Darauf sitzt die erste Web-UI V1 direkt gegen diese API in [index.html](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/web/v1/index.html>), [app.js](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/web/v1/app.js>) und [styles.css](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/web/v1/styles.css>). Sie enthält Pattern-/Preset-Auswahl, globale Regler, Gruppenauswahl, Übergänge, Preview, Snapshot-Anzeige und Event-Feed. Die technische Desktop-GUI bleibt bewusst die Engineering-/Diagnoseoberfläche; die kreative Web-UI baut nicht an einer Parallelarchitektur vorbei.

Contract-Tests liegen in [contract.rs](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/crates/infinity_host_api/tests/contract.rs>), und die API ist mit Beispiel-Responses in [docs/host_api.md](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/docs/host_api.md>) dokumentiert. Zusätzlich habe ich [README.md](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/README.md>), [docs/build_and_deploy.md](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/docs/build_and_deploy.md>) und [docs/architecture.md](</C:/Users/janni/Documents/RFP/Infinity_Vis _Rust/docs/architecture.md>) auf den neuen Stand gebracht.

Verifiziert habe ich:
- `cargo check -p infinity_host_api`
- `cargo test -p infinity_host_api`
- `cargo test -q`

Nicht verifiziert habe ich eine separate JS-Syntaxprüfung mit `node --check`, weil `node` in dieser Umgebung nicht installiert ist.
This commit is contained in:
2026-04-17 11:58:07 +02:00
parent 9457666fd6
commit a37a3c5cbe
16 changed files with 3166 additions and 55 deletions

View File

@@ -0,0 +1,14 @@
[package]
name = "infinity_host_api"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
[dependencies]
clap.workspace = true
serde.workspace = true
serde_json.workspace = true
infinity_config = { path = "../infinity_config" }
infinity_host = { path = "../infinity_host" }

View File

@@ -0,0 +1,745 @@
use infinity_config::{ColorOrder, LedDirection, PanelPosition, ValidationState};
use infinity_host::{
HostCommand, HostSnapshot, NodeConnectionState, PreviewSource, SceneParameterKind,
SceneParameterValue, SceneTransitionStyle, TestPatternKind,
};
use serde::{Deserialize, Serialize};
pub const API_VERSION: &str = "v1";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiSnapshotResponse {
pub api_version: &'static str,
pub generated_at_millis: u64,
pub state: ApiStateSnapshot,
pub preview: ApiPreviewSnapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiCatalogResponse {
pub api_version: &'static str,
pub patterns: Vec<ApiPatternCatalogEntry>,
pub presets: Vec<ApiPresetSummary>,
pub groups: Vec<ApiGroupSummary>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPresetListResponse {
pub api_version: &'static str,
pub presets: Vec<ApiPresetSummary>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiGroupListResponse {
pub api_version: &'static str,
pub groups: Vec<ApiGroupSummary>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiCommandRequest {
#[serde(default)]
pub request_id: Option<String>,
pub command: ApiCommand,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiCommandResponse {
pub api_version: &'static str,
pub accepted: bool,
pub request_id: Option<String>,
pub generated_at_millis: u64,
pub summary: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiErrorResponse {
pub api_version: &'static str,
pub error: ApiErrorBody,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiErrorBody {
pub code: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiStateSnapshot {
pub system: ApiSystemInfo,
pub global: ApiGlobalState,
pub engine: ApiEngineState,
pub active_scene: ApiActiveScene,
pub nodes: Vec<ApiNodeStatus>,
pub panels: Vec<ApiPanelStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPreviewSnapshot {
pub generated_at_millis: u64,
pub panels: Vec<ApiPreviewPanel>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiStreamEnvelope {
pub api_version: &'static str,
pub sequence: u64,
pub generated_at_millis: u64,
pub message: ApiStreamMessage,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case", tag = "type", content = "payload")]
pub enum ApiStreamMessage {
Snapshot(ApiStateSnapshot),
Preview(ApiPreviewSnapshot),
Event(ApiEventNotice),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiEventNotice {
pub kind: ApiEventKind,
pub message: String,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiEventKind {
Info,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiSystemInfo {
pub project_name: String,
pub schema_version: u32,
pub topology_label: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiGlobalState {
pub blackout: bool,
pub master_brightness: f32,
pub selected_pattern: String,
pub selected_group: Option<String>,
pub transition_duration_ms: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiEngineState {
pub logic_hz: u16,
pub frame_hz: u16,
pub preview_hz: u16,
pub uptime_ms: u64,
pub frame_index: u64,
pub dropped_frames: u64,
pub active_transition: Option<ApiTransitionState>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiTransitionState {
pub style: ApiTransitionStyle,
pub from_pattern_id: String,
pub to_pattern_id: String,
pub duration_ms: u32,
pub progress: f32,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiTransitionStyle {
Snap,
Crossfade,
Chase,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiActiveScene {
pub preset_id: Option<String>,
pub pattern_id: String,
pub seed: u64,
pub palette: Vec<String>,
pub parameters: Vec<ApiSceneParameter>,
pub target_group: Option<String>,
pub blackout: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiSceneParameter {
pub key: String,
pub label: String,
pub kind: ApiParameterKind,
pub value: ApiParameterValue,
pub min_scalar: Option<f32>,
pub max_scalar: Option<f32>,
pub step: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPatternCatalogEntry {
pub pattern_id: String,
pub display_name: String,
pub description: String,
pub parameters: Vec<ApiPatternParameter>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPatternParameter {
pub key: String,
pub label: String,
pub kind: ApiParameterKind,
pub min_scalar: Option<f32>,
pub max_scalar: Option<f32>,
pub step: Option<f32>,
pub default_value: ApiParameterValue,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPresetSummary {
pub preset_id: String,
pub pattern_id: String,
pub target_group: Option<String>,
pub transition_duration_ms: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiGroupSummary {
pub group_id: String,
pub member_count: usize,
pub tags: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiNodeStatus {
pub node_id: String,
pub display_name: String,
pub reserved_ip: Option<String>,
pub connection: ApiConnectionState,
pub last_contact_ms: u64,
pub error_status: Option<String>,
pub panel_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPanelStatus {
pub node_id: String,
pub panel_position: ApiPanelPosition,
pub physical_output_name: String,
pub driver_reference: String,
pub led_count: u16,
pub direction: ApiLedDirection,
pub color_order: ApiColorOrder,
pub enabled: bool,
pub validation_state: ApiValidationState,
pub connection: ApiConnectionState,
pub last_test_trigger_ms: Option<u64>,
pub error_status: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ApiPreviewPanel {
pub node_id: String,
pub panel_position: ApiPanelPosition,
pub representative_color_hex: String,
pub sample_led_hex: Vec<String>,
pub energy_percent: u8,
pub source: ApiPreviewSource,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiPanelPosition {
Top,
Middle,
Bottom,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiConnectionState {
Online,
Degraded,
Offline,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiPreviewSource {
Scene,
Transition,
PanelTest,
Blackout,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiParameterKind {
Scalar,
Toggle,
Text,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiLedDirection {
Forward,
Reverse,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiColorOrder {
Rgb,
Rbg,
Grb,
Gbr,
Brg,
Bgr,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiValidationState {
PendingHardwareValidation,
Validated,
Retired,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case", tag = "kind", content = "value")]
pub enum ApiParameterValue {
Scalar(f32),
Toggle(bool),
Text(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case", tag = "type", content = "payload")]
pub enum ApiCommand {
SetBlackout {
enabled: bool,
},
SetMasterBrightness {
value: f32,
},
SelectPattern {
pattern_id: String,
},
RecallPreset {
preset_id: String,
},
SelectGroup {
group_id: Option<String>,
},
SetSceneParameter {
key: String,
value: ApiParameterValue,
},
SetTransitionDurationMs {
duration_ms: u32,
},
TriggerPanelTest {
node_id: String,
panel_position: ApiPanelPosition,
pattern: ApiTestPattern,
},
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ApiTestPattern {
WalkingPixel106,
}
impl ApiSnapshotResponse {
pub fn from_snapshot(snapshot: &HostSnapshot) -> Self {
let state = ApiStateSnapshot::from_snapshot(snapshot);
let preview = ApiPreviewSnapshot::from_snapshot(snapshot);
Self {
api_version: API_VERSION,
generated_at_millis: snapshot.generated_at_millis,
state,
preview,
}
}
}
impl ApiCatalogResponse {
pub fn from_snapshot(snapshot: &HostSnapshot) -> Self {
Self {
api_version: API_VERSION,
patterns: snapshot
.catalog
.patterns
.iter()
.map(|pattern| ApiPatternCatalogEntry {
pattern_id: pattern.pattern_id.clone(),
display_name: pattern.display_name.clone(),
description: pattern.description.clone(),
parameters: pattern
.parameters
.iter()
.map(|parameter| ApiPatternParameter {
key: parameter.key.clone(),
label: parameter.label.clone(),
kind: map_parameter_kind(parameter.kind),
min_scalar: parameter.min_scalar,
max_scalar: parameter.max_scalar,
step: parameter.step,
default_value: map_parameter_value(&parameter.default_value),
})
.collect(),
})
.collect(),
presets: snapshot
.catalog
.presets
.iter()
.map(|preset| ApiPresetSummary {
preset_id: preset.preset_id.clone(),
pattern_id: preset.pattern_id.clone(),
target_group: preset.target_group.clone(),
transition_duration_ms: preset.transition_duration_ms,
})
.collect(),
groups: snapshot
.catalog
.groups
.iter()
.map(|group| ApiGroupSummary {
group_id: group.group_id.clone(),
member_count: group.member_count,
tags: group.tags.clone(),
})
.collect(),
}
}
}
impl ApiPresetListResponse {
pub fn from_snapshot(snapshot: &HostSnapshot) -> Self {
Self {
api_version: API_VERSION,
presets: ApiCatalogResponse::from_snapshot(snapshot).presets,
}
}
}
impl ApiGroupListResponse {
pub fn from_snapshot(snapshot: &HostSnapshot) -> Self {
Self {
api_version: API_VERSION,
groups: ApiCatalogResponse::from_snapshot(snapshot).groups,
}
}
}
impl ApiStateSnapshot {
pub fn from_snapshot(snapshot: &HostSnapshot) -> Self {
Self {
system: ApiSystemInfo {
project_name: snapshot.system.project_name.clone(),
schema_version: snapshot.system.schema_version,
topology_label: snapshot.system.topology_label.clone(),
},
global: ApiGlobalState {
blackout: snapshot.global.blackout,
master_brightness: snapshot.global.master_brightness,
selected_pattern: snapshot.global.selected_pattern.clone(),
selected_group: snapshot.global.selected_group.clone(),
transition_duration_ms: snapshot.global.transition_duration_ms,
},
engine: ApiEngineState {
logic_hz: snapshot.engine.logic_hz,
frame_hz: snapshot.engine.frame_hz,
preview_hz: snapshot.engine.preview_hz,
uptime_ms: snapshot.engine.uptime_ms,
frame_index: snapshot.engine.frame_index,
dropped_frames: snapshot.engine.dropped_frames,
active_transition: snapshot.engine.active_transition.as_ref().map(|transition| {
ApiTransitionState {
style: map_transition_style(transition.style),
from_pattern_id: transition.from_pattern_id.clone(),
to_pattern_id: transition.to_pattern_id.clone(),
duration_ms: transition.duration_ms,
progress: transition.progress,
}
}),
},
active_scene: ApiActiveScene {
preset_id: snapshot.active_scene.preset_id.clone(),
pattern_id: snapshot.active_scene.pattern_id.clone(),
seed: snapshot.active_scene.seed,
palette: snapshot.active_scene.palette.clone(),
parameters: snapshot
.active_scene
.parameters
.iter()
.map(|parameter| ApiSceneParameter {
key: parameter.key.clone(),
label: parameter.label.clone(),
kind: map_parameter_kind(parameter.kind),
value: map_parameter_value(&parameter.value),
min_scalar: parameter.min_scalar,
max_scalar: parameter.max_scalar,
step: parameter.step,
})
.collect(),
target_group: snapshot.active_scene.target_group.clone(),
blackout: snapshot.active_scene.blackout,
},
nodes: snapshot
.nodes
.iter()
.map(|node| ApiNodeStatus {
node_id: node.node_id.clone(),
display_name: node.display_name.clone(),
reserved_ip: node.reserved_ip.clone(),
connection: map_connection_state(node.connection),
last_contact_ms: node.last_contact_ms,
error_status: node.error_status.clone(),
panel_count: node.panel_count,
})
.collect(),
panels: snapshot
.panels
.iter()
.map(|panel| ApiPanelStatus {
node_id: panel.target.node_id.clone(),
panel_position: map_panel_position(&panel.target.panel_position),
physical_output_name: panel.physical_output_name.clone(),
driver_reference: panel.driver_reference.clone(),
led_count: panel.led_count,
direction: map_led_direction(panel.direction.clone()),
color_order: map_color_order(panel.color_order.clone()),
enabled: panel.enabled,
validation_state: map_validation_state(panel.validation_state.clone()),
connection: map_connection_state(panel.connection),
last_test_trigger_ms: panel.last_test_trigger_ms,
error_status: panel.error_status.clone(),
})
.collect(),
}
}
}
impl ApiPreviewSnapshot {
pub fn from_snapshot(snapshot: &HostSnapshot) -> Self {
Self {
generated_at_millis: snapshot.generated_at_millis,
panels: snapshot
.preview
.panels
.iter()
.map(|panel| ApiPreviewPanel {
node_id: panel.target.node_id.clone(),
panel_position: map_panel_position(&panel.target.panel_position),
representative_color_hex: panel.representative_color_hex.clone(),
sample_led_hex: panel.sample_led_hex.clone(),
energy_percent: panel.energy_percent,
source: map_preview_source(panel.preview_source),
})
.collect(),
}
}
}
impl ApiCommandRequest {
pub fn into_host_command(self) -> Result<HostCommand, String> {
match self.command {
ApiCommand::SetBlackout { enabled } => Ok(HostCommand::SetBlackout(enabled)),
ApiCommand::SetMasterBrightness { value } => {
Ok(HostCommand::SetMasterBrightness(value))
}
ApiCommand::SelectPattern { pattern_id } => {
Ok(HostCommand::SelectPattern(pattern_id))
}
ApiCommand::RecallPreset { preset_id } => {
Ok(HostCommand::RecallPreset { preset_id })
}
ApiCommand::SelectGroup { group_id } => {
Ok(HostCommand::SelectGroup { group_id })
}
ApiCommand::SetSceneParameter { key, value } => Ok(HostCommand::SetSceneParameter {
key,
value: map_command_parameter_value(value),
}),
ApiCommand::SetTransitionDurationMs { duration_ms } => {
Ok(HostCommand::SetTransitionDurationMs(duration_ms))
}
ApiCommand::TriggerPanelTest {
node_id,
panel_position,
pattern,
} => Ok(HostCommand::TriggerPanelTest {
target: infinity_host::PanelTarget {
node_id,
panel_position: map_command_panel_position(panel_position),
},
pattern: match pattern {
ApiTestPattern::WalkingPixel106 => TestPatternKind::WalkingPixel106,
},
}),
}
}
pub fn summary(&self) -> String {
self.command.summary()
}
}
fn map_panel_position(position: &PanelPosition) -> ApiPanelPosition {
match position {
PanelPosition::Top => ApiPanelPosition::Top,
PanelPosition::Middle => ApiPanelPosition::Middle,
PanelPosition::Bottom => ApiPanelPosition::Bottom,
}
}
fn map_command_panel_position(position: ApiPanelPosition) -> PanelPosition {
match position {
ApiPanelPosition::Top => PanelPosition::Top,
ApiPanelPosition::Middle => PanelPosition::Middle,
ApiPanelPosition::Bottom => PanelPosition::Bottom,
}
}
fn map_connection_state(state: NodeConnectionState) -> ApiConnectionState {
match state {
NodeConnectionState::Online => ApiConnectionState::Online,
NodeConnectionState::Degraded => ApiConnectionState::Degraded,
NodeConnectionState::Offline => ApiConnectionState::Offline,
}
}
fn map_led_direction(direction: LedDirection) -> ApiLedDirection {
match direction {
LedDirection::Forward => ApiLedDirection::Forward,
LedDirection::Reverse => ApiLedDirection::Reverse,
}
}
fn map_color_order(color_order: ColorOrder) -> ApiColorOrder {
match color_order {
ColorOrder::Rgb => ApiColorOrder::Rgb,
ColorOrder::Rbg => ApiColorOrder::Rbg,
ColorOrder::Grb => ApiColorOrder::Grb,
ColorOrder::Gbr => ApiColorOrder::Gbr,
ColorOrder::Brg => ApiColorOrder::Brg,
ColorOrder::Bgr => ApiColorOrder::Bgr,
}
}
fn map_validation_state(state: ValidationState) -> ApiValidationState {
match state {
ValidationState::PendingHardwareValidation => ApiValidationState::PendingHardwareValidation,
ValidationState::Validated => ApiValidationState::Validated,
ValidationState::Retired => ApiValidationState::Retired,
}
}
fn map_preview_source(source: PreviewSource) -> ApiPreviewSource {
match source {
PreviewSource::Scene => ApiPreviewSource::Scene,
PreviewSource::Transition => ApiPreviewSource::Transition,
PreviewSource::PanelTest => ApiPreviewSource::PanelTest,
PreviewSource::Blackout => ApiPreviewSource::Blackout,
}
}
fn map_transition_style(style: SceneTransitionStyle) -> ApiTransitionStyle {
match style {
SceneTransitionStyle::Snap => ApiTransitionStyle::Snap,
SceneTransitionStyle::Crossfade => ApiTransitionStyle::Crossfade,
SceneTransitionStyle::Chase => ApiTransitionStyle::Chase,
}
}
fn map_parameter_kind(kind: SceneParameterKind) -> ApiParameterKind {
match kind {
SceneParameterKind::Scalar => ApiParameterKind::Scalar,
SceneParameterKind::Toggle => ApiParameterKind::Toggle,
SceneParameterKind::Text => ApiParameterKind::Text,
}
}
fn map_parameter_value(value: &SceneParameterValue) -> ApiParameterValue {
match value {
SceneParameterValue::Scalar(value) => ApiParameterValue::Scalar(*value),
SceneParameterValue::Toggle(value) => ApiParameterValue::Toggle(*value),
SceneParameterValue::Text(value) => ApiParameterValue::Text(value.clone()),
}
}
fn map_command_parameter_value(value: ApiParameterValue) -> SceneParameterValue {
match value {
ApiParameterValue::Scalar(value) => SceneParameterValue::Scalar(value),
ApiParameterValue::Toggle(value) => SceneParameterValue::Toggle(value),
ApiParameterValue::Text(value) => SceneParameterValue::Text(value),
}
}
impl ApiCommand {
pub fn summary(&self) -> String {
match self {
Self::SetBlackout { enabled } => {
if *enabled {
"blackout enabled".to_string()
} else {
"blackout released".to_string()
}
}
Self::SetMasterBrightness { value } => {
format!("master brightness set to {:.0}%", value.clamp(0.0, 1.0) * 100.0)
}
Self::SelectPattern { pattern_id } => format!("pattern selected: {pattern_id}"),
Self::RecallPreset { preset_id } => format!("preset recalled: {preset_id}"),
Self::SelectGroup { group_id } => format!(
"group selected: {}",
group_id.as_deref().unwrap_or("all_panels")
),
Self::SetSceneParameter { key, .. } => format!("scene parameter updated: {key}"),
Self::SetTransitionDurationMs { duration_ms } => {
format!("transition duration set to {duration_ms} ms")
}
Self::TriggerPanelTest {
node_id,
panel_position,
pattern,
} => format!(
"panel test {} on {}:{}",
pattern.label(),
node_id,
panel_position.label()
),
}
}
}
impl ApiPanelPosition {
pub fn label(self) -> &'static str {
match self {
Self::Top => "top",
Self::Middle => "middle",
Self::Bottom => "bottom",
}
}
}
impl ApiTestPattern {
pub fn label(self) -> &'static str {
match self {
Self::WalkingPixel106 => "walking_pixel_106",
}
}
}
impl ApiErrorResponse {
pub fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
api_version: API_VERSION,
error: ApiErrorBody {
code: code.into(),
message: message.into(),
},
}
}
}

View File

@@ -0,0 +1,6 @@
mod dto;
mod server;
mod websocket;
pub use dto::*;
pub use server::*;

View File

@@ -0,0 +1,32 @@
use clap::Parser;
use infinity_config::{load_project_from_path, ProjectConfig};
use infinity_host::{HostApiPort, SimulationHostService};
use infinity_host_api::HostApiServer;
use std::{path::PathBuf, sync::Arc, thread, time::Duration};
#[derive(Debug, Parser)]
#[command(author, version, about = "Infinity Vis host API server")]
struct Cli {
#[arg(long, default_value = "config/project.example.toml")]
config: PathBuf,
#[arg(long, default_value = "127.0.0.1:9001")]
bind: String,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
let project = load_project(&cli.config)?;
let service: Arc<dyn HostApiPort> = SimulationHostService::spawn_shared(project);
let server = HostApiServer::bind(&cli.bind, service)?;
println!("Infinity Vis host API listening on http://{}", server.local_addr());
println!("Web UI available at http://{}/", server.local_addr());
loop {
thread::sleep(Duration::from_secs(60));
}
}
fn load_project(path: &std::path::Path) -> Result<ProjectConfig, Box<dyn std::error::Error>> {
Ok(load_project_from_path(path)?)
}

View File

@@ -0,0 +1,396 @@
use crate::dto::{
ApiCatalogResponse, ApiCommandRequest, ApiCommandResponse, ApiErrorResponse, ApiEventKind,
ApiEventNotice, ApiGroupListResponse, ApiPresetListResponse, ApiSnapshotResponse,
ApiStateSnapshot, ApiStreamEnvelope, ApiStreamMessage, API_VERSION,
};
use crate::websocket::{websocket_accept_value, write_text_frame};
use infinity_host::HostApiPort;
use std::collections::HashMap;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::thread::{self, JoinHandle};
use std::time::Duration;
pub struct HostApiServer {
local_addr: SocketAddr,
shutdown: Arc<AtomicBool>,
accept_thread: Option<JoinHandle<()>>,
}
impl HostApiServer {
pub fn bind(bind: &str, service: Arc<dyn HostApiPort>) -> io::Result<Self> {
let listener = TcpListener::bind(bind)?;
listener.set_nonblocking(true)?;
let local_addr = listener.local_addr()?;
let shutdown = Arc::new(AtomicBool::new(false));
let thread_shutdown = Arc::clone(&shutdown);
let accept_thread = thread::spawn(move || accept_loop(listener, service, thread_shutdown));
Ok(Self {
local_addr,
shutdown,
accept_thread: Some(accept_thread),
})
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn shutdown(mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.accept_thread.take() {
let _ = handle.join();
}
}
}
impl Drop for HostApiServer {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
if let Some(handle) = self.accept_thread.take() {
let _ = handle.join();
}
}
}
fn accept_loop(listener: TcpListener, service: Arc<dyn HostApiPort>, shutdown: Arc<AtomicBool>) {
while !shutdown.load(Ordering::SeqCst) {
match listener.accept() {
Ok((stream, _)) => {
let service = Arc::clone(&service);
thread::spawn(move || {
let _ = handle_connection(stream, service);
});
}
Err(error) if error.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(25));
}
Err(_) => break,
}
}
}
fn handle_connection(mut stream: TcpStream, service: Arc<dyn HostApiPort>) -> io::Result<()> {
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
let request = read_request(&mut stream)?;
if request.path == "/api/v1/stream" && request.is_websocket() {
return handle_websocket(stream, request, service);
}
match (request.method.as_str(), request.path.as_str()) {
("GET", "/api/v1/snapshot") => {
let snapshot = service.snapshot();
respond_json(&mut stream, 200, &ApiSnapshotResponse::from_snapshot(&snapshot))
}
("GET", "/api/v1/catalog") => {
let snapshot = service.snapshot();
respond_json(&mut stream, 200, &ApiCatalogResponse::from_snapshot(&snapshot))
}
("GET", "/api/v1/presets") => {
let snapshot = service.snapshot();
respond_json(&mut stream, 200, &ApiPresetListResponse::from_snapshot(&snapshot))
}
("GET", "/api/v1/groups") => {
let snapshot = service.snapshot();
respond_json(&mut stream, 200, &ApiGroupListResponse::from_snapshot(&snapshot))
}
("POST", "/api/v1/command") => match handle_command_post(&mut stream, request, service) {
Ok(()) => Ok(()),
Err(error) => respond_error(
&mut stream,
400,
"invalid_command",
format!("command request was rejected: {error}"),
),
},
("GET", "/") => respond_text(
&mut stream,
200,
"text/html; charset=utf-8",
include_str!("../../../web/v1/index.html"),
),
("GET", "/index.html") => respond_text(
&mut stream,
200,
"text/html; charset=utf-8",
include_str!("../../../web/v1/index.html"),
),
("GET", "/app.js") => respond_text(
&mut stream,
200,
"application/javascript; charset=utf-8",
include_str!("../../../web/v1/app.js"),
),
("GET", "/styles.css") => respond_text(
&mut stream,
200,
"text/css; charset=utf-8",
include_str!("../../../web/v1/styles.css"),
),
_ => respond_text(
&mut stream,
404,
"application/json; charset=utf-8",
&serde_json::to_string_pretty(&ApiErrorResponse::new(
"not_found",
format!("no route registered for {} {}", request.method, request.path),
))
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?,
),
}
}
fn handle_command_post(
stream: &mut TcpStream,
request: HttpRequest,
service: Arc<dyn HostApiPort>,
) -> io::Result<()> {
let parsed = serde_json::from_slice::<ApiCommandRequest>(&request.body)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
let request_id = parsed.request_id.clone();
let summary = parsed.summary();
let command = parsed
.into_host_command()
.map_err(|error| io::Error::new(io::ErrorKind::InvalidInput, error))?;
service.send_command(command);
let snapshot = service.snapshot();
respond_json(
stream,
200,
&ApiCommandResponse {
api_version: API_VERSION,
accepted: true,
request_id,
generated_at_millis: snapshot.generated_at_millis,
summary,
},
)
}
fn handle_websocket(
mut stream: TcpStream,
request: HttpRequest,
service: Arc<dyn HostApiPort>,
) -> io::Result<()> {
let Some(key) = request.header("sec-websocket-key") else {
return respond_error(
&mut stream,
400,
"missing_websocket_key",
"websocket upgrade requires sec-websocket-key",
);
};
let accept = websocket_accept_value(key);
let response = format!(
"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {accept}\r\n\r\n"
);
stream.write_all(response.as_bytes())?;
let mut sequence = 1u64;
let mut last_event_millis = 0u64;
loop {
let snapshot = service.snapshot();
send_stream_message(
&mut stream,
sequence,
snapshot.generated_at_millis,
ApiStreamMessage::Snapshot(ApiStateSnapshot::from_snapshot(&snapshot)),
)?;
sequence += 1;
send_stream_message(
&mut stream,
sequence,
snapshot.generated_at_millis,
ApiStreamMessage::Preview(crate::dto::ApiPreviewSnapshot::from_snapshot(&snapshot)),
)?;
sequence += 1;
let mut new_events = snapshot
.recent_events
.iter()
.filter(|event| event.at_millis > last_event_millis)
.cloned()
.collect::<Vec<_>>();
new_events.sort_by_key(|event| event.at_millis);
for event in new_events {
last_event_millis = event.at_millis;
send_stream_message(
&mut stream,
sequence,
event.at_millis,
ApiStreamMessage::Event(ApiEventNotice {
kind: ApiEventKind::Info,
message: event.message,
}),
)?;
sequence += 1;
}
thread::sleep(Duration::from_millis(250));
}
}
fn send_stream_message(
stream: &mut TcpStream,
sequence: u64,
generated_at_millis: u64,
message: ApiStreamMessage,
) -> io::Result<()> {
let payload = serde_json::to_string(&ApiStreamEnvelope {
api_version: API_VERSION,
sequence,
generated_at_millis,
message,
})
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
write_text_frame(stream, &payload)
}
fn respond_json<T: serde::Serialize>(
stream: &mut TcpStream,
status: u16,
body: &T,
) -> io::Result<()> {
let payload = serde_json::to_string_pretty(body)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
respond_text(stream, status, "application/json; charset=utf-8", &payload)
}
fn respond_error(
stream: &mut TcpStream,
status: u16,
code: impl Into<String>,
message: impl Into<String>,
) -> io::Result<()> {
respond_json(stream, status, &ApiErrorResponse::new(code, message))
}
fn respond_text(
stream: &mut TcpStream,
status: u16,
content_type: &str,
body: &str,
) -> io::Result<()> {
let reason = match status {
200 => "OK",
400 => "Bad Request",
404 => "Not Found",
_ => "OK",
};
let response = format!(
"HTTP/1.1 {status} {reason}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.as_bytes().len(),
body
);
stream.write_all(response.as_bytes())
}
#[derive(Debug)]
struct HttpRequest {
method: String,
path: String,
headers: HashMap<String, String>,
body: Vec<u8>,
}
impl HttpRequest {
fn header(&self, key: &str) -> Option<&str> {
self.headers.get(&key.to_ascii_lowercase()).map(|value| value.as_str())
}
fn is_websocket(&self) -> bool {
self.header("upgrade")
.map(|value| value.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
}
}
fn read_request(stream: &mut TcpStream) -> io::Result<HttpRequest> {
let mut buffer = Vec::new();
let mut temp = [0u8; 4096];
let mut header_end = None;
let mut expected_len = None;
loop {
let read = stream.read(&mut temp)?;
if read == 0 {
break;
}
buffer.extend_from_slice(&temp[..read]);
if header_end.is_none() {
header_end = find_header_end(&buffer);
if let Some(end) = header_end {
let header_text = String::from_utf8_lossy(&buffer[..end]);
expected_len = parse_content_length(&header_text);
if expected_len == Some(0) || expected_len.is_none() {
break;
}
}
}
if let (Some(end), Some(content_len)) = (header_end, expected_len) {
if buffer.len() >= end + 4 + content_len {
break;
}
}
}
let header_end = header_end.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing header end"))?;
let header_text = String::from_utf8_lossy(&buffer[..header_end]);
let mut lines = header_text.lines();
let request_line = lines
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing request line"))?;
let mut request_parts = request_line.split_whitespace();
let method = request_parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing method"))?
.to_string();
let path = request_parts
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "missing path"))?
.split('?')
.next()
.unwrap_or("/")
.to_string();
let mut headers = HashMap::new();
for line in lines {
if let Some((key, value)) = line.split_once(':') {
headers.insert(
key.trim().to_ascii_lowercase(),
value.trim().to_string(),
);
}
}
let body_start = header_end + 4;
let body = buffer.get(body_start..).unwrap_or_default().to_vec();
Ok(HttpRequest {
method,
path,
headers,
body,
})
}
fn parse_content_length(header_text: &str) -> Option<usize> {
header_text.lines().find_map(|line| {
line.split_once(':').and_then(|(key, value)| {
if key.trim().eq_ignore_ascii_case("content-length") {
value.trim().parse::<usize>().ok()
} else {
None
}
})
})
}
fn find_header_end(buffer: &[u8]) -> Option<usize> {
buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
}

View File

@@ -0,0 +1,141 @@
use std::io::{self, Write};
use std::net::TcpStream;
const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
pub fn websocket_accept_value(key: &str) -> String {
let digest = sha1(format!("{key}{WEBSOCKET_GUID}").as_bytes());
base64_encode(&digest)
}
pub fn write_text_frame(stream: &mut TcpStream, payload: &str) -> io::Result<()> {
let payload = payload.as_bytes();
let mut frame = Vec::with_capacity(payload.len() + 10);
frame.push(0x81);
match payload.len() {
0..=125 => frame.push(payload.len() as u8),
126..=65535 => {
frame.push(126);
frame.extend_from_slice(&(payload.len() as u16).to_be_bytes());
}
_ => {
frame.push(127);
frame.extend_from_slice(&(payload.len() as u64).to_be_bytes());
}
}
frame.extend_from_slice(payload);
stream.write_all(&frame)
}
fn base64_encode(bytes: &[u8]) -> String {
const TABLE: &[u8; 64] =
b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut encoded = String::new();
let mut index = 0;
while index < bytes.len() {
let first = bytes[index];
let second = if index + 1 < bytes.len() { bytes[index + 1] } else { 0 };
let third = if index + 2 < bytes.len() { bytes[index + 2] } else { 0 };
encoded.push(TABLE[(first >> 2) as usize] as char);
encoded.push(TABLE[((first & 0b0000_0011) << 4 | (second >> 4)) as usize] as char);
if index + 1 < bytes.len() {
encoded.push(TABLE[((second & 0b0000_1111) << 2 | (third >> 6)) as usize] as char);
} else {
encoded.push('=');
}
if index + 2 < bytes.len() {
encoded.push(TABLE[(third & 0b0011_1111) as usize] as char);
} else {
encoded.push('=');
}
index += 3;
}
encoded
}
fn sha1(bytes: &[u8]) -> [u8; 20] {
let mut h0: u32 = 0x67452301;
let mut h1: u32 = 0xEFCDAB89;
let mut h2: u32 = 0x98BADCFE;
let mut h3: u32 = 0x10325476;
let mut h4: u32 = 0xC3D2E1F0;
let mut message = bytes.to_vec();
let bit_len = (message.len() as u64) * 8;
message.push(0x80);
while (message.len() % 64) != 56 {
message.push(0x00);
}
message.extend_from_slice(&bit_len.to_be_bytes());
for chunk in message.chunks(64) {
let mut words = [0u32; 80];
for index in 0..16 {
let start = index * 4;
words[index] = u32::from_be_bytes([
chunk[start],
chunk[start + 1],
chunk[start + 2],
chunk[start + 3],
]);
}
for index in 16..80 {
words[index] =
(words[index - 3] ^ words[index - 8] ^ words[index - 14] ^ words[index - 16])
.rotate_left(1);
}
let mut a = h0;
let mut b = h1;
let mut c = h2;
let mut d = h3;
let mut e = h4;
for index in 0..80 {
let (f, k) = match index {
0..=19 => (((b & c) | ((!b) & d)), 0x5A827999),
20..=39 => ((b ^ c ^ d), 0x6ED9EBA1),
40..=59 => (((b & c) | (b & d) | (c & d)), 0x8F1BBCDC),
_ => ((b ^ c ^ d), 0xCA62C1D6),
};
let temp = a
.rotate_left(5)
.wrapping_add(f)
.wrapping_add(e)
.wrapping_add(k)
.wrapping_add(words[index]);
e = d;
d = c;
c = b.rotate_left(30);
b = a;
a = temp;
}
h0 = h0.wrapping_add(a);
h1 = h1.wrapping_add(b);
h2 = h2.wrapping_add(c);
h3 = h3.wrapping_add(d);
h4 = h4.wrapping_add(e);
}
let mut digest = [0u8; 20];
digest[0..4].copy_from_slice(&h0.to_be_bytes());
digest[4..8].copy_from_slice(&h1.to_be_bytes());
digest[8..12].copy_from_slice(&h2.to_be_bytes());
digest[12..16].copy_from_slice(&h3.to_be_bytes());
digest[16..20].copy_from_slice(&h4.to_be_bytes());
digest
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn websocket_accept_matches_rfc_example() {
let accept = websocket_accept_value("dGhlIHNhbXBsZSBub25jZQ==");
assert_eq!(accept, "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=");
}
}

View File

@@ -0,0 +1,295 @@
use infinity_config::ProjectConfig;
use infinity_host::{HostApiPort, SimulationHostService};
use infinity_host_api::HostApiServer;
use serde_json::Value;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{Shutdown, SocketAddr, TcpStream};
use std::sync::Arc;
use std::time::Duration;
fn sample_project() -> ProjectConfig {
ProjectConfig::from_toml_str(include_str!("../../../config/project.example.toml"))
.expect("project config must parse")
}
fn start_server() -> HostApiServer {
let service: Arc<dyn HostApiPort> = SimulationHostService::spawn_shared(sample_project());
HostApiServer::bind("127.0.0.1:0", service).expect("server must bind")
}
struct HttpResponse {
status_code: u16,
headers: HashMap<String, String>,
body: String,
}
#[test]
fn root_serves_creative_console_shell() {
let server = start_server();
let response = send_http_request(server.local_addr(), "GET", "/", None);
assert_eq!(response.status_code, 200);
assert!(response
.headers
.get("content-type")
.expect("content-type header")
.starts_with("text/html"));
assert!(response.body.contains("Infinity Vis / Creative Surface"));
assert!(response.body.contains("/app.js"));
server.shutdown();
}
#[test]
fn snapshot_endpoint_is_versioned_and_separates_state_and_preview() {
let server = start_server();
let response = send_http_request(server.local_addr(), "GET", "/api/v1/snapshot", None);
let body: Value = serde_json::from_str(&response.body).expect("snapshot json must parse");
assert_eq!(response.status_code, 200);
assert_eq!(body["api_version"], "v1");
assert_eq!(body["state"]["system"]["project_name"], "Infinity Vis");
assert_eq!(body["state"]["nodes"].as_array().map(Vec::len), Some(6));
assert_eq!(body["preview"]["panels"].as_array().map(Vec::len), Some(18));
assert!(body["state"]["active_scene"]["pattern_id"].is_string());
server.shutdown();
}
#[test]
fn catalog_presets_and_groups_endpoints_return_expected_lists() {
let server = start_server();
let catalog = send_http_request(server.local_addr(), "GET", "/api/v1/catalog", None);
let presets = send_http_request(server.local_addr(), "GET", "/api/v1/presets", None);
let groups = send_http_request(server.local_addr(), "GET", "/api/v1/groups", None);
let catalog_body: Value = serde_json::from_str(&catalog.body).expect("catalog json");
let preset_body: Value = serde_json::from_str(&presets.body).expect("preset json");
let group_body: Value = serde_json::from_str(&groups.body).expect("group json");
assert_eq!(catalog.status_code, 200);
assert!(catalog_body["patterns"]
.as_array()
.expect("patterns array")
.iter()
.any(|pattern| pattern["pattern_id"] == "walking_pixel"));
assert!(preset_body["presets"]
.as_array()
.expect("presets array")
.iter()
.any(|preset| preset["preset_id"] == "ocean_gradient"));
assert!(group_body["groups"]
.as_array()
.expect("groups array")
.iter()
.any(|group| group["group_id"] == "top_panels"));
server.shutdown();
}
#[test]
fn command_endpoint_applies_state_changes_and_rejects_invalid_payload() {
let server = start_server();
let response = send_http_request(
server.local_addr(),
"POST",
"/api/v1/command",
Some(
r#"{
"request_id": "contract-blackout",
"command": {
"type": "set_blackout",
"payload": {
"enabled": true
}
}
}"#,
),
);
let response_body: Value =
serde_json::from_str(&response.body).expect("command response must parse");
let snapshot = send_http_request(server.local_addr(), "GET", "/api/v1/snapshot", None);
let snapshot_body: Value = serde_json::from_str(&snapshot.body).expect("snapshot json");
assert_eq!(response.status_code, 200);
assert_eq!(response_body["accepted"], true);
assert_eq!(response_body["request_id"], "contract-blackout");
assert_eq!(snapshot_body["state"]["global"]["blackout"], true);
let invalid = send_http_request(
server.local_addr(),
"POST",
"/api/v1/command",
Some(r#"{"command":{"type":"set_blackout","payload":{}}}"#),
);
let invalid_body: Value =
serde_json::from_str(&invalid.body).expect("invalid response must parse");
assert_eq!(invalid.status_code, 400);
assert_eq!(invalid_body["api_version"], "v1");
assert_eq!(invalid_body["error"]["code"], "invalid_command");
server.shutdown();
}
#[test]
fn websocket_stream_emits_snapshot_preview_and_event_messages() {
let server = start_server();
let mut stream = open_websocket(server.local_addr());
let first_frame = read_websocket_text_frame(&mut stream);
let first_payload: Value = serde_json::from_str(&first_frame).expect("first ws frame");
assert_eq!(first_payload["message"]["type"], "snapshot");
let second_frame = read_websocket_text_frame(&mut stream);
let second_payload: Value = serde_json::from_str(&second_frame).expect("second ws frame");
assert_eq!(second_payload["message"]["type"], "preview");
let _ = send_http_request(
server.local_addr(),
"POST",
"/api/v1/command",
Some(
r#"{
"request_id": "contract-event",
"command": {
"type": "set_blackout",
"payload": {
"enabled": true
}
}
}"#,
),
);
let mut saw_event = false;
for _ in 0..8 {
let frame = read_websocket_text_frame(&mut stream);
let payload: Value = serde_json::from_str(&frame).expect("ws event frame");
if payload["message"]["type"] == "event" {
saw_event = true;
assert!(payload["message"]["payload"]["message"]
.as_str()
.expect("event message")
.contains("blackout"));
break;
}
}
assert!(saw_event, "expected websocket event after command");
let _ = stream.shutdown(Shutdown::Both);
server.shutdown();
}
fn send_http_request(addr: SocketAddr, method: &str, path: &str, body: Option<&str>) -> HttpResponse {
let body = body.unwrap_or("");
let request = format!(
"{method} {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n{}",
body.as_bytes().len(),
body,
host = addr
);
let mut stream = TcpStream::connect(addr).expect("http connection");
stream
.set_read_timeout(Some(Duration::from_secs(3)))
.expect("read timeout");
stream.write_all(request.as_bytes()).expect("write request");
stream.shutdown(Shutdown::Write).expect("shutdown write");
let mut raw = Vec::new();
stream.read_to_end(&mut raw).expect("read response");
parse_http_response(&raw)
}
fn parse_http_response(raw: &[u8]) -> HttpResponse {
let delimiter = raw
.windows(4)
.position(|window| window == b"\r\n\r\n")
.expect("http header delimiter");
let header_text = String::from_utf8(raw[..delimiter].to_vec()).expect("header utf8");
let body = String::from_utf8(raw[delimiter + 4..].to_vec()).expect("body utf8");
let mut lines = header_text.lines();
let status_line = lines.next().expect("status line");
let status_code = status_line
.split_whitespace()
.nth(1)
.expect("status code")
.parse::<u16>()
.expect("valid status code");
let headers = lines
.filter_map(|line| line.split_once(':'))
.map(|(key, value)| (key.trim().to_ascii_lowercase(), value.trim().to_string()))
.collect::<HashMap<_, _>>();
HttpResponse {
status_code,
headers,
body,
}
}
fn open_websocket(addr: SocketAddr) -> TcpStream {
let mut stream = TcpStream::connect(addr).expect("websocket connection");
stream
.set_read_timeout(Some(Duration::from_secs(3)))
.expect("read timeout");
let request = format!(
"GET /api/v1/stream HTTP/1.1\r\nHost: {host}\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\nSec-WebSocket-Version: 13\r\n\r\n",
host = addr
);
stream.write_all(request.as_bytes()).expect("write handshake");
let header = read_until_header_end(&mut stream);
let header_text = String::from_utf8(header).expect("handshake utf8");
assert!(header_text.starts_with("HTTP/1.1 101"));
assert!(header_text.contains("Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo="));
stream
}
fn read_until_header_end(stream: &mut TcpStream) -> Vec<u8> {
let mut buffer = Vec::new();
loop {
let mut byte = [0u8; 1];
let read = stream.read(&mut byte).expect("read handshake");
assert!(read > 0, "unexpected eof while reading handshake");
buffer.push(byte[0]);
if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
let end = buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
.expect("header end")
+ 4;
return buffer[..end].to_vec();
}
}
}
fn read_websocket_text_frame(stream: &mut TcpStream) -> String {
let mut header = [0u8; 2];
stream.read_exact(&mut header).expect("frame header");
assert_eq!(header[0] & 0x0f, 0x1, "expected text frame");
let payload_len = match header[1] & 0x7f {
len @ 0..=125 => len as usize,
126 => {
let mut extended = [0u8; 2];
stream.read_exact(&mut extended).expect("extended payload");
u16::from_be_bytes(extended) as usize
}
127 => {
let mut extended = [0u8; 8];
stream.read_exact(&mut extended).expect("extended payload");
u64::from_be_bytes(extended) as usize
}
_ => unreachable!("masked length bit should already be stripped"),
};
let mut payload = vec![0u8; payload_len];
stream.read_exact(&mut payload).expect("frame payload");
String::from_utf8(payload).expect("frame utf8")
}