# -*- coding: utf-8 -*- import os import time import csv import threading from datetime import datetime import tkinter as tk from tkinter import ttk, messagebox import pysmu import numpy as np import matplotlib matplotlib.use('TkAgg') from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure from collections import deque class DeviceDisconnectedError(Exception): pass class BatteryTester: def __init__(self, root): self._after_ids = set() # Track scheduled callbacks self._after_lock = threading.Lock() # Color scheme self.bg_color = "#2E3440" self.fg_color = "#D8DEE9" self.accent_color = "#5E81AC" self.warning_color = "#BF616A" self.success_color = "#A3BE8C" # Main window configuration self.root = root self.root.title("ADALM1000 - Battery Capacity Tester (CC Test)") self.root.geometry("1000x800") self.root.minsize(800, 700) self.root.configure(bg=self.bg_color) # Device and measurement state self.session_active = False self.measuring = False self.test_running = False self.continuous_mode = False self.request_stop = False self.interval = 0.1 self.log_dir = os.path.expanduser("~/adalm1000/logs") os.makedirs(self.log_dir, exist_ok=True) # Battery test parameters self.capacity = tk.DoubleVar(value=0.2) # Battery capacity in Ah self.charge_cutoff = tk.DoubleVar(value=1.45) # Charge cutoff voltage self.discharge_cutoff = tk.DoubleVar(value=0.9) # Discharge cutoff voltage self.rest_time = tk.DoubleVar(value=0.1) # Rest time in hours self.c_rate = tk.DoubleVar(value=0.1) # C-rate for test (default C/5 = 0.2) # Test progress tracking self.test_phase = tk.StringVar(value="Idle") self.capacity_ah = tk.DoubleVar(value=0.0) self.charge_capacity = tk.DoubleVar(value=0.0) # capacity tracking self.coulomb_efficiency = tk.DoubleVar(value=0.0) # efficiency calculation self.cycle_count = tk.IntVar(value=0) # cycle counting # Data buffers self.time_data = deque() self.voltage_data = deque() self.current_data = deque() self.phase_data = deque() # Initialize UI and device self.setup_ui() self.init_device() # Ensure proper cleanup self.root.protocol("WM_DELETE_WINDOW", self.on_close) def setup_ui(self): """Configure the user interface""" self.style = ttk.Style() self.style.theme_use('clam') # Configure styles self.style.configure('.', background=self.bg_color, foreground=self.fg_color) self.style.configure('TFrame', background=self.bg_color) self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color) self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color, padding=6, font=('Helvetica', 10, 'bold')) self.style.map('TButton', background=[('active', self.accent_color), ('disabled', '#4C566A')], foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')]) self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color) self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color) self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold')) self.style.configure('Status.TLabel', font=('Helvetica', 10)) self.style.configure('Warning.TButton', background=self.warning_color) self.style.configure('Success.TButton', background=self.success_color) # Main layout self.content_frame = ttk.Frame(self.root) self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # Header area header_frame = ttk.Frame(self.content_frame) header_frame.pack(fill=tk.X, pady=(0, 20)) ttk.Label(header_frame, text="ADALM1000 Battery Capacity Tester (CC Test)", style='Header.TLabel').pack(side=tk.LEFT) # Status indicator self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0) self.status_light.pack(side=tk.RIGHT, padx=10) self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red') self.connection_label = ttk.Label(header_frame, text="Disconnected") self.connection_label.pack(side=tk.RIGHT) # Reconnect button self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device) self.reconnect_btn.pack(side=tk.RIGHT, padx=10) # Measurement display display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15) display_frame.pack(fill=tk.BOTH, expand=False) # Measurement values measurement_labels = [ ("Voltage (V)", "V"), ("Current (A)", "A"), ("Test Phase", ""), ("Elapsed Time", "s"), ("Discharge Capacity", "Ah"), ("Charge Capacity", "Ah"), ("Coulomb Eff.", "%"), ("Cycle Count", ""), ] for i, (label, unit) in enumerate(measurement_labels): ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i//2, column=(i%2)*2, sticky=tk.W, pady=5) value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel') value_label.grid(row=i//2, column=(i%2)*2+1, sticky=tk.W, padx=10) if unit: ttk.Label(display_frame, text=unit).grid(row=i//2, column=(i%2)*2+2, sticky=tk.W) if i == 0: self.voltage_label = value_label elif i == 1: self.current_label = value_label elif i == 2: self.phase_label = value_label elif i == 3: self.time_label = value_label elif i == 4: self.capacity_label = value_label elif i == 5: self.charge_capacity_label = value_label elif i == 6: self.efficiency_label = value_label elif i == 7: self.cycle_label = value_label # Control area controls_frame = ttk.Frame(self.content_frame) controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0) # Parameters frame params_frame = ttk.LabelFrame(controls_frame, text="Test Parameters", padding=10) params_frame.pack(side=tk.LEFT, fill=tk.X, expand=True) # Battery capacity ttk.Label(params_frame, text="Battery Capacity (Ah):").grid(row=0, column=0, sticky=tk.W) ttk.Entry(params_frame, textvariable=self.capacity, width=6).grid(row=0, column=1, padx=5, sticky=tk.W) # Charge cutoff ttk.Label(params_frame, text="Charge Cutoff (V):").grid(row=1, column=0, sticky=tk.W) ttk.Entry(params_frame, textvariable=self.charge_cutoff, width=6).grid(row=1, column=1, padx=5, sticky=tk.W) # Discharge cutoff ttk.Label(params_frame, text="Discharge Cutoff (V):").grid(row=2, column=0, sticky=tk.W) ttk.Entry(params_frame, textvariable=self.discharge_cutoff, width=6).grid(row=2, column=1, padx=5, sticky=tk.W) # Rest time ttk.Label(params_frame, text="Rest Time (hours):").grid(row=3, column=0, sticky=tk.W) ttk.Entry(params_frame, textvariable=self.rest_time, width=6).grid(row=3, column=1, padx=5, sticky=tk.W) # C-rate for test (C/5 by default) ttk.Label(params_frame, text="Test C-rate:").grid(row=0, column=2, sticky=tk.W, padx=(10,0)) ttk.Entry(params_frame, textvariable=self.c_rate, width=4).grid(row=0, column=3, padx=5, sticky=tk.W) ttk.Label(params_frame, text="(e.g., 0.2 for C/5)").grid(row=0, column=4, sticky=tk.W) # Start/Stop buttons button_frame = ttk.Frame(controls_frame) button_frame.pack(side=tk.RIGHT, padx=10) self.start_button = ttk.Button(button_frame, text="START TEST", command=self.start_test, style='TButton') self.start_button.pack(side=tk.TOP, pady=5) self.stop_button = ttk.Button(button_frame, text="STOP TEST", command=self.stop_test, style='Warning.TButton', state=tk.DISABLED) self.stop_button.pack(side=tk.TOP, pady=5) # Continuous mode checkbox self.continuous_var = tk.BooleanVar(value=True) ttk.Checkbutton(button_frame, text="Continuous Mode", variable=self.continuous_var).pack(side=tk.TOP, pady=5) # Plot area self.plot_frame = ttk.Frame(self.content_frame) self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5)) self.setup_plot() # Status bar self.status_var = tk.StringVar() self.status_var.set("Ready") self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W) self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28) def setup_plot(self): """Configure the matplotlib plot""" self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440') # Adjust margins to give more space on right side self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15) self.ax = self.fig.add_subplot(111) self.ax.set_facecolor('#3B4252') # Set initial voltage range based on charge/discharge cutoffs ±0.2V voltage_padding = 0.2 min_voltage = max(0, self.discharge_cutoff.get() - voltage_padding) max_voltage = self.charge_cutoff.get() + voltage_padding self.ax.set_ylim(min_voltage, max_voltage) # Voltage plot self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2) self.ax.set_ylabel("Voltage (V)", color='#00BFFF') self.ax.tick_params(axis='y', labelcolor='#00BFFF') # Current plot (right axis) - set initial range based on test current self.ax2 = self.ax.twinx() current_padding = 0.05 test_current = self.c_rate.get() * self.capacity.get() max_current = test_current * 1.5 # Add 50% padding self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding) self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2) self.ax2.set_ylabel("Current (A)", color='r') self.ax2.tick_params(axis='y', labelcolor='r') self.ax.set_xlabel('Time (s)', color=self.fg_color) self.ax.set_title('Battery Test (CC)', color=self.fg_color) self.ax.tick_params(axis='x', colors=self.fg_color) self.ax.grid(True, color='#4C566A') # Position legends to avoid overlap self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99)) self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99)) # Embed plot self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame) self.canvas.draw() canvas_widget = self.canvas.get_tk_widget() canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0) canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=(10, 0)) def init_device(self): """Initialize the ADALM1000 device with continuous measurement""" try: # First try to clean up any existing session if hasattr(self, 'session'): try: self.session.end() del self.session except: pass # Add small delay to allow device to reset time.sleep(1) self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000) if not self.session.devices: raise Exception("No ADALM1000 detected - check connections") self.dev = self.session.devices[0] # Reset channels self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) self.dev.channels['B'].constant(0) self.session.start(0) self.status_light.itemconfig(self.status_indicator, fill='green') self.connection_label.config(text="Connected") self.status_var.set("Device connected | Ready to measure") self.session_active = True self.start_button.config(state=tk.NORMAL) # Start continuous measurement thread self.measurement_event = threading.Event() self.measurement_event.set() self.measurement_thread = threading.Thread( target=self.continuous_measurement, daemon=True ) self.measurement_thread.start() except Exception as e: self.handle_device_error(e) def continuous_measurement(self): """Continuous measurement with moving average filtering and optimized I/O""" filter_window_size = 10 voltage_window = [] current_window = [] last_plot_update = 0 last_ui_update = 0 log_buffer = [] update_interval = 1.0 # Update UI at 1Hz max # Initialize start_time for measurements if not hasattr(self, 'start_time'): self.start_time = time.time() while self.measurement_event.is_set() and self.root.winfo_exists(): try: # Read multiple samples for better accuracy samples = self.dev.read(filter_window_size, 500, True) if not samples: raise DeviceDisconnectedError("No samples received") # Get voltage from Channel B (HI_Z mode) and current from Channel A raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage raw_current = np.mean([s[0][1] for s in samples]) # Channel A current current_time = time.time() - self.start_time # Apply moving average filter voltage_window.append(raw_voltage) current_window.append(raw_current) if len(voltage_window) > filter_window_size: voltage_window.pop(0) current_window.pop(0) voltage = np.mean(voltage_window) current = np.mean(current_window) # Store filtered data self.time_data.append(current_time) self.voltage_data.append(voltage) self.current_data.append(current) # Throttle UI updates to prevent lag now = time.time() if now - last_ui_update > update_interval: self.safe_after(0, lambda: self.update_measurement_display( voltage, current, current_time )) last_ui_update = now # Throttle plot updates even more (1Hz max) if now - last_plot_update > 1.0: self.safe_after(0, self.update_plot) last_plot_update = now # Buffered logging if self.test_running and hasattr(self, 'current_cycle_file'): log_buffer.append([ f"{current_time:.3f}", f"{voltage:.6f}", f"{current:.6f}", self.test_phase.get(), f"{self.capacity_ah.get():.4f}", f"{self.charge_capacity.get():.4f}", f"{self.coulomb_efficiency.get():.1f}", f"{self.cycle_count.get()}" ]) # Write in chunks of 10 samples if len(log_buffer) >= 10: with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(log_buffer) log_buffer.clear() time.sleep(max(0.05, self.interval)) except Exception as e: error_msg = str(e) if self.root.winfo_exists(): self.safe_after(0, lambda msg=error_msg: self.handle_device_error(f"Measurement error: {msg}") if self.root.winfo_exists() else None) break # Flush remaining buffer on exit if log_buffer and hasattr(self, 'current_cycle_file'): with open(self.filename, 'a', newline='') as f: writer = csv.writer(f) writer.writerows(log_buffer) def start_test(self): """Start the full battery test cycle""" if not self.test_running: try: # Validate inputs if self.capacity.get() <= 0: raise ValueError("Battery capacity must be positive") if self.charge_cutoff.get() <= self.discharge_cutoff.get(): raise ValueError("Charge cutoff must be higher than discharge cutoff") if self.c_rate.get() <= 0: raise ValueError("C-rate must be positive") # Set continuous mode based on checkbox self.continuous_mode = self.continuous_var.get() # Reset timing for new test self.measurement_start_time = time.time() self.test_start_time = time.time() # Calculate target current test_current = self.c_rate.get() * self.capacity.get() if test_current > 0.2: raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000") # Clear previous data self.time_data.clear() self.voltage_data.clear() self.current_data.clear() self.phase_data.clear() self.capacity_ah.set(0.0) self.charge_capacity.set(0.0) self.coulomb_efficiency.set(0.0) self.cycle_count.set(0) # Generate base filename without cycle number timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}") self.current_cycle_file = None # Start test thread self.test_running = True self.start_time = time.time() self.last_update_time = time.time() self.test_phase.set("Initial Discharge") self.start_button.config(state=tk.DISABLED) self.stop_button.config(state=tk.NORMAL) self.status_var.set(f"Test started | Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A") # Start test sequence in a new thread self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True) self.test_thread.start() except Exception as e: messagebox.showerror("Error", str(e)) def create_cycle_log_file(self): """Create a new log file for the current cycle""" # Close previous file if exists if hasattr(self, 'current_cycle_file') and self.current_cycle_file: try: self.current_cycle_file.close() except Exception as e: print(f"Error closing previous log file: {e}") # Check write permissions if not os.access(self.log_dir, os.W_OK): messagebox.showerror("Error", f"No write permissions in {self.log_dir}") return False # Create new log file with sequential suffix suffix = 1 while True: self.filename = f"{self.base_filename}_{suffix}.csv" if not os.path.exists(self.filename): break suffix += 1 try: self.current_cycle_file = open(self.filename, 'w', newline='') self.log_writer = csv.writer(self.current_cycle_file) self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle"]) self.log_buffer = [] return True except Exception as e: messagebox.showerror("Error", f"Failed to create log file: {e}") return False @staticmethod def format_time(seconds): """Convert seconds to hh:mm:ss format""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = int(seconds % 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def stop_test(self): """Request immediate stop of the test""" if not self.test_running: return self.request_stop = True self.test_running = False # This will break out of all test loops self.measuring = False # Immediately set device to safe state if hasattr(self, 'dev'): self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) # Clear plot data self.time_data.clear() self.voltage_data.clear() self.current_data.clear() if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'): self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.ax.set_xlim(0, 1) self.ax2.set_xlim(0, 1) self.canvas.draw() self.status_var.set("Test stopped immediately") self.stop_button.config(state=tk.DISABLED) self.start_button.config(state=tk.NORMAL) # Finalize test data self.safe_after(100, self.finalize_test) def center_window(self, window): """Center a window on screen""" window.update_idletasks() width = window.winfo_width() height = window.winfo_height() x = (window.winfo_screenwidth() // 2) - (width // 2) y = (window.winfo_screenheight() // 2) - (height // 2) window.geometry(f'{width}x{height}+{x}+{y}') def run_test_sequence(self): try: test_current = self.c_rate.get() * self.capacity.get() while self.test_running and (self.continuous_mode or self.cycle_count.get() == 0): # Reset stop request at start of each cycle self.request_stop = False self.cycle_count.set(self.cycle_count.get() + 1) # Create new log file for this cycle self.create_cycle_log_file() # 1. Charge phase (constant current) self.test_phase.set("Charge") self.status_var.set(f"Charging to {self.charge_cutoff.get()}V @ {test_current:.3f}A") self.root.update() # Force UI update self.measuring = True self.dev.channels['B'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].mode = pysmu.Mode.SIMV self.dev.channels['A'].constant(test_current) self.charge_capacity.set(0.0) target_voltage = self.charge_cutoff.get() self.last_update_time = time.time() while self.test_running and not self.request_stop: if not self.voltage_data: time.sleep(0.1) continue current_voltage = self.voltage_data[-1] measured_current = abs(self.current_data[-1]) # Update charge capacity now = time.time() delta_t = now - self.last_update_time self.last_update_time = now self.charge_capacity.set(self.charge_capacity.get() + measured_current * delta_t / 3600) self.status_var.set( f"Charging: {current_voltage:.3f}V / {target_voltage}V | " f"Current: {measured_current:.3f}A | " f"Capacity: {self.charge_capacity.get():.4f}Ah" ) self.root.update() # Force UI update if current_voltage >= target_voltage or self.request_stop: break time.sleep(0.1) # More frequent checks if self.request_stop or not self.test_running: break # 2. Rest period after charge self.test_phase.set("Resting (Post-Charge)") self.measuring = False self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) rest_end_time = time.time() + (self.rest_time.get() * 3600) while time.time() < rest_end_time and self.test_running and not self.request_stop: time_left = max(0, rest_end_time - time.time()) self.status_var.set( f"Resting after charge | " f"Time left: {time_left/60:.1f} min" ) self.root.update() time.sleep(1) # Check every second for stop request if self.request_stop or not self.test_running: break # 3. Discharge phase (capacity measurement) self.test_phase.set("Discharge") self.status_var.set(f"Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A") self.root.update() self.measuring = True self.dev.channels['A'].mode = pysmu.Mode.SIMV self.dev.channels['A'].constant(-test_current) self.capacity_ah.set(0.0) self.last_update_time = time.time() while self.test_running and not self.request_stop: if not self.current_data: time.sleep(0.1) continue current_voltage = self.voltage_data[-1] current_current = abs(self.current_data[-1]) # Capacity calculation now = time.time() delta_t = now - self.last_update_time self.last_update_time = now self.capacity_ah.set(self.capacity_ah.get() + current_current * delta_t / 3600) self.status_var.set( f"Discharging: {current_voltage:.3f}V / {self.discharge_cutoff.get()}V | " f"Current: {current_current:.3f}A | " f"Capacity: {self.capacity_ah.get():.4f}Ah" ) self.root.update() if current_voltage <= self.discharge_cutoff.get() or self.request_stop: break time.sleep(0.1) # More frequent checks # 4. Rest period after discharge (only if not stopping) if self.test_running and not self.request_stop: self.test_phase.set("Resting (Post-Discharge)") self.measuring = False self.dev.channels['A'].mode = pysmu.Mode.HI_Z self.dev.channels['A'].constant(0) rest_end_time = time.time() + (self.rest_time.get() * 3600) while time.time() < rest_end_time and self.test_running and not self.request_stop: time_left = max(0, rest_end_time - time.time()) self.status_var.set( f"Resting after discharge | " f"Time left: {time_left/60:.1f} min" ) self.root.update() time.sleep(1) # Calculate Coulomb efficiency if not stopping if not self.request_stop and self.charge_capacity.get() > 0: efficiency = (self.capacity_ah.get() / self.charge_capacity.get()) * 100 self.coulomb_efficiency.set(efficiency) # Update cycle info self.status_var.set( f"Cycle {self.cycle_count.get()} complete | " f"Discharge: {self.capacity_ah.get():.3f}Ah | " f"Charge: {self.charge_capacity.get():.3f}Ah | " f"Efficiency: {self.coulomb_efficiency.get():.1f}%" ) self.root.update() # Write cycle summary to log file self.write_cycle_summary() # Short rest between cycles (only in continuous mode) if self.continuous_mode and self.test_running and not self.request_stop: rest_end_time = time.time() + (self.rest_time.get() * 3600) while time.time() < rest_end_time and self.test_running and not self.request_stop: time_left = max(0, rest_end_time - time.time()) self.test_phase.set("Resting Between Cycles") self.status_var.set( f"Resting between cycles | " f"Time left: {time_left/60:.1f} min" ) self.root.update() time.sleep(1) # Finalize test if stopped or completed self.safe_after(0, self.finalize_test) except Exception as e: error_msg = str(e) if self.root.winfo_exists(): self.safe_after(0, lambda msg=error_msg: messagebox.showerror("Test Error", msg)) self.safe_after(0, self.finalize_test) def finalize_test(self): """Final cleanup after test completes or is stopped""" self.measuring = False if hasattr(self, 'dev'): try: self.dev.channels['A'].constant(0) except Exception as e: print(f"Error resetting device: {e}") # Flush and close current log file if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'): try: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() except Exception as e: print(f"Error flushing log buffer: {e}") if hasattr(self, 'current_cycle_file'): try: self.current_cycle_file.close() except Exception as e: print(f"Error closing log file: {e}") self.start_button.config(state=tk.NORMAL) self.stop_button.config(state=tk.DISABLED) self.request_stop = False message = ( f"Test safely stopped after discharge phase | " f"Cycle {self.cycle_count.get()} completed | " f"Final capacity: {self.capacity_ah.get():.3f}Ah" ) self.status_var.set(message) self.safe_after(0, lambda: messagebox.showinfo( "Test Completed", f"Test was safely stopped after discharge phase.\n\n" f"Final discharge capacity: {self.capacity_ah.get():.3f}Ah\n" f"Total cycles completed: {self.cycle_count.get()}" )) def update_measurement_display(self, voltage, current, current_time): """Update all measurement displays at once (throttled to 1Hz)""" try: # Update all values regardless of change self.voltage_label.config(text=f"{voltage:.4f}") self.current_label.config(text=f"{current:.4f}") self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}") self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}") self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}") self.cycle_label.config(text=f"{self.cycle_count.get()}") self.phase_label.config(text=self.test_phase.get()) self.time_label.config(text=self.format_time(current_time)) except Exception as e: print(f"GUI update error: {e}") def write_cycle_summary(self): """Write cycle summary to the current cycle's log file""" if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file: return summary_line = ( f"Cycle {self.cycle_count.get()} Summary - " f"Discharge={self.capacity_ah.get():.4f}Ah, " f"Charge={self.charge_capacity.get():.4f}Ah, " f"Efficiency={self.coulomb_efficiency.get():.1f}%" ) # Ensure file is open and write summary try: if self.log_buffer: self.log_writer.writerows(self.log_buffer) self.log_buffer.clear() self.current_cycle_file.write(summary_line + "\n") self.current_cycle_file.flush() except Exception as e: print(f"Error writing cycle summary: {e}") def update_plot(self): """Optimized plot update with change detection""" if not self.time_data or len(self.time_data) < 5: # Wait for at least 5 samples return # Only update if there's significant new data if hasattr(self, '_last_plot_time') and (self.time_data[-1] - self._last_plot_time < 1.0): return self._last_plot_time = self.time_data[-1] # Update plot data self.line_voltage.set_data(self.time_data, self.voltage_data) self.line_current.set_data(self.time_data, self.current_data) # Auto-scale axes if needed self.auto_scale_axes() # Only redraw if needed self.canvas.draw_idle() def auto_scale_axes(self): """Auto-scale plot axes with appropriate padding""" if not self.time_data: return # X-axis scaling min_time = 0 max_time = self.time_data[-1] * 1.05 # 5% padding current_xlim = self.ax.get_xlim() if abs(current_xlim[1] - max_time) > (max_time * 0.1): # 10% change threshold self.ax.set_xlim(min_time, max_time) self.ax2.set_xlim(min_time, max_time) # Voltage axis scaling voltage_padding = 0.2 if self.voltage_data: min_voltage = max(0, min(self.voltage_data) - voltage_padding) max_voltage = max(self.voltage_data) + voltage_padding current_ylim = self.ax.get_ylim() if (abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1): self.ax.set_ylim(min_voltage, max_voltage) # Current axis scaling current_padding = 0.05 if self.current_data: min_current = min(self.current_data) - current_padding max_current = max(self.current_data) + current_padding current_ylim2 = self.ax2.get_ylim() if (abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02): self.ax2.set_ylim(min_current, max_current) def handle_device_error(self, error): """Handle device connection errors""" if not self.root.winfo_exists(): # Check if window still exists return error_msg = str(error) print(f"Device error: {error_msg}") # Clean up session first if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except Exception as e: print(f"Error cleaning up session: {e}") # Update UI self.status_light.itemconfig(self.status_indicator, fill='red') self.connection_label.config(text="Disconnected") self.status_var.set(f"Device error: {error_msg}") self.session_active = False self.test_running = False self.continuous_mode = False self.measuring = False if hasattr(self, 'start_button'): self.start_button.config(state=tk.DISABLED) if hasattr(self, 'stop_button'): self.stop_button.config(state=tk.DISABLED) # Clear plot + buffers self.time_data.clear() self.voltage_data.clear() self.current_data.clear() if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'): self.line_voltage.set_data([], []) self.line_current.set_data([], []) self.ax.set_xlim(0, 1) self.ax2.set_xlim(0, 1) self.canvas.draw() # Show error message and attempt reconnect automatically if self.root.winfo_exists(): self.safe_after(100, self.attempt_reconnect) def safe_after(self, delay_ms, callback, *args): """Safely schedule a callback with window existence check""" if not self.root.winfo_exists(): return None def wrapped_callback(): if self.root.winfo_exists(): try: callback(*args) except Exception as e: print(f"Callback error: {e}") after_id = self.root.after(delay_ms, wrapped_callback) self._after_ids.add(after_id) return after_id def attempt_reconnect(self): """Attempt to reconnect automatically""" if not self.root.winfo_exists(): return try: # Show error message first messagebox.showerror( "Device Connection Error", "Could not connect to ADALM1000\n\n" "1. Check USB cable connection\n" "2. The device will attempt to reconnect automatically" ) except Exception as e: print(f"Error showing message: {e}") return # Schedule reconnect attempt self.safe_after(1000, self.reconnect_device) def reconnect_device(self): """Reconnect the device with proper cleanup""" if not self.root.winfo_exists(): return self.status_var.set("Attempting to reconnect...") self.root.update() # Clear any existing session if hasattr(self, 'session'): try: if self.session_active: self.session.end() del self.session except: pass # Stop any running threads self.test_running = False self.continuous_mode = False self.measuring = False if hasattr(self, 'measurement_event'): self.measurement_event.clear() # Wait for threads to finish if hasattr(self, 'measurement_thread'): self.measurement_thread.join(timeout=1.0) if hasattr(self, 'test_thread'): self.test_thread.join(timeout=1.0) # Add small delay to allow device to reset time.sleep(1.5) # Try to initialize device try: self.init_device() if self.session_active: self.status_var.set("Reconnected successfully") return except Exception as e: print(f"Reconnect failed: {e}") # If we get here, reconnection failed self.status_var.set("Reconnect failed - will retry...") self.safe_after(2000, self.reconnect_device) # Retry after 2 seconds def on_close(self): """Clean up on window close""" # Set flags to stop all threads self.test_running = False self.measuring = False self.session_active = False if hasattr(self, 'measurement_event'): self.measurement_event.clear() # Cancel all pending callbacks with self._after_lock: for after_id in self._after_ids: try: self.root.after_cancel(after_id) except: pass self._after_ids.clear() # Give threads time to clean up timeout = 2.0 if hasattr(self, 'measurement_thread'): self.measurement_thread.join(timeout=timeout) if hasattr(self, 'test_thread'): self.test_thread.join(timeout=timeout) # Clean up device session if hasattr(self, 'session') and self.session: try: self.session.end() except Exception as e: print(f"Error ending session: {e}") # Finally destroy window try: self.root.destroy() except: pass if __name__ == "__main__": root = tk.Tk() try: app = BatteryTester(root) root.mainloop() except Exception as e: if root.winfo_exists(): messagebox.showerror("Fatal Error", f"Application failed: {str(e)}") else: print(f"Fatal Error: {e}") try: root.destroy() except: pass