This commit is contained in:
Vincent Hanewinkel 2025-08-14 22:33:14 +02:00
parent 8ea1532650
commit 5eb500644c

View File

@ -1,61 +1,56 @@
# device_controller.py
import threading, queue, time, math, os, csv
SAMPLE_RATE = 10_000.0 # 10 kS/s (Dummy)
DT = 1.0 / SAMPLE_RATE
CHUNK = 500 # 50 ms pro Chunk
RAW_SAMPLE_RATE = 100_000.0 # M1K nominal
CHUNK_RAW = 2000 # ~20 ms pro Chunk pro Gerät
OUTDIR = "./logs"
def _norm_channel(ch):
return {0:"A", 1:"B"}.get(ch, str(ch).upper())
class DeviceController:
def __init__(self, session_manager, dev):
self.sm = session_manager # <— SessionManager (teilt die Session)
def __init__(self, session_manager, dev, effective_rate_hz=10_000):
self.sm = session_manager
self.dev = dev
self.serial = getattr(dev, "serial", "UNKNOWN")
# ---- Decimation berechnen ----
# z.B. effective_rate_hz=10_000 -> DECIM=10 -> effektiv 10 kS/s
self.decim = max(1, int(round(RAW_SAMPLE_RATE / float(effective_rate_hz))))
self.eff_rate = RAW_SAMPLE_RATE / self.decim
self.dt = 1.0 / self.eff_rate
self.cmdq = queue.Queue()
self.writer_q = queue.Queue(maxsize=200)
# größere Queue; wenn voll, verwerfen wir älteste Chunks ("Ringpuffer")
self.writer_q = queue.Queue(maxsize=400)
self.stop_evt = threading.Event()
self.running = False
self.reader_t = threading.Thread(target=self.reader_loop, daemon=True)
self.writer_t = threading.Thread(target=self.writer_loop, daemon=True)
# ==== Public API (nicht blockierend) ====
def start(self):
if not self.reader_t.is_alive(): self.reader_t.start()
if not self.writer_t.is_alive(): self.writer_t.start()
self.cmdq.put(("start", None)) # nur anstoßen
self.cmdq.put(("start", None))
def stop(self):
self.cmdq.put(("stop", None)) # nur anstoßen
def shutdown(self):
# Für Programmende
self.stop()
self.stop_evt.set()
self.cmdq.put(("stop", None))
def set_mode(self, ch, mode):
key = _norm_channel(ch)
key = {0:"A",1:"B"}.get(ch, str(ch).upper())
self.dev.channels[key].mode = mode
# ==== Threads ====
# -------- Reader: liest roh und decimiert --------
def reader_loop(self):
print(f"[{self.serial}] Reader gestartet")
print(f"[{self.serial}] Reader start (decim={self.decim}, eff_rate={self.eff_rate:.0f} S/s)")
while not self.stop_evt.is_set():
# Kommandos (Start/Stop) abarbeiten
# Kommandos
try:
cmd, arg = self.cmdq.get_nowait()
cmd, _ = self.cmdq.get_nowait()
if cmd == "start" and not self.running:
n = self.sm.start() # evtl. startet hier die Session
self.sm.start() # Session ggf. starten
self.running = True
print(f"[{self.serial}] RUNNING (Session-Clients={n})")
elif cmd == "stop" and self.running:
n = self.sm.stop() # evtl. endet hier die Session
self.running = False
print(f"[{self.serial}] STOPPED (Session-Clients={n})")
self.sm.stop() # Session ggf. stoppen
except queue.Empty:
pass
@ -63,55 +58,74 @@ class DeviceController:
time.sleep(0.02)
continue
# ——— Daten lesen (blockierend, aber im Worker-Thread) ———
try:
# pysmu: read liefert Liste von [VA, IA, VB, IB]
samples = self.dev.read(CHUNK, -1) # je nach Version evtl. self.dev.read(CHUNK)
# pysmu: Liste von [VA, IA, VB, IB]
samples = self.dev.read(CHUNK_RAW, -1) # je nach Version evtl. .read(CHUNK_RAW)
if not samples:
time.sleep(0.001)
continue
va = [row[0] for row in samples]
vb = [row[2] for row in samples]
# Kanäle extrahieren
va_raw = [row[0] for row in samples]
vb_raw = [row[2] for row in samples]
# in Writer-Queue schieben
# -------- Decimation: jedes N-te Sample --------
if self.decim > 1:
va = va_raw[::self.decim]
vb = vb_raw[::self.decim]
else:
va, vb = va_raw, vb_raw
# In Writer-Queue; bei voller Queue ältestes verwerfen (Ringpuffer)
try:
self.writer_q.put((time.time(), va, vb), timeout=0.5)
self.writer_q.put_nowait((va, vb))
except queue.Full:
print(f"[{self.serial}] WARN: writer_q voll dropping chunk")
try:
_ = self.writer_q.get_nowait() # drop oldest
except queue.Empty:
pass
finally:
# (erneut versuchen, blockiert jetzt nicht)
try:
self.writer_q.put_nowait((va, vb))
except queue.Full:
# wenn immer noch voll, dann überspringen wir diesen Chunk
pass
except Exception as e:
# NICHT schlucken sichtbar loggen!
print(f"[{self.serial}] Read-Fehler: {e}")
time.sleep(0.01)
print(f"[{self.serial}] Reader beendet")
print(f"[{self.serial}] Reader stop")
# -------- Writer: Wide-Format, effektive Zeitachse --------
def writer_loop(self):
os.makedirs(OUTDIR, exist_ok=True)
fn = os.path.join(OUTDIR, f"{time.strftime('%Y%m%d_%H%M%S')}_{self.serial}.csv")
sample_idx = 0
written_rows = 0
print(f"[{self.serial}] Writer schreibt nach: {fn}")
rows_since_flush = 0
print(f"[{self.serial}] Writer -> {fn} (dt={self.dt:.6e}s)")
try:
with open(fn, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["t_rel_s", "A", "B"]) # Wide-Format
w.writerow(["t_rel_s", "A", "B"])
while not (self.stop_evt.is_set() and self.writer_q.empty()):
try:
ts, va, vb = self.writer_q.get(timeout=0.5)
va, vb = self.writer_q.get(timeout=0.5)
except queue.Empty:
continue
n = min(len(va), len(vb))
# Zeitachse mit effektiver Rate
for i in range(n):
t_rel = (sample_idx + i) * DT
t_rel = (sample_idx + i) * self.dt
w.writerow([t_rel, va[i], vb[i]])
sample_idx += n
written_rows += n
rows_since_flush += n
if written_rows and written_rows % (10*CHUNK) == 0:
if rows_since_flush >= 10_000: # alle ~10k Samples flushen
f.flush()
print(f"[{self.serial}] rows={written_rows}")
rows_since_flush = 0
except Exception as e:
print(f"[{self.serial}] Writer-Fehler: {e}")
finally: