diff --git a/MainCode/adalm1000_logger.py b/MainCode/adalm1000_logger.py index cc26c3e..6b3c3ca 100644 --- a/MainCode/adalm1000_logger.py +++ b/MainCode/adalm1000_logger.py @@ -38,64 +38,60 @@ class MeasurementThread(QThread): def __init__(self, device: object, interval: float = 0.1, start_time: float = None): """Initialize measurement thread.""" super().__init__() + self._mutex = QMutex() + self.data_mutex = QMutex() self.device = device - self.interval = max(0.05, interval) # Minimum interval + self.interval = max(0.05, interval) self._running = False - self._mutex = QMutex() # Thread safety self.filter_window_size = 10 - self.start_time = time.time() + self.start_time = start_time if start_time else time.time() self.last_update_time = self.start_time + + # Configure channels + self.device.channels['A'].mode = pysmu.Mode.SIMV # Channel A for current + self.device.channels['B'].mode = pysmu.Mode.HI_Z # Channel B for voltage def run(self): - """Main measurement loop with enhanced error handling.""" + """Measurement loop for both voltage and current.""" self._running = True voltage_window = deque() current_window = deque() - self.status_signal.emit("Measurement started") + self.last_update_time = time.time() while self._running: try: - # Read samples with timeout - samples = self.device.read( - self.filter_window_size, - timeout=500, - ) - - if not samples: - raise DeviceDisconnectedError("No samples received - device may be disconnected") - - # Process samples (thread-safe) + samples = self.device.read(self.filter_window_size, timeout=500) + if not samples or len(samples) < self.filter_window_size: + continue + with QMutexLocker(self._mutex): - raw_voltage = np.mean([s[1][0] for s in samples]) - raw_current = np.mean([s[0][1] for s in samples]) + # Get voltage from Channel B and current from Channel A + raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage + raw_current = np.mean([s[0][1] for s in samples]) # Channel A current + # Apply moving average filter if len(voltage_window) >= self.filter_window_size: voltage_window.popleft() - current_window.popleft() - voltage_window.append(raw_voltage) - current_window.append(raw_current) - voltage = np.mean(list(voltage_window)) - current = np.mean(list(current_window)) + if len(current_window) >= self.filter_window_size: + current_window.popleft() + current_window.append(raw_current) + + voltage = np.mean(voltage_window) + current = np.mean(current_window) current_time = time.time() - self.start_time - # Emit updates self.update_signal.emit(voltage, current, current_time) - self.last_update_time = time.time() - # Dynamic sleep adjustment elapsed = time.time() - self.last_update_time sleep_time = max(0.01, self.interval - elapsed) time.sleep(sleep_time) + self.last_update_time = time.time() - except DeviceDisconnectedError as e: - self.error_signal.emit(f"Device error: {str(e)}") - break except Exception as e: self.error_signal.emit(f"Measurement error: {str(e)}") - self.status_signal.emit(f"Error: {str(e)}") break self.status_signal.emit("Measurement stopped") @@ -123,24 +119,22 @@ class TestSequenceThread(QThread): error_occurred = pyqtSignal(str) def __init__(self, parent: QObject): - """Initialize test sequence thread. - - Args: - parent: Reference to main BatteryTester instance - """ - super().__init__() + """Initialize test sequence thread.""" + super().__init__(parent) # Pass parent to QThread self.parent = parent - self._mutex = QMutex() + self._mutex = QMutex() # For thread operation control + self.data_mutex = QMutex() # For data protection self._running = False def run(self): - """Execute the complete test sequence.""" + """Execute the complete test sequence with configurable modes.""" self._running = True try: test_current = float(self.parent.c_rate_input.text()) * float(self.parent.capacity_input.text()) charge_cutoff = float(self.parent.charge_cutoff_input.text()) discharge_cutoff = float(self.parent.discharge_cutoff_input.text()) + cv_cutoff_current = float(self.parent.cv_cutoff_input.text()) # Add this input field while self._running and (self.parent.continuous_mode or self.parent.cycle_count == 0): with QMutexLocker(self._mutex): @@ -150,18 +144,26 @@ class TestSequenceThread(QThread): self.parent.cycle_count += 1 cycle = self.parent.cycle_count - # Charge phase - self._execute_phase("charge", test_current, charge_cutoff, discharge_cutoff, charge_cutoff) + # CC Charge phase + self._execute_phase("charge", test_current, charge_cutoff, + discharge_cutoff, charge_cutoff) if not self._running: break + # Optional CV Charge phase + if self.parent.cv_mode_enabled: # Add checkbox in UI + self._execute_cv_charge(charge_cutoff, test_current, cv_cutoff_current) + if not self._running: + break + # Rest after charge self._execute_rest("post-charge") if not self._running: break - # Discharge phase - self._execute_phase("discharge", test_current, discharge_cutoff, discharge_cutoff, charge_cutoff) + # Discharge phase (CC) + self._execute_phase("discharge", test_current, discharge_cutoff, + discharge_cutoff, charge_cutoff) if not self.parent.continuous_mode: break @@ -179,40 +181,16 @@ class TestSequenceThread(QThread): self.error_occurred.emit(str(e)) finally: self._running = False - - def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float): + + def _execute_cv_charge(self, target_voltage: float, current_limit: float, cutoff_current: float): + """Execute constant voltage charge phase.""" try: - print(f"\n=== Starting {phase} phase ===") - print(f"Device session active: {self.parent.session_active}") - print(f"Channel A mode before: {self.parent.dev.channels['A'].mode}") + self.progress_updated.emit(0.0, "CV Charge") - # Reset channel first - self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.parent.dev.channels['A'].constant(0) - time.sleep(0.5) # Increased settling time + # Configure for CV mode + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + self.parent.dev.channels['A'].constant(target_voltage) - # Configure for current mode - if phase == "charge": - print(f"Starting CHARGE at {current}A to {target_voltage}V") - self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV - time.sleep(0.1) # Additional delay - self.parent.dev.channels['A'].constant(current) - else: - print(f"Starting DISCHARGE at {-current}A to {target_voltage}V") - self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV - time.sleep(0.1) # Additional delay - self.parent.dev.channels['A'].constant(-current) - - time.sleep(0.5) # Allow more settling time - - # Verify current setting - samples = self.parent.dev.read(10, timeout=1000) # More samples, longer timeout - measured_current = np.mean([s[0][1] for s in samples]) - print(f"Requested {current}A, Measured {measured_current:.6f}A") # More precision - print(f"Channel A mode after: {self.parent.dev.channels['A'].mode}") - - # Rest of your existing loop... - time.sleep(0.1) start_time = time.time() last_update = start_time @@ -221,21 +199,113 @@ class TestSequenceThread(QThread): if self.parent.request_stop: break - if not self.parent.voltage_data: + if not self.parent.test_data['voltage']: time.sleep(0.1) continue - current_voltage = self.parent.voltage_data[-1] + current_voltage = self.parent.test_data['voltage'][-1] + current_current = abs(self.parent.test_data['current'][-1]) + current_time = time.time() + delta_t = current_time - last_update + last_update = current_time + + # Update charge capacity + self.parent.charge_capacity += current_current * delta_t / 3600 + + # Calculate progress based on current + progress = 1 - (current_current / current_limit) + progress = max(0.0, min(1.0, progress)) + self.progress_updated.emit(progress, "CV Charge") + + # Check termination condition + if current_current <= cutoff_current: + break + + time.sleep(0.1) + + except Exception as e: + self.error_occurred.emit(f"CV Charge error: {str(e)}") + raise + finally: + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + + def _execute_phase(self, phase: str, current: float, target_voltage: float, discharge_cutoff: float, charge_cutoff: float): + try: + print(f"\n=== Starting {phase} phase ===") + print(f"Device session active: {self.parent.session_active}") + print(f"Channel A mode before: {self.parent.dev.channels['A'].mode}") + + # Reset channel first with longer delay + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.parent.dev.channels['A'].constant(0) + time.sleep(1.0) # Increased settling time + + # Configure for current mode with explicit setup + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + time.sleep(0.5) # Additional delay for mode change + + # Set current with proper polarity + if phase == "charge": + print(f"Starting CHARGE at {current}A to {target_voltage}V") + self.parent.dev.channels['A'].constant(current) + else: + print(f"Starting DISCHARGE at {-current}A to {target_voltage}V") + self.parent.dev.channels['A'].constant(-current) + + time.sleep(1.0) # Allow more settling time + + print(f"Set constant current to: {current} A on channel A") + + # Verify current setting with more samples + samples = self.parent.dev.read(50, timeout=2000) # More samples, longer timeout + measured_current = np.mean([s[0][1] for s in samples]) + print(f"Requested {current}A, Measured {measured_current:.6f}A") + + # Additional debug info + print(f"Channel A mode: {self.parent.dev.channels['A'].mode}") + + if abs(measured_current) < 0.001: # If current is still near zero + print("Warning: Current not being applied - checking connection") + # Try resetting the device session + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + time.sleep(0.5) + self.parent.dev.channels['A'].mode = pysmu.Mode.SIMV + time.sleep(0.5) + self.parent.dev.channels['A'].constant(current if phase == "charge" else -current) + time.sleep(1.0) + + # Re-measure + samples = self.parent.dev.read(50, timeout=2000) + measured_current = np.mean([s[0][1] for s in samples]) + print(f"After reset: Requested {current}A, Measured {measured_current:.6f}A") + + if abs(measured_current) < 0.001: # Still no current + raise RuntimeError(f"Failed to apply {current}A - check device connection and battery") + + # Main phase loop + start_time = time.time() + last_update = start_time + + while self._running: + with QMutexLocker(self._mutex): + if self.parent.request_stop: + break + + if not self.parent.test_data['voltage']: + time.sleep(0.1) + continue + + current_voltage = self.parent.test_data['voltage'][-1] current_time = time.time() delta_t = current_time - last_update last_update = current_time # Update capacity if phase == "charge": - self.parent.charge_capacity += abs(self.parent.current_data[-1]) * delta_t / 3600 + self.parent.charge_capacity += abs(self.parent.test_data['current'][-1]) * delta_t / 3600 progress = (current_voltage - discharge_cutoff) / (target_voltage - discharge_cutoff) else: - self.parent.capacity_ah += abs(self.parent.current_data[-1]) * delta_t / 3600 + self.parent.capacity_ah += abs(self.parent.test_data['current'][-1]) * delta_t / 3600 progress = (charge_cutoff - current_voltage) / (charge_cutoff - target_voltage) progress = max(0.0, min(1.0, progress)) @@ -258,40 +328,13 @@ class TestSequenceThread(QThread): except Exception as e: self.error_occurred.emit(f"{phase.capitalize()} error: {str(e)}") raise - - def _execute_rest(self, phase: str): - """Execute rest phase. - - Args: - phase: Description of rest phase - """ - try: - self.progress_updated.emit(0.0, f"Resting ({phase})") - self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z - self.parent.dev.channels['A'].constant(0) - - rest_time = float(self.parent.rest_time_input.text()) * 3600 - rest_end = time.time() + rest_time - - while time.time() < rest_end and self._running: - with QMutexLocker(self._mutex): - if self.parent.request_stop: - break - - progress = 1 - (rest_end - time.time()) / rest_time - self.progress_updated.emit(progress, f"Resting ({phase})") - time.sleep(1) - - except Exception as e: - self.error_occurred.emit(f"Rest phase error: {str(e)}") - raise - - def stop(self): - """Safely stop the test sequence.""" - with QMutexLocker(self._mutex): - self._running = False - self.wait(500) # Wait up to 500ms for clean exit - + finally: + # Ensure channel is reset even if error occurs + try: + self.parent.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.parent.dev.channels['A'].constant(0) + except: + pass # Best effort cleanup class BatteryTester(QMainWindow): """Main application window for battery capacity testing.""" @@ -299,22 +342,34 @@ class BatteryTester(QMainWindow): error_signal = pyqtSignal(str) def __init__(self): - """Initialize the battery tester application.""" + """Initialize the battery tester application with enhanced features.""" super().__init__() self.error_signal.connect(self.handle_device_error) + + # Initialize thread-safety objects + self._mutex = QMutex() # For general thread safety + self.data_mutex = QMutex() # For test data protection - # Initialize data buffers - self.time_data = deque() - self.voltage_data = deque() - self.current_data = deque() - self.phase_data = deque() + # Enhanced data structure for flexible mode support + self.test_data = { + 'time': deque(), + 'voltage': deque(), + 'current': deque(), + 'mode': deque(), # Tracks operation mode (CC, CV, etc.) + 'phase': deque(), # Tracks charge/discharge/rest + 'capacity': deque(), + 'energy': deque() + } - # Test variables + # Test control variables self.test_phase = "Ready" - self.capacity_ah = 0.0 - self.charge_capacity = 0.0 + self.current_mode = "None" # Tracks current operation mode + self.capacity_ah = 0.0 # Discharge capacity + self.charge_capacity = 0.0 # Charge capacity + self.energy_wh = 0.0 # Energy in watt-hours self.coulomb_efficiency = 0.0 self.cycle_count = 0 + self.cycle_data = [] # Stores cycle statistics # Color scheme self.bg_color = QColor(46, 52, 64) @@ -322,26 +377,32 @@ class BatteryTester(QMainWindow): self.accent_color = QColor(94, 129, 172) self.warning_color = QColor(191, 97, 106) self.success_color = QColor(163, 190, 140) + self.cv_color = QColor(143, 188, 187) # Additional color for CV mode - # Device status + # Device and test status self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False + self.cv_mode_enabled = False # CV charge mode flag self.request_stop = False - self.interval = 0.1 + self.interval = 0.1 # Measurement interval + + # Logging configuration self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) self.start_time = time.time() + self.log_buffer = [] + self.current_cycle_file = None # Thread management self.measurement_thread = None self.test_thread = None - # Initialize UI + # Initialize UI with all controls self._setup_ui() - # Initialize device with delay + # Initialize device with delay to avoid USB issues QTimer.singleShot(100, self.safe_init_device) def _setup_ui(self): @@ -477,6 +538,13 @@ class BatteryTester(QMainWindow): params_frame.setFrameShape(QFrame.StyledPanel) params_layout = QGridLayout(params_frame) + # Add CV cutoff current input + params_layout.addWidget(QLabel("CV Cutoff Current (A):"), 4, 0) + self.cv_cutoff_input = QLineEdit("0.02") # 20mA default + self.cv_cutoff_input.setFixedWidth(80) + self.cv_cutoff_input.setToolTip("Current at which CV charging should stop") + params_layout.addWidget(self.cv_cutoff_input, 4, 1) + # Battery capacity params_layout.addWidget(QLabel("Battery Capacity (Ah):"), 0, 0) self.capacity_input = QLineEdit("0.2") @@ -519,6 +587,12 @@ class BatteryTester(QMainWindow): button_frame = QWidget() button_layout = QVBoxLayout(button_frame) button_layout.setContentsMargins(0, 0, 0, 0) + + # Add CV mode checkbox and cutoff current input + self.cv_mode_check = QCheckBox("Enable CV Charge") + self.cv_mode_check.setChecked(False) + self.cv_mode_check.setToolTip("Enable constant voltage charge phase after CC charge") + button_layout.addWidget(self.cv_mode_check) self.start_button = QPushButton("START TEST") self.start_button.clicked.connect(self.start_test) @@ -557,6 +631,7 @@ class BatteryTester(QMainWindow): self.continuous_check.setChecked(True) self.continuous_check.setToolTip("Run multiple charge/discharge cycles until stopped") button_layout.addWidget(self.continuous_check) + button_layout.addWidget(self.cv_mode_check) controls_layout.addWidget(button_frame) self.main_layout.addWidget(controls_frame) @@ -632,7 +707,7 @@ class BatteryTester(QMainWindow): print("Waiting before initializing session...") time.sleep(1.5) # Delay helps avoid "device busy" issues - self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) + self.session = pysmu.Session() # 🔍 Log detected devices print(f"Devices found: {self.session.devices}") @@ -664,34 +739,44 @@ class BatteryTester(QMainWindow): raise Exception(f"Device initialization failed: {str(e)}") def cleanup_device(self): - """Clean up device resources.""" + """Clean up device resources efficiently with essential safeguards.""" print("Cleaning up device session...") - # Stop measurement thread - if self.measurement_thread is not None: - try: - self.measurement_thread.stop() - if not self.measurement_thread.wait(1000): - print("Warning: Measurement thread didn't stop cleanly") - self.measurement_thread = None - except Exception as e: - print(f"Error stopping measurement thread: {e}") - - # Stop and delete session - if hasattr(self, 'session'): + # Stop threads with timeout + for thread in [self.measurement_thread, getattr(self, 'test_thread', None)]: + if thread and thread.isRunning(): + try: + thread.stop() + if not thread.wait(800): # Reduced timeout + thread.terminate() + print(f"Warning: {thread.__class__.__name__} required termination") + except Exception as e: + print(f"Error stopping {thread.__class__.__name__}: {str(e)}") + + # Clean up session if it exists + if getattr(self, 'session', None): try: if self.session_active: - time.sleep(0.1) + # Quick channel reset if device exists + if hasattr(self, 'dev'): + try: + self.dev.channels['A'].mode = pysmu.Mode.HI_Z + self.dev.channels['B'].mode = pysmu.Mode.HI_Z + except: + pass # Best effort cleanup + self.session.end() - self.session_active = False del self.session - print("Session ended successfully") except Exception as e: - print(f"Error ending session: {e}") + print(f"Session cleanup error: {str(e)}") finally: self.session_active = False - # Reset UI indicators + # Reset all states and UI + self.measuring = False + self.test_running = False + self.request_stop = True + self.status_light.setStyleSheet("background-color: red; border-radius: 10px;") self.connection_label.setText("Disconnected") self.start_button.setEnabled(False) @@ -699,6 +784,10 @@ class BatteryTester(QMainWindow): def start_measurement_thread(self): """Start the measurement thread.""" + if not hasattr(self, 'dev') or self.dev is None: + print("Device not initialized, cannot start measurement thread") + return + if self.measurement_thread is not None: self.measurement_thread.stop() self.measurement_thread.wait(500) @@ -754,64 +843,127 @@ class BatteryTester(QMainWindow): pass def start_test(self): - """Start the complete battery test cycle.""" - if not self.test_running: - try: - # Get and validate input values - capacity = float(self.capacity_input.text()) - charge_cutoff = float(self.charge_cutoff_input.text()) - discharge_cutoff = float(self.discharge_cutoff_input.text()) - c_rate = float(self.c_rate_input.text()) - - if capacity <= 0: - raise ValueError("Battery capacity must be positive") - if charge_cutoff <= discharge_cutoff: - raise ValueError("Charge cutoff must be higher than discharge cutoff") - if c_rate <= 0: - raise ValueError("C-rate must be positive") - - self.continuous_mode = self.continuous_check.isChecked() - test_current = c_rate * capacity - - if test_current > 0.2: - raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") - - # Reset data - self.time_data.clear() - self.voltage_data.clear() - self.current_data.clear() - self.capacity_ah = 0.0 - self.charge_capacity = 0.0 - self.coulomb_efficiency = 0.0 - self.cycle_count = 0 - self.start_time = time.time() - self.time_label.setText("00:00:00") - self.reset_plot() - - # Prepare log file - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") - - # Start test - self.test_running = True + """Start the complete battery test cycle with proper file initialization.""" + # Check if test is already running + if self.test_running: + return + + # Verify thread safety objects exist + if not hasattr(self, 'data_mutex') or self.data_mutex is None: + QMessageBox.critical(self, "Error", "Thread safety not initialized") + return + + try: + with QMutexLocker(self.data_mutex): + # Get and validate input values with error handling + try: + capacity = float(self.capacity_input.text()) + charge_cutoff = float(self.charge_cutoff_input.text()) + discharge_cutoff = float(self.discharge_cutoff_input.text()) + c_rate = float(self.c_rate_input.text()) + cv_cutoff = float(self.cv_cutoff_input.text()) + + if capacity <= 0: + raise ValueError("Battery capacity must be positive") + if charge_cutoff <= discharge_cutoff: + raise ValueError("Charge cutoff must be higher than discharge cutoff") + if c_rate <= 0: + raise ValueError("C-rate must be positive") + + test_current = c_rate * capacity + if test_current > 0.2: + raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") + + except ValueError as e: + QMessageBox.critical(self, "Input Error", str(e)) + return + + # Reset test state and data structures + try: + for key in self.test_data: + self.test_data[key].clear() + self.capacity_ah = 0.0 + self.charge_capacity = 0.0 + self.coulomb_efficiency = 0.0 + self.cycle_count = 0 + except Exception as e: + QMessageBox.critical(self, "Data Error", f"Couldn't reset test data: {str(e)}") + return + + # Initialize timing self.start_time = time.time() - self.test_phase = "Initial Discharge" - self.phase_label.setText(self.test_phase) - - self.start_button.setEnabled(False) - self.stop_button.setEnabled(True) - self.status_bar.setText(f"Test started | Discharging to {discharge_cutoff}V @ {test_current:.3f}A") - - # Start test sequence in separate thread - self.test_thread = TestSequenceThread(self) - self.test_thread.progress_updated.connect(self.update_test_progress) - self.test_thread.cycle_completed.connect(self.update_cycle_stats) - self.test_thread.error_occurred.connect(self.handle_device_error) - self.test_thread.finished.connect(self.finalize_test) - self.test_thread.start() - - except Exception as e: - QMessageBox.critical(self, "Error", str(e)) + self.time_label.setText("00:00:00") + self.reset_plot() + + # Initialize log file with error handling + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv") + + with open(self.filename, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow([ + 'Time(s)', 'Voltage(V)', 'Current(A)', 'Mode', + 'Phase', 'Discharge(Ah)', 'Charge(Ah)', 'Efficiency(%)', + 'Cycle' + ]) + + self.current_cycle_file = open(self.filename, 'a', newline='') + self.log_writer = csv.writer(self.current_cycle_file) + self.log_buffer = [] + + except Exception as e: + QMessageBox.critical( + self, + "File Error", + f"Failed to initialize log file:\n{str(e)}\n\n" + f"Check directory permissions:\n{self.log_dir}" + ) + return + + # Start test with thread safety + try: + self.test_running = True + self.request_stop = False + self.test_phase = "Initializing" + self.current_mode = "CC" + self.continuous_mode = self.continuous_check.isChecked() + self.cv_mode_enabled = self.cv_mode_check.isChecked() + + # UI updates + self.start_button.setEnabled(False) + self.stop_button.setEnabled(True) + self.status_bar.setText( + f"Test started | Target: {discharge_cutoff}V @ {test_current:.3f}A | " + f"CV Mode: {'ON' if self.cv_mode_enabled else 'OFF'}" + ) + + # Start test thread with cleanup guard + if self.test_thread and self.test_thread.isRunning(): + self.test_thread.stop() + + self.test_thread = TestSequenceThread(self) + self.test_thread.progress_updated.connect(self.update_test_progress) + self.test_thread.cycle_completed.connect(self.update_cycle_stats) + self.test_thread.error_occurred.connect(self.handle_device_error) + self.test_thread.finished.connect(self.finalize_test) + self.test_thread.start() + + except Exception as e: + self.finalize_test(show_message=False) + QMessageBox.critical( + self, + "Test Error", + f"Failed to start test thread:\n{str(e)}" + ) + + except Exception as e: + self.finalize_test(show_message=False) + QMessageBox.critical( + self, + "Critical Error", + f"Unexpected error in test initialization:\n{str(e)}" + ) def update_test_progress(self, progress: float, phase: str): """Update test progress and phase display.""" @@ -867,84 +1019,159 @@ class BatteryTester(QMainWindow): self.finalize_test(show_message=False) def finalize_test(self, show_message: bool = True): - """Final cleanup after test completion.""" + """Final cleanup after test completion with robust error handling.""" + # Set up error tracking + errors = [] + try: - # Write log data - if hasattr(self, 'log_buffer') and self.log_buffer: + # 1. Handle log buffer writing + if getattr(self, 'log_writer', None) is not None: try: - with open(self.filename, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerows(self.log_buffer) - self.log_buffer.clear() + if getattr(self, 'log_buffer', None): + try: + self.log_writer.writerows(self.log_buffer) + self.log_buffer.clear() + except Exception as e: + errors.append(f"Failed to write log buffer: {str(e)}") except Exception as e: - print(f"Error writing log buffer: {e}") + errors.append(f"Log buffer access error: {str(e)}") - # Close log file - if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + # 2. Handle file closure + if getattr(self, 'current_cycle_file', None) is not None: try: - self.current_cycle_file.flush() - os.fsync(self.current_cycle_file.fileno()) - self.current_cycle_file.close() + file = self.current_cycle_file + if not file.closed: + try: + file.flush() + os.fsync(file.fileno()) + except Exception as e: + errors.append(f"Failed to sync file: {str(e)}") + finally: + file.close() except Exception as e: - print(f"Error closing log file: {e}") + errors.append(f"File closure error: {str(e)}") + finally: + if hasattr(self, 'current_cycle_file'): + del self.current_cycle_file + if hasattr(self, 'log_writer'): + del self.log_writer + + # 3. Clean up thread references + try: + if hasattr(self, 'test_thread') and self.test_thread is not None: + if self.test_thread.isRunning(): + self.test_thread.stop() + if not self.test_thread.wait(500): # 500ms timeout + errors.append("Test thread didn't stop cleanly") + self.test_thread = None + except Exception as e: + errors.append(f"Thread cleanup error: {str(e)}") + + # 4. Reset test state + try: + self.test_running = False + self.request_stop = True + self.measuring = False + self.stop_button.setEnabled(False) + self.start_button.setEnabled(True) + except Exception as e: + errors.append(f"State reset error: {str(e)}") + + # 5. Show completion message if requested + if show_message and self.isVisible(): + try: + msg = "Test completed" + if errors: + msg += f"\n\nMinor issues occurred:\n- " + "\n- ".join(errors) + + QMessageBox.information( + self, + "Test Complete", + f"{msg}\nCycles: {self.cycle_count}", + QMessageBox.Ok + ) + except Exception as e: + print(f"Failed to show completion message: {str(e)}") - # Show notification - if show_message: - msg_box = QMessageBox(self) - msg_box.setWindowFlags(msg_box.windowFlags() | - Qt.WindowStaysOnTopHint) - msg_box.setIcon(QMessageBox.Information) - msg_box.setWindowTitle("Test Complete") - msg_box.setText(f"Test completed\nCycles: {self.cycle_count}") - msg_box.exec_() - except Exception as e: - print(f"Critical error in finalize_test: {e}") - finally: - # Reset test status + # Catastrophic error handling + print(f"Critical error in finalize_test: {str(e)}") + try: + if hasattr(self, 'current_cycle_file'): + try: + self.current_cycle_file.close() + except: + pass + except: + pass + + # Ensure basic state is reset self.test_running = False self.request_stop = True self.measuring = False + finally: + # Final cleanup guarantees + try: + if hasattr(self, 'log_buffer'): + self.log_buffer.clear() + except: + pass + def update_measurements(self, voltage: float, current: float, current_time: float): - """Update measurements in the UI.""" - if len(self.time_data) > 10000: # Limit data points - self.time_data.popleft() - self.voltage_data.popleft() - self.current_data.popleft() + """Update measurements with enhanced data structure.""" + if not hasattr(self, '_mutex'): + print("Critical Error: Mutex not initialized") + return - self.time_data.append(current_time) - self.voltage_data.append(voltage) - self.current_data.append(current) - - # Update UI - self.voltage_label.setText(f"{voltage:.4f}") - self.current_label.setText(f"{current:.4f}") - self.time_label.setText(self.format_time(current_time)) - - # Update plot periodically - if len(self.time_data) % 10 == 0: - self.update_plot() - - # Log data if test is running - if self.test_running and hasattr(self, 'current_cycle_file'): - self.log_buffer.append([ - f"{current_time:.3f}", - f"{voltage:.6f}", - f"{current:.6f}", - self.test_phase, - f"{self.capacity_ah:.4f}", - f"{self.charge_capacity:.4f}", - f"{self.coulomb_efficiency:.1f}", - f"{self.cycle_count}" - ]) - - # Write data in blocks - if len(self.log_buffer) >= 10: - with open(self.filename, 'a', newline='') as f: - writer = csv.writer(f) - writer.writerows(self.log_buffer) - self.log_buffer.clear() + try: + with QMutexLocker(self._mutex): + # Limit data points + max_points = 10000 + for key in self.test_data: + if len(self.test_data[key]) > max_points: + self.test_data[key].popleft() + + # Store data + self.test_data['time'].append(current_time) + self.test_data['voltage'].append(voltage) + self.test_data['current'].append(current) + self.test_data['phase'].append(self.test_phase) + self.test_data['mode'].append(self.current_mode) + + # Update UI + self.voltage_label.setText(f"{voltage:.4f}") + self.current_label.setText(f"{current:.4f}") + self.time_label.setText(self.format_time(current_time)) + + # Update plot periodically + if len(self.test_data['time']) % 10 == 0: + self.update_plot() + + # Log data if test is running + if self.test_running and hasattr(self, 'log_writer'): + self.log_buffer.append([ + f"{current_time:.3f}", + f"{voltage:.6f}", + f"{current:.6f}", + self.test_phase, + f"{self.capacity_ah:.4f}", + f"{self.charge_capacity:.4f}", + f"{self.coulomb_efficiency:.1f}", + f"{self.cycle_count}" + ]) + + # Write data in blocks + if len(self.log_buffer) >= 10: + try: + self.log_writer.writerows(self.log_buffer) + if hasattr(self, 'current_cycle_file') and self.current_cycle_file: + self.current_cycle_file.flush() + self.log_buffer.clear() + except Exception as e: + print(f"Log write error: {e}") + except Exception as e: + print(f"Error in update_measurements: {str(e)}") def write_cycle_summary(self): """Write cycle summary to the current cycle's log file""" @@ -969,22 +1196,28 @@ class BatteryTester(QMainWindow): print(f"Error writing cycle summary: {e}") def update_plot(self): - """Update the plot with new data.""" - if not self.time_data: + """Update the plot with new data using test_data structure.""" + if not self.test_data['time']: return - self.line_voltage.set_data(list(self.time_data), list(self.voltage_data)) - self.line_current.set_data(list(self.time_data), list(self.current_data)) + self.line_voltage.set_data( + list(self.test_data['time']), + list(self.test_data['voltage']) + ) + self.line_current.set_data( + list(self.test_data['time']), + list(self.test_data['current']) + ) self.auto_scale_axes() self.canvas.draw_idle() def auto_scale_axes(self): - """Automatically adjust plot axes.""" - if not self.time_data: + """Automatically adjust plot axes using test_data.""" + if not self.test_data['time']: return # X-axis adjustment - max_time = list(self.time_data)[-1] + max_time = list(self.test_data['time'])[-1] current_xlim = self.ax.get_xlim() if max_time > current_xlim[1] * 0.95: new_xmax = max_time * 1.10 # 10% padding @@ -992,23 +1225,17 @@ class BatteryTester(QMainWindow): self.ax2.set_xlim(0, new_xmax) # Y-axes adjustment - if self.voltage_data: + if self.test_data['voltage']: voltage_padding = 0.2 - min_v = max(0, min(list(self.voltage_data)) - voltage_padding) - max_v = min(5.0, max(list(self.voltage_data)) + voltage_padding) - current_ylim = self.ax.get_ylim() - new_min = current_ylim[0] + (min_v - current_ylim[0]) * 0.1 - new_max = current_ylim[1] + (max_v - current_ylim[1]) * 0.1 - self.ax.set_ylim(new_min, new_max) + min_v = max(0, min(list(self.test_data['voltage'])) - voltage_padding) + max_v = min(5.0, max(list(self.test_data['voltage'])) + voltage_padding) + self.ax.set_ylim(min_v, max_v) - if self.current_data: + if self.test_data['current']: current_padding = 0.05 - min_c = max(-0.25, min(list(self.current_data)) - current_padding) - max_c = min(0.25, max(list(self.current_data)) + current_padding) - current_ylim = self.ax2.get_ylim() - new_min = current_ylim[0] + (min_c - current_ylim[0]) * 0.1 - new_max = current_ylim[1] + (max_c - current_ylim[1]) * 0.1 - self.ax2.set_ylim(new_min, new_max) + min_c = max(-0.25, min(list(self.test_data['current'])) - current_padding) + max_c = min(0.25, max(list(self.test_data['current'])) + current_padding) + self.ax2.set_ylim(min_c, max_c) @staticmethod def format_time(seconds: float) -> str: