MainCode/adalm1000_logger.py aktualisiert

plot and tata works
description overlapping test untested
This commit is contained in:
Jan 2025-07-03 17:40:03 +02:00
parent df69d0e832
commit b5380e5a33

View File

@ -10,6 +10,7 @@ matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure from matplotlib.figure import Figure
from collections import deque from collections import deque
from queue import Queue, Full, Empty
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel, from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel,
QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog) QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog)
@ -27,15 +28,16 @@ class MeasurementThread(QThread):
super().__init__() super().__init__()
self.device = device self.device = device
self.interval = interval self.interval = interval
self.running = False self._running = False
self.filter_window_size = 10 self.filter_window_size = 10
self.voltage_window = [] self.voltage_window = []
self.current_window = [] self.current_window = []
self.start_time = time.time() self.start_time = time.time()
self.measurement_queue = Queue(maxsize=1)
def run(self): def run(self):
self.running = True self._running = True
while self.running: while self._running:
try: try:
samples = self.device.read(self.filter_window_size, 500, True) samples = self.device.read(self.filter_window_size, 500, True)
if not samples: if not samples:
@ -56,14 +58,122 @@ class MeasurementThread(QThread):
current = np.mean(self.current_window) current = np.mean(self.current_window)
self.update_signal.emit(voltage, current, current_time) self.update_signal.emit(voltage, current, current_time)
# Store the latest measurement in the queue
try:
self.measurement_queue.put_nowait((voltage, current))
except Full:
pass
time.sleep(max(0.05, self.interval)) time.sleep(max(0.05, self.interval))
except Exception as e: except Exception as e:
self.error_signal.emit(str(e)) self.error_signal.emit(f"Read error: {str(e)}")
break time.sleep(1)
continue
def stop(self): def stop(self):
self.running = False self._running = False
self.wait(500)
class TestSequenceWorker(QObject):
finished = pyqtSignal()
update_phase = pyqtSignal(str)
update_status = pyqtSignal(str)
test_completed = pyqtSignal()
error_occurred = pyqtSignal(str)
def __init__(self, device, test_current, charge_cutoff, discharge_cutoff, rest_time, continuous_mode, parent):
super().__init__()
self.device = device
self.test_current = test_current
self.charge_cutoff = charge_cutoff
self.discharge_cutoff = discharge_cutoff
self.rest_time = rest_time * 3600 # Convert hours to seconds
self.continuous_mode = continuous_mode
self.parent = parent
self._running = True
self.voltage_timeout = 0.5 # seconds
def get_latest_measurement(self):
"""Thread-safe measurement reading with timeout"""
try:
return self.parent.measurement_thread.measurement_queue.get(
timeout=self.voltage_timeout
)
except Empty:
return (None, None) # Return tuple for unpacking
def charge_phase(self):
"""Handle the battery charging phase"""
self.update_phase.emit("Charge")
self.update_status.emit(f"Charging to {self.charge_cutoff}V @ {self.test_current:.3f}A")
self.device.channels['B'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(self.test_current)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage >= self.charge_cutoff:
break
time.sleep(0.1)
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def discharge_phase(self):
"""Handle the battery discharging phase"""
self.update_phase.emit("Discharge")
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A")
self.device.channels['A'].mode = pysmu.Mode.SIMV
self.device.channels['A'].constant(-self.test_current)
while self._running:
voltage, current = self.get_latest_measurement()
if voltage is None:
continue
# Update parent's data for logging/display
with self.parent.plot_mutex:
if len(self.parent.voltage_data) > 0:
self.parent.voltage_data[-1] = voltage
self.parent.current_data[-1] = current
if voltage <= self.discharge_cutoff:
break
time.sleep(0.1)
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
def rest_phase(self, phase_name):
"""Handle rest period between phases"""
self.update_phase.emit(f"Resting ({phase_name})")
rest_end = time.time() + self.rest_time
while time.time() < rest_end and self._running:
time_left = max(0, rest_end - time.time())
self.update_status.emit(f"Resting | Time left: {time_left/60:.1f} min")
time.sleep(1)
def stop(self):
"""Request the thread to stop"""
self._running = False
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
class BatteryTester(QMainWindow): class BatteryTester(QMainWindow):
def __init__(self): def __init__(self):
@ -437,9 +547,14 @@ class BatteryTester(QMainWindow):
self.current_label.setText(f"{current:.4f}") self.current_label.setText(f"{current:.4f}")
self.time_label.setText(self.format_time(current_time)) self.time_label.setText(self.format_time(current_time))
# Update plot periodically # Throttle plot updates to avoid recursive repaint
if len(self.time_data) % 10 == 0: # Update plot every 10 samples now = time.time()
self.update_plot() if not hasattr(self, '_last_plot_update'):
self._last_plot_update = 0
if now - self._last_plot_update > 0.1: # Update plot max 10 times per second
self._last_plot_update = now
QTimer.singleShot(0, self.update_plot)
def update_status(self): def update_status(self):
"""Update status information periodically""" """Update status information periodically"""
@ -478,11 +593,7 @@ class BatteryTester(QMainWindow):
if self.c_rate <= 0: if self.c_rate <= 0:
raise ValueError("C-rate must be positive") raise ValueError("C-rate must be positive")
self.continuous_mode = self.continuous_mode_check.isChecked()
self.measurement_start_time = time.time()
self.test_start_time = time.time()
test_current = self.c_rate * self.capacity test_current = self.c_rate * self.capacity
if test_current > 0.2: if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
@ -515,8 +626,28 @@ class BatteryTester(QMainWindow):
self.stop_button.setEnabled(True) self.stop_button.setEnabled(True)
self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A") self.status_bar.showMessage(f"Test started | Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A")
# Start test sequence in a new thread # Start test sequence in a QThread
self.test_sequence_thread = threading.Thread(target=self.run_test_sequence, daemon=True) self.test_sequence_thread = QThread()
self.test_sequence_worker = TestSequenceWorker(
self.dev,
test_current,
self.charge_cutoff,
self.discharge_cutoff,
self.rest_time,
self.continuous_mode_check.isChecked(),
self # Pass reference to main window for callbacks
)
self.test_sequence_worker.moveToThread(self.test_sequence_thread)
# Connect signals
self.test_sequence_worker.update_phase.connect(self.update_test_phase)
self.test_sequence_worker.update_status.connect(self.status_bar.showMessage)
self.test_sequence_worker.test_completed.connect(self.finalize_test)
self.test_sequence_worker.error_occurred.connect(self.handle_test_error)
self.test_sequence_worker.finished.connect(self.test_sequence_thread.quit)
self.test_sequence_worker.finished.connect(self.test_sequence_worker.deleteLater)
self.test_sequence_thread.finished.connect(self.test_sequence_thread.deleteLater)
self.test_sequence_thread.start() self.test_sequence_thread.start()
except Exception as e: except Exception as e:
@ -595,165 +726,6 @@ class BatteryTester(QMainWindow):
self.finalize_test() self.finalize_test()
def run_test_sequence(self):
try:
test_current = self.c_rate * self.capacity
while self.test_running and (self.continuous_mode or self.cycle_count == 0):
self.request_stop = False
self.cycle_count += 1
self.cycle_label.setText(str(self.cycle_count))
self.create_cycle_log_file()
# 1. Charge phase
self.test_phase = "Charge"
self.phase_label.setText(self.test_phase)
self.status_bar.showMessage(f"Charging to {self.charge_cutoff}V @ {test_current:.3f}A")
self.measuring = True
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].mode = pysmu.Mode.SIMV
self.dev.channels['A'].constant(test_current)
self.charge_capacity = 0.0
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
target_voltage = self.charge_cutoff
self.last_update_time = time.time()
while self.test_running and not self.request_stop:
if not self.voltage_data:
time.sleep(0.1)
continue
current_voltage = self.voltage_data[-1]
measured_current = abs(self.current_data[-1])
# Log data
if hasattr(self, 'current_cycle_file'):
self.log_buffer.append([
f"{time.time() - self.start_time:.3f}",
f"{current_voltage:.6f}",
f"{measured_current:.6f}",
self.test_phase,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}",
f"{self.cycle_count}"
])
if len(self.log_buffer) >= 10:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
if current_voltage >= target_voltage or self.request_stop:
break
time.sleep(0.1)
if self.request_stop or not self.test_running:
break
# 2. Rest period after charge
self.test_phase = "Resting (Post-Charge)"
self.phase_label.setText(self.test_phase)
self.measuring = False
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
rest_end_time = time.time() + (self.rest_time * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.status_bar.showMessage(f"Resting after charge | Time left: {time_left/60:.1f} min")
time.sleep(1)
if self.request_stop or not self.test_running:
break
# 3. Discharge phase
self.test_phase = "Discharge"
self.phase_label.setText(self.test_phase)
self.status_bar.showMessage(f"Discharging to {self.discharge_cutoff}V @ {test_current:.3f}A")
self.measuring = True
self.dev.channels['A'].mode = pysmu.Mode.SIMV
self.dev.channels['A'].constant(-test_current)
self.capacity_ah = 0.0
self.capacity_label.setText(f"{self.capacity_ah:.4f}")
self.last_update_time = time.time()
while self.test_running and not self.request_stop:
if not self.current_data:
time.sleep(0.1)
continue
current_voltage = self.voltage_data[-1]
current_current = abs(self.current_data[-1])
# Log data
if hasattr(self, 'current_cycle_file'):
self.log_buffer.append([
f"{time.time() - self.start_time:.3f}",
f"{current_voltage:.6f}",
f"{current_current:.6f}",
self.test_phase,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}",
f"{self.cycle_count}"
])
if len(self.log_buffer) >= 10:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
if current_voltage <= self.discharge_cutoff or self.request_stop:
break
if not self.continuous_mode_check.isChecked():
self.test_running = False
self.test_phase = "Idle"
self.phase_label.setText(self.test_phase)
break
# 4. Rest period after discharge
if self.test_running and not self.request_stop:
self.test_phase = "Resting (Post-Discharge)"
self.phase_label.setText(self.test_phase)
self.measuring = False
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
rest_end_time = time.time() + (self.rest_time * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.status_bar.showMessage(f"Resting after discharge | Time left: {time_left/60:.1f} min")
time.sleep(1)
# Calculate Coulomb efficiency
if not self.request_stop and self.charge_capacity > 0:
efficiency = (self.capacity_ah / self.charge_capacity) * 100
self.coulomb_efficiency = efficiency
self.efficiency_label.setText(f"{efficiency:.1f}")
self.status_bar.showMessage(
f"Cycle {self.cycle_count} complete | "
f"Discharge: {self.capacity_ah:.3f}Ah | "
f"Charge: {self.charge_capacity:.3f}Ah | "
f"Efficiency: {efficiency:.1f}%"
)
self.write_cycle_summary()
if self.log_buffer:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
self.finalize_test()
except Exception as e:
self.status_bar.showMessage(f"Test error: {str(e)}")
self.finalize_test()
def finalize_test(self): def finalize_test(self):
"""Final cleanup after test completes or is stopped""" """Final cleanup after test completes or is stopped"""
self.measuring = False self.measuring = False
@ -839,14 +811,34 @@ class BatteryTester(QMainWindow):
print(f"Error writing cycle summary: {e}") print(f"Error writing cycle summary: {e}")
def update_plot(self): def update_plot(self):
"""Optimized plot update with change detection""" """More reliable plotting with better error handling"""
if not self.time_data: if not self.time_data or len(self.time_data) != len(self.voltage_data):
print("Plot: No data or mismatched lengths") # Debug
return return
with self.plot_mutex: try:
self.line_voltage.set_data(self.time_data, self.voltage_data) # Create local copies quickly
self.line_current.set_data(self.time_data, self.current_data) with self.plot_mutex:
self.auto_scale_axes() x_data = np.array(self.time_data)
y1_data = np.array(self.voltage_data)
y2_data = np.array(self.current_data)
# Update plot data
self.line_voltage.set_data(x_data, y1_data)
self.line_current.set_data(x_data, y2_data)
# Only auto-scale when needed
if x_data[-1] > self.ax.get_xlim()[1] * 0.8:
self.auto_scale_axes()
# Force redraw
self.canvas.draw_idle()
except Exception as e:
print(f"Plot error: {e}")
# Reset plot on error
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.canvas.draw_idle() self.canvas.draw_idle()
def auto_scale_axes(self): def auto_scale_axes(self):
@ -909,14 +901,61 @@ class BatteryTester(QMainWindow):
self.voltage_data.clear() self.voltage_data.clear()
self.current_data.clear() self.current_data.clear()
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'): @pyqtSlot(str)
self.line_voltage.set_data([], []) def update_test_phase(self, phase_text):
self.line_current.set_data([], []) """Update the test phase display"""
self.ax.set_xlim(0, 1) self.test_phase = phase_text
self.ax2.set_xlim(0, 1) self.phase_label.setText(phase_text)
self.canvas.draw()
self.attempt_reconnect() # Update log if available
if hasattr(self, 'log_buffer'):
current_time = time.time() - self.start_time
self.log_buffer.append([
f"{current_time:.3f}",
"",
"",
phase_text,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}" if hasattr(self, 'coulomb_efficiency') else "0.0",
f"{self.cycle_count}"
])
@pyqtSlot(str)
def handle_test_error(self, error_msg):
"""Handle errors from the test sequence with complete cleanup"""
try:
# 1. Notify user
QMessageBox.critical(self, "Test Error",
f"An error occurred:\n{error_msg}\n\nAttempting to recover...")
# 2. Stop all operations
self.stop_test()
# 3. Reset UI elements
if hasattr(self, 'line_voltage'):
try:
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
except Exception as plot_error:
print(f"Plot reset error: {plot_error}")
# 4. Update status
self.status_bar.showMessage(f"Error: {error_msg} - Reconnecting...")
self.status_light.setStyleSheet("background-color: orange; border-radius: 10px;")
# 5. Attempt recovery
QTimer.singleShot(1000, self.attempt_reconnect) # Delay before reconnect
except Exception as e:
print(f"Error in error handler: {e}")
# Fallback - restart application?
QMessageBox.critical(self, "Fatal Error",
"The application needs to restart due to an unrecoverable error")
QTimer.singleShot(1000, self.close)
def attempt_reconnect(self): def attempt_reconnect(self):
"""Attempt to reconnect automatically""" """Attempt to reconnect automatically"""
@ -968,11 +1007,18 @@ class BatteryTester(QMainWindow):
self.measuring = False self.measuring = False
self.session_active = False self.session_active = False
# Stop measurement thread
if hasattr(self, 'measurement_thread'): if hasattr(self, 'measurement_thread'):
self.measurement_thread.stop() self.measurement_thread.stop()
self.measurement_thread.quit()
self.measurement_thread.wait(1000) # Wait up to 1 second
# Stop test sequence thread
if hasattr(self, 'test_sequence_thread'):
if hasattr(self, 'test_sequence_worker'):
self.test_sequence_worker.stop()
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
# Clean up device session
if hasattr(self, 'session') and self.session: if hasattr(self, 'session') and self.session:
try: try:
self.session.end() self.session.end()