Clear log_buffer after every cycle, because logs are getting smaler the more cycles. (D)
1037 lines
43 KiB
Python
1037 lines
43 KiB
Python
# -*- coding: utf-8 -*-
|
|
import os
|
|
import time
|
|
import csv
|
|
import threading
|
|
from datetime import datetime
|
|
import tkinter as tk
|
|
from tkinter import ttk, messagebox
|
|
import pysmu
|
|
import numpy as np
|
|
import matplotlib
|
|
matplotlib.use('TkAgg')
|
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
|
from matplotlib.figure import Figure
|
|
from collections import deque
|
|
|
|
class DeviceDisconnectedError(Exception):
|
|
pass
|
|
|
|
class BatteryTester:
|
|
def __init__(self, root):
|
|
self._after_ids = set() # Track scheduled callbacks
|
|
self._after_lock = threading.Lock()
|
|
# Color scheme
|
|
self.bg_color = "#2E3440"
|
|
self.fg_color = "#D8DEE9"
|
|
self.accent_color = "#5E81AC"
|
|
self.warning_color = "#BF616A"
|
|
self.success_color = "#A3BE8C"
|
|
|
|
# Main window configuration
|
|
self.root = root
|
|
self.root.title("ADALM1000 - Battery Capacity Tester (CC Test)")
|
|
self.root.geometry("1000x800")
|
|
self.root.minsize(800, 700)
|
|
self.root.configure(bg=self.bg_color)
|
|
|
|
# Device and measurement state
|
|
self.session_active = False
|
|
self.measuring = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.request_stop = False
|
|
self.interval = 0.1
|
|
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
|
|
|
# Battery test parameters
|
|
self.capacity = tk.DoubleVar(value=0.2) # Battery capacity in Ah
|
|
self.charge_cutoff = tk.DoubleVar(value=1.43) # Charge cutoff voltage
|
|
self.discharge_cutoff = tk.DoubleVar(value=0.9) # Discharge cutoff voltage
|
|
self.rest_time = tk.DoubleVar(value=0.25) # Rest time in hours
|
|
self.c_rate = tk.DoubleVar(value=0.1) # C-rate for test (default C/5 = 0.2)
|
|
|
|
# Test progress tracking
|
|
self.test_phase = tk.StringVar(value="Idle")
|
|
self.capacity_ah = tk.DoubleVar(value=0.0)
|
|
self.charge_capacity = tk.DoubleVar(value=0.0) # capacity tracking
|
|
self.coulomb_efficiency = tk.DoubleVar(value=0.0) # efficiency calculation
|
|
self.cycle_count = tk.IntVar(value=0) # cycle counting
|
|
|
|
# Data buffers
|
|
self.time_data = deque()
|
|
self.voltage_data = deque()
|
|
self.current_data = deque()
|
|
self.phase_data = deque()
|
|
|
|
# Initialize UI and device
|
|
self.setup_ui()
|
|
self.init_device()
|
|
|
|
# Ensure proper cleanup
|
|
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
|
|
|
|
def setup_ui(self):
|
|
"""Configure the user interface"""
|
|
self.style = ttk.Style()
|
|
self.style.theme_use('clam')
|
|
|
|
# Configure styles
|
|
self.style.configure('.', background=self.bg_color, foreground=self.fg_color)
|
|
self.style.configure('TFrame', background=self.bg_color)
|
|
self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color)
|
|
self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color,
|
|
padding=6, font=('Helvetica', 10, 'bold'))
|
|
self.style.map('TButton',
|
|
background=[('active', self.accent_color), ('disabled', '#4C566A')],
|
|
foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')])
|
|
self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color)
|
|
self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color)
|
|
self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold'))
|
|
self.style.configure('Status.TLabel', font=('Helvetica', 10))
|
|
self.style.configure('Warning.TButton', background=self.warning_color)
|
|
self.style.configure('Success.TButton', background=self.success_color)
|
|
|
|
# Main layout
|
|
self.content_frame = ttk.Frame(self.root)
|
|
self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
|
|
|
# Header area
|
|
header_frame = ttk.Frame(self.content_frame)
|
|
header_frame.pack(fill=tk.X, pady=(0, 20))
|
|
|
|
ttk.Label(header_frame, text="ADALM1000 Battery Capacity Tester (CC Test)", style='Header.TLabel').pack(side=tk.LEFT)
|
|
|
|
# Status indicator
|
|
self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0)
|
|
self.status_light.pack(side=tk.RIGHT, padx=10)
|
|
self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red')
|
|
self.connection_label = ttk.Label(header_frame, text="Disconnected")
|
|
self.connection_label.pack(side=tk.RIGHT)
|
|
|
|
# Reconnect button
|
|
self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device)
|
|
self.reconnect_btn.pack(side=tk.RIGHT, padx=10)
|
|
|
|
# Measurement display
|
|
display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15)
|
|
display_frame.pack(fill=tk.BOTH, expand=False)
|
|
|
|
# Measurement values
|
|
measurement_labels = [
|
|
("Voltage (V)", "V"),
|
|
("Current (A)", "A"),
|
|
("Test Phase", ""),
|
|
("Elapsed Time", "s"),
|
|
("Discharge Capacity", "Ah"),
|
|
("Charge Capacity", "Ah"),
|
|
("Coulomb Eff.", "%"),
|
|
("Cycle Count", ""),
|
|
]
|
|
|
|
for i, (label, unit) in enumerate(measurement_labels):
|
|
ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i//2, column=(i%2)*2, sticky=tk.W, pady=5)
|
|
value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel')
|
|
value_label.grid(row=i//2, column=(i%2)*2+1, sticky=tk.W, padx=10)
|
|
if unit:
|
|
ttk.Label(display_frame, text=unit).grid(row=i//2, column=(i%2)*2+2, sticky=tk.W)
|
|
|
|
if i == 0:
|
|
self.voltage_label = value_label
|
|
elif i == 1:
|
|
self.current_label = value_label
|
|
elif i == 2:
|
|
self.phase_label = value_label
|
|
elif i == 3:
|
|
self.time_label = value_label
|
|
elif i == 4:
|
|
self.capacity_label = value_label
|
|
elif i == 5:
|
|
self.charge_capacity_label = value_label
|
|
elif i == 6:
|
|
self.efficiency_label = value_label
|
|
elif i == 7:
|
|
self.cycle_label = value_label
|
|
|
|
# Control area
|
|
controls_frame = ttk.Frame(self.content_frame)
|
|
controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0)
|
|
|
|
# Parameters frame
|
|
params_frame = ttk.LabelFrame(controls_frame, text="Test Parameters", padding=10)
|
|
params_frame.pack(side=tk.LEFT, fill=tk.X, expand=True)
|
|
|
|
# Battery capacity
|
|
ttk.Label(params_frame, text="Battery Capacity (Ah):").grid(row=0, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.capacity, width=6).grid(row=0, column=1, padx=5, sticky=tk.W)
|
|
|
|
# Charge cutoff
|
|
ttk.Label(params_frame, text="Charge Cutoff (V):").grid(row=1, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.charge_cutoff, width=6).grid(row=1, column=1, padx=5, sticky=tk.W)
|
|
|
|
# Discharge cutoff
|
|
ttk.Label(params_frame, text="Discharge Cutoff (V):").grid(row=2, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.discharge_cutoff, width=6).grid(row=2, column=1, padx=5, sticky=tk.W)
|
|
|
|
# Rest time
|
|
ttk.Label(params_frame, text="Rest Time (hours):").grid(row=3, column=0, sticky=tk.W)
|
|
ttk.Entry(params_frame, textvariable=self.rest_time, width=6).grid(row=3, column=1, padx=5, sticky=tk.W)
|
|
|
|
# C-rate for test (C/5 by default)
|
|
ttk.Label(params_frame, text="Test C-rate:").grid(row=0, column=2, sticky=tk.W, padx=(10,0))
|
|
ttk.Entry(params_frame, textvariable=self.c_rate, width=4).grid(row=0, column=3, padx=5, sticky=tk.W)
|
|
ttk.Label(params_frame, text="(e.g., 0.2 for C/5)").grid(row=0, column=4, sticky=tk.W)
|
|
|
|
# Start/Stop buttons
|
|
button_frame = ttk.Frame(controls_frame)
|
|
button_frame.pack(side=tk.RIGHT, padx=10)
|
|
|
|
self.start_button = ttk.Button(button_frame, text="START TEST", command=self.start_test, style='TButton')
|
|
self.start_button.pack(side=tk.TOP, pady=5)
|
|
|
|
self.stop_button = ttk.Button(button_frame, text="STOP TEST", command=self.stop_test, style='Warning.TButton', state=tk.DISABLED)
|
|
self.stop_button.pack(side=tk.TOP, pady=5)
|
|
|
|
# Continuous mode checkbox
|
|
self.continuous_var = tk.BooleanVar(value=True)
|
|
ttk.Checkbutton(button_frame, text="Continuous Mode", variable=self.continuous_var).pack(side=tk.TOP, pady=5)
|
|
|
|
# Plot area
|
|
self.plot_frame = ttk.Frame(self.content_frame)
|
|
self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5))
|
|
self.setup_plot()
|
|
|
|
# Status bar
|
|
self.status_var = tk.StringVar()
|
|
self.status_var.set("Ready")
|
|
self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W)
|
|
self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28)
|
|
|
|
def setup_plot(self):
|
|
"""Configure the matplotlib plot"""
|
|
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
|
|
# Adjust margins to give more space on right side
|
|
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
|
|
self.ax = self.fig.add_subplot(111)
|
|
self.ax.set_facecolor('#3B4252')
|
|
|
|
# Set initial voltage range based on charge/discharge cutoffs ±0.2V
|
|
voltage_padding = 0.2
|
|
min_voltage = max(0, self.discharge_cutoff.get() - voltage_padding)
|
|
max_voltage = self.charge_cutoff.get() + voltage_padding
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Voltage plot
|
|
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
|
|
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
|
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
|
|
|
# Current plot (right axis) - set initial range based on test current
|
|
self.ax2 = self.ax.twinx()
|
|
current_padding = 0.05
|
|
test_current = self.c_rate.get() * self.capacity.get()
|
|
max_current = test_current * 1.5 # Add 50% padding
|
|
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
|
|
|
|
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
|
self.ax2.set_ylabel("Current (A)", color='r')
|
|
self.ax2.tick_params(axis='y', labelcolor='r')
|
|
|
|
self.ax.set_xlabel('Time (s)', color=self.fg_color)
|
|
self.ax.set_title('Battery Test (CC)', color=self.fg_color)
|
|
self.ax.tick_params(axis='x', colors=self.fg_color)
|
|
self.ax.grid(True, color='#4C566A')
|
|
|
|
# Position legends to avoid overlap
|
|
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
|
|
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
|
|
|
|
# Embed plot
|
|
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
|
|
self.canvas.draw()
|
|
canvas_widget = self.canvas.get_tk_widget()
|
|
canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0)
|
|
canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=(10, 0))
|
|
|
|
def init_device(self):
|
|
"""Initialize the ADALM1000 device with continuous measurement"""
|
|
try:
|
|
# First try to clean up any existing session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Add small delay to allow device to reset
|
|
time.sleep(1)
|
|
|
|
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
|
if not self.session.devices:
|
|
raise Exception("No ADALM1000 detected - check connections")
|
|
|
|
self.dev = self.session.devices[0]
|
|
# Reset channels
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
self.dev.channels['B'].constant(0)
|
|
|
|
self.session.start(0)
|
|
|
|
self.status_light.itemconfig(self.status_indicator, fill='green')
|
|
self.connection_label.config(text="Connected")
|
|
self.status_var.set("Device connected | Ready to measure")
|
|
self.session_active = True
|
|
self.start_button.config(state=tk.NORMAL)
|
|
|
|
# Start continuous measurement thread
|
|
self.measurement_event = threading.Event()
|
|
self.measurement_event.set()
|
|
self.measurement_thread = threading.Thread(
|
|
target=self.continuous_measurement,
|
|
daemon=True
|
|
)
|
|
self.measurement_thread.start()
|
|
|
|
except Exception as e:
|
|
self.handle_device_error(e)
|
|
|
|
def continuous_measurement(self):
|
|
"""Continuous measurement with moving average filtering and optimized I/O"""
|
|
filter_window_size = 10
|
|
voltage_window = []
|
|
current_window = []
|
|
last_plot_update = 0
|
|
last_ui_update = 0
|
|
log_buffer = []
|
|
update_interval = 1.0 # Update UI at 1Hz max
|
|
|
|
# Initialize start_time for measurements
|
|
if not hasattr(self, 'start_time'):
|
|
self.start_time = time.time()
|
|
|
|
while self.measurement_event.is_set() and self.root.winfo_exists():
|
|
try:
|
|
# Read multiple samples for better accuracy
|
|
samples = self.dev.read(filter_window_size, 500, True)
|
|
if not samples:
|
|
raise DeviceDisconnectedError("No samples received")
|
|
|
|
# Get voltage from Channel B (HI_Z mode) and current from Channel A
|
|
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
|
|
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
|
|
current_time = time.time() - self.start_time
|
|
|
|
# Apply moving average filter
|
|
voltage_window.append(raw_voltage)
|
|
current_window.append(raw_current)
|
|
|
|
if len(voltage_window) > filter_window_size:
|
|
voltage_window.pop(0)
|
|
current_window.pop(0)
|
|
|
|
voltage = np.mean(voltage_window)
|
|
current = np.mean(current_window)
|
|
|
|
# Store filtered data
|
|
self.time_data.append(current_time)
|
|
self.voltage_data.append(voltage)
|
|
self.current_data.append(current)
|
|
|
|
# Throttle UI updates to prevent lag
|
|
now = time.time()
|
|
if now - last_ui_update > update_interval:
|
|
self.safe_after(0, lambda: self.update_measurement_display(
|
|
voltage, current, current_time
|
|
))
|
|
last_ui_update = now
|
|
|
|
# Throttle plot updates even more (1Hz max)
|
|
if now - last_plot_update > 1.0:
|
|
self.safe_after(0, self.update_plot)
|
|
last_plot_update = now
|
|
|
|
# Buffered logging
|
|
if self.test_running and hasattr(self, 'current_cycle_file'):
|
|
log_buffer.append([
|
|
f"{current_time:.3f}",
|
|
f"{voltage:.6f}",
|
|
f"{current:.6f}",
|
|
self.test_phase.get(),
|
|
f"{self.capacity_ah.get():.4f}",
|
|
f"{self.charge_capacity.get():.4f}",
|
|
f"{self.coulomb_efficiency.get():.1f}",
|
|
f"{self.cycle_count.get()}"
|
|
])
|
|
|
|
# Write in chunks of 10 samples
|
|
if len(log_buffer) >= 10:
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(log_buffer)
|
|
log_buffer.clear()
|
|
|
|
time.sleep(max(0.05, self.interval))
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if self.root.winfo_exists():
|
|
self.safe_after(0, lambda msg=error_msg:
|
|
self.handle_device_error(f"Measurement error: {msg}") if self.root.winfo_exists() else None)
|
|
break
|
|
|
|
# Flush remaining buffer on exit
|
|
if log_buffer and hasattr(self, 'current_cycle_file'):
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(log_buffer)
|
|
|
|
def start_test(self):
|
|
"""Start the full battery test cycle"""
|
|
if not self.test_running:
|
|
try:
|
|
# Validate inputs
|
|
if self.capacity.get() <= 0:
|
|
raise ValueError("Battery capacity must be positive")
|
|
if self.charge_cutoff.get() <= self.discharge_cutoff.get():
|
|
raise ValueError("Charge cutoff must be higher than discharge cutoff")
|
|
if self.c_rate.get() <= 0:
|
|
raise ValueError("C-rate must be positive")
|
|
|
|
# Set continuous mode based on checkbox
|
|
self.continuous_mode = self.continuous_var.get()
|
|
|
|
# Reset timing for new test
|
|
self.measurement_start_time = time.time()
|
|
self.test_start_time = time.time()
|
|
|
|
# Calculate target current
|
|
test_current = self.c_rate.get() * self.capacity.get()
|
|
|
|
if test_current > 0.2:
|
|
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
|
|
|
|
# Clear previous data
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
self.capacity_ah.set(0.0)
|
|
self.charge_capacity.set(0.0)
|
|
self.coulomb_efficiency.set(0.0)
|
|
self.cycle_count.set(0)
|
|
|
|
# Generate base filename without cycle number
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
|
|
self.current_cycle_file = None
|
|
|
|
# Start test thread
|
|
self.test_running = True
|
|
self.start_time = time.time()
|
|
self.last_update_time = time.time()
|
|
self.test_phase.set("Initial Discharge")
|
|
|
|
self.start_button.config(state=tk.DISABLED)
|
|
self.stop_button.config(state=tk.NORMAL)
|
|
self.status_var.set(f"Test started | Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A")
|
|
|
|
# Start test sequence in a new thread
|
|
self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True)
|
|
self.test_thread.start()
|
|
|
|
except Exception as e:
|
|
messagebox.showerror("Error", str(e))
|
|
|
|
def create_cycle_log_file(self):
|
|
"""Create a new log file for the current cycle"""
|
|
# Close previous file if exists
|
|
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing previous log file: {e}")
|
|
|
|
# Check write permissions
|
|
if not os.access(self.log_dir, os.W_OK):
|
|
messagebox.showerror("Error", f"No write permissions in {self.log_dir}")
|
|
return False
|
|
|
|
# Create new log file with sequential suffix
|
|
suffix = 1
|
|
while True:
|
|
self.filename = f"{self.base_filename}_{suffix}.csv"
|
|
if not os.path.exists(self.filename):
|
|
break
|
|
suffix += 1
|
|
|
|
try:
|
|
self.current_cycle_file = open(self.filename, 'w', newline='')
|
|
self.log_writer = csv.writer(self.current_cycle_file)
|
|
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase",
|
|
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
|
|
"Coulomb_Eff(%)", "Cycle"])
|
|
self.log_buffer = []
|
|
return True
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"Failed to create log file: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def format_time(seconds):
|
|
"""Convert seconds to hh:mm:ss format"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
seconds = int(seconds % 60)
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
def stop_test(self):
|
|
"""Request immediate stop of the test and clean up all test data"""
|
|
if not self.test_running:
|
|
return
|
|
|
|
self.request_stop = True
|
|
self.test_running = False
|
|
self.measuring = False
|
|
|
|
# Immediately set device to safe state
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Clear all data buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
self.phase_data.clear()
|
|
|
|
# Reset test values
|
|
self.test_phase.set("Idle")
|
|
self.capacity_ah.set(0.0)
|
|
self.charge_capacity.set(0.0)
|
|
self.coulomb_efficiency.set(0.0)
|
|
|
|
# Reset plot if it exists
|
|
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
# Reset to reasonable default ranges
|
|
self.ax.set_xlim(0, 10) # Small range instead of 0-1
|
|
self.ax.set_ylim(0, 2) # Typical battery voltage range
|
|
self.ax2.set_ylim(-0.2, 0.2) # Typical current range
|
|
self.canvas.draw()
|
|
|
|
# Update UI
|
|
self.status_var.set("Test stopped - Ready for new test")
|
|
self.stop_button.config(state=tk.DISABLED)
|
|
self.start_button.config(state=tk.NORMAL)
|
|
|
|
# Finalize test data (logs, etc.)
|
|
self.safe_after(100, self.finalize_test)
|
|
|
|
def center_window(self, window):
|
|
"""Center a window on screen"""
|
|
window.update_idletasks()
|
|
width = window.winfo_width()
|
|
height = window.winfo_height()
|
|
x = (window.winfo_screenwidth() // 2) - (width // 2)
|
|
y = (window.winfo_screenheight() // 2) - (height // 2)
|
|
window.geometry(f'{width}x{height}+{x}+{y}')
|
|
|
|
def run_test_sequence(self):
|
|
try:
|
|
test_current = self.c_rate.get() * self.capacity.get()
|
|
|
|
while self.test_running and (self.continuous_mode or self.cycle_count.get() == 0):
|
|
# Reset stop request at start of each cycle
|
|
self.request_stop = False
|
|
self.cycle_count.set(self.cycle_count.get() + 1)
|
|
|
|
# Create new log file for this cycle
|
|
self.create_cycle_log_file()
|
|
|
|
# 1. Charge phase (constant current)
|
|
self.test_phase.set("Charge")
|
|
self.status_var.set(f"Charging to {self.charge_cutoff.get()}V @ {test_current:.3f}A")
|
|
self.root.update() # Force UI update
|
|
|
|
self.measuring = True
|
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.dev.channels['A'].constant(test_current)
|
|
self.charge_capacity.set(0.0)
|
|
target_voltage = self.charge_cutoff.get()
|
|
self.last_update_time = time.time()
|
|
|
|
while self.test_running and not self.request_stop:
|
|
if not self.voltage_data:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.voltage_data[-1]
|
|
measured_current = abs(self.current_data[-1])
|
|
|
|
# Update charge capacity
|
|
now = time.time()
|
|
delta_t = now - self.last_update_time
|
|
self.last_update_time = now
|
|
self.charge_capacity.set(self.charge_capacity.get() + measured_current * delta_t / 3600)
|
|
|
|
self.status_var.set(
|
|
f"Charging: {current_voltage:.3f}V / {target_voltage}V | "
|
|
f"Current: {measured_current:.3f}A | "
|
|
f"Capacity: {self.charge_capacity.get():.4f}Ah"
|
|
)
|
|
self.root.update() # Force UI update
|
|
|
|
if current_voltage >= target_voltage or self.request_stop:
|
|
break
|
|
|
|
time.sleep(0.1) # More frequent checks
|
|
|
|
if self.request_stop or not self.test_running:
|
|
break
|
|
|
|
# 2. Rest period after charge
|
|
self.test_phase.set("Resting (Post-Charge)")
|
|
self.measuring = False
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
|
|
rest_end_time = time.time() + (self.rest_time.get() * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.status_var.set(
|
|
f"Resting after charge | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
self.root.update()
|
|
time.sleep(1) # Check every second for stop request
|
|
|
|
if self.request_stop or not self.test_running:
|
|
break
|
|
|
|
# 3. Discharge phase (capacity measurement)
|
|
self.test_phase.set("Discharge")
|
|
self.status_var.set(f"Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A")
|
|
self.root.update()
|
|
|
|
self.measuring = True
|
|
self.dev.channels['A'].mode = pysmu.Mode.SIMV
|
|
self.dev.channels['A'].constant(-test_current)
|
|
self.capacity_ah.set(0.0)
|
|
self.last_update_time = time.time()
|
|
|
|
while self.test_running and not self.request_stop:
|
|
if not self.current_data:
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
current_voltage = self.voltage_data[-1]
|
|
current_current = abs(self.current_data[-1])
|
|
|
|
# Capacity calculation
|
|
now = time.time()
|
|
delta_t = now - self.last_update_time
|
|
self.last_update_time = now
|
|
self.capacity_ah.set(self.capacity_ah.get() + current_current * delta_t / 3600)
|
|
|
|
if not self.continuous_var.get() and self.continuous_mode: # Only update if it was previously enabled
|
|
self.continuous_mode = False # Ensure we don't start a new cycle
|
|
self.status_var.set(
|
|
f"Continuous Mode disabled | "
|
|
f"Discharging to {self.discharge_cutoff.get()}V (will stop after this cycle) | "
|
|
f"Current: {current_current:.3f}A | "
|
|
f"Capacity: {self.capacity_ah.get():.4f}Ah"
|
|
)
|
|
self.root.update() # Force UI update
|
|
|
|
else:
|
|
# Default status message
|
|
self.status_var.set(
|
|
f"Discharging: {current_voltage:.3f}V / {self.discharge_cutoff.get()}V | "
|
|
f"Current: {current_current:.3f}A | "
|
|
f"Capacity: {self.capacity_ah.get():.4f}Ah"
|
|
)
|
|
|
|
self.root.update()
|
|
|
|
if current_voltage <= self.discharge_cutoff.get() or self.request_stop:
|
|
break
|
|
|
|
if not self.continuous_var.get():
|
|
self.test_running = False
|
|
break # Exit the main test loop
|
|
|
|
# 4. Rest period after discharge (only if not stopping)
|
|
if self.test_running and not self.request_stop:
|
|
self.test_phase.set("Resting (Post-Discharge)")
|
|
self.measuring = False
|
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
|
self.dev.channels['A'].constant(0)
|
|
|
|
rest_end_time = time.time() + (self.rest_time.get() * 3600)
|
|
while time.time() < rest_end_time and self.test_running and not self.request_stop:
|
|
time_left = max(0, rest_end_time - time.time())
|
|
self.status_var.set(
|
|
f"Resting after discharge | "
|
|
f"Time left: {time_left/60:.1f} min"
|
|
)
|
|
self.root.update()
|
|
time.sleep(1)
|
|
|
|
# Calculate Coulomb efficiency if not stopping
|
|
if not self.request_stop and self.charge_capacity.get() > 0:
|
|
efficiency = (self.capacity_ah.get() / self.charge_capacity.get()) * 100
|
|
self.coulomb_efficiency.set(efficiency)
|
|
|
|
# Update cycle info
|
|
self.status_var.set(
|
|
f"Cycle {self.cycle_count.get()} complete | "
|
|
f"Discharge: {self.capacity_ah.get():.3f}Ah | "
|
|
f"Charge: {self.charge_capacity.get():.3f}Ah | "
|
|
f"Efficiency: {self.coulomb_efficiency.get():.1f}%"
|
|
)
|
|
self.root.update()
|
|
|
|
# Write cycle summary to log file
|
|
self.write_cycle_summary()
|
|
|
|
# Flush remaining buffer data
|
|
if hasattr(self, 'log_buffer') and self.log_buffer:
|
|
with open(self.filename, 'a', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
|
|
# Finalize test if stopped or completed
|
|
self.safe_after(0, self.finalize_test)
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if self.root.winfo_exists():
|
|
self.safe_after(0, lambda msg=error_msg: messagebox.showerror("Test Error", msg))
|
|
self.safe_after(0, self.finalize_test)
|
|
|
|
def finalize_test(self):
|
|
"""Final cleanup after test completes or is stopped"""
|
|
self.measuring = False
|
|
if hasattr(self, 'dev'):
|
|
try:
|
|
self.dev.channels['A'].constant(0)
|
|
except Exception as e:
|
|
print(f"Error resetting device: {e}")
|
|
|
|
# Flush and close current log file
|
|
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
|
|
try:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
except Exception as e:
|
|
print(f"Error flushing log buffer: {e}")
|
|
|
|
if hasattr(self, 'current_cycle_file'):
|
|
try:
|
|
self.current_cycle_file.close()
|
|
except Exception as e:
|
|
print(f"Error closing log file: {e}")
|
|
|
|
self.start_button.config(state=tk.NORMAL)
|
|
self.stop_button.config(state=tk.DISABLED)
|
|
self.request_stop = False
|
|
|
|
message = (
|
|
f"Test safely stopped after discharge phase | "
|
|
f"Cycle {self.cycle_count.get()} completed | "
|
|
f"Final capacity: {self.capacity_ah.get():.3f}Ah"
|
|
)
|
|
self.status_var.set(message)
|
|
|
|
self.safe_after(0, lambda: messagebox.showinfo(
|
|
"Test Completed",
|
|
f"Test was safely stopped after discharge phase.\n\n"
|
|
f"Final discharge capacity: {self.capacity_ah.get():.3f}Ah\n"
|
|
f"Total cycles completed: {self.cycle_count.get()}"
|
|
))
|
|
|
|
def update_measurement_display(self, voltage, current, current_time):
|
|
"""Update all measurement displays at once (throttled to 1Hz)"""
|
|
try:
|
|
# Update all values regardless of change
|
|
self.voltage_label.config(text=f"{voltage:.4f}")
|
|
self.current_label.config(text=f"{current:.4f}")
|
|
self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}")
|
|
self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}")
|
|
self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}")
|
|
self.cycle_label.config(text=f"{self.cycle_count.get()}")
|
|
self.phase_label.config(text=self.test_phase.get())
|
|
self.time_label.config(text=self.format_time(current_time))
|
|
|
|
except Exception as e:
|
|
print(f"GUI update error: {e}")
|
|
|
|
def write_cycle_summary(self):
|
|
"""Write cycle summary to the current cycle's log file"""
|
|
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
|
|
return
|
|
|
|
summary_line = (
|
|
f"Cycle {self.cycle_count.get()} Summary - "
|
|
f"Discharge={self.capacity_ah.get():.4f}Ah, "
|
|
f"Charge={self.charge_capacity.get():.4f}Ah, "
|
|
f"Efficiency={self.coulomb_efficiency.get():.1f}%"
|
|
)
|
|
|
|
# Ensure file is open and write summary
|
|
try:
|
|
if self.log_buffer:
|
|
self.log_writer.writerows(self.log_buffer)
|
|
self.log_buffer.clear()
|
|
self.current_cycle_file.write(summary_line + "\n")
|
|
self.current_cycle_file.flush()
|
|
except Exception as e:
|
|
print(f"Error writing cycle summary: {e}")
|
|
|
|
def update_plot(self):
|
|
"""Optimized plot update with change detection"""
|
|
if not self.time_data or len(self.time_data) < 5: # Wait for at least 5 samples
|
|
return
|
|
|
|
# Only update if there's significant new data
|
|
if hasattr(self, '_last_plot_time') and (self.time_data[-1] - self._last_plot_time < 1.0):
|
|
return
|
|
|
|
self._last_plot_time = self.time_data[-1]
|
|
|
|
# Update plot data
|
|
self.line_voltage.set_data(self.time_data, self.voltage_data)
|
|
self.line_current.set_data(self.time_data, self.current_data)
|
|
|
|
# Auto-scale axes if needed
|
|
self.auto_scale_axes()
|
|
|
|
# Only redraw if needed
|
|
self.canvas.draw_idle()
|
|
|
|
def auto_scale_axes(self):
|
|
"""Auto-scale plot axes with appropriate padding"""
|
|
if not self.time_data:
|
|
return
|
|
|
|
# X-axis scaling
|
|
min_time = 0
|
|
max_time = self.time_data[-1] * 1.05 # 5% padding
|
|
current_xlim = self.ax.get_xlim()
|
|
if abs(current_xlim[1] - max_time) > (max_time * 0.1): # 10% change threshold
|
|
self.ax.set_xlim(min_time, max_time)
|
|
self.ax2.set_xlim(min_time, max_time)
|
|
|
|
# Voltage axis scaling
|
|
voltage_padding = 0.2
|
|
if self.voltage_data:
|
|
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
|
|
max_voltage = max(self.voltage_data) + voltage_padding
|
|
current_ylim = self.ax.get_ylim()
|
|
if (abs(current_ylim[0] - min_voltage) > 0.1 or
|
|
abs(current_ylim[1] - max_voltage) > 0.1):
|
|
self.ax.set_ylim(min_voltage, max_voltage)
|
|
|
|
# Current axis scaling
|
|
current_padding = 0.05
|
|
if self.current_data:
|
|
min_current = min(self.current_data) - current_padding
|
|
max_current = max(self.current_data) + current_padding
|
|
current_ylim2 = self.ax2.get_ylim()
|
|
if (abs(current_ylim2[0] - min_current) > 0.02 or
|
|
abs(current_ylim2[1] - max_current) > 0.02):
|
|
self.ax2.set_ylim(min_current, max_current)
|
|
|
|
def handle_device_error(self, error):
|
|
"""Handle device connection errors"""
|
|
if not self.root.winfo_exists(): # Check if window still exists
|
|
return
|
|
|
|
error_msg = str(error)
|
|
print(f"Device error: {error_msg}")
|
|
|
|
# Clean up session first
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except Exception as e:
|
|
print(f"Error cleaning up session: {e}")
|
|
|
|
# Update UI
|
|
self.status_light.itemconfig(self.status_indicator, fill='red')
|
|
self.connection_label.config(text="Disconnected")
|
|
self.status_var.set(f"Device error: {error_msg}")
|
|
|
|
self.session_active = False
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
if hasattr(self, 'start_button'):
|
|
self.start_button.config(state=tk.DISABLED)
|
|
if hasattr(self, 'stop_button'):
|
|
self.stop_button.config(state=tk.DISABLED)
|
|
|
|
# Clear plot + buffers
|
|
self.time_data.clear()
|
|
self.voltage_data.clear()
|
|
self.current_data.clear()
|
|
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
|
|
self.line_voltage.set_data([], [])
|
|
self.line_current.set_data([], [])
|
|
self.ax.set_xlim(0, 1)
|
|
self.ax2.set_xlim(0, 1)
|
|
self.canvas.draw()
|
|
|
|
# Show error message and attempt reconnect automatically
|
|
if self.root.winfo_exists():
|
|
self.safe_after(100, self.attempt_reconnect)
|
|
|
|
def safe_after(self, delay_ms, callback, *args):
|
|
"""Safely schedule a callback with window existence check"""
|
|
if not self.root.winfo_exists():
|
|
return None
|
|
|
|
def wrapped_callback():
|
|
if self.root.winfo_exists():
|
|
try:
|
|
callback(*args)
|
|
except Exception as e:
|
|
print(f"Callback error: {e}")
|
|
|
|
after_id = self.root.after(delay_ms, wrapped_callback)
|
|
self._after_ids.add(after_id)
|
|
return after_id
|
|
|
|
def attempt_reconnect(self):
|
|
"""Attempt to reconnect automatically"""
|
|
if not self.root.winfo_exists():
|
|
return
|
|
|
|
try:
|
|
# Show error message first
|
|
messagebox.showerror(
|
|
"Device Connection Error",
|
|
"Could not connect to ADALM1000\n\n"
|
|
"1. Check USB cable connection\n"
|
|
"2. The device will attempt to reconnect automatically"
|
|
)
|
|
except Exception as e:
|
|
print(f"Error showing message: {e}")
|
|
return
|
|
|
|
# Schedule reconnect attempt
|
|
self.safe_after(1000, self.reconnect_device)
|
|
|
|
def reconnect_device(self):
|
|
"""Reconnect the device with proper cleanup"""
|
|
if not self.root.winfo_exists():
|
|
return
|
|
|
|
self.status_var.set("Attempting to reconnect...")
|
|
self.root.update()
|
|
|
|
# Clear any existing session
|
|
if hasattr(self, 'session'):
|
|
try:
|
|
if self.session_active:
|
|
self.session.end()
|
|
del self.session
|
|
except:
|
|
pass
|
|
|
|
# Stop any running threads
|
|
self.test_running = False
|
|
self.continuous_mode = False
|
|
self.measuring = False
|
|
|
|
if hasattr(self, 'measurement_event'):
|
|
self.measurement_event.clear()
|
|
|
|
# Wait for threads to finish
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.join(timeout=1.0)
|
|
if hasattr(self, 'test_thread'):
|
|
self.test_thread.join(timeout=1.0)
|
|
|
|
# Add small delay to allow device to reset
|
|
time.sleep(1.5)
|
|
|
|
# Try to initialize device
|
|
try:
|
|
self.init_device()
|
|
if self.session_active:
|
|
self.status_var.set("Reconnected successfully")
|
|
return
|
|
except Exception as e:
|
|
print(f"Reconnect failed: {e}")
|
|
|
|
# If we get here, reconnection failed
|
|
self.status_var.set("Reconnect failed - will retry...")
|
|
self.safe_after(2000, self.reconnect_device) # Retry after 2 seconds
|
|
|
|
def on_close(self):
|
|
"""Clean up on window close"""
|
|
# Set flags to stop all threads
|
|
self.test_running = False
|
|
self.measuring = False
|
|
self.session_active = False
|
|
|
|
if hasattr(self, 'measurement_event'):
|
|
self.measurement_event.clear()
|
|
|
|
# Cancel all pending callbacks
|
|
with self._after_lock:
|
|
for after_id in self._after_ids:
|
|
try:
|
|
self.root.after_cancel(after_id)
|
|
except:
|
|
pass
|
|
self._after_ids.clear()
|
|
|
|
# Give threads time to clean up
|
|
timeout = 2.0
|
|
if hasattr(self, 'measurement_thread'):
|
|
self.measurement_thread.join(timeout=timeout)
|
|
if hasattr(self, 'test_thread'):
|
|
self.test_thread.join(timeout=timeout)
|
|
|
|
# Clean up device session
|
|
if hasattr(self, 'session') and self.session:
|
|
try:
|
|
self.session.end()
|
|
except Exception as e:
|
|
print(f"Error ending session: {e}")
|
|
|
|
# Finally destroy window
|
|
try:
|
|
self.root.destroy()
|
|
except:
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
root = tk.Tk()
|
|
try:
|
|
app = BatteryTester(root)
|
|
root.mainloop()
|
|
except Exception as e:
|
|
if root.winfo_exists():
|
|
messagebox.showerror("Fatal Error", f"Application failed: {str(e)}")
|
|
else:
|
|
print(f"Fatal Error: {e}")
|
|
try:
|
|
root.destroy()
|
|
except:
|
|
pass |