Dateien nach "OnlyMeasuring" hochladen
Both channel in HI-Z Mode
This commit is contained in:
parent
1d58aba999
commit
7f470eb266
812
OnlyMeasuring/adalm1000_logger - Only measuring.py
Normal file
812
OnlyMeasuring/adalm1000_logger - Only measuring.py
Normal file
@ -0,0 +1,812 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import csv
|
||||||
|
import json
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
from datetime import datetime
|
||||||
|
import tkinter as tk
|
||||||
|
from tkinter import ttk, messagebox
|
||||||
|
import pysmu
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib
|
||||||
|
matplotlib.use('TkAgg')
|
||||||
|
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
|
||||||
|
from matplotlib.figure import Figure
|
||||||
|
|
||||||
|
class DeviceDisconnectedError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ModernADALM1000Logger:
|
||||||
|
def __init__(self, root):
|
||||||
|
# Color scheme
|
||||||
|
self.bg_color = "#2E3440"
|
||||||
|
self.fg_color = "#D8DEE9"
|
||||||
|
self.accent_color = "#5E81AC"
|
||||||
|
self.warning_color = "#BF616A"
|
||||||
|
self.success_color = "#A3BE8C"
|
||||||
|
|
||||||
|
# Main window configuration
|
||||||
|
self.root = root
|
||||||
|
self.root.title("ADALM1000 - Modern Logger")
|
||||||
|
self.root.geometry("1000x800")
|
||||||
|
self.root.minsize(800, 700)
|
||||||
|
self.root.configure(bg=self.bg_color)
|
||||||
|
|
||||||
|
# Device and measurement state
|
||||||
|
self.session_active = False
|
||||||
|
self.measuring = False
|
||||||
|
self.interval = 1.0
|
||||||
|
self.shunt_resistor = 0.1
|
||||||
|
self.log_dir = os.path.expanduser("~/adalm1000/logs")
|
||||||
|
self.config_dir = os.path.expanduser("~/adalm1000/config")
|
||||||
|
os.makedirs(self.config_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Calibration data
|
||||||
|
self.calibration = {'A': {'gain': 1.0, 'offset': 0.0}, 'B': {'gain': 1.0, 'offset': 0.0}}
|
||||||
|
self.cal_applied = tk.BooleanVar(value=True)
|
||||||
|
|
||||||
|
# Threading and data
|
||||||
|
self.device_lock = threading.Lock()
|
||||||
|
self.measurement_event = threading.Event()
|
||||||
|
self.data_queue = queue.Queue(maxsize=100)
|
||||||
|
|
||||||
|
|
||||||
|
# Data buffers using numpy for better performance
|
||||||
|
self.time_data = []
|
||||||
|
self.voltage_a_data = []
|
||||||
|
self.voltage_b_data = []
|
||||||
|
self.current_data = []
|
||||||
|
self.data_index = 0
|
||||||
|
|
||||||
|
# Initialize UI and device
|
||||||
|
self.setup_ui()
|
||||||
|
self.load_calibration()
|
||||||
|
self.init_device()
|
||||||
|
self.root.after(100, self.update_calibration_display)
|
||||||
|
|
||||||
|
# Ensure proper cleanup
|
||||||
|
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
|
||||||
|
|
||||||
|
def handle_device_error(self, error):
|
||||||
|
"""Handle device connection errors and update UI accordingly"""
|
||||||
|
error_msg = str(error)
|
||||||
|
print(f"Device error: {error_msg}") # Log to console
|
||||||
|
|
||||||
|
# Update UI in safe thread context
|
||||||
|
self.root.after_idle(lambda: self.status_light.itemconfig(self.status_indicator, fill='red'))
|
||||||
|
|
||||||
|
# Safe status light update
|
||||||
|
if hasattr(self, 'status_light') and hasattr(self, 'status_indicator'):
|
||||||
|
self.status_light.itemconfig(self.status_indicator, fill='red')
|
||||||
|
|
||||||
|
# Update connection label and status
|
||||||
|
self.connection_label.config(text="Disconnected")
|
||||||
|
|
||||||
|
if "No ADALM1000 detected" in error_msg:
|
||||||
|
self.status_var.set("Device not found - check USB connection")
|
||||||
|
else:
|
||||||
|
self.status_var.set(f"Device error: {error_msg}")
|
||||||
|
|
||||||
|
# Disable controls
|
||||||
|
self.session_active = False
|
||||||
|
self.measuring = False
|
||||||
|
if hasattr(self, 'start_button'):
|
||||||
|
self.start_button.config(state=tk.DISABLED, text="START LOGGING")
|
||||||
|
if hasattr(self, 'calibrate_button'):
|
||||||
|
self.calibrate_button.config(state=tk.DISABLED)
|
||||||
|
|
||||||
|
# Clear plot + buffers
|
||||||
|
self.line_a.set_data([], [])
|
||||||
|
self.line_current.set_data([], [])
|
||||||
|
self.ax.set_xlim(0, 1)
|
||||||
|
self.ax2.set_xlim(0, 1)
|
||||||
|
self.canvas.draw()
|
||||||
|
|
||||||
|
self.time_data.clear()
|
||||||
|
self.voltage_a_data.clear()
|
||||||
|
self.voltage_b_data.clear()
|
||||||
|
self.current_data.clear()
|
||||||
|
self.data_index = 0
|
||||||
|
|
||||||
|
# Attempt to clean up the session
|
||||||
|
if hasattr(self, 'session'):
|
||||||
|
try:
|
||||||
|
self.session.end()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
del self.session
|
||||||
|
|
||||||
|
# Enable reconnect option
|
||||||
|
if hasattr(self, 'reconnect_btn'):
|
||||||
|
self.reconnect_btn.config(state=tk.NORMAL)
|
||||||
|
|
||||||
|
# Show popup message if window still exists
|
||||||
|
if self.root.winfo_exists():
|
||||||
|
messagebox.showerror(
|
||||||
|
"Device Connection Error",
|
||||||
|
f"Could not connect to ADALM1000:\n\n{error_msg}\n\n"
|
||||||
|
"1. Check USB cable connection\n"
|
||||||
|
"2. Try the Reconnect button\n"
|
||||||
|
"3. Restart the application if problem persists"
|
||||||
|
)
|
||||||
|
|
||||||
|
def setup_ui(self):
|
||||||
|
"""Configure the user interface"""
|
||||||
|
self.style = ttk.Style()
|
||||||
|
self.style.theme_use('clam')
|
||||||
|
|
||||||
|
# Configure styles
|
||||||
|
self.style.configure('.', background=self.bg_color, foreground=self.fg_color)
|
||||||
|
self.style.configure('TFrame', background=self.bg_color)
|
||||||
|
self.style.configure('TLabel', background=self.bg_color, foreground=self.fg_color)
|
||||||
|
self.style.configure('TButton', background=self.accent_color, foreground=self.fg_color,
|
||||||
|
padding=6, font=('Helvetica', 10, 'bold'))
|
||||||
|
self.style.map('TButton',
|
||||||
|
background=[('active', self.accent_color), ('disabled', '#4C566A')],
|
||||||
|
foreground=[('active', self.fg_color), ('disabled', '#D8DEE9')])
|
||||||
|
self.style.configure('TEntry', fieldbackground="#3B4252", foreground=self.fg_color)
|
||||||
|
self.style.configure('Header.TLabel', font=('Helvetica', 14, 'bold'), foreground=self.accent_color)
|
||||||
|
self.style.configure('Value.TLabel', font=('Helvetica', 12, 'bold'))
|
||||||
|
self.style.configure('Status.TLabel', font=('Helvetica', 10))
|
||||||
|
self.style.configure('Warning.TButton', background=self.warning_color)
|
||||||
|
self.style.configure('TLabelframe', background=self.bg_color, foreground=self.accent_color)
|
||||||
|
|
||||||
|
# Main layout
|
||||||
|
self.content_frame = ttk.Frame(self.root)
|
||||||
|
self.content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
|
||||||
|
|
||||||
|
# Header area
|
||||||
|
header_frame = ttk.Frame(self.content_frame)
|
||||||
|
header_frame.pack(fill=tk.X, pady=(0, 20))
|
||||||
|
|
||||||
|
ttk.Label(header_frame, text="ADALM1000 Measurement Logger", style='Header.TLabel').pack(side=tk.LEFT)
|
||||||
|
|
||||||
|
# Status indicator
|
||||||
|
self.status_light = tk.Canvas(header_frame, width=20, height=20, bg=self.bg_color, bd=0, highlightthickness=0)
|
||||||
|
self.status_light.pack(side=tk.RIGHT, padx=10)
|
||||||
|
self.status_indicator = self.status_light.create_oval(2, 2, 18, 18, fill='red')
|
||||||
|
self.connection_label = ttk.Label(header_frame, text="Disconnected")
|
||||||
|
self.connection_label.pack(side=tk.RIGHT)
|
||||||
|
|
||||||
|
# Reconnect button
|
||||||
|
self.reconnect_btn = ttk.Button(header_frame, text="Reconnect", command=self.reconnect_device)
|
||||||
|
self.reconnect_btn.pack(side=tk.RIGHT, padx=10)
|
||||||
|
|
||||||
|
# Measurement display
|
||||||
|
display_frame = ttk.LabelFrame(self.content_frame, text=" Live Measurements ", padding=15)
|
||||||
|
display_frame.pack(fill=tk.BOTH, expand=False)
|
||||||
|
|
||||||
|
# Measurement values
|
||||||
|
measurement_labels = [
|
||||||
|
("Circuit Voltage (CH A)", "V"),
|
||||||
|
("Reference Voltage (CH B)", "V"),
|
||||||
|
("Shunt Voltage (A - B)", "V"),
|
||||||
|
("Current", "A")
|
||||||
|
]
|
||||||
|
|
||||||
|
for i, (label, unit) in enumerate(measurement_labels):
|
||||||
|
ttk.Label(display_frame, text=f"{label}:", font=('Helvetica', 11)).grid(row=i, column=0, sticky=tk.W, pady=5)
|
||||||
|
value_label = ttk.Label(display_frame, text="0.000", style='Value.TLabel')
|
||||||
|
value_label.grid(row=i, column=1, sticky=tk.W, padx=10)
|
||||||
|
ttk.Label(display_frame, text=unit).grid(row=i, column=2, sticky=tk.W)
|
||||||
|
|
||||||
|
if i == 0:
|
||||||
|
self.voltage_a_label = value_label
|
||||||
|
elif i == 1:
|
||||||
|
self.voltage_b_ref_label = value_label # NEW: CH B direct
|
||||||
|
elif i == 2:
|
||||||
|
self.shunt_voltage_label = value_label # NEW: A - B
|
||||||
|
else:
|
||||||
|
self.current_label = value_label
|
||||||
|
|
||||||
|
# Control area (single row now)
|
||||||
|
controls_frame = ttk.Frame(self.content_frame)
|
||||||
|
controls_frame.pack(fill=tk.X, pady=(10, 10), padx=0)
|
||||||
|
|
||||||
|
# Left side: Start + Interval + Shunt
|
||||||
|
left_control_frame = ttk.Frame(controls_frame)
|
||||||
|
left_control_frame.pack(side=tk.LEFT)
|
||||||
|
|
||||||
|
self.start_button = ttk.Button(left_control_frame, text="START LOGGING", command=self.toggle_measurement)
|
||||||
|
self.start_button.pack(side=tk.LEFT, padx=(0, 12))
|
||||||
|
|
||||||
|
ttk.Label(left_control_frame, text="Interval:").pack(side=tk.LEFT, padx=(0, 4))
|
||||||
|
self.interval_entry = ttk.Entry(left_control_frame, width=6)
|
||||||
|
self.interval_entry.pack(side=tk.LEFT)
|
||||||
|
self.interval_entry.insert(0, "1.0")
|
||||||
|
ttk.Label(left_control_frame, text="s").pack(side=tk.LEFT, padx=(4, 12))
|
||||||
|
|
||||||
|
ttk.Label(left_control_frame, text="Shunt:").pack(side=tk.LEFT, padx=(0, 4))
|
||||||
|
self.shunt_entry = ttk.Entry(left_control_frame, width=6)
|
||||||
|
self.shunt_entry.pack(side=tk.LEFT)
|
||||||
|
self.shunt_entry.insert(0, "0.1")
|
||||||
|
ttk.Label(left_control_frame, text="Ω").pack(side=tk.LEFT, padx=(4, 12))
|
||||||
|
|
||||||
|
# Right side: Calibration stuff
|
||||||
|
cal_control_frame = ttk.Frame(controls_frame)
|
||||||
|
cal_control_frame.pack(side=tk.RIGHT)
|
||||||
|
|
||||||
|
self.cal_check = ttk.Checkbutton(cal_control_frame, text="Apply Calibration", variable=self.cal_applied, command=self.toggle_calibration)
|
||||||
|
self.cal_check.pack(side=tk.LEFT, padx=(0, 10))
|
||||||
|
|
||||||
|
self.cal_status = ttk.Label(cal_control_frame, text="No calibration", foreground=self.warning_color)
|
||||||
|
self.cal_status.pack(side=tk.LEFT, padx=(0, 10))
|
||||||
|
|
||||||
|
self.calibrate_button = ttk.Button(cal_control_frame, text="CALIBRATE", command=self.run_calibration)
|
||||||
|
self.calibrate_button.pack(side=tk.LEFT)
|
||||||
|
|
||||||
|
# Plot area
|
||||||
|
self.plot_frame = ttk.Frame(self.content_frame)
|
||||||
|
self.plot_frame.pack(fill=tk.BOTH, expand=True, padx=20, pady=(0, 5))
|
||||||
|
self.setup_plot()
|
||||||
|
|
||||||
|
# Status bar
|
||||||
|
self.status_var = tk.StringVar()
|
||||||
|
self.status_var.set("Ready")
|
||||||
|
self.status_label = ttk.Label(self.root, textvariable=self.status_var, style='Status.TLabel', padding=(0, 5), anchor=tk.W)
|
||||||
|
self.status_label.place(x=20, relx=0, rely=1.0, anchor='sw', relwidth=0.96, height=28)
|
||||||
|
|
||||||
|
def setup_plot(self):
|
||||||
|
"""Configure the matplotlib plot"""
|
||||||
|
self.fig = Figure(figsize=(8, 5), dpi=100, facecolor='#2E3440')
|
||||||
|
self.fig.subplots_adjust(left=0.1, right=0.9, top=0.9, bottom=0.15)
|
||||||
|
self.ax = self.fig.add_subplot(111)
|
||||||
|
self.ax.set_facecolor('#3B4252')
|
||||||
|
|
||||||
|
# Voltage A (left axis)
|
||||||
|
self.line_a, = self.ax.plot([], [], color='#00BFFF', label='Voltage A (V)', linewidth=2)
|
||||||
|
self.ax.set_ylabel("Voltage (V)", color='#00BFFF')
|
||||||
|
self.ax.tick_params(axis='y', labelcolor='#00BFFF')
|
||||||
|
|
||||||
|
# Current (right axis)
|
||||||
|
self.ax2 = self.ax.twinx()
|
||||||
|
self.line_current, = self.ax2.plot([], [], 'r-', label='Current (A)', linewidth=2)
|
||||||
|
self.ax2.set_ylabel("Current (A)", color='r')
|
||||||
|
self.ax2.tick_params(axis='y', labelcolor='r')
|
||||||
|
|
||||||
|
self.ax.set_xlabel('Time (s)', color=self.fg_color)
|
||||||
|
self.ax.set_title('Circuit Voltage and Current', color=self.fg_color)
|
||||||
|
self.ax.tick_params(axis='x', colors=self.fg_color)
|
||||||
|
self.ax.grid(True, color='#4C566A')
|
||||||
|
|
||||||
|
self.ax.legend(loc='upper left')
|
||||||
|
self.ax2.legend(loc='upper right')
|
||||||
|
|
||||||
|
# Embed plot
|
||||||
|
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
|
||||||
|
self.canvas.draw()
|
||||||
|
canvas_widget = self.canvas.get_tk_widget()
|
||||||
|
canvas_widget.configure(bg='#2E3440', bd=0, highlightthickness=0)
|
||||||
|
canvas_widget.pack(side=tk.TOP, fill=tk.X, expand=True, pady=(10, 0))
|
||||||
|
|
||||||
|
def init_device(self):
|
||||||
|
"""Initialize the ADALM1000 device"""
|
||||||
|
try:
|
||||||
|
if hasattr(self, 'session'):
|
||||||
|
try:
|
||||||
|
if self.session_active:
|
||||||
|
self.session.end()
|
||||||
|
del self.session
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Cleanup error: {e}")
|
||||||
|
|
||||||
|
self.session = pysmu.Session(ignore_dataflow=True, queue_size=10000)
|
||||||
|
if not self.session.devices:
|
||||||
|
raise Exception("No ADALM1000 detected - check connections")
|
||||||
|
|
||||||
|
self.dev = self.session.devices[0]
|
||||||
|
self.dev.channels['A'].mode = pysmu.Mode.HI_Z
|
||||||
|
self.dev.channels['B'].mode = pysmu.Mode.HI_Z
|
||||||
|
self.session.start(0)
|
||||||
|
|
||||||
|
self.status_light.itemconfig(self.status_indicator, fill='green')
|
||||||
|
self.connection_label.config(text="Connected")
|
||||||
|
self.status_var.set("Device connected | Ready to measure")
|
||||||
|
self.session_active = True
|
||||||
|
|
||||||
|
self.start_button.config(state=tk.NORMAL)
|
||||||
|
self.calibrate_button.config(state=tk.NORMAL)
|
||||||
|
|
||||||
|
if hasattr(self, 'reconnect_btn'):
|
||||||
|
self.reconnect_btn.config(state=tk.DISABLED)
|
||||||
|
|
||||||
|
if not self.measurement_event.is_set():
|
||||||
|
self.start_measurement_thread()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.handle_device_error(e)
|
||||||
|
|
||||||
|
def reconnect_device(self):
|
||||||
|
"""Reconnect the device with proper cleanup"""
|
||||||
|
self.status_var.set("Attempting to reconnect...")
|
||||||
|
self.measuring = False
|
||||||
|
self.measurement_event.clear()
|
||||||
|
|
||||||
|
# Wait for threads to finish
|
||||||
|
if hasattr(self, 'measurement_thread'):
|
||||||
|
self.measurement_thread.join(timeout=1.0)
|
||||||
|
if hasattr(self, 'queue_listener'):
|
||||||
|
self.queue_listener.join(timeout=0.5)
|
||||||
|
|
||||||
|
# Reset plot + buffers before reinitializing
|
||||||
|
self.handle_device_error("Reconnecting...")
|
||||||
|
|
||||||
|
# Try to reinitialize device
|
||||||
|
self.init_device()
|
||||||
|
|
||||||
|
|
||||||
|
def start_measurement_thread(self):
|
||||||
|
"""Start the measurement thread"""
|
||||||
|
if not self.measurement_event.is_set():
|
||||||
|
self.measurement_event.set()
|
||||||
|
self.measurement_thread = threading.Thread(
|
||||||
|
target=self.measure,
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
self.measurement_thread.start()
|
||||||
|
self.start_queue_listener()
|
||||||
|
|
||||||
|
def start_queue_listener(self):
|
||||||
|
"""Start listening to the data queue"""
|
||||||
|
self.queue_listener = threading.Thread(
|
||||||
|
target=self.listen_to_data_queue,
|
||||||
|
daemon=True
|
||||||
|
)
|
||||||
|
self.queue_listener.start()
|
||||||
|
|
||||||
|
def listen_to_data_queue(self):
|
||||||
|
"""Process data from the queue"""
|
||||||
|
while self.measurement_event.is_set():
|
||||||
|
try:
|
||||||
|
data = self.data_queue.get(timeout=0.1)
|
||||||
|
self.process_data_item(data)
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def process_data_item(self, data):
|
||||||
|
"""Process a single data item from the queue"""
|
||||||
|
if data[0] == 'error':
|
||||||
|
self.handle_error(data[1])
|
||||||
|
else:
|
||||||
|
current_time, voltage_a, voltage_b, current = data
|
||||||
|
shunt_voltage = voltage_a - voltage_b
|
||||||
|
|
||||||
|
# Always append new data
|
||||||
|
self.time_data.append(current_time)
|
||||||
|
self.voltage_a_data.append(voltage_a)
|
||||||
|
self.voltage_b_data.append(voltage_b)
|
||||||
|
self.current_data.append(current)
|
||||||
|
self.data_index += 1
|
||||||
|
|
||||||
|
# Update UI
|
||||||
|
self.update_measurement_display(voltage_a, voltage_b, shunt_voltage, current)
|
||||||
|
|
||||||
|
# Update plot periodically
|
||||||
|
if self.data_index % 5 == 0 or self.data_index == 1:
|
||||||
|
self.root.after_idle(self.update_plot)
|
||||||
|
|
||||||
|
# Save data if logging
|
||||||
|
if self.measuring:
|
||||||
|
self.save_measurement_data(voltage_a, voltage_b, shunt_voltage, current)
|
||||||
|
|
||||||
|
def measure(self):
|
||||||
|
"""Measurement loop using interval-based averaging"""
|
||||||
|
self.start_time = time.time()
|
||||||
|
next_time = self.start_time
|
||||||
|
|
||||||
|
raw_va = []
|
||||||
|
raw_vb = []
|
||||||
|
|
||||||
|
while self.measurement_event.is_set():
|
||||||
|
try:
|
||||||
|
with self.device_lock:
|
||||||
|
if not self.session_active:
|
||||||
|
self.data_queue.put(('error', "Device disconnected during measurement"))
|
||||||
|
break
|
||||||
|
|
||||||
|
samples = self.dev.read(5, 500, True)
|
||||||
|
if not samples or not isinstance(samples[0], tuple) or len(samples[0]) != 2:
|
||||||
|
raise DeviceDisconnectedError("ADALM1000 is no longer responding")
|
||||||
|
|
||||||
|
raw_va.extend([s[0][0] for s in samples])
|
||||||
|
raw_vb.extend([s[1][0] for s in samples])
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
if now >= next_time:
|
||||||
|
if self.cal_applied.get():
|
||||||
|
raw_va_cal = [(v * self.calibration['A']['gain']) + self.calibration['A']['offset'] for v in raw_va]
|
||||||
|
raw_vb_cal = [(v * self.calibration['B']['gain']) + self.calibration['B']['offset'] for v in raw_vb]
|
||||||
|
else:
|
||||||
|
raw_va_cal = raw_va
|
||||||
|
raw_vb_cal = raw_vb
|
||||||
|
|
||||||
|
voltage_a = sum(raw_va_cal) / len(raw_va_cal) if raw_va_cal else 0
|
||||||
|
voltage_b = sum(raw_vb_cal) / len(raw_vb_cal) if raw_vb_cal else 0
|
||||||
|
shunt_voltage = voltage_a - voltage_b
|
||||||
|
current = shunt_voltage / self.shunt_resistor if self.shunt_resistor > 0 else 0
|
||||||
|
current_time = now - self.start_time
|
||||||
|
|
||||||
|
self.data_queue.put((current_time, voltage_a, voltage_b, current))
|
||||||
|
|
||||||
|
next_time += self.interval
|
||||||
|
raw_va.clear()
|
||||||
|
raw_vb.clear()
|
||||||
|
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
except DeviceDisconnectedError as e:
|
||||||
|
self.root.after(0, lambda err=e: self.handle_device_error(err))
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
self.root.after(0, lambda err=e: self.handle_device_error(f"Measurement error: {err}"))
|
||||||
|
break
|
||||||
|
|
||||||
|
def toggle_measurement(self):
|
||||||
|
"""Start/Stop measurement logging"""
|
||||||
|
if not self.measuring:
|
||||||
|
try:
|
||||||
|
self.interval = max(0.1, float(self.interval_entry.get()))
|
||||||
|
self.shunt_resistor = max(0.001, float(self.shunt_entry.get()))
|
||||||
|
except ValueError as e:
|
||||||
|
messagebox.showerror("Input Error", f"Invalid number: {str(e)}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Clear all data buffers when starting new measurement
|
||||||
|
self.time_data.clear()
|
||||||
|
self.voltage_a_data.clear()
|
||||||
|
self.voltage_b_data.clear()
|
||||||
|
self.current_data.clear()
|
||||||
|
|
||||||
|
# Setup new log file
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
self.filename = os.path.join(self.log_dir, f"measurement_{timestamp}.csv")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(self.filename, 'w', newline='') as f:
|
||||||
|
writer = csv.writer(f)
|
||||||
|
writer.writerow(["Time(s)", "Voltage_A(V)", "Voltage_B(V)", "Shunt_Voltage(V)", "Current(A)", "Calibration_Applied"])
|
||||||
|
except Exception as e:
|
||||||
|
messagebox.showerror("File Error", f"Cannot create file: {str(e)}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Reset data buffers
|
||||||
|
self.time_data.clear()
|
||||||
|
self.voltage_a_data.clear()
|
||||||
|
self.voltage_b_data.clear()
|
||||||
|
self.current_data.clear()
|
||||||
|
self.data_index = 0
|
||||||
|
|
||||||
|
# Start measurement
|
||||||
|
self.measuring = True
|
||||||
|
self.start_button.config(text="STOP LOGGING", style='Warning.TButton')
|
||||||
|
self.status_var.set(f"Logging to: {os.path.basename(self.filename)}")
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
|
if not self.measurement_event.is_set():
|
||||||
|
self.start_measurement_thread()
|
||||||
|
else:
|
||||||
|
self.measuring = False
|
||||||
|
self.start_button.config(text="START LOGGING", style='TButton')
|
||||||
|
self.status_var.set(f"Ready | Last file: {os.path.basename(self.filename)}")
|
||||||
|
|
||||||
|
def toggle_calibration(self):
|
||||||
|
"""Toggle calibration application and update display"""
|
||||||
|
self.update_calibration_display()
|
||||||
|
|
||||||
|
def run_calibration(self):
|
||||||
|
"""Start calibration procedure with proper initialization"""
|
||||||
|
if hasattr(self, 'cal_window') and self.cal_window.winfo_exists():
|
||||||
|
self.cal_window.lift()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Store current state
|
||||||
|
self.original_measuring = self.measuring
|
||||||
|
self.original_cal_state = self.cal_applied.get()
|
||||||
|
self.cal_applied.set(False)
|
||||||
|
|
||||||
|
# Create calibration window
|
||||||
|
self.cal_window = tk.Toplevel(self.root)
|
||||||
|
self.cal_window.title("Calibration")
|
||||||
|
self.cal_window.geometry("500x500")
|
||||||
|
self.cal_window.configure(bg=self.bg_color)
|
||||||
|
|
||||||
|
# Calibration instructions
|
||||||
|
ttk.Label(self.cal_window, text="ADALM1000 Calibration", style='Header.TLabel').pack(pady=10)
|
||||||
|
|
||||||
|
steps = [
|
||||||
|
"1. Short both CHA and CHB inputs",
|
||||||
|
"2. Click 'Measure Zero' when ready",
|
||||||
|
"3. Apply known voltage to CHA",
|
||||||
|
"4. Enter reference voltage and click 'Calibrate CHA'",
|
||||||
|
"5. Repeat for CHB if needed",
|
||||||
|
"6. Click 'Save Calibration' when done"
|
||||||
|
]
|
||||||
|
|
||||||
|
for step in steps:
|
||||||
|
ttk.Label(self.cal_window, text=step, style='TLabel').pack(anchor=tk.W, padx=20, pady=2)
|
||||||
|
|
||||||
|
# Calibration controls
|
||||||
|
btn_frame = ttk.Frame(self.cal_window)
|
||||||
|
btn_frame.pack(pady=10)
|
||||||
|
|
||||||
|
ttk.Button(btn_frame, text="Measure Zero",
|
||||||
|
command=self.start_zero_calibration).pack(side=tk.LEFT, padx=5)
|
||||||
|
|
||||||
|
self.ref_voltage = ttk.Entry(btn_frame, width=8)
|
||||||
|
self.ref_voltage.pack(side=tk.LEFT, padx=5)
|
||||||
|
self.ref_voltage.insert(0, "3.3")
|
||||||
|
|
||||||
|
ttk.Button(btn_frame, text="Calibrate CHA",
|
||||||
|
command=lambda: self.start_channel_calibration('A')).pack(side=tk.LEFT, padx=5)
|
||||||
|
ttk.Button(btn_frame, text="Calibrate CHB",
|
||||||
|
command=lambda: self.start_channel_calibration('B')).pack(side=tk.LEFT, padx=5)
|
||||||
|
|
||||||
|
# Save/Load buttons
|
||||||
|
btn_frame2 = ttk.Frame(self.cal_window)
|
||||||
|
btn_frame2.pack(pady=10)
|
||||||
|
|
||||||
|
self.save_cal_btn = ttk.Button(btn_frame2, text="Save Calibration",
|
||||||
|
command=self.save_calibration)
|
||||||
|
self.save_cal_btn.pack(side=tk.LEFT, padx=5)
|
||||||
|
|
||||||
|
ttk.Button(btn_frame2, text="Load Calibration",
|
||||||
|
command=self.load_calibration).pack(side=tk.LEFT, padx=5)
|
||||||
|
|
||||||
|
# Results display
|
||||||
|
self.cal_results = tk.Text(self.cal_window, height=10, width=50,
|
||||||
|
bg="#3B4252", fg=self.fg_color,
|
||||||
|
font=('Monospace', 9))
|
||||||
|
self.cal_results.pack(pady=10)
|
||||||
|
self.cal_results.insert(tk.END, "Calibration results will appear here...")
|
||||||
|
self.cal_results.config(state=tk.DISABLED)
|
||||||
|
|
||||||
|
self.cal_window.protocol("WM_DELETE_WINDOW", self.on_cal_window_close)
|
||||||
|
|
||||||
|
def start_zero_calibration(self):
|
||||||
|
"""Start zero calibration in a thread with status updates"""
|
||||||
|
self.update_cal_results("Starting zero measurement...")
|
||||||
|
threading.Thread(target=self.measure_zero, daemon=True).start()
|
||||||
|
|
||||||
|
def start_channel_calibration(self, channel):
|
||||||
|
"""Start channel calibration in a thread with validation"""
|
||||||
|
try:
|
||||||
|
ref_voltage = float(self.ref_voltage.get())
|
||||||
|
if ref_voltage <= 0:
|
||||||
|
raise ValueError("Reference voltage must be positive")
|
||||||
|
|
||||||
|
self.update_cal_results(f"Starting {channel} calibration with {ref_voltage}V reference...")
|
||||||
|
threading.Thread(
|
||||||
|
target=lambda: self.calibrate_channel(channel, ref_voltage),
|
||||||
|
daemon=True
|
||||||
|
).start()
|
||||||
|
except ValueError as e:
|
||||||
|
self.update_cal_results(f"Error: {str(e)}")
|
||||||
|
|
||||||
|
def measure_zero(self):
|
||||||
|
"""Perform zero offset measurement with averaging"""
|
||||||
|
try:
|
||||||
|
with self.device_lock:
|
||||||
|
# Take multiple samples for stability
|
||||||
|
samples = []
|
||||||
|
for _ in range(5): # Take 5 batches of samples
|
||||||
|
batch = self.dev.read(20, 500, True) # 20 samples, 500ms timeout
|
||||||
|
if batch:
|
||||||
|
samples.extend(batch)
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
if not samples:
|
||||||
|
raise Exception("No valid samples received")
|
||||||
|
|
||||||
|
# Calculate averages
|
||||||
|
voltage_a = np.mean([s[0][0] for s in samples])
|
||||||
|
voltage_b = np.mean([s[1][0] for s in samples])
|
||||||
|
|
||||||
|
# Update calibration
|
||||||
|
self.calibration['A']['offset'] = -voltage_a
|
||||||
|
self.calibration['B']['offset'] = -voltage_b
|
||||||
|
|
||||||
|
# Update results
|
||||||
|
result_text = (
|
||||||
|
f"Zero Calibration Complete:\n"
|
||||||
|
f"Channel A Offset: {-voltage_a:.6f} V\n"
|
||||||
|
f"Channel B Offset: {-voltage_b:.6f} V\n\n"
|
||||||
|
f"Now apply known voltage and calibrate each channel."
|
||||||
|
)
|
||||||
|
self.update_cal_results(result_text)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.update_cal_results(f"Zero Calibration Failed:\n{str(e)}")
|
||||||
|
|
||||||
|
def calibrate_channel(self, channel, ref_voltage):
|
||||||
|
"""Perform gain calibration for specified channel"""
|
||||||
|
try:
|
||||||
|
with self.device_lock:
|
||||||
|
# Take multiple samples for stability
|
||||||
|
samples = []
|
||||||
|
for _ in range(5): # Take 5 batches of samples
|
||||||
|
batch = self.dev.read(20, 500, True) # 20 samples, 500ms timeout
|
||||||
|
if batch:
|
||||||
|
samples.extend(batch)
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
if not samples:
|
||||||
|
raise Exception("No valid samples received")
|
||||||
|
|
||||||
|
# Calculate average measured voltage
|
||||||
|
measured = np.mean([s[0][0] if channel == 'A' else s[1][0] for s in samples])
|
||||||
|
if abs(measured) < 0.01: # Sanity check
|
||||||
|
raise Exception(f"Measured voltage too low ({measured:.4f}V) - check connection")
|
||||||
|
|
||||||
|
# Calculate and apply gain
|
||||||
|
gain = ref_voltage / measured
|
||||||
|
self.calibration[channel]['gain'] = gain
|
||||||
|
|
||||||
|
# Update results
|
||||||
|
result_text = (
|
||||||
|
f"Channel {channel} Calibration Complete:\n"
|
||||||
|
f"Reference Voltage: {ref_voltage:.4f} V\n"
|
||||||
|
f"Measured Voltage: {measured:.6f} V\n"
|
||||||
|
f"Calculated Gain: {gain:.6f}\n\n"
|
||||||
|
f"Don't forget to Save Calibration!"
|
||||||
|
)
|
||||||
|
self.update_cal_results(result_text)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.update_cal_results(f"Channel {channel} Calibration Failed:\n{str(e)}")
|
||||||
|
|
||||||
|
def load_calibration(self):
|
||||||
|
"""Load calibration from file and update display"""
|
||||||
|
config_file = os.path.join(self.config_dir, "calibration.json")
|
||||||
|
if os.path.exists(config_file):
|
||||||
|
try:
|
||||||
|
with open(config_file, 'r') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
if 'A' in data and 'B' in data:
|
||||||
|
self.calibration = data
|
||||||
|
self.cal_applied.set(True)
|
||||||
|
self.update_calibration_display()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to load calibration: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def update_calibration_display(self):
|
||||||
|
"""Update calibration status display"""
|
||||||
|
if self.cal_applied.get():
|
||||||
|
if self.calibration == {'A': {'gain': 1.0, 'offset': 0.0}, 'B': {'gain': 1.0, 'offset': 0.0}}:
|
||||||
|
self.cal_status.config(text="No calibration", foreground=self.warning_color)
|
||||||
|
else:
|
||||||
|
self.cal_status.config(text="Calibration active", foreground=self.success_color)
|
||||||
|
else:
|
||||||
|
self.cal_status.config(text="Calibration available", foreground=self.warning_color)
|
||||||
|
|
||||||
|
def update_cal_results(self, text):
|
||||||
|
"""Thread-safe results display update"""
|
||||||
|
if hasattr(self, 'cal_results'):
|
||||||
|
self.cal_results.config(state=tk.NORMAL)
|
||||||
|
self.cal_results.delete(1.0, tk.END)
|
||||||
|
self.cal_results.insert(tk.END, text)
|
||||||
|
self.cal_results.config(state=tk.DISABLED)
|
||||||
|
self.cal_window.update()
|
||||||
|
|
||||||
|
def save_calibration(self):
|
||||||
|
"""Save calibration to file with error handling"""
|
||||||
|
try:
|
||||||
|
config_file = os.path.join(self.config_dir, "calibration.json")
|
||||||
|
os.makedirs(self.config_dir, exist_ok=True)
|
||||||
|
|
||||||
|
with open(config_file, 'w') as f:
|
||||||
|
json.dump(self.calibration, f, indent=4)
|
||||||
|
|
||||||
|
self.update_cal_results(
|
||||||
|
f"Calibration successfully saved to:\n{config_file}\n\n"
|
||||||
|
f"Channel A: Gain={self.calibration['A']['gain']:.6f}, Offset={self.calibration['A']['offset']:.6f}V\n"
|
||||||
|
f"Channel B: Gain={self.calibration['B']['gain']:.6f}, Offset={self.calibration['B']['offset']:.6f}V"
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
self.update_cal_results(f"Failed to save calibration:\n{str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def on_cal_window_close(self):
|
||||||
|
"""Handle calibration window closing properly"""
|
||||||
|
if hasattr(self, 'cal_window'):
|
||||||
|
# Restore original states
|
||||||
|
if hasattr(self, 'original_cal_state'):
|
||||||
|
self.cal_applied.set(self.original_cal_state)
|
||||||
|
|
||||||
|
# Clean up window
|
||||||
|
self.cal_window.destroy()
|
||||||
|
del self.cal_window
|
||||||
|
|
||||||
|
# Restart logging if it was running
|
||||||
|
if hasattr(self, 'original_measuring') and self.original_measuring:
|
||||||
|
self.toggle_measurement()
|
||||||
|
|
||||||
|
def update_measurement_display(self, voltage_a, voltage_b_ref, shunt_voltage, current):
|
||||||
|
self.voltage_a_label.config(text=f"{voltage_a:.4f}")
|
||||||
|
self.voltage_b_ref_label.config(text=f"{voltage_b_ref:.4f}")
|
||||||
|
self.shunt_voltage_label.config(text=f"{shunt_voltage:.4f}")
|
||||||
|
self.current_label.config(text=f"{current:.4f}")
|
||||||
|
|
||||||
|
status = "Logging" if self.measuring else "Measuring"
|
||||||
|
if self.cal_applied.get():
|
||||||
|
status += " | CALIBRATED"
|
||||||
|
self.status_var.set(f"{status} | Interval: {self.interval}s | Shunt: {self.shunt_resistor}Ω")
|
||||||
|
|
||||||
|
def update_plot(self):
|
||||||
|
"""Update the plot with current data"""
|
||||||
|
if not self.time_data:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.line_a.set_data(self.time_data, self.voltage_a_data)
|
||||||
|
self.line_current.set_data(self.time_data, self.current_data)
|
||||||
|
|
||||||
|
# Auto-scale the plot
|
||||||
|
self.ax.relim()
|
||||||
|
self.ax.autoscale_view()
|
||||||
|
self.ax2.relim()
|
||||||
|
self.ax2.autoscale_view()
|
||||||
|
|
||||||
|
# Set x-axis to show from 0 to current max time
|
||||||
|
if len(self.time_data) > 1:
|
||||||
|
self.ax.set_xlim(0, self.time_data[-1])
|
||||||
|
self.ax2.set_xlim(0, self.time_data[-1])
|
||||||
|
|
||||||
|
self.canvas.draw()
|
||||||
|
|
||||||
|
def save_measurement_data(self, voltage_a, voltage_b, shunt_voltage, current):
|
||||||
|
"""Save measurement data to file"""
|
||||||
|
with open(self.filename, 'a', newline='') as f:
|
||||||
|
writer = csv.writer(f)
|
||||||
|
writer.writerow([
|
||||||
|
f"{time.time()-self.start_time:.3f}",
|
||||||
|
f"{voltage_a:.6f}",
|
||||||
|
f"{voltage_b:.6f}",
|
||||||
|
f"{shunt_voltage:.6f}",
|
||||||
|
f"{current:.6f}",
|
||||||
|
str(self.cal_applied.get())
|
||||||
|
])
|
||||||
|
|
||||||
|
def on_close(self):
|
||||||
|
"""Clean up on window close"""
|
||||||
|
self.measurement_event.clear()
|
||||||
|
|
||||||
|
if hasattr(self, 'measurement_thread'):
|
||||||
|
self.measurement_thread.join(timeout=1.0)
|
||||||
|
|
||||||
|
if hasattr(self, 'queue_listener'):
|
||||||
|
self.queue_listener.join(timeout=0.5)
|
||||||
|
|
||||||
|
if hasattr(self, 'session') and self.session:
|
||||||
|
try:
|
||||||
|
if self.session_active:
|
||||||
|
self.session.end()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Clean UI state without triggering error popup
|
||||||
|
self.status_var.set("Session ended. Closing...")
|
||||||
|
if hasattr(self, 'status_light') and hasattr(self, 'status_indicator'):
|
||||||
|
self.status_light.itemconfig(self.status_indicator, fill='red')
|
||||||
|
if hasattr(self, 'connection_label'):
|
||||||
|
self.connection_label.config(text="Disconnected")
|
||||||
|
|
||||||
|
self.root.destroy()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
root = tk.Tk()
|
||||||
|
try:
|
||||||
|
app = ModernADALM1000Logger(root)
|
||||||
|
root.mainloop()
|
||||||
|
except Exception as e:
|
||||||
|
if root.winfo_exists():
|
||||||
|
messagebox.showerror("Fatal Error", f"Application failed: {str(e)}")
|
||||||
|
else:
|
||||||
|
print(f"Fatal Error: {e}")
|
||||||
|
try:
|
||||||
|
root.destroy()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
Loading…
x
Reference in New Issue
Block a user