# device_controller.py import threading, queue, time, math, os, csv SAMPLE_RATE = 10_000.0 # 10 kS/s (Dummy) DT = 1.0 / SAMPLE_RATE CHUNK = 500 # 50 ms pro Chunk OUTDIR = "./logs" def _norm_channel(ch): return {0:"A", 1:"B"}.get(ch, str(ch).upper()) class DeviceController: def __init__(self, session_manager, dev): self.sm = session_manager # <— SessionManager (teilt die Session) self.dev = dev self.serial = getattr(dev, "serial", "UNKNOWN") self.cmdq = queue.Queue() self.writer_q = queue.Queue(maxsize=200) self.stop_evt = threading.Event() self.running = False self.reader_t = threading.Thread(target=self.reader_loop, daemon=True) self.writer_t = threading.Thread(target=self.writer_loop, daemon=True) # ==== Public API (nicht blockierend) ==== def start(self): if not self.reader_t.is_alive(): self.reader_t.start() if not self.writer_t.is_alive(): self.writer_t.start() self.cmdq.put(("start", None)) # nur anstoßen def stop(self): self.cmdq.put(("stop", None)) # nur anstoßen def shutdown(self): # Für Programmende self.stop() self.stop_evt.set() def set_mode(self, ch, mode): key = _norm_channel(ch) self.dev.channels[key].mode = mode # ==== Threads ==== def reader_loop(self): print(f"[{self.serial}] Reader gestartet") while not self.stop_evt.is_set(): # Kommandos (Start/Stop) abarbeiten try: cmd, arg = self.cmdq.get_nowait() if cmd == "start" and not self.running: n = self.sm.start() # evtl. startet hier die Session self.running = True print(f"[{self.serial}] RUNNING (Session-Clients={n})") elif cmd == "stop" and self.running: n = self.sm.stop() # evtl. endet hier die Session self.running = False print(f"[{self.serial}] STOPPED (Session-Clients={n})") except queue.Empty: pass if not self.running: time.sleep(0.02) continue # ——— Daten lesen (blockierend, aber im Worker-Thread) ——— try: # pysmu: read liefert Liste von [VA, IA, VB, IB] samples = self.dev.read(CHUNK, -1) # je nach Version evtl. self.dev.read(CHUNK) if not samples: time.sleep(0.001) continue va = [row[0] for row in samples] vb = [row[2] for row in samples] # in Writer-Queue schieben try: self.writer_q.put((time.time(), va, vb), timeout=0.5) except queue.Full: print(f"[{self.serial}] WARN: writer_q voll – dropping chunk") except Exception as e: # NICHT schlucken – sichtbar loggen! print(f"[{self.serial}] Read-Fehler: {e}") time.sleep(0.01) print(f"[{self.serial}] Reader beendet") def writer_loop(self): os.makedirs(OUTDIR, exist_ok=True) fn = os.path.join(OUTDIR, f"{time.strftime('%Y%m%d_%H%M%S')}_{self.serial}.csv") sample_idx = 0 written_rows = 0 print(f"[{self.serial}] Writer schreibt nach: {fn}") try: with open(fn, "w", newline="") as f: w = csv.writer(f) w.writerow(["t_rel_s", "A", "B"]) # Wide-Format while not (self.stop_evt.is_set() and self.writer_q.empty()): try: ts, va, vb = self.writer_q.get(timeout=0.5) except queue.Empty: continue n = min(len(va), len(vb)) for i in range(n): t_rel = (sample_idx + i) * DT w.writerow([t_rel, va[i], vb[i]]) sample_idx += n written_rows += n if written_rows and written_rows % (10*CHUNK) == 0: f.flush() print(f"[{self.serial}] rows={written_rows}") except Exception as e: print(f"[{self.serial}] Writer-Fehler: {e}") finally: print(f"[{self.serial}] Writer beendet: {fn}")