Compare commits

..

25 Commits

Author SHA1 Message Date
Jan
bd76230e40 MainCode/adalm1000_logger.py aktualisiert
Neue datei mit suffix per cycle
kurve wird nur während test gezeichnet
(D)
2025-07-10 19:45:56 +02:00
Jan
803c7086d4 MainCode/adalm1000_logger.py aktualisiert
Neue datei pro cyclus
(C)
2025-07-10 19:31:16 +02:00
Jan
f4dc4506b3 MainCode/adalm1000_logger.py aktualisiert
Logging funktioniert wieder zuverlässig wie früher 

Kein Datenverlust bei Crash, da flush() sofort schreibt 

Bei langen Tests bleibt die Datei schlank, falls du das Intervall auf 1s limitierst 
(C)
2025-07-10 19:15:14 +02:00
Jan
401f19d237 MainCode/adalm1000_logger.py aktualisiert
Alles sollte funktionieren.
(D)
2025-07-09 18:45:36 +02:00
Jan
d368cec550 MainCode/adalm1000_logger.py aktualisiert
everything workes except coninious mode checkbox
2025-07-09 18:01:02 +02:00
Jan
982d6c46b2 MainCode/adalm1000_logger.py aktualisiert
These changes should:

    Make the plot start at zero correctly

    Prevent crashes when stopping tests

    Allow multiple test cycles to run properly

    Maintain stable operation during start/stop sequences
C&D
2025-07-09 14:38:08 +02:00
Jan
0305463857 MainCode/adalm1000_logger.py aktualisiert
1. Verbesserte auto_scale_axes() Methode:
(D)
2025-06-13 18:21:17 +02:00
Jan
12fb82ce80 MainCode/adalm1000_logger.py aktualisiert
Got it working, from the push before.
(Deepseek)
2025-06-13 18:16:00 +02:00
Jan
81ccc8ccd7 MainCode/adalm1000_logger.py aktualisiert
Test phase display issue: When continuous mode is disabled and the discharge voltage is already below cutoff, the phase still shows "Discharge" instead of switching to "Idle".

    Graph update issue: The plot sometimes fails to update when starting a new test series.
(Deepseek)
2025-06-13 18:04:59 +02:00
Jan
1a6ebb2fab MainCode/adalm1000_logger.py aktualisiert
Clear log_buffer after every cycle, because logs are getting smaler the more cycles.
(D)
2025-05-28 17:06:24 +02:00
Jan
c697388157 MainCode/adalm1000_logger.py aktualisiert
 User unchecks Continuous Mode during discharge:

    Status updates immediately: "Continuous Mode disabled..."

    Discharge continues until cutoff voltage.

    Test stops after discharge (no rest phase or new cycle).

 User leaves Continuous Mode enabled:

    Test continues looping indefinitely (original behavior).
(D)
2025-05-27 22:09:52 +02:00
Jan
34be33434f MainCode/adalm1000_logger.py aktualisiert
This makes the stop operation more thorough and provides better visual feedback to the user. The plot will now clearly show that the test has been stopped and reset.
(D)
2025-05-26 13:38:12 +02:00
Jan
876fecb466 MainCode/adalm1000_logger.py aktualisiert
This change ensures that:

    The plot data buffers are cleared

    The plot lines are reset to empty data

    The axes are reset to default ranges (0-1)

    The canvas is redrawn to show the cleared plot
(Deepseek)
2025-05-26 13:35:44 +02:00
Jan
25322bc59d MainCode/adalm1000_logger.py aktualisiert
Safer:

    Application shutdown

    Thread cleanup

    Error recovery

    Reconnection scenarios
(Deepseek)
2025-05-25 17:08:26 +02:00
Jan
24cc224138 MainCode/adalm1000_logger.py aktualisiert
Separate Log Files: You've successfully implemented separate log files for each cycle without cycle numbers in filenames.

    1Hz UI Updates: The measurement display updates at 1Hz as requested.

    Throttled Plot Updates: Plot updates are properly throttled to prevent lag.

    Error Handling: Good error handling throughout the code.

    Thread Safety: Proper use of threading for measurements and test sequences.
(Deepseek)
2025-05-24 13:23:14 +02:00
Jan
1c928e22fc MainCode/adalm1000_logger.py aktualisiert
Show an error message when disconnected

    Automatically attempt to reconnect

    Keep retrying until successful

    Provide clear status updates throughout the process
2025-05-24 01:36:51 +02:00
Jan
6db656c71b MainCode/adalm1000_logger.py aktualisiert
change cyclecount to beginning

This will ensure your plot:

    Starts with a reasonable view of your expected voltage range

    Maintains good visibility of the key areas (charge/discharge cutoffs)

    Doesn't zoom out too far when there are measurement spikes

    Has better overall framing of the data
2025-05-24 01:33:36 +02:00
Jan
f50a641211 MainCode/adalm1000_logger.py aktualisiert
Make all measurements update live in the GUI

    Allow the stop button to immediately halt the test at any point in the cycle

    Still maintain proper cleanup and data saving when stopped
(Deepseek)
2025-05-24 01:20:08 +02:00
Jan
13148a64de MainCode/adalm1000_logger.py aktualisiert
Reducing unnecessary GUI updates

    Implementing buffered file I/O

    Throttling plot updates

    Only updating display elements when values change

    Using more efficient drawing methods for the plot
(Deepseek)
2025-05-23 23:41:21 +02:00
Jan
06c99bae38 MainCode/adalm1000_logger.py aktualisiert
Der Stop-Button setzt nur ein Flag (request_stop) statt sofort zu stoppen

    Die Entladephase überprüft dieses Flag und bricht ab, wenn es gesetzt ist

    Nach der Entladephase wird der Test nur beendet, wenn request_stop True ist

    Neue finalize_test Methode für konsistente Aufräumarbeiten

    Klare Statusmeldungen, die den Stop-Request anzeigen
