From 5eb500644cb35a383fdb50324956f5c113c3a8bf Mon Sep 17 00:00:00 2001 From: Vincent Hanewinkel Date: Thu, 14 Aug 2025 22:33:14 +0200 Subject: [PATCH] fix --- DeviceController.py | 104 +++++++++++++++++++++++++------------------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/DeviceController.py b/DeviceController.py index 6e3adec..b35375a 100644 --- a/DeviceController.py +++ b/DeviceController.py @@ -1,61 +1,56 @@ # device_controller.py import threading, queue, time, math, os, csv -SAMPLE_RATE = 10_000.0 # 10 kS/s (Dummy) -DT = 1.0 / SAMPLE_RATE -CHUNK = 500 # 50 ms pro Chunk -OUTDIR = "./logs" - -def _norm_channel(ch): - return {0:"A", 1:"B"}.get(ch, str(ch).upper()) +RAW_SAMPLE_RATE = 100_000.0 # M1K nominal +CHUNK_RAW = 2000 # ~20 ms pro Chunk pro Gerät +OUTDIR = "./logs" class DeviceController: - def __init__(self, session_manager, dev): - self.sm = session_manager # <— SessionManager (teilt die Session) + def __init__(self, session_manager, dev, effective_rate_hz=10_000): + self.sm = session_manager self.dev = dev self.serial = getattr(dev, "serial", "UNKNOWN") + # ---- Decimation berechnen ---- + # z.B. effective_rate_hz=10_000 -> DECIM=10 -> effektiv 10 kS/s + self.decim = max(1, int(round(RAW_SAMPLE_RATE / float(effective_rate_hz)))) + self.eff_rate = RAW_SAMPLE_RATE / self.decim + self.dt = 1.0 / self.eff_rate + self.cmdq = queue.Queue() - self.writer_q = queue.Queue(maxsize=200) + # größere Queue; wenn voll, verwerfen wir älteste Chunks ("Ringpuffer") + self.writer_q = queue.Queue(maxsize=400) self.stop_evt = threading.Event() self.running = False self.reader_t = threading.Thread(target=self.reader_loop, daemon=True) self.writer_t = threading.Thread(target=self.writer_loop, daemon=True) - # ==== Public API (nicht blockierend) ==== def start(self): if not self.reader_t.is_alive(): self.reader_t.start() if not self.writer_t.is_alive(): self.writer_t.start() - self.cmdq.put(("start", None)) # nur anstoßen + self.cmdq.put(("start", None)) def stop(self): - self.cmdq.put(("stop", None)) # nur anstoßen - - def shutdown(self): - # Für Programmende - self.stop() - self.stop_evt.set() + self.cmdq.put(("stop", None)) def set_mode(self, ch, mode): - key = _norm_channel(ch) + key = {0:"A",1:"B"}.get(ch, str(ch).upper()) self.dev.channels[key].mode = mode - # ==== Threads ==== + # -------- Reader: liest roh und decimiert -------- def reader_loop(self): - print(f"[{self.serial}] Reader gestartet") + print(f"[{self.serial}] Reader start (decim={self.decim}, eff_rate={self.eff_rate:.0f} S/s)") while not self.stop_evt.is_set(): - # Kommandos (Start/Stop) abarbeiten + # Kommandos try: - cmd, arg = self.cmdq.get_nowait() + cmd, _ = self.cmdq.get_nowait() if cmd == "start" and not self.running: - n = self.sm.start() # evtl. startet hier die Session + self.sm.start() # Session ggf. starten self.running = True - print(f"[{self.serial}] RUNNING (Session-Clients={n})") elif cmd == "stop" and self.running: - n = self.sm.stop() # evtl. endet hier die Session self.running = False - print(f"[{self.serial}] STOPPED (Session-Clients={n})") + self.sm.stop() # Session ggf. stoppen except queue.Empty: pass @@ -63,55 +58,74 @@ class DeviceController: time.sleep(0.02) continue - # ——— Daten lesen (blockierend, aber im Worker-Thread) ——— try: - # pysmu: read liefert Liste von [VA, IA, VB, IB] - samples = self.dev.read(CHUNK, -1) # je nach Version evtl. self.dev.read(CHUNK) + # pysmu: Liste von [VA, IA, VB, IB] + samples = self.dev.read(CHUNK_RAW, -1) # je nach Version evtl. .read(CHUNK_RAW) if not samples: time.sleep(0.001) continue - va = [row[0] for row in samples] - vb = [row[2] for row in samples] + # Kanäle extrahieren + va_raw = [row[0] for row in samples] + vb_raw = [row[2] for row in samples] - # in Writer-Queue schieben + # -------- Decimation: jedes N-te Sample -------- + if self.decim > 1: + va = va_raw[::self.decim] + vb = vb_raw[::self.decim] + else: + va, vb = va_raw, vb_raw + + # In Writer-Queue; bei voller Queue ältestes verwerfen (Ringpuffer) try: - self.writer_q.put((time.time(), va, vb), timeout=0.5) + self.writer_q.put_nowait((va, vb)) except queue.Full: - print(f"[{self.serial}] WARN: writer_q voll – dropping chunk") + try: + _ = self.writer_q.get_nowait() # drop oldest + except queue.Empty: + pass + finally: + # (erneut versuchen, blockiert jetzt nicht) + try: + self.writer_q.put_nowait((va, vb)) + except queue.Full: + # wenn immer noch voll, dann überspringen wir diesen Chunk + pass + except Exception as e: - # NICHT schlucken – sichtbar loggen! print(f"[{self.serial}] Read-Fehler: {e}") time.sleep(0.01) - print(f"[{self.serial}] Reader beendet") + print(f"[{self.serial}] Reader stop") + # -------- Writer: Wide-Format, effektive Zeitachse -------- def writer_loop(self): os.makedirs(OUTDIR, exist_ok=True) fn = os.path.join(OUTDIR, f"{time.strftime('%Y%m%d_%H%M%S')}_{self.serial}.csv") sample_idx = 0 - written_rows = 0 - print(f"[{self.serial}] Writer schreibt nach: {fn}") + rows_since_flush = 0 + print(f"[{self.serial}] Writer -> {fn} (dt={self.dt:.6e}s)") try: with open(fn, "w", newline="") as f: w = csv.writer(f) - w.writerow(["t_rel_s", "A", "B"]) # Wide-Format + w.writerow(["t_rel_s", "A", "B"]) while not (self.stop_evt.is_set() and self.writer_q.empty()): try: - ts, va, vb = self.writer_q.get(timeout=0.5) + va, vb = self.writer_q.get(timeout=0.5) except queue.Empty: continue n = min(len(va), len(vb)) + # Zeitachse mit effektiver Rate for i in range(n): - t_rel = (sample_idx + i) * DT + t_rel = (sample_idx + i) * self.dt w.writerow([t_rel, va[i], vb[i]]) sample_idx += n - written_rows += n + rows_since_flush += n - if written_rows and written_rows % (10*CHUNK) == 0: + if rows_since_flush >= 10_000: # alle ~10k Samples flushen f.flush() - print(f"[{self.serial}] rows={written_rows}") + rows_since_flush = 0 except Exception as e: print(f"[{self.serial}] Writer-Fehler: {e}") finally: