Files
RFP_Infinity-Vis/tests/test_ddp_output.py

57 lines
2.1 KiB
Python

from __future__ import annotations
import copy
from pathlib import Path
import unittest
from app.config.xml_mapping import load_config
from app.core.pattern_engine import PatternEngine
from app.core.types import PatternParameters, RGBColor
from app.output.ddp import DDP_UNCHANGED_HOST_KEEPALIVE_S, DdpOutputBackend
ROOT = Path(__file__).resolve().parents[1]
SAMPLE_MAPPING = ROOT / "sample_data" / "infinity_mirror_mapping_clean.xml"
class DdpOutputBackendTests(unittest.TestCase):
def test_identical_frames_are_suppressed_until_keepalive_is_due(self) -> None:
config = load_config(SAMPLE_MAPPING)
frame = PatternEngine().render_frame(config, "solid", PatternParameters(color_mode="mono"), timestamp=0.0)
backend = DdpOutputBackend()
first_packets = backend._build_packets(config, frame)
second_packets = backend._build_packets(config, frame)
self.assertEqual(len(first_packets), 18)
self.assertEqual(len(second_packets), 0)
for host in list(backend._last_payload_sent_at):
backend._last_payload_sent_at[host] -= DDP_UNCHANGED_HOST_KEEPALIVE_S + 0.01
keepalive_packets = backend._build_packets(config, frame)
self.assertEqual(len(keepalive_packets), 18)
def test_only_changed_host_is_resent_before_keepalive(self) -> None:
config = load_config(SAMPLE_MAPPING)
backend = DdpOutputBackend()
frame = PatternEngine().render_frame(config, "solid", PatternParameters(color_mode="mono"), timestamp=0.0)
backend._build_packets(config, frame)
changed_frame = copy.deepcopy(frame)
black = RGBColor.black()
tile = changed_frame.tiles["r1c1"]
tile.led_pixels = {
segment_name: [black for _ in colors]
for segment_name, colors in tile.led_pixels.items()
}
changed_packets = backend._build_packets(config, changed_frame)
self.assertEqual(len(changed_packets), 1)
self.assertEqual(changed_packets[0][0], config.tile_lookup()["r1c1"].controller_ip)
if __name__ == "__main__":
unittest.main()