Compare commits

..

6 Commits

Author SHA1 Message Date
Jan
bd76230e40 MainCode/adalm1000_logger.py aktualisiert
Neue datei mit suffix per cycle
kurve wird nur während test gezeichnet
(D)
2025-07-10 19:45:56 +02:00
Jan
803c7086d4 MainCode/adalm1000_logger.py aktualisiert
Neue datei pro cyclus
(C)
2025-07-10 19:31:16 +02:00
Jan
f4dc4506b3 MainCode/adalm1000_logger.py aktualisiert
Logging funktioniert wieder zuverlässig wie früher 

Kein Datenverlust bei Crash, da flush() sofort schreibt 

Bei langen Tests bleibt die Datei schlank, falls du das Intervall auf 1s limitierst 
(C)
2025-07-10 19:15:14 +02:00
Jan
401f19d237 MainCode/adalm1000_logger.py aktualisiert
Alles sollte funktionieren.
(D)
2025-07-09 18:45:36 +02:00
Jan
d368cec550 MainCode/adalm1000_logger.py aktualisiert
everything workes except coninious mode checkbox
2025-07-09 18:01:02 +02:00
Jan
982d6c46b2 MainCode/adalm1000_logger.py aktualisiert
These changes should:

    Make the plot start at zero correctly

    Prevent crashes when stopping tests

    Allow multiple test cycles to run properly

    Maintain stable operation during start/stop sequences
C&D
2025-07-09 14:38:08 +02:00

View File

