Adalm1000_Battery_SMU/MainCode/adalm1000_logger.py
Jan 25322bc59d MainCode/adalm1000_logger.py aktualisiert
Safer:

    Application shutdown

    Thread cleanup

    Error recovery

    Reconnection scenarios
(Deepseek)
2025-05-25 17:08:26 +02:00

1002 lines
41 KiB
Python

# -*- coding: utf-8 -*-
import os
import time
import csv
import threading
from datetime import datetime
import tkinter as tk
from tkinter import ttk, messagebox
import pysmu
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from collections import deque
class DeviceDisconnectedError(Exception):
pass
class BatteryTester:
def __init__(self, root):
self._after_ids = set() # Track scheduled callbacks
self._after_lock = threading.Lock()
# Color scheme
self.bg_color = "#2E3440"
self.fg_color = "#D8DEE9"
self.accent_color = "#5E81AC"
self.warning_color = "#BF616A"
self.success_color = "#A3BE8C"
# Main window configuration
self.root = root
self.root.title("ADALM1000 - Battery Capacity Tester (CC Test)")
self.root.geometry("1000x800")
self.root.minsize(800, 700)
self.root.configure(bg=self.bg_color)
# Device and measurement state
self.session_active = False
self.measuring = False
self.test_running = False
self.continuous_mode = False
self.request_stop = False
self.interval = 0.1
self.log_dir = os.path.expanduser("~/adalm1000/logs")
os.makedirs(self.log_dir, exist_ok=True)
# Battery test parameters
self.capacity = tk.DoubleVar(value=0.2) # Battery capacity in Ah
self.charge_cutoff = tk.DoubleVar(value=1.45) # Charge cutoff voltage
self.discharge_cutoff = tk.DoubleVar(value=0.9) # Discharge cutoff voltage
self.rest_time = tk.DoubleVar(value=0.1) # Rest time in hours
self.c_rate = tk.DoubleVar(value=0.1) # C-rate for test (default C/5 = 0.2)
# Test progress tracking
self.test_phase = tk.StringVar(value="Idle")
self.capacity_ah = tk.DoubleVar(value=0.0)
self.charge_capacity = tk.DoubleVar(value=0.0) # capacity tracking
self.coulomb_efficiency = tk.DoubleVar(value=0.0) # efficiency calculation
self.cycle_count = tk.IntVar(value=0) # cycle counting
# Data buffers
self.time_data = deque()
self.voltage_data = deque()
self.current_data = deque()
self.phase_data = deque()
# Initialize UI and device
self.setup_ui()
self.init_device()
# Ensure proper cleanup
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
def setup_ui(self):
"""Configure the user interface"""
self.style = ttk.Style()
self.style.theme_use('clam')
# Configure styles
self.style.configure('.', background=self.bg_color, foreground=self.fg_color)
self.style.configure('TFrame', background=self.bg_color)
self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color)
self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color,
padding=6, font=('Helvetica', 10, 'bold'))
self.style.map('TButton',
background=[('active', self.accent_color), ('disabled', '#4C566A')],
foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')])
self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color)
self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color)
self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold'))
self.style.configure('Status.TLabel', font=('Helvetica', 10))
self.style.configure('Warning.TButton', background=self.warning_color)
self.style.configure('Success.TButton', background=self.success_color)
# Main layout
self.content_frame = ttk.Frame(self.root)
self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Header area
header_frame = ttk.Frame(self.content_frame)
header_frame.pack(fill=tk.X, pady=(0, 20))
ttk.Label(header_frame, text="ADALM1000 Battery Capacity Tester (CC Test)", style='Header.TLabel').pack(side=tk.LEFT)
# Status indicator
self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0)
self.status_light.pack(side=tk.RIGHT, padx=10)
self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red')
self.connection_label = ttk.Label(header_frame, text="Disconnected")
self.connection_label.pack(side=tk.RIGHT)
# Reconnect button
self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device)
self.reconnect_btn.pack(side=tk.RIGHT, padx=10)
# Measurement display
display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15)
display_frame.pack(fill=tk.BOTH, expand=False)
# Measurement values
measurement_labels = [
("Voltage (V)", "V"),
("Current (A)", "A"),
("Test Phase", ""),
("Elapsed Time", "s"),
("Discharge Capacity", "Ah"),
("Charge Capacity", "Ah"),
("Coulomb Eff.", "%"),
("Cycle Count", ""),
]
for i, (label, unit) in enumerate(measurement_labels):
ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i//2, column=(i%2)*2, sticky=tk.W, pady=5)
value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel')
value_label.grid(row=i//2, column=(i%2)*2+1, sticky=tk.W, padx=10)
if unit:
ttk.Label(display_frame, text=unit).grid(row=i//2, column=(i%2)*2+2, sticky=tk.W)
if i == 0:
self.voltage_label = value_label
elif i == 1:
self.current_label = value_label
elif i == 2:
self.phase_label = value_label
elif i == 3:
self.time_label = value_label
elif i == 4:
self.capacity_label = value_label
elif i == 5:
self.charge_capacity_label = value_label
elif i == 6:
self.efficiency_label = value_label
elif i == 7:
self.cycle_label = value_label
# Control area
controls_frame = ttk.Frame(self.content_frame)
controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0)
# Parameters frame
params_frame = ttk.LabelFrame(controls_frame, text="Test Parameters", padding=10)
params_frame.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Battery capacity
ttk.Label(params_frame, text="Battery Capacity (Ah):").grid(row=0, column=0, sticky=tk.W)
ttk.Entry(params_frame, textvariable=self.capacity, width=6).grid(row=0, column=1, padx=5, sticky=tk.W)
# Charge cutoff
ttk.Label(params_frame, text="Charge Cutoff (V):").grid(row=1, column=0, sticky=tk.W)
ttk.Entry(params_frame, textvariable=self.charge_cutoff, width=6).grid(row=1, column=1, padx=5, sticky=tk.W)
# Discharge cutoff
ttk.Label(params_frame, text="Discharge Cutoff (V):").grid(row=2, column=0, sticky=tk.W)
ttk.Entry(params_frame, textvariable=self.discharge_cutoff, width=6).grid(row=2, column=1, padx=5, sticky=tk.W)
# Rest time
ttk.Label(params_frame, text="Rest Time (hours):").grid(row=3, column=0, sticky=tk.W)
ttk.Entry(params_frame, textvariable=self.rest_time, width=6).grid(row=3, column=1, padx=5, sticky=tk.W)
# C-rate for test (C/5 by default)
ttk.Label(params_frame, text="Test C-rate:").grid(row=0, column=2, sticky=tk.W, padx=(10,0))
ttk.Entry(params_frame, textvariable=self.c_rate, width=4).grid(row=0, column=3, padx=5, sticky=tk.W)
ttk.Label(params_frame, text="(e.g., 0.2 for C/5)").grid(row=0, column=4, sticky=tk.W)
# Start/Stop buttons
button_frame = ttk.Frame(controls_frame)
button_frame.pack(side=tk.RIGHT, padx=10)
self.start_button = ttk.Button(button_frame, text="START TEST", command=self.start_test, style='TButton')
self.start_button.pack(side=tk.TOP, pady=5)
self.stop_button = ttk.Button(button_frame, text="STOP TEST", command=self.stop_test, style='Warning.TButton', state=tk.DISABLED)
self.stop_button.pack(side=tk.TOP, pady=5)
# Continuous mode checkbox
self.continuous_var = tk.BooleanVar(value=True)
ttk.Checkbutton(button_frame, text="Continuous Mode", variable=self.continuous_var).pack(side=tk.TOP, pady=5)
# Plot area
self.plot_frame = ttk.Frame(self.content_frame)
self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5))
self.setup_plot()
# Status bar
self.status_var = tk.StringVar()
self.status_var.set("Ready")
self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W)
self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28)
def setup_plot(self):
"""Configure the matplotlib plot"""
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
# Adjust margins to give more space on right side
self.fig.subplots_adjust(left=0.1, right=0.88, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Set initial voltage range based on charge/discharge cutoffs ±0.2V
voltage_padding = 0.2
min_voltage = max(0, self.discharge_cutoff.get() - voltage_padding)
max_voltage = self.charge_cutoff.get() + voltage_padding
self.ax.set_ylim(min_voltage, max_voltage)
# Voltage plot
self.line_voltage, = self.ax.plot([], [], color='#00BFFF', label='Voltage (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current plot (right axis) - set initial range based on test current
self.ax2 = self.ax.twinx()
current_padding = 0.05
test_current = self.c_rate.get() * self.capacity.get()
max_current = test_current * 1.5 # Add 50% padding
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_title('Battery Test (CC)', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.grid(True, color='#4C566A')
# Position legends to avoid overlap
self.ax.legend(loc='upper left', bbox_to_anchor=(0.01, 0.99))
self.ax2.legend(loc='upper right', bbox_to_anchor=(0.99, 0.99))
# Embed plot
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
self.canvas.draw()
canvas_widget = self.canvas.get_tk_widget()
canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0)
canvas_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=True, pady=(10, 0))
def init_device(self):
"""Initialize the ADALM1000 device with continuous measurement"""
try:
# First try to clean up any existing session
if hasattr(self, 'session'):
try:
self.session.end()
del self.session
except:
pass
# Add small delay to allow device to reset
time.sleep(1)
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
if not self.session.devices:
raise Exception("No ADALM1000 detected - check connections")
self.dev = self.session.devices[0]
# Reset channels
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].constant(0)
self.session.start(0)
self.status_light.itemconfig(self.status_indicator, fill='green')
self.connection_label.config(text="Connected")
self.status_var.set("Device connected | Ready to measure")
self.session_active = True
self.start_button.config(state=tk.NORMAL)
# Start continuous measurement thread
self.measurement_event = threading.Event()
self.measurement_event.set()
self.measurement_thread = threading.Thread(
target=self.continuous_measurement,
daemon=True
)
self.measurement_thread.start()
except Exception as e:
self.handle_device_error(e)
def continuous_measurement(self):
"""Continuous measurement with moving average filtering and optimized I/O"""
filter_window_size = 10
voltage_window = []
current_window = []
last_plot_update = 0
last_ui_update = 0
log_buffer = []
update_interval = 1.0 # Update UI at 1Hz max
# Initialize start_time for measurements
if not hasattr(self, 'start_time'):
self.start_time = time.time()
while self.measurement_event.is_set() and self.root.winfo_exists():
try:
# Read multiple samples for better accuracy
samples = self.dev.read(filter_window_size, 500, True)
if not samples:
raise DeviceDisconnectedError("No samples received")
# Get voltage from Channel B (HI_Z mode) and current from Channel A
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
current_time = time.time() - self.start_time
# Apply moving average filter
voltage_window.append(raw_voltage)
current_window.append(raw_current)
if len(voltage_window) > filter_window_size:
voltage_window.pop(0)
current_window.pop(0)
voltage = np.mean(voltage_window)
current = np.mean(current_window)
# Store filtered data
self.time_data.append(current_time)
self.voltage_data.append(voltage)
self.current_data.append(current)
# Throttle UI updates to prevent lag
now = time.time()
if now - last_ui_update > update_interval:
self.safe_after(0, lambda: self.update_measurement_display(
voltage, current, current_time
))
last_ui_update = now
# Throttle plot updates even more (1Hz max)
if now - last_plot_update > 1.0:
self.safe_after(0, self.update_plot)
last_plot_update = now
# Buffered logging
if self.test_running and hasattr(self, 'current_cycle_file'):
log_buffer.append([
f"{current_time:.3f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase.get(),
f"{self.capacity_ah.get():.4f}",
f"{self.charge_capacity.get():.4f}",
f"{self.coulomb_efficiency.get():.1f}",
f"{self.cycle_count.get()}"
])
# Write in chunks of 10 samples
if len(log_buffer) >= 10:
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(log_buffer)
log_buffer.clear()
time.sleep(max(0.05, self.interval))
except Exception as e:
error_msg = str(e)
if self.root.winfo_exists():
self.safe_after(0, lambda msg=error_msg:
self.handle_device_error(f"Measurement error: {msg}") if self.root.winfo_exists() else None)
break
# Flush remaining buffer on exit
if log_buffer and hasattr(self, 'current_cycle_file'):
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(log_buffer)
def start_test(self):
"""Start the full battery test cycle"""
if not self.test_running:
try:
# Validate inputs
if self.capacity.get() <= 0:
raise ValueError("Battery capacity must be positive")
if self.charge_cutoff.get() <= self.discharge_cutoff.get():
raise ValueError("Charge cutoff must be higher than discharge cutoff")
if self.c_rate.get() <= 0:
raise ValueError("C-rate must be positive")
# Set continuous mode based on checkbox
self.continuous_mode = self.continuous_var.get()
# Reset timing for new test
self.measurement_start_time = time.time()
self.test_start_time = time.time()
# Calculate target current
test_current = self.c_rate.get() * self.capacity.get()
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear previous data
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
self.capacity_ah.set(0.0)
self.charge_capacity.set(0.0)
self.coulomb_efficiency.set(0.0)
self.cycle_count.set(0)
# Generate base filename without cycle number
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
self.current_cycle_file = None
# Start test thread
self.test_running = True
self.start_time = time.time()
self.last_update_time = time.time()
self.test_phase.set("Initial Discharge")
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.status_var.set(f"Test started | Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A")
# Start test sequence in a new thread
self.test_thread = threading.Thread(target=self.run_test_sequence, daemon=True)
self.test_thread.start()
except Exception as e:
messagebox.showerror("Error", str(e))
def create_cycle_log_file(self):
"""Create a new log file for the current cycle"""
# Close previous file if exists
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing previous log file: {e}")
# Check write permissions
if not os.access(self.log_dir, os.W_OK):
messagebox.showerror("Error", f"No write permissions in {self.log_dir}")
return False
# Create new log file with sequential suffix
suffix = 1
while True:
self.filename = f"{self.base_filename}_{suffix}.csv"
if not os.path.exists(self.filename):
break
suffix += 1
try:
self.current_cycle_file = open(self.filename, 'w', newline='')
self.log_writer = csv.writer(self.current_cycle_file)
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
"Coulomb_Eff(%)", "Cycle"])
self.log_buffer = []
return True
except Exception as e:
messagebox.showerror("Error", f"Failed to create log file: {e}")
return False
@staticmethod
def format_time(seconds):
"""Convert seconds to hh:mm:ss format"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
def stop_test(self):
"""Request immediate stop of the test"""
if not self.test_running:
return
self.request_stop = True
self.test_running = False # This will break out of all test loops
self.measuring = False
# Immediately set device to safe state
if hasattr(self, 'dev'):
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.status_var.set("Test stopped immediately")
self.stop_button.config(state=tk.DISABLED)
self.start_button.config(state=tk.NORMAL)
# Finalize test data
self.safe_after(100, self.finalize_test)
def center_window(self, window):
"""Center a window on screen"""
window.update_idletasks()
width = window.winfo_width()
height = window.winfo_height()
x = (window.winfo_screenwidth() // 2) - (width // 2)
y = (window.winfo_screenheight() // 2) - (height // 2)
window.geometry(f'{width}x{height}+{x}+{y}')
def run_test_sequence(self):
try:
test_current = self.c_rate.get() * self.capacity.get()
while self.test_running and (self.continuous_mode or self.cycle_count.get() == 0):
# Reset stop request at start of each cycle
self.request_stop = False
self.cycle_count.set(self.cycle_count.get() + 1)
# Create new log file for this cycle
self.create_cycle_log_file()
# 1. Charge phase (constant current)
self.test_phase.set("Charge")
self.status_var.set(f"Charging to {self.charge_cutoff.get()}V @ {test_current:.3f}A")
self.root.update() # Force UI update
self.measuring = True
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].mode = pysmu.Mode.SIMV
self.dev.channels['A'].constant(test_current)
self.charge_capacity.set(0.0)
target_voltage = self.charge_cutoff.get()
self.last_update_time = time.time()
while self.test_running and not self.request_stop:
if not self.voltage_data:
time.sleep(0.1)
continue
current_voltage = self.voltage_data[-1]
measured_current = abs(self.current_data[-1])
# Update charge capacity
now = time.time()
delta_t = now - self.last_update_time
self.last_update_time = now
self.charge_capacity.set(self.charge_capacity.get() + measured_current * delta_t / 3600)
self.status_var.set(
f"Charging: {current_voltage:.3f}V / {target_voltage}V | "
f"Current: {measured_current:.3f}A | "
f"Capacity: {self.charge_capacity.get():.4f}Ah"
)
self.root.update() # Force UI update
if current_voltage >= target_voltage or self.request_stop:
break
time.sleep(0.1) # More frequent checks
if self.request_stop or not self.test_running:
break
# 2. Rest period after charge
self.test_phase.set("Resting (Post-Charge)")
self.measuring = False
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
rest_end_time = time.time() + (self.rest_time.get() * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.status_var.set(
f"Resting after charge | "
f"Time left: {time_left/60:.1f} min"
)
self.root.update()
time.sleep(1) # Check every second for stop request
if self.request_stop or not self.test_running:
break
# 3. Discharge phase (capacity measurement)
self.test_phase.set("Discharge")
self.status_var.set(f"Discharging to {self.discharge_cutoff.get()}V @ {test_current:.3f}A")
self.root.update()
self.measuring = True
self.dev.channels['A'].mode = pysmu.Mode.SIMV
self.dev.channels['A'].constant(-test_current)
self.capacity_ah.set(0.0)
self.last_update_time = time.time()
while self.test_running and not self.request_stop:
if not self.current_data:
time.sleep(0.1)
continue
current_voltage = self.voltage_data[-1]
current_current = abs(self.current_data[-1])
# Capacity calculation
now = time.time()
delta_t = now - self.last_update_time
self.last_update_time = now
self.capacity_ah.set(self.capacity_ah.get() + current_current * delta_t / 3600)
self.status_var.set(
f"Discharging: {current_voltage:.3f}V / {self.discharge_cutoff.get()}V | "
f"Current: {current_current:.3f}A | "
f"Capacity: {self.capacity_ah.get():.4f}Ah"
)
self.root.update()
if current_voltage <= self.discharge_cutoff.get() or self.request_stop:
break
time.sleep(0.1) # More frequent checks
# 4. Rest period after discharge (only if not stopping)
if self.test_running and not self.request_stop:
self.test_phase.set("Resting (Post-Discharge)")
self.measuring = False
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
rest_end_time = time.time() + (self.rest_time.get() * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.status_var.set(
f"Resting after discharge | "
f"Time left: {time_left/60:.1f} min"
)
self.root.update()
time.sleep(1)
# Calculate Coulomb efficiency if not stopping
if not self.request_stop and self.charge_capacity.get() > 0:
efficiency = (self.capacity_ah.get() / self.charge_capacity.get()) * 100
self.coulomb_efficiency.set(efficiency)
# Update cycle info
self.status_var.set(
f"Cycle {self.cycle_count.get()} complete | "
f"Discharge: {self.capacity_ah.get():.3f}Ah | "
f"Charge: {self.charge_capacity.get():.3f}Ah | "
f"Efficiency: {self.coulomb_efficiency.get():.1f}%"
)
self.root.update()
# Write cycle summary to log file
self.write_cycle_summary()
# Short rest between cycles (only in continuous mode)
if self.continuous_mode and self.test_running and not self.request_stop:
rest_end_time = time.time() + (self.rest_time.get() * 3600)
while time.time() < rest_end_time and self.test_running and not self.request_stop:
time_left = max(0, rest_end_time - time.time())
self.test_phase.set("Resting Between Cycles")
self.status_var.set(
f"Resting between cycles | "
f"Time left: {time_left/60:.1f} min"
)
self.root.update()
time.sleep(1)
# Finalize test if stopped or completed
self.safe_after(0, self.finalize_test)
except Exception as e:
error_msg = str(e)
if self.root.winfo_exists():
self.safe_after(0, lambda msg=error_msg: messagebox.showerror("Test Error", msg))
self.safe_after(0, self.finalize_test)
def finalize_test(self):
"""Final cleanup after test completes or is stopped"""
self.measuring = False
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].constant(0)
except Exception as e:
print(f"Error resetting device: {e}")
# Flush and close current log file
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
try:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
except Exception as e:
print(f"Error flushing log buffer: {e}")
if hasattr(self, 'current_cycle_file'):
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing log file: {e}")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.request_stop = False
message = (
f"Test safely stopped after discharge phase | "
f"Cycle {self.cycle_count.get()} completed | "
f"Final capacity: {self.capacity_ah.get():.3f}Ah"
)
self.status_var.set(message)
self.safe_after(0, lambda: messagebox.showinfo(
"Test Completed",
f"Test was safely stopped after discharge phase.\n\n"
f"Final discharge capacity: {self.capacity_ah.get():.3f}Ah\n"
f"Total cycles completed: {self.cycle_count.get()}"
))
def update_measurement_display(self, voltage, current, current_time):
"""Update all measurement displays at once (throttled to 1Hz)"""
try:
# Update all values regardless of change
self.voltage_label.config(text=f"{voltage:.4f}")
self.current_label.config(text=f"{current:.4f}")
self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}")
self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}")
self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}")
self.cycle_label.config(text=f"{self.cycle_count.get()}")
self.phase_label.config(text=self.test_phase.get())
self.time_label.config(text=self.format_time(current_time))
except Exception as e:
print(f"GUI update error: {e}")
def write_cycle_summary(self):
"""Write cycle summary to the current cycle's log file"""
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
return
summary_line = (
f"Cycle {self.cycle_count.get()} Summary - "
f"Discharge={self.capacity_ah.get():.4f}Ah, "
f"Charge={self.charge_capacity.get():.4f}Ah, "
f"Efficiency={self.coulomb_efficiency.get():.1f}%"
)
# Ensure file is open and write summary
try:
if self.log_buffer:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
self.current_cycle_file.write(summary_line + "\n")
self.current_cycle_file.flush()
except Exception as e:
print(f"Error writing cycle summary: {e}")
def update_plot(self):
"""Optimized plot update with change detection"""
if not self.time_data or len(self.time_data) < 5: # Wait for at least 5 samples
return
# Only update if there's significant new data
if hasattr(self, '_last_plot_time') and (self.time_data[-1] - self._last_plot_time < 1.0):
return
self._last_plot_time = self.time_data[-1]
# Update plot data
self.line_voltage.set_data(self.time_data, self.voltage_data)
self.line_current.set_data(self.time_data, self.current_data)
# Auto-scale axes if needed
self.auto_scale_axes()
# Only redraw if needed
self.canvas.draw_idle()
def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding"""
if not self.time_data:
return
# X-axis scaling
min_time = 0
max_time = self.time_data[-1] * 1.05 # 5% padding
current_xlim = self.ax.get_xlim()
if abs(current_xlim[1] - max_time) > (max_time * 0.1): # 10% change threshold
self.ax.set_xlim(min_time, max_time)
self.ax2.set_xlim(min_time, max_time)
# Voltage axis scaling
voltage_padding = 0.2
if self.voltage_data:
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
max_voltage = max(self.voltage_data) + voltage_padding
current_ylim = self.ax.get_ylim()
if (abs(current_ylim[0] - min_voltage) > 0.1 or
abs(current_ylim[1] - max_voltage) > 0.1):
self.ax.set_ylim(min_voltage, max_voltage)
# Current axis scaling
current_padding = 0.05
if self.current_data:
min_current = min(self.current_data) - current_padding
max_current = max(self.current_data) + current_padding
current_ylim2 = self.ax2.get_ylim()
if (abs(current_ylim2[0] - min_current) > 0.02 or
abs(current_ylim2[1] - max_current) > 0.02):
self.ax2.set_ylim(min_current, max_current)
def handle_device_error(self, error):
"""Handle device connection errors"""
if not self.root.winfo_exists(): # Check if window still exists
return
error_msg = str(error)
print(f"Device error: {error_msg}")
# Clean up session first
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except Exception as e:
print(f"Error cleaning up session: {e}")
# Update UI
self.status_light.itemconfig(self.status_indicator, fill='red')
self.connection_label.config(text="Disconnected")
self.status_var.set(f"Device error: {error_msg}")
self.session_active = False
self.test_running = False
self.continuous_mode = False
self.measuring = False
if hasattr(self, 'start_button'):
self.start_button.config(state=tk.DISABLED)
if hasattr(self, 'stop_button'):
self.stop_button.config(state=tk.DISABLED)
# Clear plot + buffers
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
if hasattr(self, 'line_voltage') and hasattr(self, 'line_current'):
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
# Show error message and attempt reconnect automatically
if self.root.winfo_exists():
self.safe_after(100, self.attempt_reconnect)
def safe_after(self, delay_ms, callback, *args):
"""Safely schedule a callback with window existence check"""
if not self.root.winfo_exists():
return None
def wrapped_callback():
if self.root.winfo_exists():
try:
callback(*args)
except Exception as e:
print(f"Callback error: {e}")
after_id = self.root.after(delay_ms, wrapped_callback)
self._after_ids.add(after_id)
return after_id
def attempt_reconnect(self):
"""Attempt to reconnect automatically"""
if not self.root.winfo_exists():
return
try:
# Show error message first
messagebox.showerror(
"Device Connection Error",
"Could not connect to ADALM1000\n\n"
"1. Check USB cable connection\n"
"2. The device will attempt to reconnect automatically"
)
except Exception as e:
print(f"Error showing message: {e}")
return
# Schedule reconnect attempt
self.safe_after(1000, self.reconnect_device)
def reconnect_device(self):
"""Reconnect the device with proper cleanup"""
if not self.root.winfo_exists():
return
self.status_var.set("Attempting to reconnect...")
self.root.update()
# Clear any existing session
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except:
pass
# Stop any running threads
self.test_running = False
self.continuous_mode = False
self.measuring = False
if hasattr(self, 'measurement_event'):
self.measurement_event.clear()
# Wait for threads to finish
if hasattr(self, 'measurement_thread'):
self.measurement_thread.join(timeout=1.0)
if hasattr(self, 'test_thread'):
self.test_thread.join(timeout=1.0)
# Add small delay to allow device to reset
time.sleep(1.5)
# Try to initialize device
try:
self.init_device()
if self.session_active:
self.status_var.set("Reconnected successfully")
return
except Exception as e:
print(f"Reconnect failed: {e}")
# If we get here, reconnection failed
self.status_var.set("Reconnect failed - will retry...")
self.safe_after(2000, self.reconnect_device) # Retry after 2 seconds
def on_close(self):
"""Clean up on window close"""
# Set flags to stop all threads
self.test_running = False
self.measuring = False
self.session_active = False
if hasattr(self, 'measurement_event'):
self.measurement_event.clear()
# Cancel all pending callbacks
with self._after_lock:
for after_id in self._after_ids:
try:
self.root.after_cancel(after_id)
except:
pass
self._after_ids.clear()
# Give threads time to clean up
timeout = 2.0
if hasattr(self, 'measurement_thread'):
self.measurement_thread.join(timeout=timeout)
if hasattr(self, 'test_thread'):
self.test_thread.join(timeout=timeout)
# Clean up device session
if hasattr(self, 'session') and self.session:
try:
self.session.end()
except Exception as e:
print(f"Error ending session: {e}")
# Finally destroy window
try:
self.root.destroy()
except:
pass
if __name__ == "__main__":
root = tk.Tk()
try:
app = BatteryTester(root)
root.mainloop()
except Exception as e:
if root.winfo_exists():
messagebox.showerror("Fatal Error", f"Application failed: {str(e)}")
else:
print(f"Fatal Error: {e}")
try:
root.destroy()
except:
pass