MainCode/adalm1000_logger.py aktualisiert

Reducing unnecessary GUI updates

    Implementing buffered file I/O

    Throttling plot updates

    Only updating display elements when values change

    Using more efficient drawing methods for the plot
(Deepseek)
This commit is contained in:
Jan 2025-05-23 23:41:21 +02:00
parent 06c99bae38
commit 13148a64de

View File

@ -285,10 +285,12 @@ class BatteryTester:
self.handle_device_error(e)
def continuous_measurement(self):
"""Continuous measurement with moving average filtering"""
"""Continuous measurement with moving average filtering and optimized I/O"""
filter_window_size = 10
voltage_window = []
current_window = []
last_plot_update = 0
log_buffer = []
# Initialize start_time for measurements
if not hasattr(self, 'start_time'):
@ -304,7 +306,7 @@ class BatteryTester:
# Get voltage from Channel B (HI_Z mode) and current from Channel A
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
current_time = time.time() - (self.start_time if hasattr(self, 'start_time') else time.time())
current_time = time.time() - self.start_time
# Apply moving average filter
voltage_window.append(raw_voltage)
@ -322,24 +324,30 @@ class BatteryTester:
self.voltage_data.append(voltage)
self.current_data.append(current)
# Update UI with filtered values
if self.root.winfo_exists():
# Update UI with filtered values (throttled)
if current_time - last_plot_update > 0.5: # Update at 2Hz max
self.root.after(0, lambda: self.update_measurement_display(voltage, current, current_time))
last_plot_update = current_time
# Save data if in active test
# Buffered logging
if self.test_running and hasattr(self, 'filename'):
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
f"{current_time:.3f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase.get(),
f"{self.capacity_ah.get():.4f}",
f"{self.charge_capacity.get():.4f}",
f"{self.coulomb_efficiency.get():.1f}",
f"{self.cycle_count.get()}"
])
log_buffer.append([
f"{current_time:.3f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase.get(),
f"{self.capacity_ah.get():.4f}",
f"{self.charge_capacity.get():.4f}",
f"{self.coulomb_efficiency.get():.1f}",
f"{self.cycle_count.get()}"
])
# Write in chunks of 10 samples
if len(log_buffer) >= 10:
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(log_buffer)
log_buffer.clear()
time.sleep(max(0.05, self.interval))
@ -350,6 +358,12 @@ class BatteryTester:
self.handle_device_error(f"Measurement error: {msg}") if self.root.winfo_exists() else None)
break
# Flush remaining buffer on exit
if log_buffer and hasattr(self, 'filename'):
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerows(log_buffer)
def start_test(self):
"""Start the full battery test cycle"""
if not self.test_running:
@ -385,13 +399,13 @@ class BatteryTester:
self.coulomb_efficiency.set(0.0)
self.cycle_count.set(0)
# Setup new log file
# Setup new log file with buffered writer
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
with open(self.filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle"])
self.log_file = open(self.filename, 'w', newline='')
self.log_writer = csv.writer(self.log_file)
self.log_writer.writerow(["Time(s)", "Voltage(V)", "Current(A)", "Phase", "Discharge_Capacity(Ah)", "Charge_Capacity(Ah)", "Coulomb_Eff(%)", "Cycle"])
self.log_buffer = []
# Start test thread
self.test_running = True
@ -629,6 +643,13 @@ class BatteryTester:
if hasattr(self, 'dev'):
self.dev.channels['A'].constant(0)
# Flush and close log file
if hasattr(self, 'log_buffer') and self.log_buffer and hasattr(self, 'log_writer'):
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
if hasattr(self, 'log_file'):
self.log_file.close()
if hasattr(self, 'filename'):
self.write_cycle_summary()
@ -653,19 +674,34 @@ class BatteryTester:
))
def update_measurement_display(self, voltage, current, current_time):
"""Update display with current measurements"""
"""Update display with current measurements (optimized to only update changed values)"""
try:
self.voltage_label.config(text=f"{voltage:.4f}")
self.current_label.config(text=f"{current:.4f}")
self.phase_label.config(text=self.test_phase.get())
self.time_label.config(text=self.format_time(current_time))
self.capacity_label.config(text=f"{self.capacity_ah.get():.4f}")
self.charge_capacity_label.config(text=f"{self.charge_capacity.get():.4f}")
self.efficiency_label.config(text=f"{self.coulomb_efficiency.get():.1f}")
self.cycle_label.config(text=str(self.cycle_count.get()))
# Only update changed values
voltage_text = f"{voltage:.4f}"
if not hasattr(self, '_last_voltage_text') or self._last_voltage_text != voltage_text:
self.voltage_label.config(text=voltage_text)
self._last_voltage_text = voltage_text
current_text = f"{current:.4f}"
if not hasattr(self, '_last_current_text') or self._last_current_text != current_text:
self.current_label.config(text=current_text)
self._last_current_text = current_text
phase_text = self.test_phase.get()
if not hasattr(self, '_last_phase_text') or self._last_phase_text != phase_text:
self.phase_label.config(text=phase_text)
self._last_phase_text = phase_text
time_text = self.format_time(current_time)
if not hasattr(self, '_last_time_text') or self._last_time_text != time_text:
self.time_label.config(text=time_text)
self._last_time_text = time_text
# Update plot with proper scaling (throttled)
if not hasattr(self, '_last_plot_update') or (time.time() - self._last_plot_update > 1.0):
self.update_plot()
self._last_plot_update = time.time()
# Update plot with proper scaling
self.update_plot()
except Exception as e:
print(f"GUI update error: {e}")
@ -685,10 +721,16 @@ class BatteryTester:
f.write(summary_line + "\n")
def update_plot(self):
"""Update plot with proper scaling and limits"""
"""Update plot with proper scaling and limits (optimized)"""
if not self.time_data:
return
# Only update if there's significant new data
if hasattr(self, '_last_plot_len') and len(self.time_data) - self._last_plot_len < 10:
return
self._last_plot_len = len(self.time_data)
# Update plot data
self.line_voltage.set_data(self.time_data, self.voltage_data)
self.line_current.set_data(self.time_data, self.current_data)
@ -697,23 +739,31 @@ class BatteryTester:
min_time = 0 # Always start from 0
max_time = self.time_data[-1] + 1 # Add 1 second padding
self.ax.set_xlim(min_time, max_time)
self.ax2.set_xlim(min_time, max_time)
# Only adjust limits if needed
current_xlim = self.ax.get_xlim()
if abs(current_xlim[1] - max_time) > 5: # Only adjust if significant change
self.ax.set_xlim(min_time, max_time)
self.ax2.set_xlim(min_time, max_time)
# Auto-scale y-axes with some margin
# Auto-scale y-axes with some margin (only if significant change)
if self.voltage_data:
voltage_margin = 0.2
min_voltage = max(0, min(self.voltage_data) - voltage_margin)
max_voltage = max(self.voltage_data) + voltage_margin
self.ax.set_ylim(min_voltage, max_voltage)
current_ylim = self.ax.get_ylim()
if abs(current_ylim[0] - min_voltage) > 0.1 or abs(current_ylim[1] - max_voltage) > 0.1:
self.ax.set_ylim(min_voltage, max_voltage)
if self.current_data:
current_margin = 0.05
min_current = min(self.current_data) - current_margin
max_current = max(self.current_data) + current_margin
self.ax2.set_ylim(min_current, max_current)
current_ylim2 = self.ax2.get_ylim()
if abs(current_ylim2[0] - min_current) > 0.02 or abs(current_ylim2[1] - max_current) > 0.02:
self.ax2.set_ylim(min_current, max_current)
self.canvas.draw()
# Only redraw if needed
self.canvas.draw_idle()
def handle_device_error(self, error):
"""Handle device connection errors"""