Adalm1000_Logger/DeviceController.py
Vincent Hanewinkel a317fcb5d1 test
2025-08-14 22:21:22 +02:00

101 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# device_controller.py
import threading, queue, time, math, os, csv
SAMPLE_RATE = 10_000.0 # 10 kS/s (Dummy)
DT = 1.0 / SAMPLE_RATE
CHUNK = 500 # 50 ms pro Chunk
OUTDIR = "./logs"
class DeviceController:
def __init__(self, serial="DUMMY"):
self.serial = serial
self.cmdq = queue.Queue()
self.writer_q = queue.Queue(maxsize=100)
self.stop_evt = threading.Event()
self.running = False
self.reader_t = threading.Thread(target=self.reader_loop, daemon=True)
self.writer_t = threading.Thread(target=self.writer_loop, daemon=True)
# === öffentliche API (nicht blockierend) ===
def start(self):
if not self.reader_t.is_alive(): self.reader_t.start()
if not self.writer_t.is_alive(): self.writer_t.start()
self.cmdq.put(("start", None))
def stop(self):
self.cmdq.put(("stop", None))
def shutdown(self):
self.stop()
self.stop_evt.set()
# nicht joinen aus dem GUI-Thread; nur beim Programmende joinen!
# === Dummy-Reader: erzeugt Daten, statt pysmu zu lesen ===
def reader_loop(self):
phase = 0.0
step = 2*math.pi * 50.0 * DT # 50 Hz Sinus
while not self.stop_evt.is_set():
# Kommandos abarbeiten
try:
cmd, arg = self.cmdq.get_nowait()
if cmd == "start":
self.running = True
print("[DUMMY] start")
elif cmd == "stop":
self.running = False
print("[DUMMY] stop")
except queue.Empty:
pass
if not self.running:
time.sleep(0.02)
continue
# CHUNK Samples erzeugen
va, vb = [], []
for _ in range(CHUNK):
a = math.sin(phase) # Kanal A
b = 0.5*math.sin(phase*0.5) # Kanal B
va.append(a)
vb.append(b)
phase += step
try:
self.writer_q.put((time.time(), va, vb), timeout=0.5)
except queue.Full:
print("[WARN] writer_q voll Datenverlust")
# kurze Pause, um CPU zu schonen
time.sleep(CHUNK * DT)
# === Writer: robust, Wide-Format ===
def writer_loop(self):
os.makedirs(OUTDIR, exist_ok=True)
fn = os.path.join(OUTDIR, f"{time.strftime('%Y%m%d_%H%M%S')}_{self.serial}.csv")
sample_idx = 0
written_rows = 0
try:
with open(fn, "w", newline="") as f:
w = csv.writer(f)
w.writerow(["t_rel_s", "A", "B"]) # Wide-Format
while not (self.stop_evt.is_set() and self.writer_q.empty()):
try:
ts, va, vb = self.writer_q.get(timeout=0.5)
except queue.Empty:
continue
n = min(len(va), len(vb))
for i in range(n):
t_rel = (sample_idx + i) * DT
w.writerow([t_rel, va[i], vb[i]])
sample_idx += n
written_rows += n
# damit man live sieht, dass wirklich geschrieben wird
if written_rows and written_rows % (10*CHUNK) == 0:
f.flush()
print(f"[{self.serial}] geschrieben: {written_rows} Zeilen")
except Exception as e:
print(f"[{self.serial}] Writer-Fehler: {e}")
finally:
print(f"[{self.serial}] Datei geschlossen: {fn}")