MainCode/adalm1000_logger.py aktualisiert

(D)
This commit is contained in:
Jan 2025-06-28 01:19:18 +02:00
parent a5ad053a0c
commit f779d97397

View File

@ -27,57 +27,83 @@ class DeviceDisconnectedError(Exception):
pass
class MeasurementThread(QThread):
update_signal = pyqtSignal(float, float, float)
update_signal = pyqtSignal(float, float, float) # voltage, current, timestamp
error_signal = pyqtSignal(str)
status_signal = pyqtSignal(str) # New: for status updates
def __init__(self, device, interval=0.1):
super().__init__()
self.device = device
self.interval = interval
self.interval = max(0.05, interval) # Ensure minimum interval
self._running = False
self._lock = threading.Lock() # Thread safety
self.filter_window_size = 10
self.start_time = time.time()
self.last_update_time = self.start_time
def run(self):
"""Main measurement loop with enhanced error handling"""
self._running = True
voltage_window = []
current_window = []
voltage_window = deque(maxlen=self.filter_window_size)
current_window = deque(maxlen=self.filter_window_size)
self.status_signal.emit("Measurement started")
while self._running:
try:
samples = self.device.read(self.filter_window_size, 500, True)
# 1. Read samples with timeout
samples = self.device.read(
self.filter_window_size,
timeout=500,
)
if not samples:
raise DeviceDisconnectedError("Keine Samples empfangen")
raw_voltage = np.mean([s[1][0] for s in samples])
raw_current = np.mean([s[0][1] for s in samples])
current_time = time.time() - self.start_time
# Gleitender Mittelwertfilter
voltage_window.append(raw_voltage)
current_window.append(raw_current)
if len(voltage_window) > self.filter_window_size:
voltage_window.pop(0)
current_window.pop(0)
voltage = np.mean(voltage_window)
current = np.mean(current_window)
raise DeviceDisconnectedError("No samples received - device may be disconnected")
# 2. Process samples (thread-safe)
with self._lock:
raw_voltage = np.mean([s[1][0] for s in samples])
raw_current = np.mean([s[0][1] for s in samples])
voltage_window.append(raw_voltage)
current_window.append(raw_current)
voltage = np.mean(voltage_window)
current = np.mean(current_window)
current_time = time.time() - self.start_time
# 3. Emit updates
self.update_signal.emit(voltage, current, current_time)
time.sleep(max(0.05, self.interval))
self.last_update_time = time.time()
# 4. Dynamic sleep adjustment
elapsed = time.time() - self.last_update_time
sleep_time = max(0.01, self.interval - elapsed)
time.sleep(sleep_time)
except DeviceDisconnectedError as e:
self.error_signal.emit(f"Device error: {str(e)}")
break
except Exception as e:
self.error_signal.emit(str(e))
self.error_signal.emit(f"Measurement error: {str(e)}")
self.status_signal.emit(f"Error: {str(e)}")
break
self.status_signal.emit("Measurement stopped")
self._running = False
def stop(self):
"""Safe thread termination with timeout"""
self._running = False
if self.isRunning():
self.quit()
if not self.wait(500): # Wait up to 500ms for clean exit
print("Warning: Thread didn't exit cleanly, terminating")
self.terminate()
if not self.wait(300): # 300ms grace period
self.terminate() # Forceful termination if needed
def is_active(self):
"""Check if thread is running and updating"""
with self._lock:
return self._running and (time.time() - self.last_update_time < 2.0)
class BatteryTester(QMainWindow):
def __init__(self):
@ -720,30 +746,59 @@ class BatteryTester(QMainWindow):
QTimer.singleShot(100, self.finalize_test)
def finalize_test(self):
"""Finale Bereinigung nach Testende"""
# Protokolldaten schreiben
if hasattr(self, 'log_buffer') and self.log_buffer:
try:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
except Exception as e:
print(f"Fehler beim Schreiben des Protokollpuffers: {e}")
# Protokolldatei schließen
if hasattr(self, 'current_cycle_file'):
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Fehler beim Schließen der Protokolldatei: {e}")
# Benachrichtigung anzeigen
QMessageBox.information(
self,
"Test abgeschlossen",
f"Test wurde sicher beendet.\n\n"
f"Entladekapazität: {self.capacity_ah:.3f}Ah\n"
f"Abgeschlossene Zyklen: {self.cycle_count}"
)
"""Finale Bereinigung nach Testende mit verbessertem Fenster-Handling"""
try:
# 1. Protokolldaten schreiben (mit zusätzlichem Lock)
if hasattr(self, 'log_buffer') and self.log_buffer:
try:
with threading.Lock(): # Thread-sicherer Zugriff
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
except Exception as e:
print(f"Fehler beim Schreiben des Protokollpuffers: {e}")
# 2. Protokolldatei schließen (mit Fehlertoleranz)
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
self.current_cycle_file.flush() # Daten sicher schreiben
os.fsync(self.current_cycle_file.fileno()) # Betriebssystem-Puffer leeren
self.current_cycle_file.close()
except Exception as e:
print(f"Fehler beim Schließen der Protokolldatei: {e}")
# 3. Benachrichtigung anzeigen (mit Fokus-Sicherung)
msg_box = QMessageBox(self) # Expliziter Parent
msg_box.setWindowFlags(msg_box.windowFlags() |
Qt.WindowStaysOnTopHint | # Immer im Vordergrund
Qt.MSWindowsFixedSizeDialogHint) # Besseres Verhalten unter Windows
msg_box.setIcon(QMessageBox.Information)
msg_box.setWindowTitle("Test abgeschlossen")
msg_box.setText(
f"Test wurde sicher beendet.\n\n"
f"Entladekapazität: {self.capacity_ah:.3f}Ah\n"
f"Abgeschlossene Zyklen: {self.cycle_count}"
)
# Sicherstellen, dass das Fenster sichtbar wird
msg_box.raise_()
msg_box.activateWindow()
# Non-blocking anzeigen und Fokus erzwingen
QTimer.singleShot(100, lambda: (
msg_box.show(),
msg_box.raise_(),
msg_box.activateWindow()
))
msg_box.exec_()
except Exception as e:
print(f"Kritischer Fehler in finalize_test: {e}")
finally:
# 4. Gerätestatus zurücksetzen
self.test_running = False
self.request_stop = True
self.measuring = False
def update_measurements(self, voltage, current, current_time):
"""Aktualisiert die Messwerte im UI"""
@ -833,7 +888,7 @@ class BatteryTester(QMainWindow):
voltage_padding = 0.2
min_voltage = max(0, float(self.discharge_cutoff_input.text()) - voltage_padding)
max_voltage = float(self.charge_cutoff_input.text()) + voltage_padding
self.ax.set_ylim(min_voltage, max_volume)
self.ax.set_ylim(min_voltage, max_voltage)
self.canvas.draw()