CSVVisualizer/CSVVisualizer.py
Jan 2b393b48eb CSVVisualizer.py aktualisiert
Eine neue clean_data() Methode, die:

        Leere Zeilen entfernt

        Fehlende Werte mit linearer Interpolation füllt

        Fehlende Phasen mit der letzten bekannten Phase füllt

        Die Zeitachse sortiert

    Verbesserte Fehlerbehandlung für fehlende Werte

    Bessere Handhabung von Phasenwechseln in der Visualisierung

    Konsistente Behandlung der Zeitachse
(D)
2025-07-10 19:52:50 +02:00

243 lines
9.3 KiB
Python

# -*- coding: utf-8 -*-
import os
import csv
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from tkinter import Tk, filedialog, messagebox
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import tkinter as tk
from tkinter import ttk
import numpy as np
class CSVVisualizer:
def __init__(self, root):
self.root = root
self.root.protocol("WM_DELETE_WINDOW", self.cleanup)
self.root.title("ADALM1000 Log Visualizer")
self.root.geometry("1000x700")
# Farben für die Phasen (angepasst an adalm1000_logger.py)
self.phase_colors = {
"Charge": "#4E79A7", # Blau
"Discharge": "#E15759", # Rot
"Resting (Post-Charge)": "#59A14F", # Grün
"Resting (Post-Discharge)": "#EDC948", # Gelb
"Resting Between Cycles": "#B07AA1", # Lila
"Initial Discharge": "#FF9DA7", # Rosa
"Idle": "#CCCCCC" # Grau für inaktive Phasen
}
self.setup_ui()
def setup_ui(self):
"""Erstellt die Benutzeroberfläche"""
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Steuerleiste oben
control_frame = ttk.Frame(main_frame)
control_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Button(control_frame, text="CSV auswählen", command=self.load_csv).pack(side=tk.LEFT, padx=5)
ttk.Button(control_frame, text="Grafik speichern", command=self.save_plot).pack(side=tk.LEFT, padx=5)
# Anzeige des aktuellen Dateipfads
self.file_label = ttk.Label(control_frame, text="Keine Datei ausgewählt")
self.file_label.pack(side=tk.LEFT, padx=10, expand=True)
# Plot-Bereich
self.plot_frame = ttk.Frame(main_frame)
self.plot_frame.pack(fill=tk.BOTH, expand=True)
# Platzhalter für den Plot
self.fig, (self.ax_voltage, self.ax_current) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)
self.canvas = FigureCanvasTkAgg(self.fig, master=self.plot_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# Statusleiste
self.status_var = tk.StringVar()
self.status_var.set("Bereit")
ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN).pack(fill=tk.X, pady=(5, 0))
def load_csv(self):
"""Lädt CSV-Datei mit spezifischer Handhabung für ADALM1000-Logs"""
filepath = filedialog.askopenfilename(
title="ADALM1000 Log-Datei auswählen",
filetypes=[("CSV Files", "*.csv"), ("All Files", "*.*")]
)
if not filepath:
return
# Extrahiere Testparameter aus der Log-Datei
test_params = {}
with open(filepath, 'r') as f:
for line in f:
if line.startswith('# - '):
key, value = line[4:].strip().split(': ', 1)
test_params[key] = value
elif line.startswith('Time(s)'):
break
self.graph_title = "ADALM1000 Battery Test"
if test_params:
self.graph_title = (
f"ADALM1000 Test | "
f"Capacity: {test_params.get('Battery Capacity', 'N/A')} | "
f"Current: {test_params.get('Test Current', 'N/A')}"
)
try:
# Lese nur Datenzeilen (ignoriere Kommentare und leere Zeilen)
skip_rows = 0
with open(filepath, 'r') as f:
for line in f:
if line.startswith('Time(s)'):
break
skip_rows += 1
self.df = pd.read_csv(
filepath,
skiprows=skip_rows,
dtype={
'Time(s)': 'float32',
'Voltage(V)': 'float32',
'Current(A)': 'float32',
'Phase': 'str',
'Discharge_Capacity(Ah)': 'float32',
'Charge_Capacity(Ah)': 'float32',
'Coulomb_Eff(%)': 'float32',
'Cycle': 'int32'
}
)
# Bereinige die Daten
self.clean_data()
self.file_label.config(text=os.path.basename(filepath))
self.status_var.set(f"Daten geladen: {len(self.df)} Messungen")
self.update_plot()
except Exception as e:
messagebox.showerror("Fehler", f"Fehler beim Laden:\n{str(e)}")
self.status_var.set("Fehler beim Laden")
def clean_data(self):
"""Bereinigt die Daten und füllt fehlende Werte"""
# Bereinige Phasen-Namen
self.df['Phase'] = self.df['Phase'].str.strip()
# Entferne Zeilen mit komplett leeren Werten
self.df.dropna(how='all', inplace=True)
# Fülle fehlende Spannung/Strom Werte mit linearen Interpolation
self.df['Voltage(V)'] = self.df['Voltage(V)'].interpolate(method='linear')
self.df['Current(A)'] = self.df['Current(A)'].interpolate(method='linear')
# Fülle fehlende Phasen mit der letzten bekannten Phase
self.df['Phase'].fillna(method='ffill', inplace=True)
# Stelle sicher, dass die Zeit monoton steigend ist
self.df.sort_values('Time(s)', inplace=True)
self.df.reset_index(drop=True, inplace=True)
def update_plot(self):
"""Aktualisiert den Plot mit den aktuellen Daten"""
if not hasattr(self, 'df'):
return
df_clean = self.df.copy()
# Zeit relativ zum Start berechnen
start_time = df_clean["Time(s)"].min()
df_clean["Relative_Time"] = df_clean["Time(s)"] - start_time
# Plots zurücksetzen
self.ax_voltage.clear()
self.ax_current.clear()
# Spannung plotten (oberer Plot)
self.ax_voltage.plot(df_clean["Relative_Time"], df_clean["Voltage(V)"],
label="Spannung (V)", color="black", linewidth=1)
# Strom plotten (unterer Plot)
self.ax_current.plot(df_clean["Relative_Time"], df_clean["Current(A)"],
label="Strom (A)", color="#D76364", linewidth=1)
# Phasen als farbige Hintergründe (für beide Plots)
start_idx = 0
for i in range(1, len(df_clean)):
if df_clean.iloc[i]["Phase"] != df_clean.iloc[i-1]["Phase"] or i == len(df_clean) - 1:
end_idx = i
start_time_rel = df_clean.iloc[start_idx]["Relative_Time"]
end_time_rel = df_clean.iloc[end_idx]["Relative_Time"]
phase = df_clean.iloc[start_idx]["Phase"]
color = self.phase_colors.get(phase, "#CCCCCC")
self.ax_voltage.axvspan(start_time_rel, end_time_rel, facecolor=color, alpha=0.2)
self.ax_current.axvspan(start_time_rel, end_time_rel, facecolor=color, alpha=0.2)
start_idx = i
# Legende erstellen
patches = [mpatches.Patch(color=self.phase_colors[phase], label=phase)
for phase in self.phase_colors if phase in df_clean["Phase"].unique()]
self.ax_voltage.legend(handles=patches, loc="upper right")
self.ax_voltage.set_ylabel("Spannung (V)")
self.ax_current.set_ylabel("Strom (A)")
self.ax_current.set_xlabel("Zeit (s) seit Start")
self.ax_voltage.set_title(self.graph_title)
# Gitternetz für beide Plots
self.ax_voltage.grid(True, alpha=0.3)
self.ax_current.grid(True, alpha=0.3)
# Automatische Skalierung
self.ax_voltage.relim()
self.ax_voltage.autoscale_view()
self.ax_current.relim()
self.ax_current.autoscale_view()
# Canvas aktualisieren
self.fig.tight_layout()
self.canvas.draw()
self.status_var.set("Grafik aktualisiert")
def save_plot(self):
"""Speichert den aktuellen Plot als Bilddatei"""
if not hasattr(self, 'df'):
messagebox.showwarning("Warnung", "Keine Daten zum Speichern vorhanden")
return
filetypes = [
('PNG Image', '*.png'),
('PDF Document', '*.pdf'),
('SVG Vector', '*.svg')
]
default_filename = "adalm1000_plot.png"
filepath = filedialog.asksaveasfilename(
title="Grafik speichern",
initialfile=default_filename,
filetypes=filetypes,
defaultextension=".png"
)
if filepath:
try:
self.fig.savefig(filepath, dpi=300, bbox_inches='tight')
messagebox.showinfo("Erfolg", f"Grafik gespeichert als:\n{filepath}")
except Exception as e:
messagebox.showerror("Fehler", f"Speichern fehlgeschlagen:\n{str(e)}")
def cleanup(self):
"""Aufräumen vor dem Schließen"""
if hasattr(self, 'fig'):
plt.close(self.fig)
self.root.destroy()
if __name__ == "__main__":
root = Tk()
app = CSVVisualizer(root)
root.mainloop()