(Deepseek)
2025-05-23 23:34:30 +02:00
Jan
a82cc2c981 MainCode/adalm1000_logger.py aktualisiert
Die Statusmeldung zeigt jetzt klar an, ob:
        Der Test normal weiterläuft ("Next: Charge to X.XV")
        Auf einen Interrupt gewartet wird ("Waiting for interrupt"
    Der Code prüft häufiger auf Interrupts, besonders zwischen den einzelnen Phasen
    Bei einem Interrupt wird der aktuelle Zyklus noch sauber zu Ende geführt, aber kein neuer gestartet
    Die Abschlussmeldung zeigt jetzt an, nach welchem Zyklus der Test unterbrochen wurde
(Deepseek)
2025-05-23 20:55:06 +02:00
Jan
07b86664c0 MainCode/adalm1000_logger.py aktualisiert
Cycling added
    Ich werde eine neue Variable continuous_mode hinzufügen, um den kontinuierlichen Betrieb zu steuern
    Die run_test_sequence() Methode wird modifiziert, um in einer Schleife zu laufen
    Die stop_test() Methode wird erweitert, um den kontinuierlichen Modus zu beenden
(Deepseek)
2025-05-23 20:38:50 +02:00
Jan
516e2a44b2 MainCode/adalm1000_logger.py aktualisiert
Timing angepasst (ChatGPT)
2025-05-23 20:27:54 +02:00
Jan
a9a871bff5 revert 165e27204b80d6368b69532100a7487eb48c853c
revert MainCode/adalm1000_logger.py aktualisiert

Charge Time handeling geändert (ChatGPT)
2025-05-23 20:26:55 +02:00
Jan
165e27204b MainCode/adalm1000_logger.py aktualisiert
Charge Time handeling geändert (ChatGPT)
2025-05-23 20:22:44 +02:00
2 changed files with 1178 additions and 1401 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,812 +0,0 @@
# -*- coding: utf-8 -*-
import os
import time
import csv
import json
import queue
import threading
from datetime import datetime
import tkinter as tk
from tkinter import ttk, messagebox
import pysmu
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
class DeviceDisconnectedError(Exception):
pass
class ModernADALM1000Logger:
def __init__(self, root):
# Color scheme
self.bg_color = "#2E3440"
self.fg_color = "#D8DEE9"
self.accent_color = "#5E81AC"
self.warning_color = "#BF616A"
self.success_color = "#A3BE8C"
# Main window configuration
self.root = root
self.root.title("ADALM1000 - Modern Logger")
self.root.geometry("1000x800")
self.root.minsize(800, 700)
self.root.configure(bg=self.bg_color)
# Device and measurement state
self.session_active = False
self.measuring = False
self.interval = 1.0
self.shunt_resistor = 0.1
self.log_dir = os.path.expanduser("~/adalm1000/logs")
self.config_dir = os.path.expanduser("~/adalm1000/config")
os.makedirs(self.config_dir, exist_ok=True)
# Calibration data
self.calibration = {'A': {'gain': 1.0, 'offset': 0.0}, 'B': {'gain': 1.0, 'offset': 0.0}}
self.cal_applied = tk.BooleanVar(value=True)
# Threading and data
self.device_lock = threading.Lock()
self.measurement_event = threading.Event()
self.data_queue = queue.Queue(maxsize=100)
# Data buffers using numpy for better performance
self.time_data = []
self.voltage_a_data = []
self.voltage_b_data = []
self.current_data = []
self.data_index = 0
# Initialize UI and device
self.setup_ui()
self.load_calibration()
self.init_device()
self.root.after(100, self.update_calibration_display)
# Ensure proper cleanup
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
def handle_device_error(self, error):
"""Handle device connection errors and update UI accordingly"""
error_msg = str(error)
print(f"Device error: {error_msg}") # Log to console
# Update UI in safe thread context
self.root.after_idle(lambda: self.status_light.itemconfig(self.status_indicator, fill='red'))
# Safe status light update
if hasattr(self, 'status_light') and hasattr(self, 'status_indicator'):
self.status_light.itemconfig(self.status_indicator, fill='red')
# Update connection label and status
self.connection_label.config(text="Disconnected")
if "No ADALM1000 detected" in error_msg:
self.status_var.set("Device not found - check USB connection")
else:
self.status_var.set(f"Device error: {error_msg}")
# Disable controls
self.session_active = False
self.measuring = False
if hasattr(self, 'start_button'):
self.start_button.config(state=tk.DISABLED, text="START LOGGING")
if hasattr(self, 'calibrate_button'):
self.calibrate_button.config(state=tk.DISABLED)
# Clear plot + buffers
self.line_a.set_data([], [])
self.line_current.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax2.set_xlim(0, 1)
self.canvas.draw()
self.time_data.clear()
self.voltage_a_data.clear()
self.voltage_b_data.clear()
self.current_data.clear()
self.data_index = 0
# Attempt to clean up the session
if hasattr(self, 'session'):
try:
self.session.end()
except:
pass
del self.session
# Enable reconnect option
if hasattr(self, 'reconnect_btn'):
self.reconnect_btn.config(state=tk.NORMAL)
# Show popup message if window still exists
if self.root.winfo_exists():
messagebox.showerror(
"Device Connection Error",
f"Could not connect to ADALM1000:\n\n{error_msg}\n\n"
"1. Check USB cable connection\n"
"2. Try the Reconnect button\n"
"3. Restart the application if problem persists"
)
def setup_ui(self):
"""Configure the user interface"""
self.style = ttk.Style()
self.style.theme_use('clam')
# Configure styles
self.style.configure('.', background=self.bg_color, foreground=self.fg_color)
self.style.configure('TFrame', background=self.bg_color)
self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color)
self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color,
padding=6, font=('Helvetica', 10, 'bold'))
self.style.map('TButton',
background=[('active', self.accent_color), ('disabled', '#4C566A')],
foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')])
self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color)
self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color)
self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold'))
self.style.configure('Status.TLabel', font=('Helvetica', 10))
self.style.configure('Warning.TButton', background=self.warning_color)
self.style.configure('TLabelframe', background=self.bg_color, foreground=self.accent_color)
# Main layout
self.content_frame = ttk.Frame(self.root)
self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Header area
header_frame = ttk.Frame(self.content_frame)
header_frame.pack(fill=tk.X, pady=(0, 20))
ttk.Label(header_frame, text="ADALM1000 Measurement Logger", style='Header.TLabel').pack(side=tk.LEFT)
# Status indicator
self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0)
self.status_light.pack(side=tk.RIGHT, padx=10)
self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red')
self.connection_label = ttk.Label(header_frame, text="Disconnected")
self.connection_label.pack(side=tk.RIGHT)
# Reconnect button
self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device)
self.reconnect_btn.pack(side=tk.RIGHT, padx=10)
# Measurement display
display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15)
display_frame.pack(fill=tk.BOTH, expand=False)
# Measurement values
measurement_labels = [
("Circuit Voltage (CH A)", "V"),
("Reference Voltage (CH B)", "V"),
("Shunt Voltage (A - B)", "V"),
("Current", "A")
]
for i, (label, unit) in enumerate(measurement_labels):
ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i, column=0, sticky=tk.W, pady=5)
value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel')
value_label.grid(row=i, column=1, sticky=tk.W, padx=10)
ttk.Label(display_frame, text=unit).grid(row=i, column=2, sticky=tk.W)
if i == 0:
self.voltage_a_label = value_label
elif i == 1:
self.voltage_b_ref_label = value_label # NEW: CH B direct
elif i == 2:
self.shunt_voltage_label = value_label # NEW: A - B
else:
self.current_label = value_label
# Control area (single row now)
controls_frame = ttk.Frame(self.content_frame)
controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0)
# Left side: Start + Interval + Shunt
left_control_frame = ttk.Frame(controls_frame)
left_control_frame.pack(side=tk.LEFT)
self.start_button = ttk.Button(left_control_frame, text="START LOGGING", command=self.toggle_measurement)
self.start_button.pack(side=tk.LEFT, padx=(0, 12))
ttk.Label(left_control_frame, text="Interval:").pack(side=tk.LEFT, padx=(0, 4))
self.interval_entry = ttk.Entry(left_control_frame, width=6)
self.interval_entry.pack(side=tk.LEFT)
self.interval_entry.insert(0, "1.0")
ttk.Label(left_control_frame, text="s").pack(side=tk.LEFT, padx=(4, 12))
ttk.Label(left_control_frame, text="Shunt:").pack(side=tk.LEFT, padx=(0, 4))
self.shunt_entry = ttk.Entry(left_control_frame, width=6)
self.shunt_entry.pack(side=tk.LEFT)
self.shunt_entry.insert(0, "0.1")
ttk.Label(left_control_frame, text="Ω").pack(side=tk.LEFT, padx=(4, 12))
# Right side: Calibration stuff
cal_control_frame = ttk.Frame(controls_frame)
cal_control_frame.pack(side=tk.RIGHT)
self.cal_check = ttk.Checkbutton(cal_control_frame, text="Apply Calibration", variable=self.cal_applied, command=self.toggle_calibration)
self.cal_check.pack(side=tk.LEFT, padx=(0, 10))
self.cal_status = ttk.Label(cal_control_frame, text="No calibration", foreground=self.warning_color)
self.cal_status.pack(side=tk.LEFT, padx=(0, 10))
self.calibrate_button = ttk.Button(cal_control_frame, text="CALIBRATE", command=self.run_calibration)
self.calibrate_button.pack(side=tk.LEFT)
# Plot area
self.plot_frame = ttk.Frame(self.content_frame)
self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5))
self.setup_plot()
# Status bar
self.status_var = tk.StringVar()
self.status_var.set("Ready")
self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W)
self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28)
def setup_plot(self):
"""Configure the matplotlib plot"""
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
self.fig.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.15)
self.ax = self.fig.add_subplot(111)
self.ax.set_facecolor('#3B4252')
# Voltage A (left axis)
self.line_a, = self.ax.plot([], [], color='#00BFFF', label='Voltage A (V)', linewidth=2)
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
# Current (right axis)
self.ax2 = self.ax.twinx()
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
self.ax2.set_ylabel("Current (A)", color='r')
self.ax2.tick_params(axis='y', labelcolor='r')
self.ax.set_xlabel('Time (s)', color=self.fg_color)
self.ax.set_title('Circuit Voltage and Current', color=self.fg_color)
self.ax.tick_params(axis='x', colors=self.fg_color)
self.ax.grid(True, color='#4C566A')
self.ax.legend(loc='upper left')
self.ax2.legend(loc='upper right')
# Embed plot
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
self.canvas.draw()
canvas_widget = self.canvas.get_tk_widget()
canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0)
canvas_widget.pack(side=tk.TOP, fill=tk.X, expand=True, pady=(10, 0))
def init_device(self):
"""Initialize the ADALM1000 device"""
try:
if hasattr(self, 'session'):
try:
if self.session_active:
self.session.end()
del self.session
except Exception as e:
print(f"Cleanup error: {e}")
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
if not self.session.devices:
raise Exception("No ADALM1000 detected - check connections")
self.dev = self.session.devices[0]
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
self.session.start(0)
self.status_light.itemconfig(self.status_indicator, fill='green')
self.connection_label.config(text="Connected")
self.status_var.set("Device connected | Ready to measure")
self.session_active = True
self.start_button.config(state=tk.NORMAL)
self.calibrate_button.config(state=tk.NORMAL)
if hasattr(self, 'reconnect_btn'):
self.reconnect_btn.config(state=tk.DISABLED)
if not self.measurement_event.is_set():
self.start_measurement_thread()
except Exception as e:
self.handle_device_error(e)
def reconnect_device(self):
"""Reconnect the device with proper cleanup"""
self.status_var.set("Attempting to reconnect...")
self.measuring = False
self.measurement_event.clear()
# Wait for threads to finish
if hasattr(self, 'measurement_thread'):
self.measurement_thread.join(timeout=1.0)
if hasattr(self, 'queue_listener'):
self.queue_listener.join(timeout=0.5)
# Reset plot + buffers before reinitializing
self.handle_device_error("Reconnecting...")
# Try to reinitialize device
self.init_device()
def start_measurement_thread(self):
"""Start the measurement thread"""
if not self.measurement_event.is_set():
self.measurement_event.set()
self.measurement_thread = threading.Thread(
target=self.measure,
daemon=True
)
self.measurement_thread.start()
self.start_queue_listener()
def start_queue_listener(self):
"""Start listening to the data queue"""
self.queue_listener = threading.Thread(
target=self.listen_to_data_queue,
daemon=True
)
self.queue_listener.start()
def listen_to_data_queue(self):
"""Process data from the queue"""
while self.measurement_event.is_set():
try:
data = self.data_queue.get(timeout=0.1)
self.process_data_item(data)
except queue.Empty:
continue
def process_data_item(self, data):
"""Process a single data item from the queue"""
if data[0] == 'error':
self.handle_error(data[1])
else:
current_time, voltage_a, voltage_b, current = data
shunt_voltage = voltage_a - voltage_b
# Always append new data
self.time_data.append(current_time)
self.voltage_a_data.append(voltage_a)
self.voltage_b_data.append(voltage_b)
self.current_data.append(current)
self.data_index += 1
# Update UI
self.update_measurement_display(voltage_a, voltage_b, shunt_voltage, current)
# Update plot periodically
if self.data_index % 5 == 0 or self.data_index == 1:
self.root.after_idle(self.update_plot)
# Save data if logging
if self.measuring:
self.save_measurement_data(voltage_a, voltage_b, shunt_voltage, current)
def measure(self):
"""Measurement loop using interval-based averaging"""
self.start_time = time.time()
next_time = self.start_time
raw_va = []
raw_vb = []
while self.measurement_event.is_set():
try:
with self.device_lock:
if not self.session_active:
self.data_queue.put(('error', "Device disconnected during measurement"))
break
samples = self.dev.read(5, 500, True)
if not samples or not isinstance(samples[0], tuple) or len(samples[0]) != 2:
raise DeviceDisconnectedError("ADALM1000 is no longer responding")
raw_va.extend([s[0][0] for s in samples])
raw_vb.extend([s[1][0] for s in samples])
now = time.time()
if now >= next_time:
if self.cal_applied.get():
raw_va_cal = [(v * self.calibration['A']['gain']) + self.calibration['A']['offset'] for v in raw_va]
raw_vb_cal = [(v * self.calibration['B']['gain']) + self.calibration['B']['offset'] for v in raw_vb]
else:
raw_va_cal = raw_va
raw_vb_cal = raw_vb
voltage_a = sum(raw_va_cal) / len(raw_va_cal) if raw_va_cal else 0
voltage_b = sum(raw_vb_cal) / len(raw_vb_cal) if raw_vb_cal else 0
shunt_voltage = voltage_a - voltage_b
current = shunt_voltage / self.shunt_resistor if self.shunt_resistor > 0 else 0
current_time = now - self.start_time
self.data_queue.put((current_time, voltage_a, voltage_b, current))
next_time += self.interval
raw_va.clear()
raw_vb.clear()
time.sleep(0.01)
except DeviceDisconnectedError as e:
self.root.after(0, lambda err=e: self.handle_device_error(err))
break
except Exception as e:
self.root.after(0, lambda err=e: self.handle_device_error(f"Measurement error: {err}"))
break
def toggle_measurement(self):
"""Start/Stop measurement logging"""
if not self.measuring:
try:
self.interval = max(0.1, float(self.interval_entry.get()))
self.shunt_resistor = max(0.001, float(self.shunt_entry.get()))
except ValueError as e:
messagebox.showerror("Input Error", f"Invalid number: {str(e)}")
return
# Clear all data buffers when starting new measurement
self.time_data.clear()
self.voltage_a_data.clear()
self.voltage_b_data.clear()
self.current_data.clear()
# Setup new log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.filename = os.path.join(self.log_dir, f"measurement_{timestamp}.csv")
try:
with open(self.filename, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Time(s)", "Voltage_A(V)", "Voltage_B(V)", "Shunt_Voltage(V)", "Current(A)", "Calibration_Applied"])
except Exception as e:
messagebox.showerror("File Error", f"Cannot create file: {str(e)}")
return
# Reset data buffers
self.time_data.clear()
self.voltage_a_data.clear()
self.voltage_b_data.clear()
self.current_data.clear()
self.data_index = 0
# Start measurement
self.measuring = True
self.start_button.config(text="STOP LOGGING", style='Warning.TButton')
self.status_var.set(f"Logging to: {os.path.basename(self.filename)}")
self.start_time = time.time()
if not self.measurement_event.is_set():
self.start_measurement_thread()
else:
self.measuring = False
self.start_button.config(text="START LOGGING", style='TButton')
self.status_var.set(f"Ready | Last file: {os.path.basename(self.filename)}")
def toggle_calibration(self):
"""Toggle calibration application and update display"""
self.update_calibration_display()
def run_calibration(self):
"""Start calibration procedure with proper initialization"""
if hasattr(self, 'cal_window') and self.cal_window.winfo_exists():
self.cal_window.lift()
return
# Store current state
self.original_measuring = self.measuring
self.original_cal_state = self.cal_applied.get()
self.cal_applied.set(False)
# Create calibration window
self.cal_window = tk.Toplevel(self.root)
self.cal_window.title("Calibration")
self.cal_window.geometry("500x500")
self.cal_window.configure(bg=self.bg_color)
# Calibration instructions
ttk.Label(self.cal_window, text="ADALM1000 Calibration", style='Header.TLabel').pack(pady=10)
steps = [
"1. Short both CHA and CHB inputs",
"2. Click 'Measure Zero' when ready",
"3. Apply known voltage to CHA",
"4. Enter reference voltage and click 'Calibrate CHA'",
"5. Repeat for CHB if needed",
"6. Click 'Save Calibration' when done"
]
for step in steps:
ttk.Label(self.cal_window, text=step, style='TLabel').pack(anchor=tk.W, padx=20, pady=2)
# Calibration controls
btn_frame = ttk.Frame(self.cal_window)
btn_frame.pack(pady=10)
ttk.Button(btn_frame, text="Measure Zero",
command=self.start_zero_calibration).pack(side=tk.LEFT, padx=5)
self.ref_voltage = ttk.Entry(btn_frame, width=8)
self.ref_voltage.pack(side=tk.LEFT, padx=5)
self.ref_voltage.insert(0, "3.3")
ttk.Button(btn_frame, text="Calibrate CHA",
command=lambda: self.start_channel_calibration('A')).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="Calibrate CHB",
command=lambda: self.start_channel_calibration('B')).pack(side=tk.LEFT, padx=5)
# Save/Load buttons
btn_frame2 = ttk.Frame(self.cal_window)
btn_frame2.pack(pady=10)
self.save_cal_btn = ttk.Button(btn_frame2, text="Save Calibration",
command=self.save_calibration)
self.save_cal_btn.pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame2, text="Load Calibration",
command=self.load_calibration).pack(side=tk.LEFT, padx=5)
# Results display
self.cal_results = tk.Text(self.cal_window, height=10, width=50,
bg="#3B4252", fg=self.fg_color,
font=('Monospace', 9))
self.cal_results.pack(pady=10)
self.cal_results.insert(tk.END, "Calibration results will appear here...")
self.cal_results.config(state=tk.DISABLED)
self.cal_window.protocol("WM_DELETE_WINDOW", self.on_cal_window_close)
def start_zero_calibration(self):
"""Start zero calibration in a thread with status updates"""
self.update_cal_results("Starting zero measurement...")
threading.Thread(target=self.measure_zero, daemon=True).start()
def start_channel_calibration(self, channel):
"""Start channel calibration in a thread with validation"""
try:
ref_voltage = float(self.ref_voltage.get())
if ref_voltage <= 0:
raise ValueError("Reference voltage must be positive")
self.update_cal_results(f"Starting {channel} calibration with {ref_voltage}V reference...")
threading.Thread(
target=lambda: self.calibrate_channel(channel, ref_voltage),
daemon=True
).start()
except ValueError as e:
self.update_cal_results(f"Error: {str(e)}")
def measure_zero(self):
"""Perform zero offset measurement with averaging"""
try:
with self.device_lock:
# Take multiple samples for stability
samples = []
for _ in range(5): # Take 5 batches of samples
batch = self.dev.read(20, 500, True) # 20 samples, 500ms timeout
if batch:
samples.extend(batch)
time.sleep(0.1)
if not samples:
raise Exception("No valid samples received")
# Calculate averages
voltage_a = np.mean([s[0][0] for s in samples])
voltage_b = np.mean([s[1][0] for s in samples])
# Update calibration
self.calibration['A']['offset'] = -voltage_a
self.calibration['B']['offset'] = -voltage_b
# Update results
result_text = (
f"Zero Calibration Complete:\n"
f"Channel A Offset: {-voltage_a:.6f} V\n"
f"Channel B Offset: {-voltage_b:.6f} V\n\n"
f"Now apply known voltage and calibrate each channel."
)
self.update_cal_results(result_text)
except Exception as e:
self.update_cal_results(f"Zero Calibration Failed:\n{str(e)}")
def calibrate_channel(self, channel, ref_voltage):
"""Perform gain calibration for specified channel"""
try:
with self.device_lock:
# Take multiple samples for stability
samples = []
for _ in range(5): # Take 5 batches of samples
batch = self.dev.read(20, 500, True) # 20 samples, 500ms timeout
if batch:
samples.extend(batch)
time.sleep(0.1)
if not samples:
raise Exception("No valid samples received")
# Calculate average measured voltage
measured = np.mean([s[0][0] if channel == 'A' else s[1][0] for s in samples])
if abs(measured) < 0.01: # Sanity check
raise Exception(f"Measured voltage too low ({measured:.4f}V) - check connection")
# Calculate and apply gain
gain = ref_voltage / measured
self.calibration[channel]['gain'] = gain
# Update results
result_text = (
f"Channel {channel} Calibration Complete:\n"
f"Reference Voltage: {ref_voltage:.4f} V\n"
f"Measured Voltage: {measured:.6f} V\n"
f"Calculated Gain: {gain:.6f}\n\n"
f"Don't forget to Save Calibration!"
)
self.update_cal_results(result_text)
except Exception as e:
self.update_cal_results(f"Channel {channel} Calibration Failed:\n{str(e)}")
def load_calibration(self):
"""Load calibration from file and update display"""
config_file = os.path.join(self.config_dir, "calibration.json")
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
data = json.load(f)
if 'A' in data and 'B' in data:
self.calibration = data
self.cal_applied.set(True)
self.update_calibration_display()
return True
except Exception as e:
print(f"Failed to load calibration: {str(e)}")
return False
def update_calibration_display(self):
"""Update calibration status display"""
if self.cal_applied.get():
if self.calibration == {'A': {'gain': 1.0, 'offset': 0.0}, 'B': {'gain': 1.0, 'offset': 0.0}}:
self.cal_status.config(text="No calibration", foreground=self.warning_color)
else:
self.cal_status.config(text="Calibration active", foreground=self.success_color)
else:
self.cal_status.config(text="Calibration available", foreground=self.warning_color)
def update_cal_results(self, text):
"""Thread-safe results display update"""
if hasattr(self, 'cal_results'):
self.cal_results.config(state=tk.NORMAL)
self.cal_results.delete(1.0, tk.END)
self.cal_results.insert(tk.END, text)
self.cal_results.config(state=tk.DISABLED)
self.cal_window.update()
def save_calibration(self):
"""Save calibration to file with error handling"""
try:
config_file = os.path.join(self.config_dir, "calibration.json")
os.makedirs(self.config_dir, exist_ok=True)
with open(config_file, 'w') as f:
json.dump(self.calibration, f, indent=4)
self.update_cal_results(
f"Calibration successfully saved to:\n{config_file}\n\n"
f"Channel A: Gain={self.calibration['A']['gain']:.6f}, Offset={self.calibration['A']['offset']:.6f}V\n"
f"Channel B: Gain={self.calibration['B']['gain']:.6f}, Offset={self.calibration['B']['offset']:.6f}V"
)
return True
except Exception as e:
self.update_cal_results(f"Failed to save calibration:\n{str(e)}")
return False
def on_cal_window_close(self):
"""Handle calibration window closing properly"""
if hasattr(self, 'cal_window'):
# Restore original states
if hasattr(self, 'original_cal_state'):
self.cal_applied.set(self.original_cal_state)
# Clean up window
self.cal_window.destroy()
del self.cal_window
# Restart logging if it was running
if hasattr(self, 'original_measuring') and self.original_measuring:
self.toggle_measurement()
def update_measurement_display(self, voltage_a, voltage_b_ref, shunt_voltage, current):
self.voltage_a_label.config(text=f"{voltage_a:.4f}")
self.voltage_b_ref_label.config(text=f"{voltage_b_ref:.4f}")
self.shunt_voltage_label.config(text=f"{shunt_voltage:.4f}")
self.current_label.config(text=f"{current:.4f}")
status = "Logging" if self.measuring else "Measuring"
if self.cal_applied.get():
status += " | CALIBRATED"
self.status_var.set(f"{status} | Interval: {self.interval}s | Shunt: {self.shunt_resistor}Ω")
def update_plot(self):
"""Update the plot with current data"""
if not self.time_data:
return
self.line_a.set_data(self.time_data, self.voltage_a_data)
self.line_current.set_data(self.time_data, self.current_data)
# Auto-scale the plot
self.ax.relim()
self.ax.autoscale_view()
self.ax2.relim()
self.ax2.autoscale_view()
# Set x-axis to show from 0 to current max time
if len(self.time_data) > 1:
self.ax.set_xlim(0, self.time_data[-1])
self.ax2.set_xlim(0, self.time_data[-1])
self.canvas.draw()
def save_measurement_data(self, voltage_a, voltage_b, shunt_voltage, current):
"""Save measurement data to file"""
with open(self.filename, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
f"{time.time()-self.start_time:.3f}",
f"{voltage_a:.6f}",
f"{voltage_b:.6f}",
f"{shunt_voltage:.6f}",
f"{current:.6f}",
str(self.cal_applied.get())
])
def on_close(self):
"""Clean up on window close"""
self.measurement_event.clear()
if hasattr(self, 'measurement_thread'):
self.measurement_thread.join(timeout=1.0)
if hasattr(self, 'queue_listener'):
self.queue_listener.join(timeout=0.5)
if hasattr(self, 'session') and self.session:
try:
if self.session_active:
self.session.end()
except:
pass
# Clean UI state without triggering error popup
self.status_var.set("Session ended. Closing...")
if hasattr(self, 'status_light') and hasattr(self, 'status_indicator'):
self.status_light.itemconfig(self.status_indicator, fill='red')
if hasattr(self, 'connection_label'):
self.connection_label.config(text="Disconnected")
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
try:
app = ModernADALM1000Logger(root)
root.mainloop()
except Exception as e:
if root.winfo_exists():
messagebox.showerror("Fatal Error", f"Application failed: {str(e)}")
else:
print(f"Fatal Error: {e}")
try:
root.destroy()
except:
pass