MainCode/adalm1000_logger.py aktualisiert

still not ready
D
This commit is contained in:
Jan 2025-08-07 23:08:41 +02:00
parent 411528cdcd
commit 15e6856e76

View File

@ -7,6 +7,7 @@ import traceback
from datetime import datetime
import numpy as np
import matplotlib
import subprocess
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
@ -34,6 +35,7 @@ class DeviceManager:
self._last_log_time = 0
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
self.session = None
# Datenpuffer
max_data_points = 36000
@ -52,6 +54,7 @@ class DeviceManager:
self.cycle_count = 0
self.test_phase = "Idle"
self.start_time = time.time()
self.plot_mutex = threading.Lock()
# Logging
self.current_cycle_file = None
@ -78,40 +81,110 @@ class DeviceManager:
if self.consecutive_read_errors >= self.max_consecutive_errors:
try:
print("Attempting device recovery...")
print(f"Attempting device recovery (errors: {self.consecutive_read_errors})...")
# 1. First try soft reset
try:
if hasattr(self.dev, 'reset'):
print("Attempting soft reset...")
self.dev.reset()
time.sleep(0.5)
return True
time.sleep(1.5) # Increased delay for reset to complete
# Verify reset worked
try:
samples = self.dev.read(1, 500, True)
if samples:
self.consecutive_read_errors = 0
print("Soft reset successful")
return True
except Exception as e:
print(f"Verification after soft reset failed: {e}")
except Exception as e:
print(f"Soft reset failed: {e}")
# 2. Full reinitialization
global_session = pysmu.Session()
devices = global_session.devices
if not devices:
print("No devices found after rescan")
# 2. Full reinitialization with USB reset
print("Attempting full USB reset...")
try:
# Close existing session cleanly
if hasattr(self, 'session') and self.session:
try:
print("Ending existing session...")
self.session.end()
time.sleep(0.5)
except Exception as e:
print(f"Error ending session: {e}")
# Add delay and attempt USB reset
time.sleep(2.0) # Longer delay for USB to settle
# Try multiple times to scan for devices
for attempt in range(3):
try:
print(f"Scan attempt {attempt + 1}/3...")
new_session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
devices = new_session.scan()
if not devices:
print(f"No devices found on attempt {attempt+1}/3")
time.sleep(1.0)
continue
# Find our device by serial
for new_dev in devices:
if new_dev.serial == self.serial:
print(f"Found device {self.serial} on attempt {attempt+1}")
self.dev = new_dev
self.consecutive_read_errors = 0
self.session = new_session # Update session reference
# Restart measurement thread
if hasattr(self, 'measurement_thread'):
try:
self.measurement_thread.stop()
time.sleep(0.5)
except:
pass
self.start_measurement(self.interval)
print("Device reinitialized successfully")
return True
print(f"Original device not found on attempt {attempt+1}")
time.sleep(1.0)
except Exception as e:
print(f"Scan attempt {attempt+1} failed: {e}")
time.sleep(1.0)
print("Failed to find original device after 3 attempts")
return False
# Find our device by serial
for new_dev in devices:
if new_dev.serial == self.serial:
self.dev = new_dev
self.consecutive_read_errors = 0
print("Device reinitialized successfully")
return True
print("Original device not found after rescan")
return False
except Exception as e:
print(f"Full reinitialization failed: {e}")
return False
# 3. Final fallback - try USB reset command
try:
print("Attempting USB port reset...")
import subprocess
# ADALM1000 USB IDs
result = subprocess.run(['usbreset', '064b:784c'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if result.returncode == 0:
print("USB reset command executed, waiting 3 seconds...")
time.sleep(3.0)
return self.handle_read_error(0) # Retry with counter reset
else:
print("USB reset failed - command not found or permission denied")
except Exception as e:
print(f"USB reset command failed: {e}")
except Exception as e:
print(f"Device recovery failed: {e}")
return False
return True
return True # Not enough errors yet to trigger recovery
def start_measurement(self, interval=0.1):
self.stop_measurement() # Ensure any existing thread is stopped
@ -226,7 +299,7 @@ class MeasurementThread(QThread):
current = np.mean(self.current_window)
# Validate measurements
if not (0.0 <= voltage <= 5.0):
if not (-0.2 <= voltage <= 5.0):
raise ValueError(f"Voltage out of range: {voltage:.4f}V")
if not (-0.25 <= current <= 0.25):
raise ValueError(f"Current out of range: {current:.4f}A")
@ -470,10 +543,11 @@ class DischargeWorker(QObject):
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if self.parent.active_device:
with self.parent.active_device.plot_mutex:
if self.parent.active_device.voltage_data:
self.parent.active_device.voltage_data[-1] = voltage
self.parent.active_device.current_data[-1] = current
if voltage <= self.discharge_cutoff:
break
@ -543,10 +617,11 @@ class ChargeWorker(QObject):
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if self.parent.active_device:
with self.parent.active_device.plot_mutex:
if self.parent.active_device.voltage_data:
self.parent.active_device.voltage_data[-1] = voltage
self.parent.active_device.current_data[-1] = current
if voltage >= self.charge_cutoff:
break
@ -919,9 +994,14 @@ class BatteryTester(QMainWindow):
border-radius: 4px;
min-height: 28px;
background-color: {self.accent_color};
color: {self.fg_color};
}}
QPushButton:checked {{
background-color: {self.warning_color};
background-color: {self.warning_color} !important;
color: {self.fg_color} !important;
}}
QPushButton:disabled {{
background-color: #4C566A;
}}
"""
@ -973,6 +1053,49 @@ class BatteryTester(QMainWindow):
}}
""")
def apply_button_style(self):
"""Apply consistent button styling based on current state"""
if self.toggle_button.isChecked():
# Stop state
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.warning_color};
color: white;
font-weight: bold;
border: none;
border-radius: 4px;
padding: 4px 8px;
min-height: 28px;
}}
QPushButton:hover {{
background-color: #{self.darker_color(self.warning_color)};
}}
""")
else:
# Start state
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.accent_color};
color: {self.fg_color};
font-weight: bold;
border: none;
border-radius: 4px;
padding: 4px 8px;
min-height: 28px;
}}
QPushButton:hover {{
background-color: #{self.darker_color(self.accent_color)};
}}
""")
def darker_color(self, hex_color):
"""Helper to generate a darker shade for hover effects"""
if not hex_color.startswith('#'):
hex_color = '#' + hex_color
rgb = [int(hex_color[i:i+2], 16) for i in (1, 3, 5)]
darker = [max(0, c - 40) for c in rgb]
return ''.join([f"{c:02x}" for c in darker])
def toggle_global_recording(self):
"""Toggle recording for all connected devices simultaneously"""
if not hasattr(self, 'global_recording'):
@ -1006,15 +1129,23 @@ class BatteryTester(QMainWindow):
self.record_button.setStyleSheet(f"background-color: {self.success_color};")
self.status_bar.showMessage("Recording stopped for all devices")
def safe_execute(func):
"""Decorator to catch and log exceptions in Qt event handlers"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print(f"Error in {func.__name__}: {str(e)}")
traceback.print_exc()
return wrapper
@safe_execute
def toggle_test(self):
"""Toggle between start and stop based on button state"""
if self.toggle_button.isChecked():
# Button shows "STOP" - run start logic
self.toggle_button.setText("STOP")
self.apply_button_style() # Add this line
self.start_test()
else:
# Button shows "START" - run stop logic
self.toggle_button.setText("START")
self.apply_button_style() # Add this line
self.stop_test()
def change_mode(self, mode_name):
@ -1056,6 +1187,7 @@ class BatteryTester(QMainWindow):
# Reset button state
self.toggle_button.setChecked(False)
self.toggle_button.setEnabled(True)
self.apply_button_style()
# Reset measurement state and zero the time
if self.active_device:
@ -1071,7 +1203,7 @@ class BatteryTester(QMainWindow):
self.energy_label.setText("0.0000")
self.cycle_label.setText("0")
self.phase_label.setText("Idle")
self.time_label.setText("00:00:00")
#self.time_label.setText("00:00:00")
# Reset plot
self.reset_plot()
@ -1090,6 +1222,9 @@ class BatteryTester(QMainWindow):
else:
self.record_button.setVisible(False)
self.apply_button_style()
self.status_bar.showMessage(f"Mode changed to {mode_name}")
def reset_test(self):
if not self.active_device:
return
@ -1399,6 +1534,7 @@ class BatteryTester(QMainWindow):
# Start measurement only AFTER connecting signals
if not self.measurement_thread.isRunning():
self.active_device.start_measurement(self.interval)
print("Measurement thread running?", self.measurement_thread.isRunning())
except Exception as e:
print(f"Error connecting to new device: {e}")
return
@ -1441,6 +1577,7 @@ class BatteryTester(QMainWindow):
self.cycle_label.setText(str(dev.cycle_count))
self.phase_label.setText(dev.test_phase)
@safe_execute
@pyqtSlot(float, float, float)
def update_measurements(self, voltage, current, current_time):
if not self.active_device:
@ -1551,7 +1688,13 @@ class BatteryTester(QMainWindow):
dev = self.active_device
self.capacity_label.setText(f"{dev.capacity_ah:.4f}")
self.energy_label.setText(f"{dev.energy:.4f}")
# Update elapsed-time display
if self.active_device and self.active_device.time_data:
elapsed = self.active_device.time_data[-1]
self.time_label.setText(self.format_time(elapsed))
@safe_execute
def start_test(self):
"""Start the selected test mode using the active device"""
if not self.active_device:
@ -1574,6 +1717,7 @@ class BatteryTester(QMainWindow):
dev_manager.measurement_thread.current_window.clear()
with dev_manager.measurement_thread.measurement_queue.mutex:
dev_manager.measurement_thread.measurement_queue.queue.clear()
self.time_label.setText("00:00:00")
# Reset data buffers
dev_manager.time_data.clear()
@ -1613,6 +1757,7 @@ class BatteryTester(QMainWindow):
# Update UI
self.phase_label.setText(dev_manager.test_phase)
self.toggle_button.setText("STOP")
self.apply_button_style()
# Get parameters from UI
try:
@ -1801,6 +1946,7 @@ class BatteryTester(QMainWindow):
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.apply_button_style()
self.status_bar.showMessage(f"Cycle test started | Current: {test_current:.4f}A")
# Create log file
@ -1837,6 +1983,7 @@ class BatteryTester(QMainWindow):
# Ensure buttons are in correct state if error occurs
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
def start_discharge_test(self):
@ -1911,6 +2058,7 @@ class BatteryTester(QMainWindow):
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.apply_button_style()
self.status_bar.showMessage(f"Discharge started | Current: {test_current:.4f}A")
# Create log file
@ -1943,6 +2091,7 @@ class BatteryTester(QMainWindow):
# Ensure buttons are in correct state if error occurs
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
def start_charge_test(self):
@ -2017,6 +2166,7 @@ class BatteryTester(QMainWindow):
self.toggle_button.setChecked(True)
self.toggle_button.setText("STOP")
self.apply_button_style()
self.status_bar.showMessage(f"Charge started @ {test_current:.3f}A to {self.charge_cutoff}V")
# Create log file
@ -2048,6 +2198,7 @@ class BatteryTester(QMainWindow):
QMessageBox.critical(self, "Error", str(e))
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
def start_live_monitoring(self, device):
@ -2164,6 +2315,7 @@ class BatteryTester(QMainWindow):
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
@safe_execute
def stop_test(self):
"""Request immediate stop with proper visual feedback"""
# Immediate red button feedback
@ -2243,6 +2395,7 @@ class BatteryTester(QMainWindow):
self.toggle_button.setChecked(False)
self.toggle_button.setText(text)
self.apply_button_style()
self.toggle_button.setStyleSheet(f"""
QPushButton {{
background-color: {self.status_colors["active"]};
@ -2279,12 +2432,23 @@ class BatteryTester(QMainWindow):
""")
def closeEvent(self, event):
"""Ensure all log files are properly closed"""
if hasattr(self, 'global_recording') and self.global_recording:
self.toggle_global_recording() # This will stop all recordings
"""Ensure clean shutdown"""
self.stop_test()
# Additional cleanup if needed
super().closeEvent(event)
# Stop all timers
self.status_timer.stop()
# Close all log files
for device in self.devices.values():
if device.is_recording:
self.finalize_device_log_file(device)
# Clean up threads
if hasattr(self, 'measurement_thread') and self.measurement_thread:
self.measurement_thread.stop()
self.measurement_thread.wait(1000)
event.accept()
def reset_test_data(self):
if not self.active_device:
@ -2311,7 +2475,7 @@ class BatteryTester(QMainWindow):
# Reset UI displays
self.voltage_label.setText("0.000")
self.current_label.setText("0.000")
self.time_label.setText("00:00:00")
#self.time_label.setText("00:00:00")
self.capacity_label.setText("0.0000")
self.energy_label.setText("0.0000")
self.phase_label.setText("Idle")
@ -2435,7 +2599,10 @@ class BatteryTester(QMainWindow):
self.request_stop = False
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
self.apply_button_style() # Add this line
self.test_running = False
# 8. Show completion message if test wasn't stopped by user
if not self.request_stop:
@ -2495,6 +2662,7 @@ class BatteryTester(QMainWindow):
# Ensure we don't leave the UI in a locked state
self.toggle_button.setChecked(False)
self.toggle_button.setText("START")
self.apply_button_style()
self.toggle_button.setEnabled(True)
self.status_bar.showMessage("Error during test finalization")