MainCode/adalm1000_logger.py aktualisiert

Separate Log Files: You've successfully implemented separate log files for each cycle without cycle numbers in filenames.

    1Hz UI Updates: The measurement display updates at 1Hz as requested.

    Throttled Plot Updates: Plot updates are properly throttled to prevent lag.

    Error Handling: Good error handling throughout the code.

    Thread Safety: Proper use of threading for measurements and test sequences.
(Deepseek)
This commit is contained in:
Jan 2025-05-24 13:23:14 +02:00
parent 1c928e22fc
commit 24cc224138

View File

@ -303,7 +303,9 @@ class BatteryTester:
voltage_window = []
current_window = []
last_plot_update = 0
last_ui_update = 0
log_buffer = []
update_interval = 1.0 # Update UI at 1Hz max
# Initialize start_time for measurements
if not hasattr(self, 'start_time'):
@ -337,13 +339,21 @@ class BatteryTester:
self.voltage_data.append(voltage)
self.current_data.append(current)
# Update UI with filtered values (throttled)
if current_time - last_plot_update > 0.5: # Update at 2Hz max
self.root.after(0, lambda: self.update_measurement_display(voltage, current, current_time))
last_plot_update = current_time
# Throttle UI updates to prevent lag
now = time.time()
if now - last_ui_update > update_interval:
self.root.after(0, lambda: self.update_measurement_display(
voltage, current, current_time
))
last_ui_update = now
# Throttle plot updates even more (1Hz max)
if now - last_plot_update > 1.0:
self.root.after(0, self.update_plot)
last_plot_update = now
# Buffered logging
if self.test_running and hasattr(self, 'filename'):
if self.test_running and hasattr(self, 'current_cycle_file'):
log_buffer.append([
f"{current_time:.3f}",
f"{voltage:.6f}",
@ -372,7 +382,7 @@ class BatteryTester:
break
# Flush remaining buffer on exit
if log_buffer and hasattr(self, 'filename'):
if log_buffer and hasattr(self, 'current_cycle_file'):
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(log_buffer)
@ -412,14 +422,11 @@ class BatteryTester:
self.coulomb_efficiency.set(0.0)
self.cycle_count.set(0)
# Setup new log file with buffered writer
# Generate base filename without cycle number
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
self.log_file = open(self.filename, 'w', newline='')
self.log_writer = csv.writer(self.log_file)
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle"])
self.log_buffer = []
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
self.current_cycle_file = None
# Start test thread
self.test_running = True
self.start_time = time.time()
@ -436,6 +443,40 @@ class BatteryTester:
except Exception as e:
messagebox.showerror("Error", str(e))
def create_cycle_log_file(self):
"""Create a new log file for the current cycle"""
# Close previous file if exists
if hasattr(self, 'current_cycle_file') and self.current_cycle_file:
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing previous log file: {e}")
# Check write permissions
if not os.access(self.log_dir, os.W_OK):
messagebox.showerror("Error", f"No write permissions in {self.log_dir}")
return False
# Create new log file with sequential suffix
suffix = 1
while True:
self.filename = f"{self.base_filename}_{suffix}.csv"
if not os.path.exists(self.filename):
break
suffix += 1
try:
self.current_cycle_file = open(self.filename, 'w', newline='')
self.log_writer = csv.writer(self.current_cycle_file)
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase",
"Discharge_Capacity(Ah)", "Charge_Capacity(Ah)",
"Coulomb_Eff(%)", "Cycle"])
self.log_buffer = []
return True
except Exception as e:
messagebox.showerror("Error", f"Failed to create log file: {e}")
return False
@staticmethod
def format_time(seconds):
@ -484,6 +525,9 @@ class BatteryTester:
self.request_stop = False
self.cycle_count.set(self.cycle_count.get() + 1)
# Create new log file for this cycle
self.create_cycle_log_file()
# 1. Charge phase (constant current)
self.test_phase.set("Charge")
self.status_var.set(f"Charging to {self.charge_cutoff.get()}V @ {test_current:.3f}A")
@ -642,23 +686,29 @@ class BatteryTester:
"""Final cleanup after test completes or is stopped"""
self.measuring = False
if hasattr(self, 'dev'):
self.dev.channels['A'].constant(0)
try:
self.dev.channels['A'].constant(0)
except Exception as e:
print(f"Error resetting device: {e}")
# Flush and close log file
# Flush and close current log file
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
if hasattr(self, 'log_file'):
self.log_file.close()
try:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
except Exception as e:
print(f"Error flushing log buffer: {e}")
if hasattr(self, 'filename'):
self.write_cycle_summary()
if hasattr(self, 'current_cycle_file'):
try:
self.current_cycle_file.close()
except Exception as e:
print(f"Error closing log file: {e}")
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
self.request_stop = False
# Erfolgsmeldung mit mehr Details
message = (
f"Test safely stopped after discharge phase | "
f"Cycle {self.cycle_count.get()} completed | "
@ -666,7 +716,6 @@ class BatteryTester:
)
self.status_var.set(message)
# Optional: Messagebox anzeigen
self.root.after(0, lambda: messagebox.showinfo(
"Test Completed",
f"Test was safely stopped after discharge phase.\n\n"
@ -675,127 +724,96 @@ class BatteryTester:
))
def update_measurement_display(self, voltage, current, current_time):
"""Update display with current measurements (optimized to only update changed values)"""
"""Update all measurement displays at once (throttled to 1Hz)"""
try:
# Only update changed values
voltage_text = f"{voltage:.4f}"
if not hasattr(self, '_last_voltage_text') or self._last_voltage_text != voltage_text:
self.voltage_label.config(text=voltage_text)
self._last_voltage_text = voltage_text
capacity_text = f"{self.capacity_ah.get():.4f}"
if not hasattr(self, '_last_capacity_text') or self._last_capacity_text != capacity_text:
self.capacity_label.config(text=capacity_text)
self._last_capacity_text = capacity_text
charge_capacity_text = f"{self.charge_capacity.get():.4f}"
if not hasattr(self, '_last_charge_capacity_text') or self._last_charge_capacity_text != charge_capacity_text:
self.charge_capacity_label.config(text=charge_capacity_text)
self._last_charge_capacity_text = charge_capacity_text
efficiency_text = f"{self.coulomb_efficiency.get():.1f}"
if not hasattr(self, '_last_efficiency_text') or self._last_efficiency_text != efficiency_text:
self.efficiency_label.config(text=efficiency_text)
self._last_efficiency_text = efficiency_text
cycle_text = f"{self.cycle_count.get()}"
if not hasattr(self, '_last_cycle_text') or self._last_cycle_text != cycle_text:
self.cycle_label.config(text=cycle_text)
self._last_cycle_text = cycle_text
current_text = f"{current:.4f}"
if not hasattr(self, '_last_current_text') or self._last_current_text != current_text:
self.current_label.config(text=current_text)
self._last_current_text = current_text
phase_text = self.test_phase.get()
if not hasattr(self, '_last_phase_text') or self._last_phase_text != phase_text:
self.phase_label.config(text=phase_text)
self._last_phase_text = phase_text
time_text = self.format_time(current_time)
if not hasattr(self, '_last_time_text') or self._last_time_text != time_text:
self.time_label.config(text=time_text)
self._last_time_text = time_text
# Update plot with proper scaling (throttled)
if not hasattr(self, '_last_plot_update') or (time.time() - self._last_plot_update > 1.0):
self.update_plot()
self._last_plot_update = time.time()
# Update all values regardless of change
self.voltage_label.config(text=f"{voltage:.4f}")
self.current_label.config(text=f"{current:.4f}")
self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}")
self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}")
self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}")
self.cycle_label.config(text=f"{self.cycle_count.get()}")
self.phase_label.config(text=self.test_phase.get())
self.time_label.config(text=self.format_time(current_time))
except Exception as e:
print(f"GUI update error: {e}")
def write_cycle_summary(self):
"""Write cycle summary to the log file"""
if not hasattr(self, 'filename'):
"""Write cycle summary to the current cycle's log file"""
if not hasattr(self, 'current_cycle_file') or not self.current_cycle_file:
return
summary_line = (
f"Cycle {self.cycle_count.get()} - "
f"Cycle {self.cycle_count.get()} Summary - "
f"Discharge={self.capacity_ah.get():.4f}Ah, "
f"Charge={self.charge_capacity.get():.4f}Ah, "
f"Efficiency={self.coulomb_efficiency.get():.1f}%"
)
with open(self.filename, 'a', newline='') as f:
f.write(summary_line + "\n")
# Ensure file is open and write summary
try:
if self.log_buffer:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
self.current_cycle_file.write(summary_line + "\n")
self.current_cycle_file.flush()
except Exception as e:
print(f"Error writing cycle summary: {e}")
def update_plot(self):
"""Update plot with proper scaling and limits (optimized)"""
if not self.time_data:
"""Optimized plot update with change detection"""
if not self.time_data or len(self.time_data) < 10: # Wait for at least 10 samples
return
# Only update if there's significant new data
if hasattr(self, '_last_plot_len') and len(self.time_data) - self._last_plot_len < 10:
if hasattr(self, '_last_plot_time') and (self.time_data[-1] - self._last_plot_time < 1.0):
return
self._last_plot_len = len(self.time_data)
self._last_plot_time = self.time_data[-1]
# Update plot data
self.line_voltage.set_data(self.time_data, self.voltage_data)
self.line_current.set_data(self.time_data, self.current_data)
# Set x-axis to always show from 0 to current max time
min_time = 0 # Always start from 0
max_time = self.time_data[-1] + 1 # Add 1 second padding
# Auto-scale axes if needed
self.auto_scale_axes()
# Only adjust limits if needed
# Only redraw if needed
self.canvas.draw_idle()
def auto_scale_axes(self):
"""Auto-scale plot axes with appropriate padding"""
if not self.time_data:
return
# X-axis scaling
min_time = 0
max_time = self.time_data[-1] * 1.05 # 5% padding
current_xlim = self.ax.get_xlim()
if abs(current_xlim[1] - max_time) > 5: # Only adjust if significant change
if abs(current_xlim[1] - max_time) > (max_time * 0.1): # 10% change threshold
self.ax.set_xlim(min_time, max_time)
self.ax2.set_xlim(min_time, max_time)
# Auto-scale y-axes but respect initial boundaries
# Voltage axis scaling
voltage_padding = 0.2
if self.voltage_data:
min_voltage = max(0, min(self.voltage_data) - voltage_padding)
max_voltage = max(self.voltage_data) + voltage_padding
# Ensure we don't go below discharge cutoff - padding or above charge cutoff + padding
min_voltage = max(min_voltage, max(0, self.discharge_cutoff.get() - voltage_padding*2))
max_voltage = min(max_voltage, self.charge_cutoff.get() + voltage_padding*2)
current_ylim = self.ax.get_ylim()
if abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1:
if (abs(current_ylim[0] - min_voltage) > 0.1 or
abs(current_ylim[1] - max_voltage) > 0.1):
self.ax.set_ylim(min_voltage, max_voltage)
# Current axis scaling
current_padding = 0.05
if self.current_data:
test_current = self.c_rate.get() * self.capacity.get()
min_current = min(self.current_data) - current_padding
max_current = max(self.current_data) + current_padding
# Ensure we don't go too far beyond test current
min_current = max(min_current, -test_current*1.5)
max_current = min(max_current, test_current*1.5)
current_ylim2 = self.ax2.get_ylim()
if abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02:
if (abs(current_ylim2[0] - min_current) > 0.02 or
abs(current_ylim2[1] - max_current) > 0.02):
self.ax2.set_ylim(min_current, max_current)
# Only redraw if needed
self.canvas.draw_idle()
def handle_device_error(self, error):
"""Handle device connection errors"""
@ -912,17 +930,24 @@ class BatteryTester:
if hasattr(self, 'measurement_event'):
self.measurement_event.clear()
# Give threads time to clean up
timeout = 2.0 # seconds
if hasattr(self, 'measurement_thread'):
self.measurement_thread.join(timeout=1.0)
self.measurement_thread.join(timeout=timeout)
if self.measurement_thread.is_alive():
print("Warning: Measurement thread did not terminate cleanly")
if hasattr(self, 'test_thread'):
self.test_thread.join(timeout=1.0)
self.test_thread.join(timeout=timeout)
if self.test_thread.is_alive():
print("Warning: Test thread did not terminate cleanly")
if hasattr(self, 'session') and self.session:
try:
if self.session_active:
self.session.end()
except:
pass
except Exception as e:
print(f"Error ending session: {e}")
self.root.destroy()