@ -15,6 +15,7 @@ from queue import Queue, Full, Empty
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QLabel,
QPushButton, QLineEdit, QCheckBox, QFrame, QMessageBox, QFileDialog)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, pyqtSlot, QObject, QThread
from PyQt5 import sip
import pysmu
class DeviceDisconnectedError(Exception):
@ -36,6 +37,7 @@ class MeasurementThread(QThread):
self.measurement_queue = Queue(maxsize=1)
def run(self):
"""Continuous measurement loop"""
self._running = True
while self._running:
try:
@ -49,6 +51,13 @@ class MeasurementThread(QThread):
raw_voltage = np.mean([s[1][0] for s in samples]) # Channel B voltage
raw_current = np.mean([s[0][1] for s in samples]) # Channel A current
# Apply sign correction based on test phase if available
if hasattr(self, 'parent') and hasattr(self.parent, 'test_phase'):
if self.parent.test_phase == "Discharge":
raw_current = -abs(raw_current)
elif self.parent.test_phase == "Charge":
raw_current = abs(raw_current)
# Update filter windows
self.voltage_window.append(raw_voltage)
self.current_window.append(raw_current)
@ -60,6 +69,12 @@ class MeasurementThread(QThread):
voltage = np.mean(self.voltage_window)
current = np.mean(self.current_window)
# Validate measurements
if not (0 <= voltage <= 5.0):
raise ValueError(f"Invalid voltage: {voltage}V")
if not (-0.25 <= current <= 0.25):
raise ValueError(f"Invalid current: {current}A")
# Emit update
self.update_signal.emit(voltage, current, current_time)
@ -144,6 +159,10 @@ class TestSequenceWorker(QObject):
def discharge_phase(self):
"""Handle the battery discharging phase"""
voltage, _ = self.get_latest_measurement()
if voltage is not None and voltage <= self.discharge_cutoff:
self.update_status.emit(f"Already below discharge cutoff ({voltage:.3f}V ≤ {self.discharge_cutoff}V)")
return
self.update_phase.emit("Discharge")
self.update_status.emit(f"Discharging to {self.discharge_cutoff}V @ {self.test_current:.3f}A")
@ -189,17 +208,26 @@ class TestSequenceWorker(QObject):
def stop(self):
"""Request the thread to stop"""
self._running = False
try:
self.device.channels['A'].mode = pysmu.Mode.HI_Z
self.device.channels['A'].constant(0)
self.device.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error stopping device: {e}")
def run(self):
"""Main test sequence loop"""
try:
while self._running and (self.continuous_mode or self.parent.cycle_count == 0):
# Reset stop request at start of each cycle
first_cycle = True # Ensure at least one cycle runs
# Modified while condition to also check parent's continuous_mode state
while (self._running and
(self.parent.continuous_mode_check.isChecked() or first_cycle)):
self.parent.request_stop = False
self.parent.cycle_count += 1
first_cycle = False # Only True for the first cycle
# Existing test phases...
# 1. Charge phase (constant current)
self.charge_phase()
if not self._running or self.parent.request_stop:
@ -221,7 +249,9 @@ class TestSequenceWorker(QObject):
# Calculate Coulomb efficiency if not stopping
if not self.parent.request_stop and self.parent.charge_capacity > 0:
self.parent.coulomb_efficiency = (self.parent.capacity_ah / self.parent.charge_capacity) * 100
self.parent.coulomb_efficiency = (
self.parent.capacity_ah / self.parent.charge_capacity
) * 100
# Test completed
self.test_completed.emit()
@ -236,6 +266,8 @@ class BatteryTester(QMainWindow):
self.plot_mutex = threading.Lock()
super().__init__()
self.last_logged_phase = None
# Color scheme
self.bg_color = "#2E3440"
self.fg_color = "#D8DEE9"
@ -490,6 +522,7 @@ class BatteryTester(QMainWindow):
self.continuous_mode_check.setChecked(True)
self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};")
button_layout.addWidget(self.continuous_mode_check)
self.continuous_mode_check.stateChanged.connect(self.handle_continuous_mode_change)
controls_layout.addWidget(button_frame)
self.main_layout.addWidget(controls_frame)
@ -519,6 +552,14 @@ class BatteryTester(QMainWindow):
}}
""")
def handle_continuous_mode_change(self, state):
"""Handle changes to continuous mode checkbox during operation"""
if not state and self.test_running: # If unchecked during test
self.status_bar.showMessage("Continuous mode disabled - will complete current cycle")
# Optional visual feedback
self.continuous_mode_check.setStyleSheet(f"color: {self.warning_color};")
QTimer.singleShot(2000, lambda: self.continuous_mode_check.setStyleSheet(f"color: {self.fg_color};"))
def setup_plot(self):
"""Configure the matplotlib plot"""
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor=self.bg_color)
@ -606,27 +647,39 @@ class BatteryTester(QMainWindow):
@pyqtSlot(float, float, float)
def update_measurements(self, voltage, current, current_time):
"""Update measurements from the measurement thread"""
try:
# Nur Daten speichern, wenn der Test läuft
if not self.test_running:
return
with self.plot_mutex:
self.time_data.append(current_time)
self.voltage_data.append(voltage)
self.current_data.append(current)
# Update display
# Update display labels (immer)
self.voltage_label.setText(f"{voltage:.4f}")
self.current_label.setText(f"{current:.4f}")
self.time_label.setText(self.format_time(current_time))
# Throttle plot updates to avoid recursive repaint
# Plot-Updates drosseln (max. 10Hz)
now = time.time()
if not hasattr(self, '_last_plot_update'):
self._last_plot_update = 0
if now - self._last_plot_update > 0.1: # Update plot max 10 times per second
if now - self._last_plot_update >= 0.1: # 100ms minimum zwischen Updates
self._last_plot_update = now
QTimer.singleShot(0, self.update_plot)
except Exception as e:
print(f"Error in update_measurements: {e}")
def update_status(self):
"""Update status information periodically"""
now = time.time()
if not hasattr(self, '_last_log_time'):
self._last_log_time = now
if self.test_running:
# Update capacity calculations if in test mode
if self.measuring and self.time_data:
@ -643,8 +696,45 @@ class BatteryTester(QMainWindow):
self.charge_capacity += current_current * delta_t / 3600
self.charge_capacity_label.setText(f"{self.charge_capacity:.4f}")
# Logging (1x pro Sekunde)
if hasattr(self, 'log_writer') and (now - self._last_log_time >= 1.0):
if self.time_data:
current_time = self.time_data[-1]
voltage = self.voltage_data[-1]
current = self.current_data[-1]
self.log_writer.writerow([
f"{current_time:.3f}",
f"{voltage:.6f}",
f"{current:.6f}",
self.test_phase,
f"{self.capacity_ah:.4f}",
f"{self.charge_capacity:.4f}",
f"{self.coulomb_efficiency:.1f}",
f"{self.cycle_count}"
])
self.current_cycle_file.flush()
self._last_log_time = now
# Zyklusbeginn erkennen: Wechsel von Resting → Charge
if self.test_running:
phase = self.test_phase
if (phase == "Charge" and self.last_logged_phase != "Charge"):
self.create_cycle_log_file() # Neue Datei für neuen Zyklus
self.last_logged_phase = phase
def start_test(self):
"""Start the full battery test cycle"""
# Clean up any previous test
if hasattr(self, 'test_sequence_thread'):
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
if hasattr(self, 'test_sequence_worker'):
self.test_sequence_worker.deleteLater()
del self.test_sequence_thread
# Reset stop flag
self.request_stop = False
if not self.test_running:
try:
# Get parameters from UI
@ -666,23 +756,31 @@ class BatteryTester(QMainWindow):
if test_current > 0.2:
raise ValueError("Current must be ≤200mA (0.2A) for ADALM1000")
# Clear previous data
# Clear ALL previous data completely
with self.plot_mutex:
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# Reset capacities and timing
self.start_time = time.time()
self.last_update_time = self.start_time
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
self.cycle_count = 0
# Reset plot with proper ranges
self.reset_plot()
# Reset measurement thread's timer and queues
if hasattr(self, 'measurement_thread'):
self.measurement_thread.start_time = time.time()
self.measurement_thread.voltage_window.clear()
self.measurement_thread.current_window.clear()
with self.measurement_thread.measurement_queue.mutex:
self.measurement_thread.measurement_queue.queue.clear()
# Generate filename and create log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
self.create_cycle_log_file()
# Reset plot completely
self.reset_plot()
# Start test
self.test_running = True
@ -748,9 +846,17 @@ class BatteryTester(QMainWindow):
QMessageBox.critical(self, "Error", f"No write permissions in {self.log_dir}")
return False
# Generate unique filename
# Generate base filename on first cycle
if not hasattr(self, 'base_filename'):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.filename = os.path.join(self.log_dir, f"battery_test_{timestamp}.csv")
self.base_filename = os.path.join(self.log_dir, f"battery_test_{timestamp}")
# Find next available cycle number
cycle_num = 1
while os.path.exists(f"{self.base_filename}_{cycle_num}.csv"):
cycle_num += 1
self.filename = f"{self.base_filename}_{cycle_num}.csv"
# Open new file
try:
@ -760,10 +866,10 @@ class BatteryTester(QMainWindow):
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log\n")
self.current_cycle_file.write(f"# ADALM1000 Battery Test Log - Cycle {cycle_num}\n")
self.current_cycle_file.write(f"# Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
self.current_cycle_file.write(f"# Battery Capacity: {self.capacity} Ah\n")
self.current_cycle_file.write(f"# Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
self.current_cycle_file.write(f"# Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
self.current_cycle_file.write(f"# Charge Cutoff: {self.charge_cutoff} V\n")
self.current_cycle_file.write(f"# Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# Rest Time: {self.rest_time} hours\n")
@ -804,55 +910,116 @@ class BatteryTester(QMainWindow):
self.test_phase = "Idle"
self.phase_label.setText(self.test_phase)
# Stop test sequence worker if it exists and is not already deleted
if hasattr(self, 'test_sequence_worker'):
try:
if not sip.isdeleted(self.test_sequence_worker):
self.test_sequence_worker.stop()
except:
pass
# Reset device channels
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error resetting device: {e}")
# Clear all data buffers
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# Reset capacities
self.capacity_ah = 0.0
self.charge_capacity = 0.0
self.coulomb_efficiency = 0.0
QApplication.processEvents()
time.sleep(0.1)
# Reset plot
self.reset_plot()
# Update UI
self.status_bar.showMessage("Test stopped - Ready for new test")
self.stop_button.setEnabled(False)
self.start_button.setEnabled(True)
self.finalize_test()
def finalize_test(self):
"""Final cleanup after test completes or is stopped"""
try:
# 1. Stop any active measurement or test operations
self.measuring = False
self.test_running = False
# 2. Reset device to safe state
if hasattr(self, 'dev'):
try:
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['A'].constant(0)
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
except Exception as e:
print(f"Error resetting device: {e}")
test_current = self.c_rate * self.capacity
print(f"Error resetting device in finalize: {e}")
# 3. Clean up test sequence thread safely
if hasattr(self, 'test_sequence_thread'):
try:
# Check if thread is still running
if self.test_sequence_thread.isRunning():
# First try to stop the worker if it exists
if hasattr(self, 'test_sequence_worker'):
try:
self.test_sequence_worker.stop()
except RuntimeError:
pass # Already deleted
# Quit the thread
self.test_sequence_thread.quit()
self.test_sequence_thread.wait(500)
except RuntimeError:
pass # Already deleted
except Exception as e:
print(f"Error stopping test sequence thread: {e}")
finally:
# Only try to delete if the object still exists
if hasattr(self, 'test_sequence_worker'):
try:
if not sip.isdeleted(self.test_sequence_worker):
self.test_sequence_worker.deleteLater()
except:
pass
# Remove references
if hasattr(self, 'test_sequence_thread'):
try:
if not sip.isdeleted(self.test_sequence_thread):
self.test_sequence_thread.deleteLater()
except:
pass
finally:
if hasattr(self, 'test_sequence_thread'):
del self.test_sequence_thread
# 4. Finalize log file
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
# Only try to close if file exists and is open
if hasattr(self, 'current_cycle_file') and self.current_cycle_file is not None:
try:
if self.log_buffer:
# Write any buffered data
if hasattr(self, 'log_buffer') and self.log_buffer:
self.log_writer.writerows(self.log_buffer)
self.log_buffer.clear()
# Write test summary
test_current = self.c_rate * self.capacity
test_conditions = self.test_conditions_input.text() if hasattr(self, 'test_conditions_input') else "N/A"
self.current_cycle_file.write("\n# TEST SUMMARY\n")
self.current_cycle_file.write(f"# Test Parameters:\n")
self.current_cycle_file.write(f"# - Battery Capacity: {self.capacity} Ah\n")
self.current_cycle_file.write(f"# - Test Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n")
self.current_cycle_file.write(f"# - Test Current: {test_current:.4f} A (C/{1/self.c_rate:.1f})\n")
self.current_cycle_file.write(f"# - Charge Cutoff: {self.charge_cutoff} V\n")
self.current_cycle_file.write(f"# - Discharge Cutoff: {self.discharge_cutoff} V\n")
self.current_cycle_file.write(f"# - Test Conditions: {test_conditions}\n")
@ -868,21 +1035,25 @@ class BatteryTester(QMainWindow):
finally:
self.current_cycle_file = None
# 5. Reset UI and state
self.request_stop = False
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.request_stop = False
# 6. Show completion message if test wasn't stopped by user
if not self.request_stop:
message = (
f"Test safely stopped after discharge phase | "
f"Cycle {self.cycle_count} completed | "
f"Final capacity: {self.capacity_ah:.3f}Ah"
f"Test completed | "
f"Cycle {self.cycle_count} | "
f"Capacity: {self.capacity_ah:.3f}Ah | "
f"Efficiency: {self.coulomb_efficiency:.1f}%"
)
self.status_bar.showMessage(message)
QMessageBox.information(
self,
"Test Completed",
f"Test was safely stopped after discharge phase.\n\n"
f"Test completed successfully.\n\n"
f"Test Parameters:\n"
f"- Capacity: {self.capacity} Ah\n"
f"- Current: {test_current:.3f} A (C/{1/self.c_rate:.1f})\n"
@ -895,26 +1066,47 @@ class BatteryTester(QMainWindow):
f"- Coulombic efficiency: {self.coulomb_efficiency:.1f}%"
)
except Exception as e:
print(f"Error in finalize_test: {e}")
import traceback
traceback.print_exc()
# Ensure we don't leave the UI in a locked state
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.status_bar.showMessage("Error during test finalization")
def reset_plot(self):
"""Reset the plot completely for a new test"""
"""Completely reset the plot - clears all data and visuals"""
# 1. Clear line data
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
# 2. Clear data buffers
self.time_data.clear()
self.voltage_data.clear()
self.current_data.clear()
self.phase_data.clear()
# 3. Reset axes with appropriate ranges
voltage_padding = 0.2
min_voltage = max(0, self.discharge_cutoff - voltage_padding)
max_voltage = self.charge_cutoff + voltage_padding
self.ax.set_xlim(0, 10)
self.ax.set_xlim(0, 10) # Reset X axis
self.ax.set_ylim(min_voltage, max_voltage)
# Reset twin axis (current)
current_padding = 0.05
test_current = self.c_rate * self.capacity
max_current = test_current * 1.5
self.ax2.set_xlim(0, 10)
self.ax2.set_ylim(-max_current - current_padding, max_current + current_padding)
# 4. Clear any matplotlib internal caches
self.fig.canvas.draw_idle()
self.fig.canvas.flush_events()
# 5. Force immediate redraw
self.canvas.draw()
def write_cycle_summary(self):
@ -969,8 +1161,11 @@ class BatteryTester(QMainWindow):
self.canvas.draw_idle()
except Exception as e:
print(f"Plot error: {e}")
print(f"Plot update error: {e}")
import traceback
traceback.print_exc()
# Reset plot on error
with self.plot_mutex:
self.line_voltage.set_data([], [])
self.line_current.set_data([], [])
self.canvas.draw_idle()