MainCode/adalm1000_logger.py aktualisiert

Alles außer capacity sollte funzen
(D)
This commit is contained in:
Jan 2025-07-29 12:30:34 +02:00
parent 44baefa581
commit c97256395d

View File

@ -38,39 +38,21 @@ class MeasurementThread(QThread):
self.current_direction = 1 # 1 for source, -1 for sink self.current_direction = 1 # 1 for source, -1 for sink
def run(self): def run(self):
"""Continuous measurement with proper validation""" """Continuous measurement loop"""
self._running = True self._running = True
consecutive_errors = 0
while self._running: while self._running:
try: try:
samples = self.device.read(self.filter_window_size, 500, True) samples = self.device.read(self.filter_window_size, 500, True)
# Check for device disconnection
if not samples: if not samples:
consecutive_errors += 1 raise DeviceDisconnectedError("No samples received")
if consecutive_errors > 3:
raise DeviceDisconnectedError("Keine Messwerte empfangen")
time.sleep(0.1)
continue
consecutive_errors = 0 # Reset counter on successful read
current_time = time.time() - self.start_time current_time = time.time() - self.start_time
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B
# Strict voltage validation (0-5V range) # Get voltage from Channel B (HI_Z mode) and current from Channel A
if raw_voltage < 0 or raw_voltage > 5.0: raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raise ValueError(f"Ungültige Spannung: {raw_voltage}V (außerhalb 0-5V Bereich)") raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction # Channel A current with direction
# Current measurement with direction # Update filter windows
raw_current = np.mean([s[0][1] for s in samples]) * self.current_direction
# Current validation (-200mA to +200mA)
if not (-0.25 <= raw_current <= 0.25):
raise ValueError(f"Ungültiger Strom: {raw_current}A (außerhalb ±200mA Bereich)")
# Apply filtering
self.voltage_window.append(raw_voltage) self.voltage_window.append(raw_voltage)
self.current_window.append(raw_current) self.current_window.append(raw_current)
@ -81,24 +63,26 @@ class MeasurementThread(QThread):
voltage = np.mean(self.voltage_window) voltage = np.mean(self.voltage_window)
current = np.mean(self.current_window) current = np.mean(self.current_window)
# Emit measurements # Validate measurements
if voltage is None or not (-1.0 <= voltage <= 6.0):
raise ValueError(f"Invalid voltage: {voltage}V")
if not (-0.25 <= current <= 0.25):
raise ValueError(f"Invalid current: {current}A")
# Emit update
self.update_signal.emit(voltage, current, current_time) self.update_signal.emit(voltage, current, current_time)
# Store in queue # Store measurement
try: try:
self.measurement_queue.put_nowait((voltage, current)) self.measurement_queue.put_nowait((voltage, current))
except Full: except Full:
pass pass
time.sleep(max(0.05, self.interval)) time.sleep(max(0.05, self.interval))
except DeviceDisconnectedError as e:
self.error_signal.emit(f"Gerätefehler: {str(e)}")
time.sleep(1)
continue
except Exception as e: except Exception as e:
self.error_signal.emit(f"Messfehler: {str(e)}") self.error_signal.emit(f"Read error: {str(e)}")
time.sleep(0.5) time.sleep(1)
continue continue
def set_direction(self, direction): def set_direction(self, direction):
@ -448,10 +432,21 @@ class BatteryTester(QMainWindow):
os.makedirs(self.log_dir, exist_ok=True) os.makedirs(self.log_dir, exist_ok=True)
# Data buffers # Data buffers
self.time_data = deque() max_data_points = 36000 # Define this first
self.voltage_data = deque() self.time_data = deque(maxlen=max_data_points)
self.current_data = deque() self.voltage_data = deque(maxlen=max_data_points)
self.current_data = deque(maxlen=max_data_points)
self.max_points_to_keep = 10000
self.display_time_data = deque(maxlen=self.max_points_to_keep)
self.display_voltage_data = deque(maxlen=self.max_points_to_keep)
self.display_current_data = deque(maxlen=self.max_points_to_keep)
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
self.phase_data = deque() self.phase_data = deque()
self.downsample_factor = 1 # Initial kein Downsampling
self.downsample_counter = 0
# Initialize UI and device # Initialize UI and device
self.setup_ui() self.setup_ui()
@ -464,8 +459,8 @@ class BatteryTester(QMainWindow):
# Status update timer # Status update timer
self.status_timer = QTimer() self.status_timer = QTimer()
self.status_timer.timeout.connect(self.update_status) self.status_timer.timeout.connect(self.update_status_and_plot)
self.status_timer.start(1000) # Update every second self.status_timer.start(1000) #every second
def setup_ui(self): def setup_ui(self):
"""Configure the user interface""" """Configure the user interface"""
@ -785,23 +780,33 @@ class BatteryTester(QMainWindow):
self.rest_time_input.show() self.rest_time_input.show()
self.continuous_mode_check.show() self.continuous_mode_check.show()
self.start_button.setText("START CYCLE TEST") self.start_button.setText("START CYCLE TEST")
self.start_button.setEnabled(True) # Explicitly enable
elif mode_name == "Discharge Test": elif mode_name == "Discharge Test":
self.discharge_cutoff_label.show() self.discharge_cutoff_label.show()
self.discharge_cutoff_input.show() self.discharge_cutoff_input.show()
self.start_button.setText("START DISCHARGE") self.start_button.setText("START DISCHARGE")
self.start_button.setEnabled(True) # Explicitly enable
elif mode_name == "Charge Test": elif mode_name == "Charge Test":
self.charge_cutoff_label.show() self.charge_cutoff_label.show()
self.charge_cutoff_input.show() self.charge_cutoff_input.show()
self.start_button.setText("START CHARGE") self.start_button.setText("START CHARGE")
self.start_button.setEnabled(True) # Explicitly enable
elif mode_name == "Live Monitoring": elif mode_name == "Live Monitoring":
self.record_button.show() self.record_button.show()
self.start_button.setText("START MONITORING") self.start_button.setText("START MONITORING")
self.start_button.setEnabled(False) # Only enable start button if device is connected
self.start_button.setEnabled(self.session_active)
# Reset measurement state
self.reset_test()
self.status_bar.showMessage(f"Mode changed to {mode_name}") self.status_bar.showMessage(f"Mode changed to {mode_name}")
def reset_test(self): def reset_test(self):
"""Reset test state without stopping measurement""" """Reset test state without stopping measurement"""
# Reset Downsampling
self.downsample_factor = 1
self.downsample_counter = 0
# Clear data buffers # Clear data buffers
with self.plot_mutex: with self.plot_mutex:
self.time_data.clear() self.time_data.clear()
@ -908,55 +913,33 @@ class BatteryTester(QMainWindow):
self.main_layout.addWidget(self.canvas, 1) self.main_layout.addWidget(self.canvas, 1)
def init_device(self): def init_device(self):
"""Initialize the ADALM1000 with proper connection checks""" """Initialize the ADALM1000 device with continuous measurement"""
try: try:
# Cleanup previous session # Clean up any existing session
if hasattr(self, 'session'): if hasattr(self, 'session'):
try: try:
self.session.end() self.session.end()
del self.session del self.session
except: except:
pass pass
# Hardware reset delay
time.sleep(1) time.sleep(1)
try: self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices:
if not self.session.devices: raise Exception("No ADALM1000 detected - check connections")
raise Exception("Kein ADALM1000 erkannt - Verbindung prüfen")
except Exception as e:
if "resource busy" in str(e).lower():
raise Exception("ADALM1000 wird bereits von einem anderen Programm verwendet")
raise
self.dev = self.session.devices[0] self.dev = self.session.devices[0]
# Set safe defaults
self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0) self.dev.channels['A'].constant(0)
self.dev.channels['B'].constant(0) self.dev.channels['B'].constant(0)
# Connection test with actual measurement
try:
samples = self.dev.read(5, 500, True) # 5 samples for stability
if not samples:
raise DeviceDisconnectedError("Keine Messwerte empfangen")
# Check if we're getting valid data (not just noise)
voltages = [s[1][0] for s in samples] # Channel B voltages
if all(abs(v) < 0.0001 for v in voltages): # 100µV threshold
self.status_bar.showMessage("Gerät verbunden, aber keine Batterie angeschlossen")
except Exception as e:
raise DeviceDisconnectedError(f"Verbindungstest fehlgeschlagen: {str(e)}")
self.session.start(0) self.session.start(0)
# Update UI self.status_light.setStyleSheet(f"background-color: green; border-radius: 10px;")
self.status_light.setStyleSheet("background-color: green; border-radius: 10px;") self.connection_label.setText("Connected")
self.connection_label.setText("Verbunden") self.status_bar.showMessage("Device connected | Ready to measure")
self.status_bar.showMessage("Gerät bereit - Batterie anschließen um zu messen")
self.session_active = True self.session_active = True
self.start_button.setEnabled(True) self.start_button.setEnabled(True)
@ -975,13 +958,52 @@ class BatteryTester(QMainWindow):
# Only store data if in a test or recording # Only store data if in a test or recording
if not (self.test_running or self.record_button.isChecked()): if not (self.test_running or self.record_button.isChecked()):
return return
# 1. Originale Daten immer vollständig speichern (für Berechnungen und Logging)
with self.plot_mutex: with self.plot_mutex:
self.time_data.append(current_time) self.time_data.append(current_time)
self.voltage_data.append(voltage) self.voltage_data.append(voltage)
self.current_data.append(current) self.current_data.append(current)
# Update display labels # 2. Downsampling für die Anzeige
if not hasattr(self, 'aggregation_buffer'):
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': 0
}
self.aggregation_buffer['time'].append(current_time)
self.aggregation_buffer['voltage'].append(voltage)
self.aggregation_buffer['current'].append(current)
self.aggregation_buffer['count'] += 1
# Nur aggregieren wenn genug Daten oder Zeit vergangen
now = time.time()
if (self.aggregation_buffer['count'] >= self.downsample_factor or
now - self.aggregation_buffer['last_plot_time'] >= 1.0):
# Berechne aggregierte Werte (Mittelwert)
agg_time = np.mean(self.aggregation_buffer['time'])
agg_voltage = np.mean(self.aggregation_buffer['voltage'])
agg_current = np.mean(self.aggregation_buffer['current'])
# Für die Anzeige verwenden
if not hasattr(self, 'display_time_data'):
self.display_time_data = deque(maxlen=self.max_points_to_keep)
self.display_voltage_data = deque(maxlen=self.max_points_to_keep)
self.display_current_data = deque(maxlen=self.max_points_to_keep)
self.display_time_data.append(agg_time)
self.display_voltage_data.append(agg_voltage)
self.display_current_data.append(agg_current)
# Reset Buffer
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': now
}
# 3. Originale Funktionalität für Berechnungen beibehalten
self.voltage_label.setText(f"{voltage:.4f}") self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{abs(current):.4f}") self.current_label.setText(f"{abs(current):.4f}")
self.time_label.setText(self.format_time(current_time)) self.time_label.setText(self.format_time(current_time))
@ -995,8 +1017,11 @@ class BatteryTester(QMainWindow):
self.energy += power * delta_t / 3600 # Convert to Wh self.energy += power * delta_t / 3600 # Convert to Wh
self.energy_label.setText(f"{self.energy:.4f}") self.energy_label.setText(f"{self.energy:.4f}")
# Plot updates throttled to 10Hz # 4. Auto-Skalierung anpassen
now = time.time() if len(self.time_data) > self.max_points_to_keep * 1.5:
self.adjust_downsampling()
# 5. Plot updates throttled to 10Hz
if not hasattr(self, '_last_plot_update'): if not hasattr(self, '_last_plot_update'):
self._last_plot_update = 0 self._last_plot_update = 0
@ -1006,6 +1031,48 @@ class BatteryTester(QMainWindow):
except Exception as e: except Exception as e:
print(f"Error in update_measurements: {e}") print(f"Error in update_measurements: {e}")
import traceback
traceback.print_exc()
# Versuche den Aggregationsbuffer zu retten
if hasattr(self, 'aggregation_buffer'):
agg_buffer = self.aggregation_buffer
if agg_buffer['count'] > 0:
try:
with self.plot_mutex:
if not hasattr(self, 'display_time_data'):
self.display_time_data = deque(maxlen=self.max_points_to_keep)
self.display_voltage_data = deque(maxlen=self.max_points_to_keep)
self.display_current_data = deque(maxlen=self.max_points_to_keep)
self.display_time_data.append(np.mean(agg_buffer['time']))
self.display_voltage_data.append(np.mean(agg_buffer['voltage']))
self.display_current_data.append(np.mean(agg_buffer['current']))
except:
pass
self.aggregation_buffer = {
'time': [], 'voltage': [], 'current': [],
'count': 0, 'last_plot_time': time.time()
}
def adjust_downsampling(self):
current_length = len(self.time_data)
if current_length > self.max_points_to_keep * 1.5:
# Exponentiell erhöhen, aber max. 64
new_factor = min(64, max(1, self.downsample_factor * 2))
elif current_length < self.max_points_to_keep // 2:
# Halbieren, aber min. 1
new_factor = max(1, self.downsample_factor // 2)
else:
return
if new_factor != self.downsample_factor:
self.downsample_factor = new_factor
self.status_bar.showMessage(
f"Downsampling: Factor {self.downsample_factor}", 2000)
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_status(self): def update_status(self):
"""Update status information periodically""" """Update status information periodically"""
@ -1818,45 +1885,38 @@ class BatteryTester(QMainWindow):
# 5. Force immediate redraw # 5. Force immediate redraw
self.canvas.draw() self.canvas.draw()
def update_status_and_plot(self):
"""Combined status and plot update"""
self.update_status()
self.update_plot()
def update_plot(self): def update_plot(self):
"""More reliable plotting with better error handling""" """More robust plotting with error handling"""
try: try:
# Create local copies safely # Create local copies of data safely
with self.plot_mutex: with self.plot_mutex:
if not self.time_data or not self.voltage_data or not self.current_data: if not self.display_time_data:
return return
if len(self.time_data) != len(self.voltage_data) or len(self.time_data) != len(self.current_data): x_data = np.array(self.display_time_data)
# Find the minimum length to avoid mismatch y1_data = np.array(self.display_voltage_data)
min_len = min(len(self.time_data), len(self.voltage_data), len(self.current_data)) y2_data = np.array(self.display_current_data)
x_data = np.array(self.time_data[-min_len:])
y1_data = np.array(self.voltage_data[-min_len:])
y2_data = np.array(self.current_data[-min_len:])
else:
x_data = np.array(self.time_data)
y1_data = np.array(self.voltage_data)
y2_data = np.array(self.current_data)
# Update plot data # Update plot data
self.line_voltage.set_data(x_data, y1_data) self.line_voltage.set_data(x_data, y1_data)
self.line_current.set_data(x_data, y2_data) self.line_current.set_data(x_data, y2_data)
# Auto-scale when needed # Auto-scale when needed
if len(x_data) > 0 and x_data[-1] > self.ax.get_xlim()[1] * 0.8: if len(x_data) > 1:
self.auto_scale_axes() self.auto_scale_axes()
# Force redraw # Force redraw
self.canvas.draw_idle() self.canvas.draw_idle()
except Exception as e: except Exception as e:
print(f"Plot update error: {e}") print(f"Plot error: {e}")
import traceback # Attempt to recover
traceback.print_exc() self.reset_plot()
# Reset plot on error
with self.plot_mutex:
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.canvas.draw_idle()
def auto_scale_axes(self): def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding and strict boundaries""" """Auto-scale plot axes with appropriate padding and strict boundaries"""
@ -1890,39 +1950,33 @@ class BatteryTester(QMainWindow):
@pyqtSlot(str) @pyqtSlot(str)
def handle_device_error(self, error): def handle_device_error(self, error):
"""Handle errors with proper state management""" """Handle device connection errors"""
error_msg = str(error) error_msg = str(error)
print(f"Fehler: {error_msg}") print(f"Device error: {error_msg}")
# Special cases
if "resource busy" in error_msg.lower():
error_msg = "ADALM1000 is already in use"
elif "no samples" in error_msg.lower():
error_msg = "No measurments - Check connection"
# Cleanup
if hasattr(self, 'session'): if hasattr(self, 'session'):
try: try:
if self.session_active: if self.session_active:
self.session.end() self.session.end()
del self.session del self.session
except: except Exception as e:
pass print(f"Error cleaning up session: {e}")
# Update UI self.status_light.setStyleSheet(f"background-color: red; border-radius: 10px;")
self.status_light.setStyleSheet("background-color: red; border-radius: 10px;")
self.connection_label.setText("Disconnected") self.connection_label.setText("Disconnected")
self.status_bar.showMessage(f"Error: {error_msg}") self.status_bar.showMessage(f"Device error: {error_msg}")
# Disable controls
self.session_active = False self.session_active = False
self.test_running = False self.test_running = False
self.continuous_mode = False
self.measuring = False
self.start_button.setEnabled(False) self.start_button.setEnabled(False)
self.stop_button.setEnabled(False) self.stop_button.setEnabled(False)
# Attempt recovery for certain errors self.time_data.clear()
if "no samples" in error_msg.lower() or "resource busy" in error_msg.lower(): self.voltage_data.clear()
QTimer.singleShot(3000, self.reconnect_device) self.current_data.clear()
@pyqtSlot(str) @pyqtSlot(str)
def update_test_phase(self, phase_text): def update_test_phase(self, phase_text):