This makes the stop operation more thorough and provides better visual feedback to the user. The plot will now clearly show that the test has been stopped and reset. (D)
1028 lines
42 KiB
Python
1028 lines
42 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import time
|
|
import csv
|
|
import threading
|
|
from datetime import datetime
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox
|
|
import pysmu
|
|
import numpy as np
|
|
import matplotlib
|
|
matplotlib.use('TkAgg')
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
|
from matplotlib.figure import Figure
|
|
from collections import deque
|
|
|
|
class DeviceDisconnectedError(Exception):
|
|
pass
|
|
|
|
class BatteryTester:
|
|
def __init__(self, root):
|
|
self._after_ids = set() # Track scheduled callbacks
|
|
self._after_lock = threading.Lock()
|
|
# Color scheme
|
|
self.bg_color = "#2E3440"
|
|
self.fg_color = "#D8DEE9"
|
|
self.accent_color = "#5E81AC"
|
|
self.warning_color = "#BF616A"
|
|
self.success_color = "#A3BE8C"
|
|
|
|
# Main window configuration
|
|
self.root = root
|
|
self.root.title("ADALM1000 - Battery Capacity Tester (CC Test)")
|
|
self.root.geometry("1000x800")
|
|
self.root.minsize(800, 700)
|
|
self.root.configure(bg=self.bg_color)
|
|
|
|
# Device and measurement state
|
|
self.session_active = False
|
|
self.measuring = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.request_stop = False
|
|
self.interval = 0.1
|
|
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
|
|
|
# Battery test parameters
|
|
self.capacity = tk.DoubleVar(value=0.2) # Battery capacity in Ah
|
|
self.charge_cutoff = tk.DoubleVar(value=1.45) # Charge cutoff voltage
|
|
self.discharge_cutoff = tk.DoubleVar(value=0.9) # Discharge cutoff voltage
|
|
self.rest_time = tk.DoubleVar(value=0.1) # Rest time in hours
|
|
self.c_rate = tk.DoubleVar(value=0.1) # C-rate for test (default C/5 = 0.2)
|
|
|
|
# Test progress tracking
|
|
self.test_phase = tk.StringVar(value="Idle")
|
|
self.capacity_ah = tk.DoubleVar(value=0.0)
|
|
self.charge_capacity = tk.DoubleVar(value=0.0) # capacity tracking
|
|
self.coulomb_efficiency = tk.DoubleVar(value=0.0) # efficiency calculation
|
|
self.cycle_count = tk.IntVar(value=0) # cycle counting
|
|
|
|
# Data buffers
|
|
self.time_data = deque()
|
|
self.voltage_data = deque()
|
|
self.current_data = deque()
|
|
self.phase_data = deque()
|
|
|
|
# Initialize UI and device
|
|
self.setup_ui()
|
|
self.init_device()
|
|
|
|
# Ensure proper cleanup
|
|
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
|
|
|
|
def setup_ui(self):
|
|
"""Configure the user interface"""
|
|
self.style = ttk.Style()
|
|
self.style.theme_use('clam')
|
|
|
|
# Configure styles
|
|
self.style.configure('.', background=self.bg_color, foreground=self.fg_color)
|
|
self.style.configure('TFrame', background=self.bg_color)
|
|
self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color)
|
|
self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color,
|
|
padding=6, font=('Helvetica', 10, 'bold'))
|
|
self.style.map('TButton',
|
|
background=[('active', self.accent_color), ('disabled', '#4C566A')],
|
|
foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')])
|
|
self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color)
|
|
self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color)
|
|
self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold'))
|
|
self.style.configure('Status.TLabel', font=('Helvetica', 10))
|
|
self.style.configure('Warning.TButton', background=self.warning_color)
|
|
self.style.configure('Success.TButton', background=self.success_color)
|
|
|
|
# Main layout
|
|
self.content_frame = ttk.Frame(self.root)
|
|
self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
|
|
|
# Header area
|
|
header_frame = ttk.Frame(self.content_frame)
|
|
header_frame.pack(fill=tk.X, pady=(0, 20))
|
|
|
|
ttk.Label(header_frame, text="ADALM1000 Battery Capacity Tester (CC Test)", style='Header.TLabel').pack(side=tk.LEFT)
|
|
|
|
# Status indicator
|
|
self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0)
|
|
self.status_light.pack(side=tk.RIGHT, padx=10)
|
|
self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red')
|
|
self.connection_label = ttk.Label(header_frame, text="Disconnected")
|
|
self.connection_label.pack(side=tk.RIGHT)
|
|
|
|
# Reconnect button
|
|
self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device)
|
|
self.reconnect_btn.pack(side=tk.RIGHT, padx=10)
|
|
|
|
# Measurement display
|
|
display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15)
|
|
display_frame.pack(fill=tk.BOTH, expand=False)
|
|
|
|
# Measurement values
|
|
measurement_labels = [
|
|
("Voltage (V)", "V"),
|
|
("Current (A)", "A"),
|
|
("Test Phase", ""),
|
|
("Elapsed Time", "s"),
|
|
("Discharge Capacity", "Ah"),
|
|
("Charge Capacity", "Ah"),
|
|
("Coulomb Eff.", "%"),
|
|
("Cycle Count", ""),
|
|
]
|
|
|
|
for i, (label, unit) in enumerate(measurement_labels):
|
|
ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i//2, column=(i%2)*2, sticky=tk.W, pady=5)
|
|
value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel')
|
|
value_label.grid(row=i//2, column=(i%2)*2+1, sticky=tk.W, padx=10)
|
|
if unit:
|
|
ttk.Label(display_frame, text=unit).grid(row=i//2, column=(i%2)*2+2, sticky=tk.W)
|
|
|
|
if i == 0:
|
|
self.voltage_label = value_label
|
|
elif i == 1:
|
|
self.current_label = value_label
|
|
elif i == 2:
|
|
self.phase_label = value_label
|
|
elif i == 3:
|
|
self.time_label = value_label
|
|
elif i == 4:
|
|
self.capacity_label = value_label
|
|
elif i == 5:
|
|
self.charge_capacity_label = value_label
|
|
elif i == 6:
|
|
self.efficiency_label = value_label
|
|
elif i == 7:
|
|
self.cycle_label = value_label
|
|
|
|
# Control area
|
|
controls_frame = ttk.Frame(self.content_frame)
|
|
controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0)
|
|
|
|
# Parameters frame
|
|
params_frame = ttk.LabelFrame(controls_frame, text="Test Parameters", padding=10)
|
|
params_frame.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
|
|
# Battery capacity
|
|
ttk.Label(params_frame, text="Battery Capacity (Ah):").grid(row=0, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.capacity, width=6).grid(row=0, column=1, padx=5, sticky=tk.W)
|
|
|
|
# Charge cutoff
|
|
ttk.Label(params_frame, text="Charge Cutoff (V):").grid(row=1, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.charge_cutoff, width=6).grid(row=1, column=1, padx=5, sticky=tk.W)
|
|
|
|
# Discharge cutoff
|
|
ttk.Label(params_frame, text="Discharge Cutoff (V):").grid(row=2, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.discharge_cutoff, width=6).grid(row=2, column=1, padx=5, sticky=tk.W)
|
|
|
|
# Rest time
|
|
ttk.Label(params_frame, text="Rest Time (hours):").grid(row=3, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.rest_time, width=6).grid(row=3, column=1, padx=5, sticky=tk.W)
|
|
|
|
# C-rate for test (C/5 by default)
|
|
ttk.Label(params_frame, text="Test C-rate:").grid(row=0, column=2, sticky=tk.W, padx=(10,0))
|
|
ttk.Entry(params_frame, textvariable=self.c_rate, width=4).grid(row=0, column=3, padx=5, sticky=tk.W)
|
|
ttk.Label(params_frame, text="(e.g., 0.2 for C/5)").grid(row=0, column=4, sticky=tk.W)
|
|
|
|
# Start/Stop buttons
|
|
button_frame = ttk.Frame(controls_frame)
|
|
button_frame.pack(side=tk.RIGHT, padx=10)
|
|
|
|
self.start_button = ttk.Button(button_frame, text="START TEST", command=self.start_test, style='TButton')
|
|
self.start_button.pack(side=tk.TOP, pady=5)
|
|
|
|
self.stop_button = ttk.Button(button_frame, text="STOP TEST", command=self.stop_test, style='Warning.TButton', state=tk.DISABLED)
|
|
self.stop_button.pack(side=tk.TOP, pady=5)
|
|
|
|
# Continuous mode checkbox
|
|
self.continuous_var = tk.BooleanVar(value=True)
|
|
ttk.Checkbutton(button_frame, text="Continuous Mode", variable=self.continuous_var).pack(side=tk.TOP, pady=5)
|
|
|
|
# Plot area
|
|
self.plot_frame = ttk.Frame(self.content_frame)
|
|
self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5))
|
|
self.setup_plot()
|
|
|
|
# Status bar
|
|
self.status_var = tk.StringVar()
|
|
self.status_var.set("Ready")
|
|
self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W)
|
|
self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28)
|
|
|
|
def setup_plot(self):
|
|
"""Configure the matplotlib plot"""
|
|
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
|
|
# Adjust margins to give more space on right side
|
|
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
|
self.ax = self.fig.add_subplot(111)
|
|
self.ax.set_facecolor('#3B4252')
|
|
|
|
# Set initial voltage range based on charge/discharge cutoffs ±0.2V
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, self.discharge_cutoff.get() - voltage_padding)
|
|
max_voltage = self.charge_cutoff.get() + voltage_padding
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Voltage plot
|
|
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
|
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
|
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
|
|
|
# Current plot (right axis) - set initial range based on test current
|
|
self.ax2 = self.ax.twinx()
|
|
current_padding = 0.05
|
|
test_current = self.c_rate.get() * self.capacity.get()
|
|
max_current = test_current * 1.5 # Add 50% padding
|
|
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
|
|
|
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
|
self.ax2.set_ylabel("Current (A)", color='r')
|
|
self.ax2.tick_params(axis='y', labelcolor='r')
|
|
|
|
self.ax.set_xlabel('Time (s)', color=self.fg_color)
|
|
self.ax.set_title('Battery Test (CC)', color=self.fg_color)
|
|
self.ax.tick_params(axis='x', colors=self.fg_color)
|
|
self.ax.grid(True, color='#4C566A')
|
|
|
|
# Position legends to avoid overlap
|
|
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
|
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
|
|
|
# Embed plot
|
|
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
|
|
self.canvas.draw()
|
|
canvas_widget = self.canvas.get_tk_widget()
|
|
canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0)
|
|
canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=(10, 0))
|
|
|
|
def init_device(self):
|
|
"""Initialize the ADALM1000 device with continuous measurement"""
|
|
try:
|
|
# First try to clean up any existing session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Add small delay to allow device to reset
|
|
time.sleep(1)
|
|
|
|
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
|
if not self.session.devices:
|
|
raise Exception("No ADALM1000 detected - check connections")
|
|
|
|
self.dev = self.session.devices[0]
|
|
# Reset channels
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].constant(0)
|
|
|
|
self.session.start(0)
|
|
|
|
self.status_light.itemconfig(self.status_indicator, fill='green')
|
|
self.connection_label.config(text="Connected")
|
|
self.status_var.set("Device connected | Ready to measure")
|
|
self.session_active = True
|
|
self.start_button.config(state=tk.NORMAL)
|
|
|
|
# Start continuous measurement thread
|
|
self.measurement_event = threading.Event()
|
|
self.measurement_event.set()
|
|
self.measurement_thread = threading.Thread(
|
|
target=self.continuous_measurement,
|
|
daemon=True
|
|
)
|
|
self.measurement_thread.start()
|
|
|
|
except Exception as e:
|
|
self.handle_device_error(e)
|
|
|
|
def continuous_measurement(self):
|
|
"""Continuous measurement with moving average filtering and optimized I/O"""
|
|
filter_window_size = 10
|
|
voltage_window = []
|
|
current_window = []
|
|
last_plot_update = 0
|
|
last_ui_update = 0
|
|
log_buffer = []
|
|
update_interval = 1.0 # Update UI at 1Hz max
|
|
|
|
# Initialize start_time for measurements
|
|
if not hasattr(self, 'start_time'):
|
|
self.start_time = time.time()
|
|
|
|
while self.measurement_event.is_set() and self.root.winfo_exists():
|
|
try:
|
|
# Read multiple samples for better accuracy
|
|
samples = self.dev.read(filter_window_size, 500, True)
|
|
if not samples:
|
|
raise DeviceDisconnectedError("No samples received")
|
|
|
|
# Get voltage from Channel B (HI_Z mode) and current from Channel A
|
|
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
|
|
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
|
|
current_time = time.time() - self.start_time
|
|
|
|
# Apply moving average filter
|
|
voltage_window.append(raw_voltage)
|
|
current_window.append(raw_current)
|
|
|
|
if len(voltage_window) > filter_window_size:
|
|
voltage_window.pop(0)
|
|
current_window.pop(0)
|
|
|
|
voltage = np.mean(voltage_window)
|
|
current = np.mean(current_window)
|
|
|
|
# Store filtered data
|
|
self.time_data.append(current_time)
|
|
self.voltage_data.append(voltage)
|
|
self.current_data.append(current)
|
|
|
|
# Throttle UI updates to prevent lag
|
|
now = time.time()
|
|
if now - last_ui_update > update_interval:
|
|
self.safe_after(0, lambda: self.update_measurement_display(
|
|
voltage, current, current_time
|
|
))
|
|
last_ui_update = now
|
|
|
|
# Throttle plot updates even more (1Hz max)
|
|
if now - last_plot_update > 1.0:
|
|
self.safe_after(0, self.update_plot)
|
|
last_plot_update = now
|
|
|
|
# Buffered logging
|
|
if self.test_running and hasattr(self, 'current_cycle_file'):
|
|
log_buffer.append([
|
|
f"{current_time:.3f}",
|
|
f"{voltage:.6f}",
|
|
f"{current:.6f}",
|
|
self.test_phase.get(),
|
|
f"{self.capacity_ah.get():.4f}",
|
|
f"{self.charge_capacity.get():.4f}",
|
|
f"{self.coulomb_efficiency.get():.1f}",
|
|
f"{self.cycle_count.get()}"
|
|
])
|
|
|
|
# Write in chunks of 10 samples
|
|
if len(log_buffer) >= 10:
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(log_buffer)
|
|
log_buffer.clear()
|
|
|
|
time.sleep(max(0.05, self.interval))
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if self.root.winfo_exists():
|
|
self.safe_after(0, lambda msg=error_msg:
|
|
self.handle_device_error(f"Measurement error: {msg}") if self.root.winfo_exists() else None)
|
|
break
|
|
|
|
# Flush remaining buffer on exit
|
|
if log_buffer and hasattr(self, 'current_cycle_file'):
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(log_buffer)
|
|
|
|
def start_test(self):
|
|
"""Start the full battery test cycle"""
|
|
if not self.test_running:
|
|
try:
|
|
# Validate inputs
|
|
if self.capacity.get() <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if self.charge_cutoff.get() <= self.discharge_cutoff.get():
|
|
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
|
if self.c_rate.get() <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
# Set continuous mode based on checkbox
|
|
self.continuous_mode = self.continuous_var.get()
|
|
|
|
# Reset timing for new test
|
|
self.measurement_start_time = time.time()
|
|
self.test_start_time = time.time()
|
|
|
|
# Calculate target current
|
|
test_current = self.c_rate.get() * self.capacity.get()
|
|
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
# Clear previous data
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
self.capacity_ah.set(0.0)
|
|
self.charge_capacity.set(0.0)
|
|
self.coulomb_efficiency.set(0.0)
|
|
self.cycle_count.set(0)
|
|
|
|
# Generate base filename without cycle number
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
|
|
self.current_cycle_file = None
|
|
|
|
# Start test thread
|
|
self.test_running = True
|
|
self.start_time = time.time()
|
|
self.last_update_time = time.time()
|
|
self.test_phase.set("Initial Discharge")
|
|
|
|
self.start_button.config(state=tk.DISABLED)
|
|
self.stop_button.config(state=tk.NORMAL)
|
|
self.status_var.set(f"Test started | Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A")
|
|
|
|
# Start test sequence in a new thread
|
|
self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True)
|
|
self.test_thread.start()
|
|
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
|
|
def create_cycle_log_file(self):
|
|
"""Create a new log file for the current cycle"""
|
|
# Close previous file if exists
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing previous log file: {e}")
|
|
|
|
# Check write permissions
|
|
if not os.access(self.log_dir, os.W_OK):
|
|
messagebox.showerror("Error", f"No write permissions in {self.log_dir}")
|
|
return False
|
|
|
|
# Create new log file with sequential suffix
|
|
suffix = 1
|
|
while True:
|
|
self.filename = f"{self.base_filename}_{suffix}.csv"
|
|
if not os.path.exists(self.filename):
|
|
break
|
|
suffix += 1
|
|
|
|
try:
|
|
self.current_cycle_file = open(self.filename, 'w', newline='')
|
|
self.log_writer = csv.writer(self.current_cycle_file)
|
|
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
|
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
|
|
"Coulomb_Eff(%)", "Cycle"])
|
|
self.log_buffer = []
|
|
return True
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"Failed to create log file: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def format_time(seconds):
|
|
"""Convert seconds to hh:mm:ss format"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
seconds = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
def stop_test(self):
|
|
"""Request immediate stop of the test and clean up all test data"""
|
|
if not self.test_running:
|
|
return
|
|
|
|
self.request_stop = True
|
|
self.test_running = False
|
|
self.measuring = False
|
|
|
|
# Immediately set device to safe state
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Clear all data buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
|
|
# Reset test values
|
|
self.test_phase.set("Idle")
|
|
self.capacity_ah.set(0.0)
|
|
self.charge_capacity.set(0.0)
|
|
self.coulomb_efficiency.set(0.0)
|
|
|
|
# Reset plot if it exists
|
|
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
# Reset to reasonable default ranges
|
|
self.ax.set_xlim(0, 10) # Small range instead of 0-1
|
|
self.ax.set_ylim(0, 2) # Typical battery voltage range
|
|
self.ax2.set_ylim(-0.2, 0.2) # Typical current range
|
|
self.canvas.draw()
|
|
|
|
# Update UI
|
|
self.status_var.set("Test stopped - Ready for new test")
|
|
self.stop_button.config(state=tk.DISABLED)
|
|
self.start_button.config(state=tk.NORMAL)
|
|
|
|
# Finalize test data (logs, etc.)
|
|
self.safe_after(100, self.finalize_test)
|
|
|
|
def center_window(self, window):
|
|
"""Center a window on screen"""
|
|
window.update_idletasks()
|
|
width = window.winfo_width()
|
|
height = window.winfo_height()
|
|
x = (window.winfo_screenwidth() // 2) - (width // 2)
|
|
y = (window.winfo_screenheight() // 2) - (height // 2)
|
|
window.geometry(f'{width}x{height}+{x}+{y}')
|
|
|
|
def run_test_sequence(self):
|
|
try:
|
|
test_current = self.c_rate.get() * self.capacity.get()
|
|
|
|
while self.test_running and (self.continuous_mode or self.cycle_count.get() == 0):
|
|
# Reset stop request at start of each cycle
|
|
self.request_stop = False
|
|
self.cycle_count.set(self.cycle_count.get() + 1)
|
|
|
|
# Create new log file for this cycle
|
|
self.create_cycle_log_file()
|
|
|
|
# 1. Charge phase (constant current)
|
|
self.test_phase.set("Charge")
|
|
self.status_var.set(f"Charging to {self.charge_cutoff.get()}V @ {test_current:.3f}A")
|
|
self.root.update() # Force UI update
|
|
|
|
self.measuring = True
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.dev.channels['A'].constant(test_current)
|
|
self.charge_capacity.set(0.0)
|
|
target_voltage = self.charge_cutoff.get()
|
|
self.last_update_time = time.time()
|
|
|
|
while self.test_running and not self.request_stop:
|
|
if not self.voltage_data:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.voltage_data[-1]
|
|
measured_current = abs(self.current_data[-1])
|
|
|
|
# Update charge capacity
|
|
now = time.time()
|
|
delta_t = now - self.last_update_time
|
|
self.last_update_time = now
|
|
self.charge_capacity.set(self.charge_capacity.get() + measured_current * delta_t / 3600)
|
|
|
|
self.status_var.set(
|
|
f"Charging: {current_voltage:.3f}V / {target_voltage}V | "
|
|
f"Current: {measured_current:.3f}A | "
|
|
f"Capacity: {self.charge_capacity.get():.4f}Ah"
|
|
)
|
|
self.root.update() # Force UI update
|
|
|
|
if current_voltage >= target_voltage or self.request_stop:
|
|
break
|
|
|
|
time.sleep(0.1) # More frequent checks
|
|
|
|
if self.request_stop or not self.test_running:
|
|
break
|
|
|
|
# 2. Rest period after charge
|
|
self.test_phase.set("Resting (Post-Charge)")
|
|
self.measuring = False
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
|
|
rest_end_time = time.time() + (self.rest_time.get() * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.status_var.set(
|
|
f"Resting after charge | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
self.root.update()
|
|
time.sleep(1) # Check every second for stop request
|
|
|
|
if self.request_stop or not self.test_running:
|
|
break
|
|
|
|
# 3. Discharge phase (capacity measurement)
|
|
self.test_phase.set("Discharge")
|
|
self.status_var.set(f"Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A")
|
|
self.root.update()
|
|
|
|
self.measuring = True
|
|
self.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.dev.channels['A'].constant(-test_current)
|
|
self.capacity_ah.set(0.0)
|
|
self.last_update_time = time.time()
|
|
|
|
while self.test_running and not self.request_stop:
|
|
if not self.current_data:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.voltage_data[-1]
|
|
current_current = abs(self.current_data[-1])
|
|
|
|
# Capacity calculation
|
|
now = time.time()
|
|
delta_t = now - self.last_update_time
|
|
self.last_update_time = now
|
|
self.capacity_ah.set(self.capacity_ah.get() + current_current * delta_t / 3600)
|
|
|
|
self.status_var.set(
|
|
f"Discharging: {current_voltage:.3f}V / {self.discharge_cutoff.get()}V | "
|
|
f"Current: {current_current:.3f}A | "
|
|
f"Capacity: {self.capacity_ah.get():.4f}Ah"
|
|
)
|
|
self.root.update()
|
|
|
|
if current_voltage <= self.discharge_cutoff.get() or self.request_stop:
|
|
break
|
|
|
|
time.sleep(0.1) # More frequent checks
|
|
|
|
# 4. Rest period after discharge (only if not stopping)
|
|
if self.test_running and not self.request_stop:
|
|
self.test_phase.set("Resting (Post-Discharge)")
|
|
self.measuring = False
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
|
|
rest_end_time = time.time() + (self.rest_time.get() * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.status_var.set(
|
|
f"Resting after discharge | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
self.root.update()
|
|
time.sleep(1)
|
|
|
|
# Calculate Coulomb efficiency if not stopping
|
|
if not self.request_stop and self.charge_capacity.get() > 0:
|
|
efficiency = (self.capacity_ah.get() / self.charge_capacity.get()) * 100
|
|
self.coulomb_efficiency.set(efficiency)
|
|
|
|
# Update cycle info
|
|
self.status_var.set(
|
|
f"Cycle {self.cycle_count.get()} complete | "
|
|
f"Discharge: {self.capacity_ah.get():.3f}Ah | "
|
|
f"Charge: {self.charge_capacity.get():.3f}Ah | "
|
|
f"Efficiency: {self.coulomb_efficiency.get():.1f}%"
|
|
)
|
|
self.root.update()
|
|
|
|
# Write cycle summary to log file
|
|
self.write_cycle_summary()
|
|
|
|
# Short rest between cycles (only in continuous mode)
|
|
if self.continuous_mode and self.test_running and not self.request_stop:
|
|
rest_end_time = time.time() + (self.rest_time.get() * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.test_phase.set("Resting Between Cycles")
|
|
self.status_var.set(
|
|
f"Resting between cycles | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
self.root.update()
|
|
time.sleep(1)
|
|
|
|
# Finalize test if stopped or completed
|
|
self.safe_after(0, self.finalize_test)
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if self.root.winfo_exists():
|
|
self.safe_after(0, lambda msg=error_msg: messagebox.showerror("Test Error", msg))
|
|
self.safe_after(0, self.finalize_test)
|
|
|
|
def finalize_test(self):
|
|
"""Final cleanup after test completes or is stopped"""
|
|
self.measuring = False
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Flush and close current log file
|
|
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
|
|
try:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
except Exception as e:
|
|
print(f"Error flushing log buffer: {e}")
|
|
|
|
if hasattr(self, 'current_cycle_file'):
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing log file: {e}")
|
|
|
|
self.start_button.config(state=tk.NORMAL)
|
|
self.stop_button.config(state=tk.DISABLED)
|
|
self.request_stop = False
|
|
|
|
message = (
|
|
f"Test safely stopped after discharge phase | "
|
|
f"Cycle {self.cycle_count.get()} completed | "
|
|
f"Final capacity: {self.capacity_ah.get():.3f}Ah"
|
|
)
|
|
self.status_var.set(message)
|
|
|
|
self.safe_after(0, lambda: messagebox.showinfo(
|
|
"Test Completed",
|
|
f"Test was safely stopped after discharge phase.\n\n"
|
|
f"Final discharge capacity: {self.capacity_ah.get():.3f}Ah\n"
|
|
f"Total cycles completed: {self.cycle_count.get()}"
|
|
))
|
|
|
|
def update_measurement_display(self, voltage, current, current_time):
|
|
"""Update all measurement displays at once (throttled to 1Hz)"""
|
|
try:
|
|
# Update all values regardless of change
|
|
self.voltage_label.config(text=f"{voltage:.4f}")
|
|
self.current_label.config(text=f"{current:.4f}")
|
|
self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}")
|
|
self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}")
|
|
self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}")
|
|
self.cycle_label.config(text=f"{self.cycle_count.get()}")
|
|
self.phase_label.config(text=self.test_phase.get())
|
|
self.time_label.config(text=self.format_time(current_time))
|
|
|
|
except Exception as e:
|
|
print(f"GUI update error: {e}")
|
|
|
|
def write_cycle_summary(self):
|
|
"""Write cycle summary to the current cycle's log file"""
|
|
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
|
|
return
|
|
|
|
summary_line = (
|
|
f"Cycle {self.cycle_count.get()} Summary - "
|
|
f"Discharge={self.capacity_ah.get():.4f}Ah, "
|
|
f"Charge={self.charge_capacity.get():.4f}Ah, "
|
|
f"Efficiency={self.coulomb_efficiency.get():.1f}%"
|
|
)
|
|
|
|
# Ensure file is open and write summary
|
|
try:
|
|
if self.log_buffer:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
self.current_cycle_file.write(summary_line + "\n")
|
|
self.current_cycle_file.flush()
|
|
except Exception as e:
|
|
print(f"Error writing cycle summary: {e}")
|
|
|
|
def update_plot(self):
|
|
"""Optimized plot update with change detection"""
|
|
if not self.time_data or len(self.time_data) < 5: # Wait for at least 5 samples
|
|
return
|
|
|
|
# Only update if there's significant new data
|
|
if hasattr(self, '_last_plot_time') and (self.time_data[-1] - self._last_plot_time < 1.0):
|
|
return
|
|
|
|
self._last_plot_time = self.time_data[-1]
|
|
|
|
# Update plot data
|
|
self.line_voltage.set_data(self.time_data, self.voltage_data)
|
|
self.line_current.set_data(self.time_data, self.current_data)
|
|
|
|
# Auto-scale axes if needed
|
|
self.auto_scale_axes()
|
|
|
|
# Only redraw if needed
|
|
self.canvas.draw_idle()
|
|
|
|
def auto_scale_axes(self):
|
|
"""Auto-scale plot axes with appropriate padding"""
|
|
if not self.time_data:
|
|
return
|
|
|
|
# X-axis scaling
|
|
min_time = 0
|
|
max_time = self.time_data[-1] * 1.05 # 5% padding
|
|
current_xlim = self.ax.get_xlim()
|
|
if abs(current_xlim[1] - max_time) > (max_time * 0.1): # 10% change threshold
|
|
self.ax.set_xlim(min_time, max_time)
|
|
self.ax2.set_xlim(min_time, max_time)
|
|
|
|
# Voltage axis scaling
|
|
voltage_padding = 0.2
|
|
if self.voltage_data:
|
|
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
|
|
max_voltage = max(self.voltage_data) + voltage_padding
|
|
current_ylim = self.ax.get_ylim()
|
|
if (abs(current_ylim[0] - min_voltage) > 0.1 or
|
|
abs(current_ylim[1] - max_voltage) > 0.1):
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Current axis scaling
|
|
current_padding = 0.05
|
|
if self.current_data:
|
|
min_current = min(self.current_data) - current_padding
|
|
max_current = max(self.current_data) + current_padding
|
|
current_ylim2 = self.ax2.get_ylim()
|
|
if (abs(current_ylim2[0] - min_current) > 0.02 or
|
|
abs(current_ylim2[1] - max_current) > 0.02):
|
|
self.ax2.set_ylim(min_current, max_current)
|
|
|
|
def handle_device_error(self, error):
|
|
"""Handle device connection errors"""
|
|
if not self.root.winfo_exists(): # Check if window still exists
|
|
return
|
|
|
|
error_msg = str(error)
|
|
print(f"Device error: {error_msg}")
|
|
|
|
# Clean up session first
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except Exception as e:
|
|
print(f"Error cleaning up session: {e}")
|
|
|
|
# Update UI
|
|
self.status_light.itemconfig(self.status_indicator, fill='red')
|
|
self.connection_label.config(text="Disconnected")
|
|
self.status_var.set(f"Device error: {error_msg}")
|
|
|
|
self.session_active = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
if hasattr(self, 'start_button'):
|
|
self.start_button.config(state=tk.DISABLED)
|
|
if hasattr(self, 'stop_button'):
|
|
self.stop_button.config(state=tk.DISABLED)
|
|
|
|
# Clear plot + buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
self.ax.set_xlim(0, 1)
|
|
self.ax2.set_xlim(0, 1)
|
|
self.canvas.draw()
|
|
|
|
# Show error message and attempt reconnect automatically
|
|
if self.root.winfo_exists():
|
|
self.safe_after(100, self.attempt_reconnect)
|
|
|
|
def safe_after(self, delay_ms, callback, *args):
|
|
"""Safely schedule a callback with window existence check"""
|
|
if not self.root.winfo_exists():
|
|
return None
|
|
|
|
def wrapped_callback():
|
|
if self.root.winfo_exists():
|
|
try:
|
|
callback(*args)
|
|
except Exception as e:
|
|
print(f"Callback error: {e}")
|
|
|
|
after_id = self.root.after(delay_ms, wrapped_callback)
|
|
self._after_ids.add(after_id)
|
|
return after_id
|
|
|
|
def attempt_reconnect(self):
|
|
"""Attempt to reconnect automatically"""
|
|
if not self.root.winfo_exists():
|
|
return
|
|
|
|
try:
|
|
# Show error message first
|
|
messagebox.showerror(
|
|
"Device Connection Error",
|
|
"Could not connect to ADALM1000\n\n"
|
|
"1. Check USB cable connection\n"
|
|
"2. The device will attempt to reconnect automatically"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error showing message: {e}")
|
|
return
|
|
|
|
# Schedule reconnect attempt
|
|
self.safe_after(1000, self.reconnect_device)
|
|
|
|
def reconnect_device(self):
|
|
"""Reconnect the device with proper cleanup"""
|
|
if not self.root.winfo_exists():
|
|
return
|
|
|
|
self.status_var.set("Attempting to reconnect...")
|
|
self.root.update()
|
|
|
|
# Clear any existing session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Stop any running threads
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
if hasattr(self, 'measurement_event'):
|
|
self.measurement_event.clear()
|
|
|
|
# Wait for threads to finish
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.join(timeout=1.0)
|
|
if hasattr(self, 'test_thread'):
|
|
self.test_thread.join(timeout=1.0)
|
|
|
|
# Add small delay to allow device to reset
|
|
time.sleep(1.5)
|
|
|
|
# Try to initialize device
|
|
try:
|
|
self.init_device()
|
|
if self.session_active:
|
|
self.status_var.set("Reconnected successfully")
|
|
return
|
|
except Exception as e:
|
|
print(f"Reconnect failed: {e}")
|
|
|
|
# If we get here, reconnection failed
|
|
self.status_var.set("Reconnect failed - will retry...")
|
|
self.safe_after(2000, self.reconnect_device) # Retry after 2 seconds
|
|
|
|
def on_close(self):
|
|
"""Clean up on window close"""
|
|
# Set flags to stop all threads
|
|
self.test_running = False
|
|
self.measuring = False
|
|
self.session_active = False
|
|
|
|
if hasattr(self, 'measurement_event'):
|
|
self.measurement_event.clear()
|
|
|
|
# Cancel all pending callbacks
|
|
with self._after_lock:
|
|
for after_id in self._after_ids:
|
|
try:
|
|
self.root.after_cancel(after_id)
|
|
except:
|
|
pass
|
|
self._after_ids.clear()
|
|
|
|
# Give threads time to clean up
|
|
timeout = 2.0
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.join(timeout=timeout)
|
|
if hasattr(self, 'test_thread'):
|
|
self.test_thread.join(timeout=timeout)
|
|
|
|
# Clean up device session
|
|
if hasattr(self, 'session') and self.session:
|
|
try:
|
|
self.session.end()
|
|
except Exception as e:
|
|
print(f"Error ending session: {e}")
|
|
|
|
# Finally destroy window
|
|
try:
|
|
self.root.destroy()
|
|
except:
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
root = tk.Tk()
|
|
try:
|
|
app = BatteryTester(root)
|
|
root.mainloop()
|
|
except Exception as e:
|
|
if root.winfo_exists():
|
|
messagebox.showerror("Fatal Error", f"Application failed: {str(e)}")
|
|
else:
|
|
print(f"Fatal Error: {e}")
|
|
try:
|
|
root.destroy()
|
|
except:
|
|
pass |