import pandas as pd #импорт библиотеки pandas from IPython.display import display from tkinter import ttk, filedialog, messagebox from tkinter import * import tkinter as tk import ttkthemes import tkinter.font as tkFont from datetime import datetime from scipy.optimize import fsolve, least_squares from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import matplotlib.pyplot as plt #импорт библиотеки matplotlib import seaborn as sns #импорт библиотеки seaborn import mplcursors as mpl import numpy as np #импорт библиотеки numpy from mpl_interactions import ioff, panhandler import struct import os import pyodbc import psycopg2 from psycopg2 import Error from enum import Enum import math import re from pathlib import Path import csv import random import pyperclip from scipy.stats import kurtosis from openpyxl import Workbook, load_workbook import os import xml.etree.ElementTree as ET from scipy.signal import butter, bessel, filtfilt # ============================================================================= # #Библиотека стиля "киберпанк" для графика # import mplcyberpunk # plt.style.use("cyberpunk") # ============================================================================= #Таблица базы данных (акустические данные) TABLEACS = 'Acoustics' #Количество датчиков NUM_SENSORS = 4 #Количество отображаемых датчиков NUMBER_SENSORS_DISPLAYED = 0 FILE_EXCEL='./experiment_data_spm.xlsx' # Файл с информацией о подшипниках FILE_WITH_BEARINGS = './bearings.csv' #Создать виджеты вывода результатов def create_widgets_output_results(): global result_lb, statistics_txt, state_unit_txt, statistics_tv, statistics_sb, statistics_col_names, fig, ax, chart_canvas #Результат #result_lb = Label(result_frame, justify='left') # Создаем текстовый вид для вывода статистик #statistics_txt = tk.Text(result_frame, height=10, width=50) #statistics_col_names = ['Местоп. дефекта', 'Сила дефекта', 'Макс. ампл./СКО', 'Кол. пик. зона A', 'Кол. пик. зона B', 'Кол. пик. зона C', 'Кол. пик. зона D', 'Кол. пик. зона E' 'Направление(опора)', 'Макс. амплитуда', 'СКО', 'Куртосис', 'Момент 2-го порядка', 'Момент 4-го порядка'] #названия колонок таблицы Treeview statistics_col_names = ['Направление(опора)', 'Частота (Гц)', 'Макс. амплитуда', 'СКО', 'Fo', 'Дефект', 'Степень дефекта', 'Макс. ампл./СКО', 'Куртосис', 'Момент 2', 'Момент 4'] statistics_tv = ttk.Treeview(result_frame, columns=statistics_col_names, show="headings") statistics_sb = ttk.Scrollbar(result_frame, orient="vertical", command=statistics_tv.yview) statistics_tv.configure(yscrollcommand=statistics_sb.set) # Создаем текстовый вид для вывода состояния агрегата state_unit_txt = tk.Text(result_frame, height=16, width=35) #fig, ax = plt.subplots() #Масштабирование with plt.ioff(): fig, ax = plt.subplots() disconnect_zoom = zoom_factory(ax) # Включите прокрутку и панорамирование с помощью MPL # Библиотека взаимодействий функционирует подобно panhandler. #pan_handler = panhandler(fig) #display(fig.canvas) #Область, в которой будет размещаться график chart_canvas = FigureCanvasTkAgg(plt.gcf(), master=charts_frame) #Показать виджеты вывода результатов def show_widgets_output_results(): #result_lb.grid(row = 0, column = 0, sticky='nw') #statistics_txt.grid(row = 0, column = 1, sticky='nw', padx=2) statistics_tv.grid(row = 0, column = 1, sticky='nwne', pady=5, padx=2) statistics_sb.grid(row = 0, column = 1, sticky='nese', pady=5) state_unit_txt.grid(row = 0, column = 0, sticky='nw', padx=2) canvas_widget = chart_canvas.get_tk_widget() canvas_widget.pack(fill=BOTH, expand = True) #Скрыть виджеты вывода результатов def hide_widgets_output_results(): #result_lb.grid_forget() #statistics_txt.grid_forget() statistics_tv.grid_forget() state_unit_txt.grid_forget() #Выбрать Checkbutton def select_сheckbutton(): global one_condition_is_set one_condition_is_set = True if one_condition.get() == 1 else False return one_condition_is_set # ============================================================================= # if javascript.get() == 1: result = f"{result} JavaScript" # if java.get() == 1: result = f"{result} Java" # ============================================================================= #Создать виджеты информации о датчиках def create_widgets_info_sensors(): global time_peak_labels, time_peak_vars, time_peak_entries, coords_labels, coords_x_labels, coords_x_vars, coords_x_entries, coords_y_labels, coords_y_vars, coords_y_entries, load_signals_entries, load_signals_buttons #Названия для "Время задержки (датчик)" time_peak_labels = [] #Значения для "Время задержки (датчик)" time_peak_vars = [] #Ввод для "Время задержки (датчик)" time_peak_entries = [] #Название координат coords_labels = [] #Координаты датчика по X coords_x_labels = [] #Значения координат X coords_x_vars = [] #Вввод координат X coords_x_entries = [] #Координаты датчика по Y coords_y_labels = [] #Значения координат Y coords_y_vars = [] #Вввод координат Y coords_y_entries = [] #Названиея сигналов load_signals_entries = [] #Кнопки загрузки сигналов load_signals_buttons = [] #Счётчик строк counter_row = 0 inform_about_sensor_frame.grid(row = 0, column = 0, columnspan=2, sticky='nwne') speed_sound_metal_lb.grid(row = 1, column = 0, columnspan=2, sticky='nw') speed_sound_metal_ent.grid(row = 2, column = 0, columnspan=2, sticky='nwne') for i in range(NUM_SENSORS): time_peak_lb = Label(control_frame, text=str(f'Время задержки для {i+1} датчика (c)')) time_peak_var = StringVar(value='0') time_peak_ent = ttk.Entry(control_frame, textvariable=time_peak_var) time_peak_lb.grid(row=3 + counter_row, column=0, columnspan=2, sticky='nw') time_peak_ent.grid(row=4 + counter_row, column=0, columnspan=2, sticky='nwne') if i == NUM_SENSORS-1: calculate_sts_values_btn.grid(row = 5 + counter_row, column = 0, columnspan=2, sticky='nwne') load_signals_frame.grid(row = 6 + counter_row, column = 0, columnspan=2, sticky='nwne') #Добавление time_peak_labels.append(time_peak_lb) time_peak_vars.append(time_peak_var) time_peak_entries.append(time_peak_ent) coord_lb = Label(inform_about_sensor_frame, text=f'Координаты {i+1} датчика (м)') coord_x_lb = Label(inform_about_sensor_frame, text='X') coord_x_var = StringVar(value='0') coord_x_ent = ttk.Entry(inform_about_sensor_frame, textvariable=coord_x_var) coord_y_lb = Label(inform_about_sensor_frame, text='Y') coord_y_var = StringVar(value='0') coord_y_ent = ttk.Entry(inform_about_sensor_frame, textvariable=coord_y_var) load_signal_ent = tk.Entry(load_signals_frame) # Создаем кнопку для выбора сигнала (массив) load_signal_btn = ttk.Button(load_signals_frame, text=f'Загрузить сигнал {i+1}', command=lambda i=i: load_signal_data_file(i)) coord_lb.grid(row = 0 + counter_row, column = 0, columnspan=3, sticky='nw') coord_x_lb.grid(row = 1 + counter_row, column = 0, sticky='ne') coord_x_ent.grid(row = 1 + counter_row, column = 1, sticky='nw') coord_y_lb.grid(row = 1 + counter_row, column = 2, sticky='ne') coord_y_ent.grid(row = 1 + counter_row, column = 3) load_signal_ent.grid(row = NUM_SENSORS * 2 * (counter_row + 1), column = 0, sticky='nw', pady=5) load_signal_btn.grid(row = NUM_SENSORS * 2 * (counter_row + 1), column = 1, sticky='nw', pady=5) coords_labels.append(coord_lb) coords_x_labels.append(coord_x_lb) coords_x_vars.append(coord_x_var) coords_x_entries.append(coord_x_ent) coords_y_labels.append(coord_y_lb) coords_y_vars.append(coord_y_var) coords_y_entries.append(coord_y_ent) load_signals_entries.append(load_signal_ent) load_signals_buttons.append(load_signal_btn) counter_row += 2 #Удалить виджеты информации о датчиках def delete_widgets_info_sensors(): for label in time_peak_labels: label.destroy() for entry in time_peak_entries: entry.destroy() # Очистить переменные for var in time_peak_vars: var.set('') # Очистить списки time_peak_labels.clear() time_peak_entries.clear() time_peak_vars.clear() for label in coords_labels: label.destroy() for label in coords_x_labels: label.destroy() for var in coords_x_vars: var.set('') for entry in coords_x_entries: entry.destroy() for label in coords_y_labels: label.destroy() for var in coords_y_vars: var.set('') for entry in coords_y_entries: entry.destroy() for entry in load_signals_entries: entry.destroy() for entry in load_signals_buttons: entry.destroy() coords_labels.clear() coords_x_labels.clear() coords_x_vars.clear() coords_x_entries.clear() coords_y_labels.clear() coords_y_vars.clear() coords_y_entries.clear() load_signals_entries.clear() load_signals_buttons.clear() #Создать виджеты для ввода данных def create_widgets_for_input(): global speed_sound_metal_lb, speed_sound_metal_var, speed_sound_metal_ent, calculate_sts_values_btn, \ duration_signal_lb, duration_signal_var, duration_signal_ent,var_sko_lb, var_sko_var, var_sko_ent, value_thinning_lb, value_thinning_var, value_thinning_ent, date_col_names, datetime_tv,signal_col_names, \ datetime_sb, signal_tv, signal_sb, update_bd_btn, update_btn, one_condition, one_condition_ckbtn, num_revs_lb, num_revs_var, num_revs_ent speed_sound_metal_lb = Label(control_frame, text='Скорость распространения звука в металле (м/c)') speed_sound_metal_var = StringVar(value='0')#v = s/t = 0.87/0.00044 = 1977.27 calculate_sts_values_btn = ttk.Button(control_frame, text="Рассчитать значения", command=lambda:find_points(СalculationStatus.speed_and_delay.value)) # ============================================================================= # modes_work = ["1. Начальные значения", "2. Расчёт скорости", "3. Определение координат"] # # mode_work_lb = Label(control_frame, text='Режим') # # по умолчанию будет выбран первый элемент из mode_lb # mode_work_var = StringVar(value=modes_work[0]) # #Combobox # modes_work_cb = ttk.Combobox(control_frame, textvariable=mode_work_var, values=modes_work, state="readonly") # #прикрепить функцию обработки к событию <> с помощью метода bind # modes_work_cb.bind("<>", on_switch_mode_work) # ============================================================================= #только цифры # ============================================================================= # vcmd = (control_frame.register(validate_input), '%P') # speed_sound_metal_ent = ttk.Entry(control_frame, textvariable=speed_sound_metal_var, validate="key", validatecommand=vcmd) # ============================================================================= speed_sound_metal_ent = ttk.Entry(control_frame, textvariable=speed_sound_metal_var) duration_signal_lb = Label(control_spm_frame, text='Длительность сигнала (с)') duration_signal_var = StringVar() duration_signal_ent = tk.Entry(control_spm_frame,textvariable=duration_signal_var) var_sko_lb = Label(control_spm_frame, text='n*СКО') var_sko_var = StringVar(value=1) var_sko_ent = tk.Entry(control_spm_frame, textvariable=var_sko_var) date_col_names = ['Дата'] datetime_tv = ttk.Treeview(control_spm_frame, columns=date_col_names, show="headings") #datetime_tv.column(date_col_names[0], width=120) datetime_sb = ttk.Scrollbar(control_spm_frame, orient="vertical", command=datetime_tv.yview) datetime_tv.configure(yscrollcommand=datetime_sb.set) signal_col_names = ['Направление', 'Опора', 'Агрегат', 'Комментарий', 'Id'] signal_tv = ttk.Treeview(control_spm_frame, columns= signal_col_names, show="headings") signal_tv.column(signal_col_names[0], width=40) signal_tv.column(signal_col_names[1], width=20) signal_tv.column(signal_col_names[2], width=20) signal_tv.column(signal_col_names[3], width=40) #Скрытая колонка signal_tv.column(signal_col_names[4], stretch=NO, minwidth=0, width=0) signal_sb = ttk.Scrollbar(control_spm_frame, orient="vertical", command=signal_tv.yview) signal_tv.configure(yscrollcommand=signal_sb.set) #Кнопка обновления update_btn = ttk.Button(control_spm_frame, text="Рассчитать", command=lambda:find_points(СalculationStatus.statistical_data.value)) one_condition = IntVar(value=0) one_condition_ckbtn = ttk.Checkbutton(control_spm_frame, text="1 усл. поиска пика", variable=one_condition, command=select_сheckbutton) value_thinning_lb = Label(control_spm_frame, text='Прореживание') value_thinning_var = StringVar(value = 0) value_thinning_ent = tk.Entry(control_spm_frame,textvariable=value_thinning_var) num_revs_lb = Label(control_spm_frame, text='Кол. оборотов') num_revs_var = StringVar(value = 2) num_revs_ent = tk.Entry(control_spm_frame,textvariable=num_revs_var) # Параметры настроек params = load_settings() #Кнопка обновления значений в таблицах (дата сигнал) update_bd_btn = ttk.Button(control_spm_frame, text="Обновить", command=connect_to_db_mssql if params["dbms"].get() == DbmsNames.mssql.value or params["dbms"].get() == str() else connect_to_db_postgresql) #Показать виджеты для ввода данных def show_widgets_for_input(): # ============================================================================= # mode_work_lb.pack(anchor=NW) # modes_work_cb.pack(anchor=NW,fill=X) # ============================================================================= # ============================================================================= # speed_sound_metal_lb.pack(anchor=NW) # speed_sound_metal_ent.pack(anchor=NW,fill=X) # # ============================================================================= #speed_sound_metal_lb.grid(row = 0, column = 0, columnspan=2, sticky='nw') #speed_sound_metal_ent.grid(row = 1, column = 0, sticky='nwne') #calculate_sts_values_btn.grid(row = 1, column = 1, sticky='nwne') #Задать положение контейнера #inform_about_sensor_frame.pack(anchor=NW) inform_about_sensor_frame.grid(row = 8, column = 0, columnspan=2, sticky='nwne') load_signals_frame.grid(row = 9, column = 0, columnspan=2, sticky='nwne') one_condition_ckbtn.grid(row = 0, column = 2, sticky='nw', pady=5, padx=5) update_btn.grid(row = 1, column = 2, sticky='ne', pady=5) duration_signal_lb.grid(row = 0, column = 0, sticky='nw') duration_signal_ent.grid(row = 1, column = 0, sticky='nw') num_revs_lb.grid(row = 0, column = 1, sticky='nw') num_revs_ent.grid(row = 1, column = 1, sticky='nw') var_sko_lb.grid(row = 2, column = 0, sticky='nw') var_sko_ent.grid(row = 3, column = 0, sticky='nw', pady=5) value_thinning_lb.grid(row = 2, column = 1, sticky='nw') value_thinning_ent.grid(row = 3, column = 1, sticky='nw', pady=5) datetime_tv.grid(row = 4, column = 0, sticky='nw', pady=5) datetime_sb.grid(row = 4, column = 0, sticky='nese', pady=5) signal_tv.grid(row = 4, column = 1, sticky='nwne', columnspan=2, pady=5, padx=5) signal_sb.grid(row = 4, column = 1, sticky='nese', columnspan=2, pady=5) update_bd_btn.grid(row = 5, column = 2, sticky='se', pady=5, padx=5) # ============================================================================= # #Переключить режим работы # def on_switch_mode_work(event): # test # # ============================================================================= # Функция, задающая систему уравнений для метода Ньютона def equations(xy): x, y = xy residuals_list = [] for i in range(NUM_SENSORS): eq = (x_coords[i] - x)**2 + (y_coords[i] - y)**2 - dists[i]**2 residuals_list.append(eq) return residuals_list # Функция, задающая систему уравнений для метода наименьших квадратов def residual(xy): x, y = xy residuals_list = [] for i in range(NUM_SENSORS): eq = (x_coords[i] - x)**2 + (y_coords[i] - y)**2 - dists[i]**2 residuals_list.append(eq) return residuals_list #1 вариант без цикла # ============================================================================= # x, y = xy # eq1 = (x1 - x)**2 + (y1 - y)**2 - d1**2 # eq2 = (x2 - x)**2 + (y2 - y)**2 - d2**2 # eq3 = (x3 - x)**2 + (y3 - y)**2 - d3**2 # return [eq1, eq2, eq3] # # ============================================================================= #Проверить заполнение полей def get_filling_fields(): if not 'coords_x_entries' in globals(): messagebox.showwarning('Предупреждение','Отсутствуют необходимые компоненты для расчёта!') return False if speed_sound_metal_ent.get() == str() or any(time_peak_ent.get() == str() for time_peak_ent in time_peak_entries) \ or any(coord_x_ent.get() == str() for coord_x_ent in coords_x_entries) or any(coord_y_ent.get() == str() for coord_y_ent in coords_y_entries) or any(load_signal_ent.get() == str() for load_signal_ent in load_signals_entries): messagebox.showwarning('Предупреждение','Заполните все поля для нахождения дефекта!') return True else: return False #нигде не используется #расчёт скорости def calculate_speed(): #Проверить заполнение полей fields_filled = get_filling_fields() if fields_filled or not 'coords_x_entries' in globals(): return first_x = float(coords_x_entries[NumberingSensors.first.value].get()) first_y = float(coords_y_entries[NumberingSensors.first.value].get()) second_x = float(coords_x_entries[NumberingSensors.second.value].get()) second_y = float(coords_y_entries[NumberingSensors.second.value].get()) time_peak_ax = float(time_peak_entries[NumberingSensors.first.value].get()) if float(time_peak_entries[NumberingSensors.first.value].get()) != 0 or float(time_peak_entries[NumberingSensors.first.value].get()) != None else float(time_peak_entries[NumberingSensors.second.value].get()) #Расстояние между первым и вторым датчиком first_distance = np.sqrt(pow((first_x - second_x),2) + pow((first_y - second_y),2)) #Скорость speeds = first_distance/time_peak_ax speed_sound_metal_var.set(speeds) #Нахождение точек в декартовой системе координат def find_points(сalculation_status): global dists, x_coords, y_coords, name_points, coords_defect, x_center, y_center #Проверить заполнение полей fields_filled = get_filling_fields() if fields_filled or not 'coords_x_entries' in globals(): return #Скорость звука в металле speed_sound_metal_var = float(speed_sound_metal_ent.get()) #Расстояния dists = [] #Массив точек x_coords = [] y_coords = [] state_unit_txt.delete('1.0', 'end') # Очищаем предыдущий вывод state_unit_txt.insert('end', "Точки местонахождения датчиков\n") for i in range(NUM_SENSORS): #Добавление расстояний dists.append(speed_sound_metal_var * float(time_peak_entries[i].get())) #Добавление координат x_coords.append(float(coords_x_entries[i].get())) y_coords.append(float(coords_y_entries[i].get())) state_unit_txt.insert('end', f"{i+1} датчик: (x{i+1}: {round(x_coords[i], 7)}, y{i+1}: {round(y_coords[i], 7)})\n") # Начальное предположение для метода наименьших квадратов initial_guess = np.array([0, 0]) # Решение методом наименьших квадратов #Координаты дефекта coords_defect = least_squares(residual, initial_guess) state_unit_txt.insert('end', "Решение методом наименьших квадратов (м)\n") state_unit_txt.insert('end', f"X: [x: {round(coords_defect.x[0], 7)} y: {round(coords_defect.x[1], 7)}]\n") # ============================================================================= # # Решение методом Ньютона # solution = fsolve(equations, [0, 0]) # print("Решение методом Ньютона:", solution) # result_lb['text'] += f"Решение методом Ньютона: {solution}" # ============================================================================= if сalculation_status == СalculationStatus.statistical_data.value: #Обновить график update_chart() #Показать виджеты вывода результатов show_widgets_output_results() if duration_signal_ent.get() != str() and duration_signal_ent.get() != str() and var_sko_ent.get() != str(): try: calculate_statistical_data(сalculation_status) except Exception as e: messagebox.showerror('Ошибка', f'Ошибка при расчёте: {e}') #calculate_statistical_data(сalculation_status) else: messagebox.showwarning('Предупреждение','Заполните все поля для расчёта статистик!') return # ============================================================================= # def update_statistics(): # #Размер массива # array_size = array_signal.size # # Вычисления статистик # mean = np.mean(array_signal) # moment2 = np.mean((array_signal - mean) ** 2) # moment3 = np.mean((array_signal - mean) ** 3) # #moment4 = np.mean((array_signal - mean) ** 4) # std_dev = np.std(array_signal) # #kurtosis_value = kurtosis(array_signal) # skewness = moment3 / (std_dev ** 3) # rms_value = np.sqrt(np.mean(np.square(array_signal))) # # #до выбросов # # ============================================================================= # # # Указание размера среза # # #горизонтальный # # #desired_size = 23586 # # #осевой # # #desired_size = 23592 # # #вертикальный # # #desired_size = 23560 # # # # #до выбросов # # # Обрезка массива начиная с начала и указав нужный размер # # trimmed_array = array_signal [:desired_size] # # # # #Сумма масcива # # array_sum = trimmed_array.sum() # # #Размер массива # # array_size = trimmed_array.size # # #Среднее аврифметическое # # x_mean = (1/array_size) * array_sum # # # Вычисление суммы (xi - x_)^2 # # sum_squared_diff = np.sum((trimmed_array - x_mean) ** 2) # # #Стандартное отклонение # # standard_deviation = sum_squared_diff/array_size # # # # #СКЗ # # rms_value_ = np.sqrt(np.mean(np.square(trimmed_array))) # # ============================================================================= # # #Куртозис # # Вычисление суммы (xi - x_) # sum_one = np.sum((array_signal - mean)/array_size) # kurtosis_value = sum_one/std_dev ** 4 # # # # Текст лейблов # statistics_txt.delete('1.0', 'end') # Очищаем предыдущий вывод # statistics_txt.insert('end', f"Выбран файл: {os.path.basename(file_path)}\n\n") # statistics_txt.insert('end', f"Среднее значение: {mean:.20f}\n") # statistics_txt.insert('end', f"Момент 2-го порядка (дисперсия): {moment2:.3f}\n") # #statistics_txt.insert('end', f"Момент 3-го порядка: {moment3:.3f}\n") # #statistics_txt.insert('end', f"Момент 4-го порядка: {moment4:.3f}\n") # statistics_txt.insert('end', f"Эксцесс (куртосис): {kurtosis_value:.20f}\n") # statistics_txt.insert('end', f"Асимметрия: {skewness:.3f}\n") # statistics_txt.insert('end', f"СКЗ (RMS) виброскорости: {rms_value:.3f}\n") # statistics_txt.insert('end', f"СКО: {std_dev:.3f}\n") # # #Количество отсчётов # num_counts = 100 # #Нахождение первого пика # for i in range(array_size): # #Последний индекс массива # last_index_array = i+1 # #Массив от начала до индекса last_index_array # temp_array = array_signal[:last_index_array] # #Расчёт СКЗ # rms_value_sel = np.sqrt(np.mean(np.square(temp_array))) # #Предельное значение # limit_value = rms_value_sel * 3 # #Последнее значение пика в массиве # temp_value_peak = temp_array[-1] # #Если СКЗ меньше последнего значения в массиве # if limit_value < abs(temp_value_peak): # #Массив от последнего найденного индекса по условию limit_value < abs(temp_value_peak ) до выбранного количества отсчётов # array_search_peaks = array_signal[last_index_array:last_index_array + num_counts] # if array_search_peaks.size >= num_counts: # break_outer_loop = False # for j in range(num_counts): # #Если значение в массиве после значения последнего пика больше # if abs(array_search_peaks[j]) > abs(temp_value_peak/1.2): # break_outer_loop = True # # прерываем внутренний цикл # break # if break_outer_loop: # # прерываем внешний цикл # break # # # print(temp_array.size) # print(temp_array) # print(temp_value_peak) # # # # # #Показать виджеты вывода результатов # show_widgets_output_results() # # #Загрузка сигнала # def load_signal_data(): # global array_signal, file_path # # Выбор файла и чтение значений # file_path = filedialog.askopenfilename() # with open(file_path, 'r', encoding='utf-8') as file: # array_signal = np.array([float(line.strip()) for line in file]) # # update_statistics() # ============================================================================= #Получить единицы измерения выбранного сигнала def get_units_measurement(): match units: case AcsUnits.mv.value: return "мВ" case AcsUnits.accel.value: return "Ускорение (мм/с2)" case AcsUnits.speed.value: return "Скорость (мм/с)" case AcsUnits.mm.value: return "Перемещение (мм)" case AcsUnits.mkm.value: return "Перемещение (мкм)" case AcsUnits.imp.value: return "Импульсы" case AcsUnits.misc.value: return "Доп.единицы" case AcsUnits.accel_g.value: return "g" case AcsUnits.voltage.value: return "Вольт (B)" case AcsUnits.current.value: return "Ампер (A)" case AcsUnits.power.value: return "Ватт (Вт)" case AcsUnits.tempr_c.value: return "Цельсия (C)" case AcsUnits.tempr_f.value: return "Фаренгейт (F)" case AcsUnits.bear_failure.value: return "Дефект подш." case AcsUnits.shift_mm.value: return "Перемещение (мил)" case AcsUnits.freq_hz.value: return "Fo (Гц)" case AcsUnits.phase.value: return "Фаза на Fo (град.)" case AcsUnits.accel_percent.value: return "Ускорение (%)" case AcsUnits.speed_percent.value: return "Скорость (%)" case AcsUnits.shift_percent.value: return "Перемещение (%)" case AcsUnits.freq_rpm.value: return "Fo (Об/мин)" case _: return "Неизвестные единицы" class AcsUnits(Enum): mv = 0 # мВ accel = 1 # ускорение (мм/с2) speed = 2 # скорость (мм/с) mm = 3 # перемещение (мм) mkm = 4 # перемещение (мкм) imp = 5 # энергетические импульсы (ЕИ -Диамех) misc = 6 # дополнительные единицы accel_g = 7 # g voltage = 8 # вольт (B) current = 9 # ампер (A) power = 10 # ватт (Вт) tempr_c = 11 # Цельсия (C) tempr_f = 12 # Фаренгейт (F) bear_failure = 13 # Дефект подшипника shift_mm = 14 # перемещение (мил) freq_hz = 16 # Оборотная частота (Гц) phase = 17 # Фаза на оборотной частоте (град.) accel_percent = 18 # ускорение (%) speed_percent = 19 # скорость (%) shift_percent = 20 # перемещение (%) freq_rpm = 24 # Оборотная частота (Об/мин) #Направления измерений class MeasurementDirections(Enum): axial = 0 vertical = 1 horizontal = 2 vertical_two = 3 #Части подшипника class BearingParts(Enum): outer = 0 inner = 1 rolls = 2 separator = 3 #Зоны пиков class PeakZones(Enum): zone_a = 0 zone_b = 1 zone_c = 2 zone_d = 3 zone_e = 4 zone_x = 5 #Cостояния статистик class StatisticalStates(Enum): original = 'Исходное' final = 'Конечное' #Нумерация датчиков class NumberingSensors(Enum): first = 0 second = 1 third = 2 #Начальные состояния подшипника class InitialConditionsBearing(Enum): unknown = 'Неизвестно' known = 'Известно' #Статусы расчёта class СalculationStatus(Enum): speed_and_delay = 0 statistical_data = 1 #Названия СУБД class DbmsNames(Enum): mssql = "Microsoft SQL" postgresql = "PostgreSQL" #Типы фильтрации class TypesFilter(Enum): butterworth = "Баттерворта" bessel = "Бесселя" #Режимы фильтрации class ModesFilter(Enum): low_freq = "Нижних частот" high_freq = "Верхних частот" bandpass = "Полосовой" cutting = "Режекторный" #Наборы фильтрации class SetsFilter(Enum): center_freq_width = "Частоту центра и ширину" boundary_freq = "Частоты границ" #Состояние фильтрации сигнала class StateFilter(Enum): on = "Включить" off = "Выключить" #Состояние местоположения дефекта class StateLocationDefect(Enum): on = "Включить" off = "Выключить" #Режимы отображения зон class ZoneDisplayModes(Enum): on = "Включить" off = "Выключить" #Загрузка сигнала из файла def load_signal_data_file(i): file_path = filedialog.askopenfilename() if file_path: try: signal_values = np.loadtxt(file_path) arrays[f'arraysign{i}'] = signal_values load_signals_entries[i].delete(0, tk.END) load_signals_entries[i].insert(tk.END, f"{i+1} датчик (файл)") # ============================================================================= # if MeasurementDirections.axial.value == i: # load_signal_ax_ent.delete(0, tk.END) # load_signal_ax_ent.insert(tk.END, os.path.basename(file_path)) # if MeasurementDirections.vertical.value == i: # load_signal_vert_ent.delete(0, tk.END) # load_signal_vert_ent.insert(tk.END, os.path.basename(file_path)) # if MeasurementDirections.horizontal.value == i: # load_signal_hor_ent.delete(0, tk.END) # load_signal_hor_ent.insert(tk.END, os.path.basename(file_path)) # if MeasurementDirections.vertical_two.value == i: # load_signal_vert_two_ent.delete(0, tk.END) # load_signal_vert_two_ent.insert(tk.END, os.path.basename(file_path)) # ============================================================================= #calculate_button.configure(state='normal') except FileNotFoundError: print("Error: File not found!") except ValueError: print("Error: Invalid file format!") # ============================================================================= # #Чудо алгоритм по поиску первого пика в сигнале # def find_first_peak(signal, sok): # n = float(var_sko_ent.get()) * sok # if signal is not None: # # Проходим по сигналу и ищем элемент, который больше или равен 4 * sok # for i in range(len(signal)): # if signal[i] >= n: # # Проверяем, что в следующих 5 элементах есть хотя бы один элемент, больший или равный 4 * sok # #if any(signal[j] >= n for j in range(i+1, min(i+6, len(signal)))): # return i # #def find_first_peak(signal, sok): # #n = float(var_sko_ent.get()) * sok # #if signal is not None: # # Проходим по сигналу и ищем элемент, который больше или равен 4 * sok # #for i in range(len(signal)): # #if signal[i] >= n: # # Проверяем, что в следующих 5 элементах есть хотя бы один элемент, больший или равный 4 * sok # #if any(signal[j] >= n for j in range(i+1, min(i+6, len(signal)))): # #return i # # ============================================================================= #поиск первого пика в сигнале (1 вариант) def find_first_peak(signal, sok): """Поиск первого пика в сигнале. Аргументы: signal : 1D-массив чисел. Сигнал для поиска пика. sok : число. Стандартное отклонение сигнала. Возвращает: int. Индекс первого пика в сигнале, если он существует, иначе None. """ # Вычисляем пороговое значение для поиска первого пика n = abs(float(var_sko_ent.get()) * sok) # Проверяем, что сигнал задан if signal is not None: # Проходим по сигналу и ищем элемент, который больше или равен пороговому значению for i in range(len(signal)): if abs(signal[i]) >= n: #Если стоит галочка на одно условие if select_сheckbutton() == True: # Если условие выполнено, возвращаем индекс первого пика return i # Проверяем, что в следующих 5 элементах есть хотя бы два элемента, большие или равны 4 * sok if sum(abs(signal[j]) >= 3*sok for j in range(i+1, min(i+6, len(signal)))) >= 2: # Если условие выполнено, возвращаем индекс первого пика return i #поиск первого пика в сигнале (2 вариант) # ============================================================================= # def find_first_peak(signal, maxampl): # # Проверяем, что сигнал задан # if signal is not None: # # Проходим по сигналу и ищем элемент, который больше или равен пороговому значению # for i in range(len(signal)): # if abs(signal[i]) >= maxampl: # return i # ============================================================================= # ============================================================================= # def compare_peak_delays(): # delays = [] # for i in range(3): # for j in range(i+1, 3): # signal_i = arrays[f'arraysign{i}'] # signal_j = arrays[f'arraysign{j}'] # if signal_i is not None and signal_j is not None: # peak_i = find_first_peak(signal_i, np.std(signal_i, ddof=0)) # peak_j = find_first_peak(signal_j, np.std(signal_j, ddof=0)) # if peak_i is not None and peak_j is not None: # sampling_rate = 1 / float(duration_signal_ent.get()) # peak_i_time = peak_i / len(signal_i) * sampling_rate # peak_j_time = peak_j / len(signal_j) * sampling_rate # delay = abs(peak_i_time - peak_j_time) # delays.append((f'Сигнал # {i+1} - Сигнал # {j+1}', delay)) # return delays # ============================================================================= #Расчёт статистических данных def calculate_statistical_data(сalculation_status): global first_peak_time #Загрузка параметров params = load_settings() first_peak_time = 0 new_data = [] first_peak_time_arr= [] #result_text.delete('1.0', tk.END) # Текст лейблов #statistics_txt.delete('1.0', 'end') # Очищаем предыдущий вывод for i in range(NUM_SENSORS): signal = arrays[f'arraysign{i}'] #Фильтруем сигнал, если фильтрация включена if params["on_off_filtering"].get() == StateFilter.on.value: try: #Фильтровать сигнал signal = get_filter_signal(sample_rate, signal) except Exception as e: messagebox.showwarning("Предупреждение", f"Ошибка фильтрации сигнала: {e}") return if signal is not None: mean = np.mean(signal) #skz = np.mean(signal) #skz = np.sqrt(np.mean(np.square(signal))) sok = np.std(signal, ddof=0) #moment2 = np.sqrt(np.mean(signal**2)) moment2 = np.var(signal) moment4 = np.mean((signal - mean) ** 4) kurtosis = np.mean(signal**4) / np.mean(signal**2)**2 first_peak_idx = find_first_peak(signal, sok) #максимальная амплитуда maxampl = max(signal, key=abs) #Сила дефекта strength_defect = round(abs(maxampl)/sok, 3) #first_peak_idx = find_first_peak(signal, maxampl) if first_peak_idx is not None: sampling_rate = 1 / float(duration_signal_ent.get()) first_peak_time = (first_peak_idx / len(signal) * sampling_rate) * 1000 first_peak_time_arr.append(first_peak_time) # ============================================================================= # #result = f'Signal #{i+1}: СКЗ = {skz:.7f}, СКО = {sok:.7f}, Момент-2 = {moment2:.7f}, Куртосис = {kurtosis:.7f}, Время первого пика = {first_peak_time:.7f}\n' # statistics_txt.insert('end', f"Макс. Амплитуда: {maxampl:.3f}\n\n") # statistics_txt.insert('end', f"Направление: {name_points[i]}\n\n") # statistics_txt.insert('end', f"Время первого пика: {first_peak_time:.6f} мс.\n") # statistics_txt.insert('end', f"Момент 2-го порядка (дисперсия): {moment2:.4f}\n") # statistics_txt.insert('end', f"Момент 4-го порядка: {moment4:.4f}\n") # statistics_txt.insert('end', f"Эксцесс (куртосис): {kurtosis:.4f}\n") # #statistics_txt.insert('end', f"СКЗ (RMS) виброскорости: {skz:.4f}\n") # statistics_txt.insert('end', f"СКО: {sok:.3f}\n\n") # ============================================================================= # Данные для добавления в таблицу #new_data = [[datetime.now(), datetime.now(), name_points[i], 1886.88, -4.42, skz, sok, kurtosis, moment2, "0.44 мкс"]] # Данные для добавления в таблицу new_data.append([selected_date, name_points[i], round(first_peak_time,8), maxampl, round(sok,5), strength_defect, round(kurtosis,5), round(moment2, 5), round(moment4,5), x_coords[i], y_coords[i]]) else: # ============================================================================= # statistics_txt.insert('end', f"Макс. Амплитуда: {maxampl:.3f}\n\n") # statistics_txt.insert('end', f"Направление: {name_points[i]}\n\n") # statistics_txt.insert('end', f"Время первого пика: {first_peak_time:.6f} мс.\n") # statistics_txt.insert('end', f"Момент 2-го порядка (дисперсия): {moment2:.4f}\n") # statistics_txt.insert('end', f"Момент 4-го порядка: {moment4:.4f}\n") # statistics_txt.insert('end', f"Эксцесс (куртосис): {kurtosis:.4f}\n") # #statistics_txt.insert('end', f"СКЗ (RMS) виброскорости: {skz:.4f}\n") # statistics_txt.insert('end', f"СКО: {sok:.3f}\n\n") # ============================================================================= # Данные для добавления в таблицу new_data.append([selected_date, name_points[i], round(first_peak_time,8), maxampl,round(sok,5), strength_defect, round(kurtosis,5), round(moment2, 5), round(moment4,5), x_coords[i], y_coords[i]]) #result = f'Signal #{i+1}: СКЗ = {skz:.7f}, СКО = {sok:.7f}, Момент-2 = {moment2:.7f}, Куртосис = {kurtosis:.7f}, Время первого пика не найдено\n' #result_text.insert(tk.END, result) #Минимальное значение из массива пиков, чтобы определить какой датчик принимает удар первым min_value = min(first_peak_time_arr) #Индекс минимального значения index_min_value = first_peak_time_arr.index(min_value) #Задержки delays = [None] * NUM_SENSORS #Скорости speeds = [None] * NUM_SENSORS #удар возле датчика impact_sensor = [None] * NUM_SENSORS #расстояние от вертикального до осевого (гипотенуза) #distance_from_vert_ax = np.sqrt(pow(float(coordinates_first_x_ent.get()),2) + pow(float(coordinates_second_y_ent.get()),2)) #Массив с координатами X #coordinates_x_arr = [float(coordinates_first_x_ent.get()), float(coordinates_second_x_ent.get()),float(coordinates_third_x_ent.get()), float(coordinates_fourth_x_ent.get())] #Сумма скоростей sum_speeds = 0 #Расчёт скорости и задержки for i in range(NUM_SENSORS): if i != index_min_value: #Задержка delays[i] = first_peak_time_arr[i] - first_peak_time_arr[index_min_value] #Расстояние distance = np.sqrt(pow((x_coords[i] - x_coords[index_min_value]),2) + pow((y_coords[i] - y_coords[index_min_value]),2)) #Скорость speeds[i] = distance/(delays[i]/1000) #Обновление задержек в поле "Время задержки" time_peak_vars[i].set(delays[i]/1000) sum_speeds += speeds[i] else: impact_sensor[i] = 'Удар' #Обновление задержек в поле "Время задержки" time_peak_vars[i].set(0) if сalculation_status == СalculationStatus.speed_and_delay.value: #Обновление скорости в поле speed_sound_metal_var.set(sum_speeds/(NUM_SENSORS-1)) # ============================================================================= # #Скорость, до осевого # speed_sound_metal_var.set(speeds[0] if speeds[0] != None else speeds[1]) # ============================================================================= # ============================================================================= # # if axial_impact.get() == 1: # delays[1] = first_peak_time_arr[1] - first_peak_time_arr[0] # delays[2] = first_peak_time_arr[2] - first_peak_time_arr[0] # delays[3] = first_peak_time_arr[3] - first_peak_time_arr[0] # # # ============================================================================= # # speeds[1] = distance_from_vert_ax/(delays[1]/1000) # # speeds[2] = float(coordinates_second_y_ent.get())/(delays[2]/1000) # # ============================================================================= # # speeds[1] = float(coordinates_second_x_ent.get())/(delays[1]/1000) # speeds[2] = float(coordinates_third_x_ent.get())/(delays[2]/1000) # speeds[3] = float(coordinates_fourth_x_ent.get())/(delays[3]/1000) # # impact_sensor[0] = 'Удар' # if vert_impact.get() == 1: # delays[0] = first_peak_time_arr[0] - first_peak_time_arr[1] # delays[2] = first_peak_time_arr[2] - first_peak_time_arr[1] # delays[3] = first_peak_time_arr[3] - first_peak_time_arr[1] # # # ============================================================================= # # speeds[0] = distance_from_vert_ax/(delays[0]/1000) # # speeds[2] = float(coordinates_first_x_ent.get())/(delays[2]/1000) # # ============================================================================= # # speeds[0] = float(coordinates_first_x_ent.get())/(delays[0]/1000) # speeds[2] = float(coordinates_third_x_ent.get())/(delays[2]/1000) # speeds[3] = float(coordinates_fourth_x_ent.get())/(delays[3]/1000) # # impact_sensor[1] = 'Удар' # if hor_impact.get() == 1: # delays[0] = first_peak_time_arr[0] - first_peak_time_arr[2] # delays[1] = first_peak_time_arr[1] - first_peak_time_arr[2] # delays[3] = first_peak_time_arr[3] - first_peak_time_arr[2] # # # ============================================================================= # # speeds[0] = distance_from_vert_ax/(delays[0]/1000) # # speeds[1] = float(coordinates_second_y_ent.get())/(delays[1]/1000) # # ============================================================================= # # speeds[0] = float(coordinates_first_x_ent.get())/(delays[0]/1000) # speeds[1] = float(coordinates_second_x_ent.get())/(delays[1]/1000) # speeds[3] = float(coordinates_fourth_x_ent.get())/(delays[3]/1000) # # impact_sensor[2] = 'Удар' # if hor_impact_two.get() == 1: # delays[0] = first_peak_time_arr[0] - first_peak_time_arr[3] # delays[1] = first_peak_time_arr[1] - first_peak_time_arr[3] # delays[2] = first_peak_time_arr[2] - first_peak_time_arr[3] # # # ============================================================================= # # speeds[0] = distance_from_vert_ax/(delays[0]/1000) # # speeds[1] = float(coordinates_second_y_ent.get())/(delays[1]/1000) # # ============================================================================= # # speeds[0] = float(coordinates_first_x_ent.get())/(delays[0]/1000) # speeds[1] = float(coordinates_second_x_ent.get())/(delays[1]/1000) # speeds[2] = float(coordinates_third_x_ent.get())/(delays[2]/1000) # # impact_sensor[3] = 'Удар' # # #Скорость, до осевого # speed_sound_metal_var.set(speeds[0]) # #Задержки, если удар по 4 датчику (2. Вертикальный) # ============================================================================= for i in range(len(new_data)): #Добавление новых значений (столбцов) new_data[i].extend([delays[i], speeds[i], impact_sensor[i]]) # ============================================================================= # delays = compare_peak_delays() # for delay in delays: # statistics_txt.insert('end', f'Задержка между {delay[0]}: {delay[1]:.7f} сек.\n') # ============================================================================= if сalculation_status == СalculationStatus.statistical_data.value: #Сохранение в Excel save_excel_file(new_data) #Создать файл Excel def сreate_file_excel(): wb = Workbook() ws = wb.active headers = ["Дата и время", "Датчик", "Точка начала первого пика, в мс.", "Макс. амплитуда сигнала", "СКО", "Макс. ампл./СКО", "Куртосис", "Момент 2", "Момент 4", "X (м)", "Y (м)", "Задержка (мс)", "Скорость (м/c)", "Рядом с датчиком"] for col_num, header in enumerate(headers, 1): ws.cell(row=1, column=col_num, value=header) #Сохранение в Excel def save_excel_file(new_data): # Путь к файлу Excel file_path = "experiment_data.xlsx" # Если файл не существует, создаем новый файл Excel if not os.path.exists(file_path): #Создать файл Excel сreate_file_excel() else: # Если файл существует, открываем его wb = load_workbook(file_path) ws = wb.active # Определяем номер эксперимента #exp_number = (ws.max_row // NUM_SENSORS) + 1 # Поскольку на каждый эксперимент приходится 4 строки #ws.append([f"Номер эксперимента №{exp_number} (поиск пиков при СКО*{float(var_sko_ent.get())})"]) # Получаем даты из первой колонки dates = [ws.cell(row=row, column=1).value for row in range(2, ws.max_row+1)] # Определяем номер эксперимента numbers = [num.split("№")[1].split()[0] for num in dates if "№" in num] # Преобразование извлеченных чисел в целые числа valid_numbers = [int(num) for num in numbers if num.isdigit()] # Вычисление максимального числа и увеличение на 1 exp_number = max(valid_numbers + [0]) + 1 ws.append([f"Номер эксперимента №{exp_number} (поиск пиков при СКО*{float(var_sko_ent.get())})"]) # Добавляем данные в таблицу for row_data in new_data: ws.append(row_data) # Сохраняем книгу Excel wb.save(file_path) # Закрыть файл Excel wb.close() #Загрузка статистических данных из таблицы Excel def load_statistical_data_spm(): global statistical_data_excel_df if os.path.exists(FILE_EXCEL): #Таблица с данными из Excel statistical_data_excel_df = pd.read_excel(FILE_EXCEL) else: statistical_data_excel_df = pd.DataFrame() #Создать файл Excel для метода spm def сreate_file_excel_spm(): # Если файл не существует, создаем новый файл Excel if not os.path.exists(FILE_EXCEL): wb = Workbook() ws = wb.active headers = ["Дата и время", "Направление", "Частота (Гц)", "Макс. амплитуда сигнала", "СКО", "Fo", "Дефект", "Степень дефекта", "Макс. ампл./СКО", "Куртосис", "Момент 2", "Момент 4", "Агрегат", "Состояние"] for col_num, header in enumerate(headers, 1): ws.cell(row=1, column=col_num, value=header) # Сохраняем книгу Excel wb.save(FILE_EXCEL) # Отрегулировать ширину столбца в зависимости от длины текста заголовка def adjust_column_width_by_header(): for col in statistics_tv["columns"]: header_text = col max_width = tkFont.Font().measure(header_text) # Добавляем немного отступа statistics_tv.column(col, width=max_width + 20) #Расчёт статистических данных для метода spm def calculate_statistical_data_spm(cursor): global first_peak_time #Создать файл Excel сreate_file_excel_spm() #Загрузка статистических данных из таблицы Excel load_statistical_data_spm() #Загрузка параметров params = load_settings() #Это исходная запись в Excel для агрегата is_original_record = False #запись с состоянием "Исходное" original_record = pd.DataFrame() if os.path.exists(FILE_EXCEL): #Загрузка статистических данных из таблицы Excel load_statistical_data_spm() # Проверка наличия записи с состоянием "Исходное" для агрегата original_record = statistical_data_excel_df[(statistical_data_excel_df['Агрегат'] == f'{unit_id}') & (statistical_data_excel_df['Состояние'] == StatisticalStates.original.value)] # ============================================================================= # if not original_record.empty: # is_original_record = True # else: # is_original_record = False # ============================================================================= #Если запись "Исходное" для агрегата присутствует в таблице Excel # ============================================================================= # if f'{unit_id}' in statistical_data_excel_df['Агрегат'].values: # is_original_record = True # ============================================================================= #Если параметр в настройказх == "Неизвестно" и есть запись с состоянием "Исходное" if params["bearing_condition"].get() == InitialConditionsBearing.unknown.value and original_record.empty: is_original_record = True # ============================================================================= # else: # is_original_record = False # ============================================================================= #Если нет записи с состоянием "Исходное" if not original_record.empty: is_original_record = True first_peak_time = 0 new_data = [] first_peak_time_arr= [] for i in range(NUM_SENSORS): signal = arrays[f'arraysign{i}'] #Степень дефекта degree_defect, name_defect_table, fo, upperfreq_signal = on_inds_tax_signal(None, cursor, i) if params["on_off_filtering"].get() == StateFilter.on.value: try: #Фильтровать сигнал signal = get_filter_signal(sample_rate, signal) except Exception as e: messagebox.showwarning("Предупреждение", f"Ошибка фильтрации сигнала: {e}") return if signal is not None: mean = np.mean(signal) #skz = np.mean(signal) #skz = np.sqrt(np.mean(np.square(signal))) sok = round(np.std(signal, ddof=0),3) #moment2 = np.sqrt(np.mean(signal**2)) moment2 = round(np.var(signal),3) moment4 = round(np.mean((signal - mean) ** 4),3) kurtosis = round(np.mean(signal**4) / np.mean(signal**2)**2,3) first_peak_idx = find_first_peak(signal, sok) #максимальная амплитуда maxampl = round(max(signal, key=abs),3) #Сила дефекта strength_defect = round(abs(maxampl)/sok,3) if first_peak_idx is not None: # Данные для добавления в таблицу new_data.append([selected_date, name_points[i], upperfreq_signal, maxampl, sok, fo, name_defect_table, degree_defect, strength_defect, kurtosis, moment2, moment4, unit_id, StatisticalStates.original.value if is_original_record == False else StatisticalStates.final.value]) else: # Данные для добавления в таблицу new_data.append([selected_date, name_points[i], upperfreq_signal, maxampl, sok, fo, name_defect_table, degree_defect, strength_defect, kurtosis, moment2, moment4, unit_id, StatisticalStates.original.value if is_original_record == False else StatisticalStates.final.value]) #очистить таблицу Treeview statistics_tv.delete(*statistics_tv.get_children()) #Добавление колонок в в таблицу Treeview for col in statistics_col_names: statistics_tv.heading(col, text=col) statistics_tv.column(col, stretch=False) #stretch=False - отключаем растяжение # ============================================================================= # #Размер колонок # column_sizes = ['5','5','5','5','100','80','80','80','80', '80'] # #Добавление колонок в в таблицу Treeview # for col, size in zip(statistics_col_names, column_sizes): # statistics_tv.heading(col, text=col) # statistics_tv.column(col, width=size) # ============================================================================= #Конечные данные # Добавление заголовка "Конечные" в таблицу Treeview statistics_tv.insert("", "end", text="", values=("Конечные",) + ("",)*(len(statistics_col_names)-1)) # Добавление данных в таблицу Treeview for index, data_row in enumerate(new_data, start=1): # Удалить первый элемент (дату) из строки данных data_without_date = data_row[1:] statistics_tv.insert("", "end", text=index, values=data_without_date) #Исходные данные initial_data_df = statistical_data_excel_df[(statistical_data_excel_df['Состояние'] == StatisticalStates.original.value) & (statistical_data_excel_df['Агрегат'] == f'{unit_id}')] if len(statistical_data_excel_df) != 0 else pd.DataFrame() #Исходные if len(initial_data_df) != 0: statistics_tv.insert("", "end", text="", values=("Исходные",) + ("",)*(len(statistics_col_names)-1)) # Преобразование данных DataFrame в список списков, исключая первый столбец (дату) initial_data_list = initial_data_df.values.tolist() # Добавление данных из initial_data_df в таблицу Treeview for index, data_row in enumerate(initial_data_list, start=len(new_data) + 1): # Начинаем с последнего индекса new_data data_without_date = data_row[1:] statistics_tv.insert("", "end", text=index, values=data_without_date) # Отрегулировать ширину столбца в зависимости от длины текста заголовка adjust_column_width_by_header() #Сохранение в Excel save_excel_file_spm(new_data) #Сохранение в Excel для метода spm def save_excel_file_spm(new_data): # Путь к файлу Excel #file_path = "experiment_data_spm.xlsx" # ============================================================================= # # Если файл не существует, создаем новый файл Excel # if not os.path.exists(FILE_EXCEL): # wb = Workbook() # ws = wb.active # # headers = ["Дата и время", "Направление", "Макс. амплитуда сигнала", # "СКО", "Степень дефекта", "Макс. ампл./СКО", "Куртосис", "Момент 2", "Момент 4", "Агрегат", "Состояние"] # for col_num, header in enumerate(headers, 1): # ws.cell(row=1, column=col_num, value=header) # else: # ============================================================================= # Если файл существует, открываем его wb = load_workbook(FILE_EXCEL) ws = wb.active # Определяем номер эксперимента #exp_number = (ws.max_row // NUM_SENSORS) + 1 # Поскольку на каждый эксперимент приходится 4 строки # Получаем даты из первой колонки dates = [ws.cell(row=row, column=1).value for row in range(2, ws.max_row+1)] # Подсчитываем количество уникальных дат unique_dates = set(dates) # Подсчет всех дат в первой колонке all_dates = len(dates) # Определяем номер эксперимента exp_number = max([int(num.split("№")[1]) for num in dates if "№" in num] + [0]) + 1 ws.append([f"Номер эксперимента №{exp_number}"]) # Добавляем данные в таблицу, пропуская даты, которые уже присутствуют 3 или 4 раза for row_data in new_data: if dates.count(row_data[0]) < NUM_SENSORS: ws.append(row_data) dates.append(row_data[0]) else: return # Сохраняем книгу Excel wb.save(FILE_EXCEL) # Закрыть файл Excel wb.close() #Загрузка статистических данных из таблицы Excel load_statistical_data_spm() #Соединение с базой данных (MS SQL) def connect_to_db_mssql(): # Очистка таблиц clear_table_widgets() # Параметры настроек params = load_settings() # для подключения к базе данных SERVERNAME = '.\SQLEXPRESS' if params["mssql_server_name"].get() == str() else params["mssql_server_name"].get() DATABASEMSSQL = 'Sadko' if params["data_base_mssql"].get() == str() else params["data_base_mssql"].get() DRIVER = "{SQL Server Native Client 11.0}" USERMSSQL = 'sadko' if params["user_mssql"].get() == str() else params["user_mssql"].get() PASSWORDMSSQL = 'sadko' if params["password_mssql"].get() == str() else params["password_mssql"].get() PASSWORDMSSQL = 'sadko' if params["password_mssql"].get() == str() else params["password_mssql"].get() TRUSTEDCONNECT = 'yes' if params["trusted_connect"].get() == str() else params["trusted_connect"].get() try: conn = pyodbc.connect(f'DRIVER={DRIVER};SERVER={SERVERNAME};PORT=1433;DATABASE={DATABASEMSSQL};UID={USERMSSQL};PWD={PASSWORDMSSQL};Trusted_Connection={TRUSTEDCONNECT}') load_times(conn) except (Exception, pyodbc.Error) as e: # print("Ошибка при подключении к MSSQL", e) messagebox.showerror("Ошибка", e) return e #Соединение с базой данных (PostgreSQL) def connect_to_db_postgresql(): # Очистка таблиц clear_table_widgets() # Параметры настроек params = load_settings() # для подключения к базе данных SERVERNAME = '192.168.0.12' if params["postgresql_server_name"].get() == str() else params["postgresql_server_name"].get() DATABASEPOSTGRESQL = 'postgres' if params["data_base_postgresql"].get() == str() else params["data_base_postgresql"].get() USERPOSTGRESQL = 'sadko' if params["user_postgresql"].get() == str() else params["user_postgresql"].get() PASSWORDPOSTGRESQL = 'sadko' if params["password_postgresql"].get() == str() else params["password_postgresql"].get() try: conn = psycopg2.connect( host=SERVERNAME, database=DATABASEPOSTGRESQL, user=USERPOSTGRESQL, password=PASSWORDPOSTGRESQL ) load_times(conn) except (Exception, psycopg2.Error) as e: # print("Ошибка при подключении к PostgreSQL", e) messagebox.showerror("Ошибка", e) return e # ============================================================================= # #Соединение с базой данных (Trusted_Connection=yes) # def connect_to_db_mssql(): # #Очистка таблиц # clear_table_widgets() # #Параметры настроек # params = load_settings() # #для подключения к базе данных # SERVERNAME = '.\SQLEXPRESS' if params["mssql_server_name"].get() == str() else params["mssql_server_name"].get() # DATABASEMSSQL = 'Sadko' if params["data_base_mssql"].get() == str() else params["data_base_mssql"].get() # DRIVER = "{SQL Server Native Client 11.0}" # USERMSSQL = 'sadko' if params["user_mssql"].get() == str() else params["user_mssql"].get() # PASSWORDMSSQL = 'sadko' if params["password_mssql"].get() == str() else params["password_mssql"].get() # try: # conn = pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SERVERNAME+';PORT=1433;DATABASE='+DATABASEMSSQL+';UID='+USERMSSQL+';PWD='+PASSWORDMSSQL+';Trusted_Connection=yes') # load_times(conn) # except (Exception, pyodbc.Error) as e: # #print("Ошибка при подключении к MSSQL", e) # messagebox.showerror("Ошибка", e) # return e # ============================================================================= #Загрузка времени def load_times(conn): # Параметры настроек params = load_settings() cursor = conn.cursor() #последние 100 записей даты query = f"SELECT DISTINCT TOP 100 mdate FROM {TABLEACS} WHERE type = 2 ORDER BY mdate DESC" if params["dbms"].get() == DbmsNames.mssql.value or params["dbms"].get() == str() else f"SELECT DISTINCT mdate FROM {TABLEACS} WHERE type = 2 ORDER BY mdate DESC LIMIT 100" cursor.execute(query) # ============================================================================= # row = cursor.fetchone() # # #массив с амплитудами # float_values = get_floats(row[0]) # ============================================================================= # ============================================================================= #Событие при выборе даты datetime_tv.bind('<>', lambda event, cursor=cursor: on_load_signals(event, cursor)) data_from_db = cursor.fetchall() if len(data_from_db) != 0: for col in date_col_names: datetime_tv.heading(col, text=col) datetime_tv.column(col, width=120) for row in data_from_db: values = [row[0]] datetime_tv.insert('', 'end', values=values) #Очистка таблиц def clear_table_widgets(): datetime_tv.delete(*datetime_tv.get_children()) signal_tv.delete(*signal_tv.get_children()) #Загрузка сигналов def on_load_signals(event, cursor): global selected_date, time_signal, NUM_SENSORS, sample_rate, unit_id, units #Очистить таблицу signal_tv.delete(*signal_tv.get_children()) if not datetime_tv.selection(): return # Очистить график ax.clear() # Очистка предыдущего графика #ax.cla() # Очистка словаря c линиями plotted_lines.clear() # Вернуть выбранную дату selected_date = datetime_tv.item(datetime_tv.selection())['values'][0] #2024-05-07 14:58:16 заменить на 2024-05-07T14:58:16 для MS SQL 2021 #selected_date = selected_date.replace(' ', 'T') #Загрузка сигналов query = f"SELECT direction, ptno, unitid, comment, units, acsid FROM {TABLEACS} WHERE mdate = '{selected_date}' and type = 2 and direction in (4,1,2)" cursor.execute(query) data_from_db = cursor.fetchall() if len(data_from_db) != 0: for col in signal_col_names: signal_tv.heading(col, text=col) # Определить словарь для сопоставления числовых значений с их метками direction_labels = {1: 'вертикальное', 2: 'горизонтальное', 4: 'осевое'} # Заменить числовые значения их метками #data_from_db = [(direction_labels[row[0]], ) for row in data_from_db] #Агрегат выбранного сигнала unit_id = data_from_db[0][2] units = data_from_db[0][4] # Добавить помеченные данные в виджет Treeview for index, row in enumerate(data_from_db): values = [direction_labels[row[0]], row[1], row[2], row[3], row[5]] signal_tv.insert('', 'end', text=index, values=values) # ============================================================================= # for row in data_from_db: # values = [row[0]] # signal_tv.insert('', 'end', values=values) # ============================================================================= query = f"SELECT Size, UpperFreq FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction in (4,1,2) and type = 2" cursor.execute(query) data_from_db = cursor.fetchall() if len(data_from_db) != 0: for row in data_from_db: values = [row[0],row[1]] #Частота дискретизации sample_rate = values[1]*2.5 #Расчёт времени сигнала time_signal = values[0]/(sample_rate) duration_signal_var.set(time_signal) else: duration_signal_var.set() #Количество отображаемых датчиков NUMBER_SENSORS_DISPLAYED = NUM_SENSORS #Количество строк в таблице с сигналами all_rows = signal_tv.get_children() #Количество датчиков NUM_SENSORS = len(all_rows) if NUMBER_SENSORS_DISPLAYED != NUM_SENSORS: #Если есть глобальная переменная time_peak_labels if 'time_peak_labels' in globals(): #Удалить виджеты информации о датчиках delete_widgets_info_sensors() #Создать виджеты информации о датчиках create_widgets_info_sensors() #подписи точек #name_points = ['О(1)','В(1)','Г(1)' ,'В(2)'] if NUM_SENSORS == 4 else ['О(1)','В(1)','Г(1)'] #добавить массив сигнала add_arrays_amplitudes(cursor, selected_date) if var_sko_ent.get() != str(): try: #Расчёт статистик для метода spm calculate_statistical_data_spm(cursor) except Exception as e: messagebox.showerror('Ошибка', f'Ошибка при расчёте: {e}') #calculate_statistical_data_spm(sample_rate) else: messagebox.showwarning('Предупреждение','Заполните все поля для расчёта статистик!') return #Загрузить сигнал с таходатчика signal_tv.bind('<>', lambda event, cursor=cursor: on_inds_tax_signal(event, cursor, None)) #Событие при выборе сигнала #signal_tv.bind('<>', lambda event, cursor=cursor: on_load_signal_array(event, cursor)) class FoCalculator: def __init__(self, max_deviation): assert max_deviation > 0 self._max_deviation = max_deviation self._prev = float('nan') self._suspected = [] def get_fo(self, period_idxs, sample_rate, surge_protect=True): if sample_rate <= 0: raise ValueError("sample_rate must be positive") fo = 0.0 if len(period_idxs) > 1: avg_prd = self.get_average_period(period_idxs) fo = sample_rate / avg_prd return self.surge_protect(fo) if surge_protect else fo def get_average_period(self, period_idxs): if len(period_idxs) <= 1: return 0 first = period_idxs[0] last = period_idxs[-1] if first < last: period_idxs = list(reversed(period_idxs)) assert period_idxs[0] > period_idxs[1] num = len(period_idxs) - 1 average = sum(period_idxs[i-1] - period_idxs[i] for i in range(1, len(period_idxs))) if num > 0: average /= num assert average >= 0 return average def surge_protect(self, value): accepted = value # если буфер пустой, записываем первое значение без проверок if math.isnan(self._prev): self._prev = value else: # Проверка на резкое изменение значения # ============================================================================= # is_abnormal = self.is_abnormal(value) # is_suspected = len(self._suspected) < cap(self._suspected) and is_abnormal # was_surge = len(self._suspected) == cap(self._suspected) and not is_abnormal # not_surge = len(self._suspected) == cap(self._suspected) and is_abnormal # ============================================================================= is_abnormal = self.is_abnormal(value) cap = len(self._suspected) is_suspected = cap < len(self._suspected) and is_abnormal was_surge = cap == len(self._suspected) and not is_abnormal not_surge = cap == len(self._suspected) and is_abnormal # Новый кандидат, возможно, выброс if is_suspected: self._suspected.append(value) accepted = self._prev # если был выброс, убираем его elif was_surge: self._suspected.clear() # предыдущий не был выбросом elif not_surge: self._prev = sum(self._suspected) / len(self._suspected) self._suspected.clear() # предыдущий был норм. elif not self._suspected: self._prev = value # значение восстановлено до фиксации выброса, # начинаем контроль заново else: assert not is_abnormal assert self._suspected self._suspected.clear() self._prev = value return accepted def is_abnormal(self, value): return value != 0 and abs(value - self._prev) > self._max_deviation # #Поиск индексов всех импульсов от датчика оборотов в сигнале def find_pulse_indices(values, threshold, by_front, filter_=True, lo_idx=0, search_len=0): if search_len == 0: search_len = len(values) end_idx = min(lo_idx + search_len, len(values)) threshold = get_threshold_value(values, threshold, lo_idx, search_len) is_tax = not by_front fronts = [] falls = [] for i in range(end_idx - 1, lo_idx - 1, -1): if values[i] <= 0 and is_tax: fronts.append(i) is_tax = False elif values[i] > threshold and not is_tax: is_tax = True falls.append(i) indices = fronts if by_front else falls #return indices if filter_ and len(indices) >= 2: filter_indices(indices, by_front) assert len(indices) > 1 return indices #Получить порог компаратора для поиска импульсов таходатчика def get_threshold_value(values, threshold, lo_idx, search_len): if threshold <= 0: threshold = 0.5 max_value = max(values[lo_idx:lo_idx+search_len]) return max_value * threshold if threshold < 1 else threshold # Фильтрация ложных фронтов / спадов def filter_indices(period_idxs, by_front): max_period = get_max_period(period_idxs) max_pulse_len = int(max_period * 0.2) indices_to_remove = [] for i in range(1, len(period_idxs)): if period_idxs[i - 1] - period_idxs[i] < max_pulse_len: if by_front: indices_to_remove.append(i - 1) else: indices_to_remove.append(i) # Удаляем элементы из списка в обратном порядке, чтобы не нарушить индексы for idx in reversed(indices_to_remove): period_idxs.pop(idx) return max_period #Максимальный период между тахоимпульсами (кол-во отсчетов) def get_max_period(period_idxs): max_prd = 0 for i in range(1, len(period_idxs)): prd_len = period_idxs[i - 1] - period_idxs[i] if prd_len > max_prd: max_prd = prd_len return max_prd #Индексы с таходатчика def on_inds_tax_signal(event, cursor, signal_num): #Проверить выбрана ли в таблице с сигналами строка selected_items = signal_tv.selection() if not selected_items and signal_num == None: return #Загрузка параметров params = load_settings() if signal_num == None: #загрузка массива одного выбранного сигнала signal, size_signal, ptno_signal, upperfreq_signal = get_signal(cursor) else: signal, size_signal, ptno_signal, upperfreq_signal = arrays[f'arraysign{signal_num}'], sizes_signals[signal_num], ptnos_signals[signal_num], upperfreqs_signals[signal_num] #Выбор сигнала с таходатчика query = f"SELECT packeddata, upperfreq FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = 16 and type = 2" cursor.execute(query) acs_data = cursor.fetchone() if (acs_data == None and signal_num == None) or (NUM_SENSORS-1 == signal_num and acs_data == None): messagebox.showwarning("Предупреждение", "Нет сигнала с таходатчика") #return #массив с амплитудами (сигнал с таходатчика) tax_sig = get_floats(acs_data[0]) if acs_data != None else 0 #Если нет таходатчика, задаём отображаемые обороты = 1 #num_revs_var.set(1 if acs_data == None else num_revs_ent.get()) #Индексы таходатчика indices = [None] * len(signal) # ============================================================================= # j = 0 # for i in range(1, len(tax_sig)-1): # if tax_sig[i] <= 0 and tax_sig[i+1] >= 0 and tax_sig[i-1] >= 0: # indices[j] = i # j = j+1 # ============================================================================= #Последний и первый индекс сигнала #first_last_signal = [len(signal), 0] #Частота дискретизации sample_rate = upperfreq_signal * 2.5 #Фильтруем сигнал, если фильтрация включена if params["on_off_filtering"].get() == StateFilter.on.value: try: #Фильтровать сигнал signal = get_filter_signal(sample_rate, signal) except Exception as e: messagebox.showwarning("Предупреждение", f"Ошибка фильтрации сигнала: {e}") return #Во сколько раз меньше записанный сигнал smaller_rec_signal = sample_rate/size_signal #Количество оборотов в записанном сигнале num_revs_rec_signal = float(params["rotation_speed"].get())/smaller_rec_signal #Количество отсчётов в одном обороте counts_in_rev = size_signal/num_revs_rec_signal #Индексы без таходатчика indices_no_tax = [] counts_sum = 0 for i in range(int(num_revs_rec_signal)+1): indices_no_tax.append(int(counts_sum)) counts_sum += counts_in_rev #изменение порядка элементов на обратный indices_no_tax = list(reversed(indices_no_tax)) #Индексы таходатчика в обратном порядке threshold = max(tax_sig) * 0.5 if acs_data != None else 0 #Если нет таходатчика, то берётся весь сигнал indices = find_pulse_indices(tax_sig, threshold, True) if acs_data != None else indices_no_tax # ============================================================================= # if len(indices) >= 2: # indices_ = list(indices) # #Фильтрация ложных фронтов / спадов # max_period = filter_indices(indices_, True) # # #проверить есть ли значение в массиве # if max_period in indices: # #удалить значения из массива, которые находятся до max_period # index = indices.index(max_period) # indices = indices[index:] # ============================================================================= #изменение порядка элементов на обратный indices = list(reversed(indices)) #удалить все значения None из массива indices while None in indices: indices.remove(None) #Количестов оборотов num_revs = len(indices) - 1 #num_revs_var.set(num_revs) if num_revs <= 0: messagebox.showwarning("Предупреждение", "Нет оборотов") return # Создание нового списка с первым и последним значением chunk_indices = [indices[0], indices[1]] #максимальное расхождение между текущим и предыдущим значением #частоты вращения abnormal_deviation = 3.0 # Гц fo_calc = FoCalculator(abnormal_deviation) # частота вращения для сигнала fo = fo_calc.get_fo(chunk_indices, sample_rate) #Заданное количество оборотов select_num_revs = int(num_revs_ent.get()) if num_revs < select_num_revs: messagebox.showwarning("Предупреждение", "Уменьшите количество оборотов") return #количество пиков на каждом обороте num_peaks_revs = [None] * len(signal) #максимальное значение пика в каждом обороте max_peak_values = [None] * len(signal) #СКО #sok = np.std(combined_signal_arr, ddof=0) sok = np.std(signal, ddof=0) # Вычисляем пороговое значение для поиска пиков #threshold = abs(float(var_sko_ent.get()) * sok) limit_zone_b = 3 if params["zone_b"].get() == str() else int(params["zone_b"].get()) limit_zone_c = 5 if params["zone_c"].get() == str() else int(params["zone_c"].get()) limit_zone_d = 20 if params["zone_d"].get() == str() else int(params["zone_d"].get()) limit_zone_e = 40 if params["zone_e"].get() == str() else int(params["zone_e"].get()) limit_zone_x = limit_zone_e + 60 #Получить значения коэффициентов для компонентов подшипника coef_outer_clip, coef_inner_clip, coef_rolls, coef_separator = get_values_coeffs_bearing() if get_values_coeffs_bearing() != None else (None, None, None, None) #Коэффициент наружного кольца coef_outer_clip = 2.57 if coef_outer_clip == None else float(coef_outer_clip) #Коэффициент внутреннего кольца coef_inner_clip = 4.42 if coef_inner_clip == None else float(coef_inner_clip) #Коэффициент тел качения coef_rolls = 3.51 if coef_rolls == None else float(coef_rolls) #Коэффициент сепаратора coef_separator = 3 if coef_separator == None else float(coef_separator) #Превышение куртосиса в n раз kurtosis_for_n_times = 2 if params["limit_kurtosis"].get() == str() else float(params["limit_kurtosis"].get()) #Превышение дисперсии в n раз variance_for_n_times = 2 if params["limit_variance"].get() == str() else float(params["limit_variance"].get()) #Лимит пиков за один оборот в зоне C limit_peaks_rev_in_c_zone = 5 if params["limit_peaks_rev_in_c_zone"].get() == str() else float(params["limit_peaks_rev_in_c_zone"].get()) #Начальное сотстояние подшипника bearing_condition = "Неизвестно" if params["bearing_condition"].get() == str() else params["bearing_condition"].get() #Если начальное состояние подшипника неизвестно, то коэффициенты зон могут быть уменьшены на 1 if params["bearing_condition"].get() == InitialConditionsBearing.unknown.value: limit_zone_b = limit_zone_b - 1 limit_zone_c = limit_zone_c - 1 limit_zone_d = limit_zone_d - 1 limit_zone_e = limit_zone_e - 1 limit_zone_x = limit_zone_x - 1 #Значения зон zone_values = list(range(1, limit_zone_x)) # Вычисляем пороговые значения для поиска пиков threshold_values = [x * sok for x in zone_values] #зоны zones = [None] * select_num_revs #Количество пиков для каждой зоны num_peaks_zone = [0] * 6 #значения зоны b values_zone_b = [] #Количество пиков для каждой зоны (по всем трём сигналам) num_peaks_zone_three_signals = [0] * 6 #Кусок сигнала в n оборотов revs = signal[indices[0]:indices[0+select_num_revs]] #Индексы максимальных пиков в колоколах indexes_peaks_bells = get_indexes_peaks_bells(revs) #массив с оборотами revs_united = [] #Сумма массивов с оборотами sum_rev_size = 0 #Количество максимальных пиков для каждого оборота сигнала for i in range(select_num_revs): #Кусок сигнала в один оборот rev = signal[indices[i]:indices[i+1]] #Перевод в абсолютные значения rev = np.abs(rev) max_peak_values.append(max(rev)) # ============================================================================= # #Определение зоны на которой находится максимальный пик # for j in range(len(threshold_values)-1): # #максимальные значения, превышающие заданное пороговое значение # max_exceeding = [r for r in rev if r > threshold_values[j]] # if len(max_exceeding) == 0: # zones[i] = j # break # ============================================================================= #максимальные значения, превышающие заданное пороговое значение #max_exceeding = [r for r in rev if r > threshold] #Добавить в массив с оборотами оборот revs_united.extend(rev) #Количество пиков для каждой зоны (по оборотам в количестве 2 штук) for k in range(sum_rev_size, len(revs_united)): #Для зоны A for j in range(math.ceil(sok)): if math.ceil(sok) < threshold_values[j]: break if revs_united[k] > threshold_values[j] and revs_united[k] <= math.ceil(sok): num_peaks_zone[PeakZones.zone_a.value] += 1 break # Для зоны B for j in range(limit_zone_b): if revs_united[k] > threshold_values[j] and revs_united[k] <= (limit_zone_b * sok): num_peaks_zone[PeakZones.zone_b.value] += 1 #значения зоны b values_zone_b.append(revs_united[k]) break #Для зоны C for j in range(limit_zone_b, limit_zone_c): if revs_united[k] > threshold_values[j] and revs_united[k] <= (limit_zone_c * sok): num_peaks_zone[PeakZones.zone_c.value] += 1 break if k in indexes_peaks_bells: #print(revs_united[k]) #Для зоны D for j in range(limit_zone_c, limit_zone_d): if revs_united[k] > threshold_values[j] and revs_united[k] <= (limit_zone_d * sok): num_peaks_zone[PeakZones.zone_d.value] += 1 break #Для зоны E for j in range(limit_zone_d, limit_zone_e): if revs_united[k] > threshold_values[j] and revs_united[k] <= (limit_zone_e * sok): num_peaks_zone[PeakZones.zone_e.value] += 1 break #Для зоны X for j in range(limit_zone_e, limit_zone_x-1): if revs_united[k] > threshold_values[j] and revs_united[k] <= ((limit_zone_x-1) * sok): num_peaks_zone[PeakZones.zone_x.value] += 1 break sum_rev_size += len(rev) #Количество пиков для каждой зоны (по всем трём сигналам) # ============================================================================= # for k in range(len(combined_signal_arr)): # #Для зоны A # num_peaks_zone_three_signals[PeakZones.zone_a.value] += sum(1 for j in range(math.ceil(sok)) if combined_signal_arr[k] > threshold_values[j] and combined_signal_arr[k] <= math.ceil(sok)) # # #Для зоны B # num_peaks_zone_three_signals[PeakZones.zone_b.value] += sum(1 for j in range(limit_zone_b) if combined_signal_arr[k] > threshold_values[j] and combined_signal_arr[k] <= (limit_zone_b * sok)) # # #Для зоны C # num_peaks_zone_three_signals[PeakZones.zone_c.value] += sum(1 for j in range(limit_zone_b, limit_zone_c) if combined_signal_arr[k] > threshold_values[j] and combined_signal_arr[k] <= (limit_zone_c * sok)) # # #Для зоны D # num_peaks_zone_three_signals[PeakZones.zone_d.value] += sum(1 for j in range(limit_zone_c, limit_zone_d) if combined_signal_arr[k] > threshold_values[j] and combined_signal_arr[k] <= (limit_zone_d * sok)) # # #Для зоны E # num_peaks_zone_three_signals[PeakZones.zone_e.value] += sum(1 for j in range(limit_zone_d, limit_zone_e) if combined_signal_arr[k] > threshold_values[j] and combined_signal_arr[k] <= (limit_zone_e * sok)) # # #Для зоны X # num_peaks_zone_three_signals[PeakZones.zone_x.value] += sum(1 for j in range(limit_zone_e, limit_zone_x-1) if combined_signal_arr[k] > threshold_values[j] and combined_signal_arr[k] <= ((limit_zone_x-1) * sok)) # ============================================================================= #количество пиков в каждом обороте #num_peaks_revs[i] = len(max_exceeding) #удалить все значения None из массива num_peaks_revs while None in num_peaks_revs: num_peaks_revs.remove(None) #удалить все значения None из массива max_peak_values while None in max_peak_values: max_peak_values.remove(None) # ============================================================================= # arrays_signal_revs = [] # for i in range(select_num_revs): # #Кусок сигнала в один оборот # rev_arr = signal[indices[i]:indices[i+1]] # arrays_signal_revs.append(rev_arr) # # #Объединить массивы с оборотами в один # combined_signal_revs = np.concatenate(arrays_signal_revs) # ============================================================================= #Кусок сигнала с таходитчика в n оборотов #tax_sig_rev = tax_sig[indices[0]:indices[0+select_num_revs]] #размер массива с оборотами size_revs = len(revs) #Размер сигнала #size_signal = len(signal) #Во сколько раз меньше размер куска сигнала от основного сигнала #how_many_times = math.ceil(size_signal/size_revs) #Верхняя частота сигнала #upperfreq_signal = acs_data[1] #Шаг step = (1 / (upperfreq_signal * 2.5)) * 1000 #Шаг от точки до точки #step = ((time_signal * 1000)/how_many_times)/size_revs # Создаем массив x-координат x_time = [i * step for i in range(size_revs)] #Построение графика сигнала #update_chart_signal(np.abs(revs), x_time) #Показать виджеты вывода результатов show_widgets_output_results() state_unit_txt.delete('1.0', 'end') # Очищаем предыдущий вывод state_unit_txt.insert('end', f"Частота: {upperfreq_signal} Гц\n") state_unit_txt.insert('end', f"Размер сигнала: {size_signal}\n") state_unit_txt.insert('end', f"Отображено оборотов: {select_num_revs}\n") state_unit_txt.insert('end', f"Всего оборотов: {num_revs}\n") #частота вращения state_unit_txt.insert('end', f"Частота вращения: {fo:.3f} Гц\n\n") # ============================================================================= # #Зона A (от 0 до значения СКО) # zone_a = 0 # #Зона B (от границы зоны А до 3*СКО называется зоной В) # zone_b = 0 # #Зона C (от 3*СКО до 5*СКО) # zone_c = 0 # ##Зона D (от границы зоны С до СКО*20) # zone_d = 0 # #Зона E (от границей зоны Д до СКО*40) # zone_e = 0 # #Зона X (При превышении пика зоны Е) # zone_x = 0 # # for i in range(len(zones)): # if zones[i] <= sok and zones[i] >= 0: # zone_a = zone_a+1 # # if zones[i] <= 3 * sok and zones[i] >= sok: # zone_b = zone_b+1 # # if zones[i] <= 5 * sok and zones[i] >= 3 * sok: # zone_c = zone_c+1 # # if zones[i] <= 20 * sok and zones[i] >= 5 * sok: # zone_d = zone_d+1 # # if zones[i] <= 40 * sok and zones[i] >= 20 * sok: # zone_e = zone_e+1 # # if zones[i] <= 100 * sok and zones[i] >= 40 * sok: # zone_x = zone_x+1 # # # if zone_a == select_num_revs: # state_unit_txt.insert('end', f"Нормальное состояние подшипника (Зона A)\n") # # if zone_b == select_num_revs: # state_unit_txt.insert('end', f"Нормальное состояние подшипника (Зона B)\n") # # if zone_c == select_num_revs: # state_unit_txt.insert('end', f"Ухудшение смазки (Зона С)\n") # # if zone_d == select_num_revs: # state_unit_txt.insert('end', f"Износ наружного кольца подшипника 4. Средний дефект: Среднее разрушение в подшипнике (Зона D)\n") # # if zone_e == select_num_revs: # state_unit_txt.insert('end', f"Сильный дефект: Сильное разрушение в подшипнике (Зона E)\n") # # if zone_x == select_num_revs: # state_unit_txt.insert('end', f"Сильный дефект: Сильное разрушение в подшипнике (Превышение зоны E)\n") # ============================================================================= final_kurtosis = 0 final_variance = 0 original_kurtosis = 0 original_variance = 0 initial_data_df = statistical_data_excel_df[(statistical_data_excel_df['Состояние'] == StatisticalStates.original.value) & (statistical_data_excel_df['Агрегат'] == f'{unit_id}')] if len(initial_data_df) !=0: # Вернуть выбранное направление selected_direction = signal_tv.item(selected_items)['values'][0] # Вернуть выбранную опору selected_ptno = signal_tv.item(selected_items)['values'][1] #Сокращённое название направления измерения direction_cut_labels = {'вертикальное': 'В', 'горизонтальное': 'Г', 'осевое': 'О'} # Преобразование, например, из вертикальное 4 в В(4) direction_short = direction_cut_labels[selected_direction] if selected_direction in direction_cut_labels else None #направление и опора, например, В(4) direction_and_ptno = f"{direction_short}({selected_ptno})" #Исходные данные из таблицы Excel для выбранного сигнала sel_orig_data_signal_df = initial_data_df[(initial_data_df['Направление'] == direction_and_ptno)] if len(sel_orig_data_signal_df) == 0: messagebox.showerror('Ошибка', f'Ошибка при расчёте. Скорее всего количество исходных данных не совпадает с конечными данными (количество датчиков не совпадает)') # Извлечение значения исходного куртосиса original_kurtosis = sel_orig_data_signal_df.iloc[0, sel_orig_data_signal_df.columns.get_loc('Куртосис')] # Извлечение значения исходной дисперсии original_variance = sel_orig_data_signal_df.iloc[0, sel_orig_data_signal_df.columns.get_loc('Момент 2')] #Конечные данные endpoints_data_df = statistical_data_excel_df[(statistical_data_excel_df['Состояние'] == StatisticalStates.final.value) & (statistical_data_excel_df['Агрегат'] == f'{unit_id}') & (statistical_data_excel_df['Дата и время'] == f'{selected_date}')] if endpoints_data_df.empty: messagebox.showwarning("Информация", "Вы выбрали сигнал с исходным состоянием статиcтических данных. Выберите последний записанный сигнал, чтобы поставить диагноз!") #return #Конечные данные из таблицы Excel для выбранного сигнала sel_end_data_signal_df = endpoints_data_df[(endpoints_data_df['Направление'] == direction_and_ptno)] # Извлечение значения конечного куртосиса final_kurtosis = sel_end_data_signal_df.iloc[0, sel_end_data_signal_df.columns.get_loc('Куртосис')] # Извлечение значения конечной дисперсии final_variance = sel_end_data_signal_df.iloc[0, sel_end_data_signal_df.columns.get_loc('Момент 2')] #Определение дефекта #Нормальное количество пиков для наружного кольца norm_num_peaks_outer = coef_outer_clip * select_num_revs #Нормальное количество пиков для внутреннего кольца norm_num_peaks_inner = coef_inner_clip * select_num_revs #Нормальное количество пиков для тел качения norm_num_peaks_rolls = coef_rolls * select_num_revs #Нормальное количество пиков для сепаратора norm_num_peaks_separator = coef_separator * select_num_revs #Нормальное количество пиков частей подшипника norm_num_peaks_bearing_parts = [norm_num_peaks_outer, norm_num_peaks_inner, norm_num_peaks_rolls, norm_num_peaks_separator] #Определить индекс по значению, чтобы определить какой дефект ближе ind_defect = norm_num_peaks_bearing_parts.index(max(norm_num_peaks_bearing_parts)) #название дефекта name_defect = "" #название дефекта для таблицы name_defect_table = "" #Надпись "Есть дефект" there_defect = "Есть дефект" #Если пики есть в зоне D if num_peaks_zone[PeakZones.zone_d.value] != 0: if ind_defect == BearingParts.outer.value and num_peaks_zone[PeakZones.zone_d.value] > norm_num_peaks_outer: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ наружного кольца подшипника {ptno_signal}" name_defect_table = "Наружного кольца" else: name_defect = there_defect name_defect_table = there_defect elif ind_defect == BearingParts.inner.value and num_peaks_zone[PeakZones.zone_d.value] > norm_num_peaks_inner: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ внутреннего кольца подшипника {ptno_signal}" name_defect_table = "Внутреннего кольца" else: name_defect = there_defect name_defect_table = there_defect elif ind_defect == BearingParts.rolls.value and num_peaks_zone[PeakZones.zone_d.value] > norm_num_peaks_rolls: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ тел качения подшипника {ptno_signal}" name_defect_table = "Тел качения" else: name_defect = there_defect name_defect_table = there_defect elif ind_defect == BearingParts.outer.value and num_peaks_zone[PeakZones.zone_d.value] > norm_num_peaks_rolls: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ сепаратора подшипника {ptno_signal}" name_defect_table = "Сепаратора" else: name_defect = there_defect name_defect_table = there_defect else: name_defect = there_defect name_defect_table = there_defect #Если пики есть в зоне E if num_peaks_zone[PeakZones.zone_e.value] != 0 and num_peaks_zone[PeakZones.zone_d.value] == 0: if ind_defect == BearingParts.outer.value and num_peaks_zone[PeakZones.zone_e.value] > norm_num_peaks_outer: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ наружного кольца подшипника {ptno_signal}" name_defect_table = "Наружного кольца" else: name_defect = there_defect name_defect_table = there_defect elif ind_defect == BearingParts.inner.value and num_peaks_zone[PeakZones.zone_e.value] > norm_num_peaks_inner: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ внутреннего кольца подшипника {ptno_signal}" name_defect_table = "Внутреннего кольца" else: name_defect = there_defect name_defect_table = there_defect elif ind_defect == BearingParts.rolls.value and num_peaks_zone[PeakZones.zone_e.value] > norm_num_peaks_rolls: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ тел качения подшипника {ptno_signal}" name_defect_table = "Тел качения" else: name_defect = there_defect name_defect_table = there_defect elif ind_defect == BearingParts.outer.value and num_peaks_zone[PeakZones.zone_e.value] > norm_num_peaks_rolls: #Если включено местоположение дефекта в настройках if params["on_off_location_defect"].get() == StateLocationDefect.on.value: name_defect = f"Износ сепаратора подшипника {ptno_signal}" name_defect_table = "Сепаратора" else: name_defect = there_defect name_defect_table = there_defect else: name_defect = there_defect name_defect_table = there_defect #Определение степени дефекта #крайняя зона extreme_zone = 0 #крайние зоны для определения двух дефектов (Зоны С, D, E) extreme_zones = [] #определение последней зоны, в которой есть пики for i in range(len(num_peaks_zone)): # ============================================================================= # #1 вариант # if num_peaks_zone[i] == 0: # extreme_zone = i-1 # break # else: # extreme_zone = PeakZones.zone_e.value # ============================================================================= #2 вариант if num_peaks_zone[i] > 0: extreme_zone = i extreme_zones.append(i) #Предельные значения куртосиса limit_kurtosis = original_kurtosis * kurtosis_for_n_times #Предельные значения дисперсии limit_variance = original_variance * variance_for_n_times #Значения пиков попавших в зону С за n количество оборотов peaks_revs_in_с_zone = limit_peaks_rev_in_c_zone * select_num_revs #Количество пиков за один оборот в зоне C peaks_rev_in_с_zone = num_peaks_zone[PeakZones.zone_c.value]/select_num_revs #Загрузка параметров params = load_settings() #Начальное сотстояние подшипника bearing_condition = "Неизвестно" if params["bearing_condition"].get() == str() else params["bearing_condition"].get() #Степень дефекта degree_defect = "" state_unit_txt.insert('end', f"Диагноз: \n") if extreme_zone == PeakZones.zone_a.value: state_unit_txt.insert('end', f"Нормальное состояние подшипника (Зона A)\n") if extreme_zone == PeakZones.zone_b.value: state_unit_txt.insert('end', f"Нормальное состояние подшипника (Зона B)\n") if (PeakZones.zone_c.value in extreme_zones and final_variance > limit_variance and peaks_rev_in_с_zone >= peaks_revs_in_с_zone) or (PeakZones.zone_c.value in extreme_zones and bearing_condition != InitialConditionsBearing.known.value): state_unit_txt.insert('end', f"Ухудшение смазки (Зона С)\n") if final_kurtosis > limit_kurtosis or bearing_condition != InitialConditionsBearing.known.value: if PeakZones.zone_d.value in extreme_zones and not PeakZones.zone_e.value in extreme_zones and not PeakZones.zone_x.value in extreme_zones: state_unit_txt.insert('end', f"{name_defect}. Слабый дефект. Слабое разрушение в подшипнике (Зона D)\n") degree_defect = 'Слабый' if PeakZones.zone_e.value in extreme_zones and not PeakZones.zone_x.value in extreme_zones: state_unit_txt.insert('end', f"{name_defect}. Средний дефект. Среднее разрушение в подшипнике (Зона E)\n") degree_defect = 'Средний' if PeakZones.zone_x.value in extreme_zones: state_unit_txt.insert('end', f"{name_defect}. Сильный дефект. Сильное разрушение в подшипнике (Превышение зоны E)\n") degree_defect = 'Сильный' messagebox.showwarning("Предупреждение","Превышение зоны E") elif extreme_zone != PeakZones.zone_a.value and extreme_zone != PeakZones.zone_b.value and num_peaks_zone[PeakZones.zone_d.value] == 0 and num_peaks_zone[PeakZones.zone_e.value] == 0 and num_peaks_zone[PeakZones.zone_x.value] == 0: state_unit_txt.insert('end', f"Нормальное состояние подшипника\n") # ============================================================================= # else: # state_unit_txt.insert('end', f"Наличие дефекта\n") # ============================================================================= state_unit_txt.insert('end', f"\nКоличество пиков для каждой зоны:\n") state_unit_txt.insert('end', f"Зона A: {num_peaks_zone[PeakZones.zone_a.value]}\n") state_unit_txt.insert('end', f"Зона B: {num_peaks_zone[PeakZones.zone_b.value]}\n") state_unit_txt.insert('end', f"Зона C: {num_peaks_zone[PeakZones.zone_c.value]}\n") state_unit_txt.insert('end', f"Зона D: {num_peaks_zone[PeakZones.zone_d.value]}\n") state_unit_txt.insert('end', f"Зона E: {num_peaks_zone[PeakZones.zone_e.value]}\n") #название оси Y name_axis_Y = "СКО" if int(value_thinning_var.get()) > 0 else get_units_measurement() #Получить единицы измерения выбранного сигнала #Отображение зон на графике (True - отображение) displaying_zones = True if params["on_off_zone"].get() == ZoneDisplayModes.on.value else False #Если значение в поле для ввода прореживания не равно 0 if int(value_thinning_var.get()) > 0: #Работа с прореживанием сигнала #максимальное значение в зоне b value_max_zone_b = max(values_zone_b) #значение прореживания thinning_value = int(value_thinning_var.get()) #прореженный сигнал за выбранное количество оборотов thinned_signal = [] #Значение границы зоны B/СКО value_max_zone_b_sko = value_max_zone_b/sok #Прореживание for i in range(0, size_revs, thinning_value): max_value = max(signal[i:i+thinning_value]) #максимальное значение/СКО max_value_sko = max_value/sok #Если в выборке амплитуда больше, чем граница зоны В, то отображается значение самой большой амплитуды if max_value_sko > value_max_zone_b_sko: thinned_signal.append(max_value_sko) #Если амплитуда не превышает зону В, то значение принимает верхнюю границу зоны В else: thinned_signal.append(value_max_zone_b_sko) #Если амплитуда не превышает зону В, то знечение принимает значение 0 #thinned_signal.append(0) #без деления на СКО # ============================================================================= # #Прореживание # for i in range(0, size_revs, thinning_value): # max_value = max(signal[i:i+thinning_value]) # #Если в выборке амплитуда больше, чем граница зоны В, то отображается значение самой большой амплитуды # if max_value > value_max_zone_b: # thinned_signal.append(max_value) # #Если амплитуда не превышает зону В, то значение принимает верхнюю границу зоны В # else: # thinned_signal.append(value_max_zone_b) # #Если амплитуда не превышает зону В, то знечение принимает значение 0 # #thinned_signal.append(0) # ============================================================================= # Создаем массив x-координат для прореженного сигнала x_time_thinned = [i * (step*thinning_value) for i in range(len(thinned_signal))] #Границы зон (без *СКО) border_zones = limit_zone_b, limit_zone_c, limit_zone_d, limit_zone_e if signal_num == None: #Построение графика сигнала с прореживанием update_chart_signal(thinned_signal, x_time_thinned, name_axis_Y, border_zones, displaying_zones) else: #Границы зон border_zone_b = limit_zone_b * sok border_zone_с = limit_zone_c * sok border_zone_d = limit_zone_d * sok border_zone_e = limit_zone_e * sok border_zones = (border_zone_b, border_zone_с, border_zone_d, border_zone_e) if signal_num == None: #Построение графика сигнала без прореживания update_chart_signal(np.abs(revs), x_time, name_axis_Y, border_zones, displaying_zones) if signal_num != None: # Очищаем предыдущий вывод state_unit_txt.delete('1.0', 'end') if degree_defect == '': degree_defect = "Нет" if name_defect_table == '': name_defect_table = "Нет" #Возвращаем степень дефекта, название дефекта, частоту вращения return (degree_defect, name_defect_table, round(fo,2), upperfreq_signal) # ============================================================================= # print(num_peaks_revs) # print(max_peak_values) # ============================================================================= #Индексы максимальных пиков в колоколах def get_indexes_peaks_bells(signal): #находим СКО для #sko = np.std(signal, ddof=0) #Перевод в абсолютные значения #signal = np.abs(signal) #Индексы таходатчика в обратном порядке threshold = max(signal) * 0.5 #Если нет таходатчика, то берётся весь сигнал indices = find_pulse_indices(signal, threshold, True) #изменение порядка элементов на обратный indices = list(reversed(indices)) #зоны колоколов bell_zones = [] #максимальные пики зон колоколов max_peaks_bell_zones = [] for i in range(len(indices)): if i+1 < len(indices): #зона колокола bell_zones = signal[indices[i]:indices[i+1]] max_peaks_bell_zones.append(max(bell_zones)) else: bell_zones = signal[indices[i]:len(signal)] max_peaks_bell_zones.append(max(bell_zones)) break #bell_zones = signal[indices[0]:indices[1]] #max_value = max(bell_zones) #Индексы пиков зон колоколов по значению indexes_peaks_bell_zones = [i for i, x in enumerate(signal) if x in max_peaks_bell_zones] return indexes_peaks_bell_zones # ============================================================================= # # Вычисляем пороговое значение для поиска первого пика # n = abs(3 * sko) # # Проверяем, что сигнал задан # if signal is not None: # # #Перый индекс (начало колокола) # first_index = 0 # #Второй индекс (конец колокола) # second_index = 0 # # #зоны колоколов # bell_zones = [] # # # Проходим по сигналу и ищем элемент, который больше или равен пороговому значению # while second_index < len(signal): # # Находим начало колокола # for i in range(second_index, len(signal)): # if abs(signal[i]) >= n: # first_index = i # bell_zones.append(first_index) # break # else: # # если не найдено начало колокола, выходим из цикла # break # # # Находим конец колокола # for i in range(first_index, len(signal)): # if sum(abs(signal[j]) < n for j in range(i+1, min(i+6, len(signal)))) >= 2: # second_index = i # bell_zones.append(second_index) # break # else: # # если не найден конец колокола, выходим из цикла # break # # # Обновляем second_index для следующего поиска # second_index += 1 # # # print(bell_zones) # ============================================================================= #return i # ============================================================================= # #Если стоит галочка на одно условие # if select_сheckbutton() == True: # # Если условие выполнено, возвращаем индекс первого пика # return i # ============================================================================= # ============================================================================= # # Проверяем, что в следующих 5 элементах есть хотя бы два элемента, большие или равны 4 * sok # if sum(abs(signal[j]) >= 3*sok for j in range(i+1, min(i+6, len(signal)))) >= 2: # # Если условие выполнено, возвращаем индекс первого пика # return i # ============================================================================= #загрузка массива одного выбранного сигнала def get_signal(cursor): if not signal_tv.selection(): return # ============================================================================= # # Вернуть выбранное направление измерения # selected_direction = signal_tv.item(signal_tv.selection())['values'][0] # # Вернуть выбранную опору # selected_ptno = signal_tv.item(signal_tv.selection())['values'][1] # # # Определить словарь для сопоставления числовых значений с их метками # direction_labels = {'вертикальное': 1, 'горизонтальное': 2, 'осевое': 4} # #Числовое значение для метки # selected_signal_value = direction_labels.get(selected_direction) # ============================================================================= # Вернуть выбранный acsid selected_acsid = signal_tv.item(signal_tv.selection())['values'][4] #query = f"SELECT packeddata, size, ptno, upperfreq FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = {selected_signal_value} and ptno = {selected_ptno} and type = 2" query = f"SELECT packeddata, size, ptno, upperfreq FROM {TABLEACS} WHERE mdate = '{selected_date}' and acsid = {selected_acsid} and type = 2" cursor.execute(query) acs_data = cursor.fetchone() #массив с амплитудами signal = get_floats(acs_data[0]) size_signal = acs_data[1] ptno_signal = acs_data[2] upperfreq_signal = acs_data[3] return (signal, size_signal, ptno_signal, upperfreq_signal) #загрузка массива сигнала (не используется) def on_load_signal_array(event, cursor): if not signal_tv.selection(): return # Вернуть выбранное направление измерения selected_direction = signal_tv.item(signal_tv.selection())['values'][0] # Вернуть выбранную опору selected_ptno = signal_tv.item(signal_tv.selection())['values'][1] # Определить словарь для сопоставления числовых значений с их метками direction_labels = {'вертикальное': 1, 'горизонтальное': 2, 'осевое': 4} #Числовое значение для метки selected_signal_value = direction_labels.get(selected_direction) query = f"SELECT packeddata, upperfreq FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = {selected_signal_value} and ptno = {selected_ptno} and type = 2" cursor.execute(query) acs_data = cursor.fetchone() #массив с амплитудами signal = get_floats(acs_data[0]) #Размер сигнала size_signal = len(signal) #Верхняя частота сигнала upperfreq_signal = acs_data[1] #Шаг от точки до точки step = (time_signal * 1000)/size_signal # Создаем массив x-координат x_time = [i * step for i in range(size_signal)] #значение прореживания thinning_value = 4 size_max_value = math.ceil(size_signal/thinning_value) max_value = [None] * size_max_value j = 0 #Прореживание for i in range(0, size_signal, 4): max_value[j] = max(signal[i:i+thinning_value]) j = j+1 #Построение графика сигнала update_chart_signal(np.abs(signal), x_time) #Построение графика сигнала def update_chart_signal(signal, x_time, name_axis_Y, border_zones, displaying_zones): global ix, iy #Получить выбранную строку item = signal_tv.selection()[0] signal_name = signal_tv.item(item, 'text') if signal_name in plotted_lines: return # Если линия уже построена, ничего не делаем #Получить значения из столбцов выбранной строки values = signal_tv.item(item, 'values') #направление direction = values[0] #Опора ptno = values[1] if direction == 'вертикальное': color = 'r' elif direction == 'горизонтальное': color = 'g' else: color = 'b' #Границы зон border_zone_b, border_zone_c, border_zone_d, border_zone_e = border_zones #Отображение зон на графике (True - отображение) #and len(plotted_lines) == 0 if displaying_zones == True: #Границы зон #border_zone_b, border_zone_c, border_zone_d, border_zone_e = border_zones #Линии зон ax.axhline(y=border_zone_b, color=color, linestyle='--') # ось y ax.axhline(y=border_zone_c, color=color, linestyle='--') ax.axhline(y=border_zone_d, color=color, linestyle='--') ax.axhline(y=border_zone_e, color=color, linestyle='--') #Подписи зон ax.text(max(x_time), border_zone_b, 'Зона B', color=color, fontsize=15, ha='left', va='top') ax.text(max(x_time), border_zone_c, 'Зона C', color=color, fontsize=15, ha='left', va='top') ax.text(max(x_time), border_zone_d, 'Зона D', color=color, fontsize=15, ha='left', va='top') ax.text(max(x_time), border_zone_e, 'Зона E', color=color, fontsize=15, ha='left', va='top') # Установка границ для оси x ax.set_xlim(0, max(x_time)) #Включить сетку plt.grid(True) #Название осей plt.xlabel('Время, мс') plt.ylabel(name_axis_Y) # ============================================================================= # if ptno == 1: # linestyle = '-' # elif ptno == 2: # linestyle = '--' # elif ptno == 3: # linestyle = '-.' # else: # linestyle = '-.' # ============================================================================= #сплошная линия для построения linestyle = '-' line, = ax.plot(x_time, signal, linestyle=linestyle, color=color, label=f"{direction}({ptno})") plotted_lines[signal_name] = line ax.legend() chart_canvas.draw() # Добавляем всплывающие подсказки (координаты) с использованием библиотеки mplcursors #mpl.cursor(hover=True) mpl.cursor(hover=mpl.HoverMode.Transient) #события мыши для перемещения по графику fig.canvas.mpl_connect('button_press_event', on_click) fig.canvas.mpl_connect('button_release_event', on_release) # Добавляем обработчик события двойного нажатия левой кнопки мыши fig.canvas.mpl_connect('button_press_event', lambda event: on_scale_original_signal(signal, x_time, border_zone_e, event)) #Масштабируем график с сигналом в исходное состояние при двойном нажатии рядом с графиком def on_scale_original_signal(signal, x_time, border_zone_e, event): # Проверяем, что это двойное нажатие левой кнопки мыши if event.dblclick and event.button == 1: value_max_sig = max(signal) #отклонение deviation = 1 # Сбрасываем границы графика к исходному состоянию ax.set_xlim(0, max(x_time)) ax.set_ylim(0, value_max_sig+deviation if value_max_sig > border_zone_e else border_zone_e+deviation) # Перерисовываем график chart_canvas.draw() #Старое построение # ============================================================================= # # Построение графика seaborn # plt.figure(figsize=(8,6)) # # Очищаем предыдущий график # clear_chart(plt) # # #Масштабирование # with plt.ioff(): # figure, axis = plt.subplots() # disconnect_zoom = zoom_factory(axis) # # Включите прокрутку и панорамирование с помощью MPL # # Библиотека взаимодействий функционирует подобно panhandler. # pan_handler = panhandler(figure) # display(figure.canvas) # # #Построение графика # plt.plot(x_time, signal) # #Включить сетку # plt.grid(True) # # #Название осей # plt.xlabel('Время, мс') # plt.ylabel('Y') # # # Добавляем всплывающие подсказки (координаты) с использованием библиотеки mplcursors # mpl.cursor(hover=True) # #Область, в которой будет размещаться график # chart_canvas = FigureCanvasTkAgg(plt.gcf(), master=charts_frame) # canvas_widget = chart_canvas.get_tk_widget() # #Задать положение # canvas_widget.pack(fill=BOTH, expand = True) # ============================================================================= #увеличение начинается в той точке а которой находится курсор def zoom_factory(ax, base_scale=2.): def zoom_fun(event): # get the current x and y limits cur_xlim = ax.get_xlim() cur_ylim = ax.get_ylim() xdata = event.xdata # get event x location ydata = event.ydata # get event y location if event.button == 'up': # deal with zoom in scale_factor = 1 / base_scale elif event.button == 'down': # deal with zoom out scale_factor = base_scale else: # deal with something that should never happen scale_factor = 1 new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor # Calculate new limits based on zoom location if xdata is not None and ydata is not None: ax.set_xlim([xdata - new_width / 2, xdata + new_width / 2]) ax.set_ylim([ydata - new_height / 2, ydata + new_height / 2]) ax.figure.canvas.draw() else: ax.set_xlim([cur_xlim[0] * scale_factor, cur_xlim[1] * scale_factor]) ax.set_ylim([cur_ylim[0] * scale_factor, cur_ylim[1] * scale_factor]) ax.figure.canvas.draw() fig = ax.get_figure() # get the figure of interest fig.canvas.mpl_connect('scroll_event', zoom_fun) # return the function return zoom_fun #добавить массив амплитуд def add_arrays_amplitudes(cursor, selected_date): global combined_signal_arr, name_points, arrays, arrays_amplitudes, sizes_signals, ptnos_signals, upperfreqs_signals #global arrays_amplitudes #направления измерения (4 - осевое(1), 1 - вертикальное(1), 2 - горизонтальное(1), 1 - вертикальное(2)) directions = [4, 1, 2] #название точек name_points = [] #Размеры сигналов sizes_signals = [] #Опоры сигналов ptnos_signals = [] #Верхние частоты сигналов upperfreqs_signals = [] #массив амплитуд arrays = {f'arraysign{i+1}': None for i in range(NUM_SENSORS)} #массив амплитуд из БД arrays_amplitudes = {f'arraysign{i+1}': None for i in range(NUM_SENSORS)} query = f"SELECT ptno FROM {TABLEACS} WHERE mdate = '{selected_date}' and type = 2 and direction != 16" cursor.execute(query) #Список кортежей ptno_tuple_list = cursor.fetchall() # Преобразование списка кортежей в список значений ptno_list = list(map(lambda x: x[0], ptno_tuple_list)) ptno_list.sort() #Список опор props_list = ptno_list if len(props_list) == 0: return #удаления дубликатов из списка props_list = list(set(props_list)) #массивы с сигналами arrays_to_concatenate = [] #счётчик для массива counter_arr = 0 #массивы амплитуд #arrays_amplitudes = [] #с первой опоры (3 сигнала) for k in range(len(props_list)): #Идентификатор добавлен для Falcon(Если на одной опоре несколько одинаковых направлений, то в таблице с расчётами отображалось только одно) for d in range(len(directions)): query = f"SELECT acsid FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = {directions[d]} and ptno = {props_list[k]} and type = 2" cursor.execute(query) acsid_tuple_list = cursor.fetchall() # Преобразование списка кортежей в список значений acsid_list = list(map(lambda x: x[0], acsid_tuple_list)) acsid_list.sort() #Список id acsid_list = acsid_list if len(acsid_list) == 0: break if acsid_list != None: for a in range(len(acsid_list)): #for i in range(len(directions)): #Загрузка сигналов query = f"SELECT packeddata, ptno, size, upperfreq FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = {directions[d]} and ptno = {props_list[k]} and acsid = {acsid_list[a]} and type = 2" cursor.execute(query) row = cursor.fetchone() if row != None: #массив с амплитудами float_values = get_floats(row[0]) ptno_signal = row[1] size_signal = row[2] upperfreq_signal = row[3] arrays_amplitudes[f'arraysign{counter_arr}'] = np.array(float_values) sizes_signals.append(size_signal) ptnos_signals.append(ptno_signal) upperfreqs_signals.append(upperfreq_signal) #Добавить массив сигнала arrays_to_concatenate.append(arrays_amplitudes[f'arraysign{counter_arr}']) if directions[d] == 4: name_points.append(f"О({row[1]})") if directions[d] == 1: name_points.append(f"В({row[1]})") if directions[d] == 2: name_points.append(f"Г({row[1]})") counter_arr += 1 # ============================================================================= # for i in range(len(directions)): # #Загрузка сигналов # query = f"SELECT packeddata FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = {directions[i]} and ptno in (1, 4) and type = 2" # cursor.execute(query) # row = cursor.fetchone() # #массив с амплитудами # float_values = get_floats(row[0]) # arrays_amplitudes[f'arraysign{i}'] = np.array(float_values) # #Добавить массив сигнала # arrays_to_concatenate.append(arrays_amplitudes[f'arraysign{i}']) # # # #со 2,3,4 опоры 1 сигнал (вертикальный) # #Загрузка сигналов # query = f"SELECT packeddata FROM {TABLEACS} WHERE mdate = '{selected_date}' and direction = 1 and ptno in (2,3,4) and type = 2" # cursor.execute(query) # row = cursor.fetchone() # if row != None: # #массив с амплитудами # float_values = get_floats(row[0]) # arrays_amplitudes[f'arraysign{3}'] = np.array(float_values) # #Добавить массив сигнала # arrays_to_concatenate.append(arrays_amplitudes[f'arraysign{3}']) # ============================================================================= #Объединить массивы с сигналами в один combined_signal_arr = np.concatenate(arrays_to_concatenate) #Приведение массива к абсолютным значениям combined_signal_arr = np.abs(combined_signal_arr) #Загрузка сигнала из БД load_signal_data_db() #Загрузка сигнала из БД def load_signal_data_db(): for i in range(NUM_SENSORS): arrays[f'arraysign{i}'] = arrays_amplitudes[f'arraysign{i}'] load_signals_entries[i].delete(0, tk.END) load_signals_entries[i].insert(tk.END, f"{i+1} датчик (БД)") #Распаковка массива def get_floats(bytes): return list(struct.unpack('f' * (len(bytes) // 4), bytes)) #Обновить график def update_chart(): global ix, iy # Очищаем предыдущий график ax.clear() #Построение графика plt.plot(x_coords,y_coords) plt.scatter(coords_defect.x[0], coords_defect.x[1], color='red') #Включить сетку plt.grid(True) #Названия точек # ============================================================================= # for i, name in enumerate(name_points): # plt.text(x_coords[i], y_coords[i], name, fontsize=12, ha='right') # ============================================================================= name_coords = [f'{i+1} датчик' for i in range(len(x_coords))] #Названия точек for i, name in enumerate(name_coords): plt.text(x_coords[i], y_coords[i], name, fontsize=12, ha='right') #Названиe точки X plt.text(coords_defect.x[0], coords_defect.x[1], 'Дефект', fontsize=12, ha='right') #Название осей plt.xlabel('X') plt.ylabel('Y') linestyle = '-' chart_canvas.draw() # Добавляем всплывающие подсказки (координаты) с использованием библиотеки mplcursors #mpl.cursor(hover=True) mpl.cursor(hover=mpl.HoverMode.Transient) #события мыши для перемещения по графику fig.canvas.mpl_connect('button_press_event', on_click) fig.canvas.mpl_connect('button_release_event', on_release) #Координаты датчиков и дефекта вместе x_defect_coords = x_coords x_defect_coords.append(coords_defect.x[0]) y_defect_coords = y_coords y_defect_coords.append(coords_defect.x[1]) # Добавляем обработчик события двойного нажатия левой кнопки мыши fig.canvas.mpl_connect('button_press_event', lambda event: on_scale_original_defect(x_defect_coords, y_defect_coords, event)) # Обработка событий мыши для перемещения по графику def on_click(event): global ix, iy ix, iy = event.xdata, event.ydata def on_release(event): global ix, iy new_ix, new_iy = event.xdata, event.ydata if ix is not None and iy is not None and new_ix is not None and new_iy is not None: dx = new_ix - ix dy = new_iy - iy ax.set_xlim(ax.get_xlim() - dx) ax.set_ylim(ax.get_ylim() - dy) plt.draw() #Масштабируем график местоположения дефекта в исходное состояние при двойном нажатии рядом с графиком def on_scale_original_defect(x_defect_coords, y_defect_coords, event): # Проверяем, что это двойное нажатие левой кнопки мыши if event.dblclick and event.button == 1: #Минимальное и максимальное значение координат x x_coords_mix = min(x_defect_coords) x_coords_max = max(x_defect_coords) #Минимальное и максимальное значение координат y y_coords_mix = min(y_defect_coords) y_coords_max = max(y_defect_coords) #отклонение deviation = 0.1 # Сбрасываем границы графика к исходному состоянию ax.set_xlim(x_coords_mix-deviation, x_coords_max+deviation) ax.set_ylim(y_coords_mix-deviation, y_coords_max+deviation) # Перерисовываем график chart_canvas.draw() #Старое построение # ============================================================================= # # Построение графика seaborn # plt.figure(figsize=(8,6)) # # Очищаем предыдущий график # clear_chart(plt) # # #Масштабирование # with plt.ioff(): # figure, axis = plt.subplots() # disconnect_zoom = zoom_factory(axis) # # Включите прокрутку и панорамирование с помощью MPL # # Библиотека взаимодействий функционирует подобно panhandler. # pan_handler = panhandler(figure) # display(figure.canvas) # # #Построение графика # plt.plot(x_coords,y_coords) # plt.scatter(coords_defect.x[0], coords_defect.x[1], color='red') # #Включить сетку # plt.grid(True) # # #Названия точек # for i, name in enumerate(name_points): # plt.text(x_coords[i], y_coords[i], name, fontsize=12, ha='right') # # #Названиe точки X # plt.text(coords_defect.x[0], coords_defect.x[1], 'X', fontsize=12, ha='right') # # #Название осей # plt.xlabel('X') # plt.ylabel('Y') # # # Добавляем всплывающие подсказки (координаты) с использованием библиотеки mplcursors # mpl.cursor(hover=True) # #Область, в которой будет размещаться график # chart_canvas = FigureCanvasTkAgg(plt.gcf(), master=charts_frame) # canvas_widget = chart_canvas.get_tk_widget() # #Задать положение # canvas_widget.pack(fill=BOTH, expand = True) # ============================================================================= #проверка на ввод def validate_input(char): if char.isdigit() or char == '.': return True else: # Запретить ввод, если символ не является цифрой return False #Очистить график def clear_chart(plt): # Очищаем предыдущий график for widget in charts_frame.winfo_children(): widget.destroy() plt.clf() #Настройки class SettingsWindow: def __init__(self, parent): self.parent = parent self.parent.title("Настройки") # Загрузка параметров из файла при создании окна self.params = load_settings() #Панель настроек myframe=Frame(parent,relief=SOLID, width=50, height=90, bd=1) #Задать положение контейнера myframe.grid(row=0, column=0, sticky="nw", padx=5, pady=5) self.settings_cvs=Canvas(myframe) settings_frame = Frame(self.settings_cvs) #скроллбар settings_sb_ver=Scrollbar(myframe,orient="vertical",command=self.settings_cvs.yview) self.settings_cvs.configure(yscrollcommand=settings_sb_ver.set) settings_sb_ver.pack(side="right",fill="y") self.settings_cvs.pack(side="left") self.settings_cvs.create_window((0,0),window=settings_frame,anchor='nw') #функция on_update_scroll_settings обновляет параметры прокрутки холста при изменении размеров рамки settings_frame.bind("", self.on_update_scroll) #Действия с настройками activity_frame = ttk.Frame(parent, borderwidth=1, padding=[1,1]) activity_frame.grid(row=2, column=0, sticky=SE, padx=5, pady=2) choosing_dbms_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) choosing_dbms_frame.pack(fill=X) Label(choosing_dbms_frame, text="Выбор СУБД", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) self.mssql_name = "Microsoft SQL" self.postgresql_name = "PostgreSQL" # по умолчанию будет выбран элемент с value=mssql_name self.dbms_var = StringVar(value=self.mssql_name if self.params["dbms"].get() == str() else self.params["dbms"].get()) self.mssql_rbtn = ttk.Radiobutton(settings_frame, text=self.mssql_name, value=self.mssql_name, variable=self.dbms_var) self.mssql_rbtn.pack(fill=X) self.postgresql_rbtn = ttk.Radiobutton(settings_frame, text=self.postgresql_name, value=self.postgresql_name, variable=self.dbms_var) self.postgresql_rbtn.pack(fill=X) # Привязка функции к Radiobutton #self.dbms_var.trace("w", lambda *args: self.on_select_dbms()) connection_mssql_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) connection_mssql_frame.pack(fill=X) Label(connection_mssql_frame, text="Подключение к MS SQL", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) # Создание элементов управления для параметров Label(settings_frame, text="Название сервера").pack(anchor=NW) self.mssql_server_name_var = StringVar(value='.\SQLEXPRESS' if self.params["mssql_server_name"].get() == str() else self.params["mssql_server_name"].get()) self.mssql_server_name_ent = ttk.Entry(settings_frame, textvariable=self.mssql_server_name_var, width=50) self.mssql_server_name_ent.pack(fill=X) # Создание элементов управления для параметров Label(settings_frame, text="Название базы данных").pack(anchor=NW) self.data_base_mssql_var = StringVar(value='Sadko' if self.params["data_base_mssql"].get() == str() else self.params["data_base_mssql"].get()) self.data_base_mssql_ent = ttk.Entry(settings_frame, textvariable=self.data_base_mssql_var) self.data_base_mssql_ent.pack(fill=X) # Создание элементов управления для параметров Label(settings_frame, text="Имя пользователя").pack(anchor=NW) self.user_mssql_var = StringVar(value='sadko' if self.params["user_mssql"].get() == str() else self.params["user_mssql"].get()) self.user_mssql_ent = ttk.Entry(settings_frame, textvariable=self.user_mssql_var) self.user_mssql_ent.pack(fill=X) # Создание элементов управления для параметров Label(settings_frame, text="Пароль").pack(anchor=NW) self.password_mssql_var = StringVar(value='sadko' if self.params["password_mssql"].get() == str() else self.params["password_mssql"].get()) self.password_mssql_ent = ttk.Entry(settings_frame, textvariable=self.password_mssql_var) self.password_mssql_ent.pack(fill=X) self.trusted_connect = ['yes', 'no'] trusted_connect_lb = Label(settings_frame, text='Доверенное соединение').pack(anchor=NW) # по умолчанию будет выбран первый элемент из trusted_connect self.trusted_connect_var = StringVar(value=self.trusted_connect[0] if self.params["trusted_connect"].get() == str() else self.params["trusted_connect"].get()) #Combobox self.trusted_connect_cb = ttk.Combobox(settings_frame, textvariable=self.trusted_connect_var, values=self.trusted_connect, state="readonly") self.trusted_connect_cb.pack(fill=X, pady=[0,15]) connection_postgresql_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) connection_postgresql_frame.pack(fill=X) Label(connection_postgresql_frame, text="Подключение к PostgreSQL", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) # Создание элементов управления для параметров Label(settings_frame, text="Название сервера").pack(anchor=NW) self.postgresql_server_name_var = StringVar(value='localhost' if self.params["postgresql_server_name"].get() == str() else self.params["postgresql_server_name"].get()) self.postgresql_server_name_ent = ttk.Entry(settings_frame, textvariable=self.postgresql_server_name_var, width=50) self.postgresql_server_name_ent.pack(fill=X) # Создание элементов управления для параметров Label(settings_frame, text="Название базы данных").pack(anchor=NW) self.data_base_postgresql_var = StringVar(value='postgres' if self.params["data_base_postgresql"].get() == str() else self.params["data_base_postgresql"].get()) self.data_base_postgresql_ent = ttk.Entry(settings_frame, textvariable=self.data_base_postgresql_var) self.data_base_postgresql_ent.pack(fill=X) # Создание элементов управления для параметров Label(settings_frame, text="Имя пользователя").pack(anchor=NW) self.user_postgresql_var = StringVar(value='sadko' if self.params["user_postgresql"].get() == str() else self.params["user_postgresql"].get()) self.user_postgresql_ent = ttk.Entry(settings_frame, textvariable=self.user_postgresql_var) self.user_postgresql_ent.pack(fill=X) # Создание элементов управления для параметров Label(settings_frame, text="Пароль").pack(anchor=NW) self.password_postgresql_var = StringVar(value='sadko' if self.params["password_postgresql"].get() == str() else self.params["password_postgresql"].get()) self.password_postgresql_ent = ttk.Entry(settings_frame, textvariable=self.password_postgresql_var) self.password_postgresql_ent.pack(fill=X, pady=[0,15]) limit_values_zones_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) limit_values_zones_frame.pack(fill=X) Label(limit_values_zones_frame, text="Предельные значения для зон", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) self.on_off_zone = ['Включить', 'Отключить'] on_off_zone_lb = Label(settings_frame, text='Отображение зон на графике').pack(anchor=NW) # по умолчанию будет выбран первый элемент из on_off_zone self.on_off_zone_var = StringVar(value=self.on_off_zone[1] if self.params["on_off_zone"].get() == str() else self.params["on_off_zone"].get()) #Combobox self.on_off_zone_cb = ttk.Combobox(settings_frame, textvariable=self.on_off_zone_var, values=self.on_off_zone, state="readonly") self.on_off_zone_cb.pack(fill=X) Label(settings_frame, text="Зона B (n*СКО)").pack(anchor=NW) self.zone_b_var = StringVar(value='3' if self.params["zone_b"].get() == str() else self.params["zone_b"].get()) self.zone_b_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.zone_b_var) self.zone_b_ent.pack(fill=X) Label(settings_frame, text="Зона C (n*СКО)").pack(anchor=NW) self.zone_c_var = StringVar(value='5' if self.params["zone_c"].get() == str() else self.params["zone_c"].get()) self.zone_c_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.zone_c_var) self.zone_c_ent.pack(fill=X) Label(settings_frame, text="Зона D (n*СКО)").pack(anchor=NW) self.zone_d_var = StringVar(value='20' if self.params["zone_d"].get() == str() else self.params["zone_d"].get()) self.zone_d_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.zone_d_var) self.zone_d_ent.pack(fill=X) Label(settings_frame, text="Зона E (n*СКО)").pack(anchor=NW) self.zone_e_var = StringVar(value='40' if self.params["zone_e"].get() == str() else self.params["zone_e"].get()) self.zone_e_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.zone_e_var) self.zone_e_ent.pack(fill=X, pady=[0,15]) coef_values_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) coef_values_frame.pack(fill=X) Label(coef_values_frame, text="Значения коэффициентов", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) #Список значений self.type_bearings = self.read_bearings_from_csv() if self.read_bearings_from_csv() != None else ['305'] type_bearing_lb = Label(settings_frame, text='Тип подшипника').pack(anchor=NW) # по умолчанию будет выбран первый элемент из type_bearing self.type_bearing_var = StringVar(value=self.type_bearings[0] if self.params["type_bearing"].get() == str() else self.params["type_bearing"].get()) #Combobox self.type_bearing_cb = ttk.Combobox(settings_frame, textvariable=self.type_bearing_var, values=self.type_bearings) self.type_bearing_cb.pack(fill=X) #прикрепить функцию обработки к событию <> с помощью метода bind self.type_bearing_cb.bind("<>", self.on_change_values_coeffs) #Данные коэффициентов по выбранному подшипнику coef_outer_clip, coef_inner_clip, coef_rolls, coef_separator = self.on_change_values_coeffs(None) if self.read_bearings_from_csv() != None else (None, None, None, None) Label(settings_frame, text="Коэффициент наружного кольца").pack(anchor=NW) self.coef_outer_clip_var = StringVar(value='2.57' if coef_outer_clip == None else coef_outer_clip) self.coef_outer_clip_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.coef_outer_clip_var) self.coef_outer_clip_ent.pack(fill=X) Label(settings_frame, text="Коэффициент внутреннего кольца").pack(anchor=NW) self.coef_inner_clip_var = StringVar(value='4.42' if coef_inner_clip == None else coef_inner_clip) self.coef_inner_clip_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.coef_inner_clip_var) self.coef_inner_clip_ent.pack(fill=X) Label(settings_frame, text="Коэффициент тел качения").pack(anchor=NW) self.coef_rolls_var = StringVar(value='3.51' if coef_rolls == None else coef_rolls) self.coef_rolls_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.coef_rolls_var) self.coef_rolls_ent.pack(fill=X) Label(settings_frame, text="Коэффициент сепаратора").pack(anchor=NW) self.coef_separator_var = StringVar(value='3' if coef_separator == None else coef_separator) self.coef_separator_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.coef_separator_var) self.coef_separator_ent.pack(fill=X) self.add_bearing_btn = ttk.Button(settings_frame,text="Добавить/изменить подшипник", command=self.add_or_change_bearing) self.add_bearing_btn.pack(fill=X, pady=5) self.delete_bearing_btn = ttk.Button(settings_frame,text="Удалить подшипник", command=self.delete_bearing) self.delete_bearing_btn.pack(fill=X, pady=[0,15]) statistical_values_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) statistical_values_frame.pack(fill=X) Label(statistical_values_frame, text="Статистические данные", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) Label(settings_frame, text="Куртосис * n").pack(anchor=NW) self.limit_kurtosis_var = StringVar(value='2' if self.params["limit_kurtosis"].get() == str() else self.params["limit_kurtosis"].get()) self.limit_kurtosis_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.limit_kurtosis_var) self.limit_kurtosis_ent.pack(fill=X) Label(settings_frame, text="Дисперсия * n").pack(anchor=NW) self.limit_variance_var = StringVar(value='2' if self.params["limit_variance"].get() == str() else self.params["limit_variance"].get()) self.limit_variance_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.limit_variance_var) self.limit_variance_ent.pack(fill=X) Label(settings_frame, text="Лимит пиков за 1 оборот для зоны С").pack(anchor=NW) self.limit_peaks_rev_in_c_zone_var = StringVar(value='5' if self.params["limit_peaks_rev_in_c_zone"].get() == str() else self.params["limit_peaks_rev_in_c_zone"].get()) self.limit_peaks_rev_in_c_zone_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.limit_peaks_rev_in_c_zone_var) self.limit_peaks_rev_in_c_zone_ent.pack(fill=X) self.bearing_condition = ['Известно', 'Неизвестно'] bearing_condition_lb = Label(settings_frame, text='Начальное состояние подшипника').pack(anchor=NW) # по умолчанию будет выбран первый элемент из bearing_condition self.bearing_condition_var = StringVar(value=self.bearing_condition[0] if self.params["bearing_condition"].get() == str() else self.params["bearing_condition"].get()) #Combobox self.bearing_condition_cb = ttk.Combobox(settings_frame, textvariable=self.bearing_condition_var, values=self.bearing_condition, state="readonly") self.bearing_condition_cb.pack(fill=X, pady=[0,15]) working_with_signal_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) working_with_signal_frame.pack(fill=X) Label(working_with_signal_frame, text="Работа с сигналом", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) Label(settings_frame, text="Частота вращения (Гц) (если нет таходатчика)").pack(anchor=NW) self.rotation_speed_var = StringVar(value='50' if self.params["rotation_speed"].get() == str() else self.params["rotation_speed"].get()) self.rotation_speed_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.rotation_speed_var) self.rotation_speed_ent.pack(fill=X, pady=[0,15]) on_off_filtering_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) on_off_filtering_frame.pack(fill=X) Label(on_off_filtering_frame, text="Включить/Выключить фильтрацию сигнала", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) self.on_filtering = "Включить" self.off_filtering = "Выключить" # по умолчанию будет выбран элемент с value=on_filtering self.on_off_filtering_var = StringVar(value=self.off_filtering if self.params["on_off_filtering"].get() == str() else self.params["on_off_filtering"].get()) self.on_filtering_rbtn = ttk.Radiobutton(settings_frame, text=self.on_filtering, value=self.on_filtering, variable=self.on_off_filtering_var) self.on_filtering_rbtn.pack(fill=X) self.off_filtering_rbtn = ttk.Radiobutton(settings_frame, text=self.off_filtering, value=self.off_filtering, variable=self.on_off_filtering_var) self.off_filtering_rbtn.pack(fill=X, pady=[0,15]) on_off_location_defect_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) on_off_location_defect_frame.pack(fill=X) Label(on_off_location_defect_frame, text="Включить/Выключить местоположение дефекта", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) self.on_location_defect = "Включить" self.off_location_defect = "Выключить" # по умолчанию будет выбран элемент с value=on_location_defect self.on_off_location_defect_var = StringVar(value=self.off_location_defect if self.params["on_off_location_defect"].get() == str() else self.params["on_off_location_defect"].get()) self.on_location_defect_rbtn = ttk.Radiobutton(settings_frame, text=self.on_location_defect, value=self.on_location_defect, variable=self.on_off_location_defect_var) self.on_location_defect_rbtn.pack(fill=X) self.off_location_defect_rbtn = ttk.Radiobutton(settings_frame, text=self.off_location_defect, value=self.off_location_defect, variable=self.on_off_location_defect_var) self.off_location_defect_rbtn.pack(fill=X) signal_filtering_frame = ttk.Frame(settings_frame, borderwidth=1, relief=RIDGE, padding=[1,1]) signal_filtering_frame.pack(fill=X) Label(signal_filtering_frame, text="Фильтрация сигнала", font=("Arial", 10, "bold")).pack(anchor=N, pady=2) self.type_filtration = ['Баттерворта', 'Бесселя'] type_filtration_lb = Label(settings_frame, text='Тип фильтрации').pack(anchor=NW) # по умолчанию будет выбран первый элемент из type_filtration self.type_filtration_var = StringVar(value=self.type_filtration[0] if self.params["type_filtration"].get() == str() else self.params["type_filtration"].get()) #Combobox self.type_filtration_cb = ttk.Combobox(settings_frame, textvariable=self.type_filtration_var, values=self.type_filtration, state="readonly") self.type_filtration_cb.pack(fill=X) self.filtering_mode = ['Нижних частот', 'Верхних частот', 'Полосовой', 'Режекторный'] filtering_mode_lb = Label(settings_frame, text='Режим фильтрации').pack(anchor=NW) # по умолчанию будет выбран первый элемент из filtering_mode self.filtering_mode_var = StringVar(value=self.filtering_mode[0] if self.params["filtering_mode"].get() == str() else self.params["filtering_mode"].get()) #Combobox self.filtering_mode_cb = ttk.Combobox(settings_frame, textvariable=self.filtering_mode_var, values=self.filtering_mode, state="readonly") self.filtering_mode_cb.pack(fill=X) #прикрепить функцию обработки к событию <> с помощью метода bind self.filtering_mode_cb.bind("<>", self.on_control_filter_mode) Label(settings_frame, text="Порядок фильтра").pack(anchor=NW) self.filter_order_var = StringVar(value='6' if self.params["filter_order"].get() == str() else self.params["filter_order"].get()) self.filter_order_ent = ttk.Spinbox(settings_frame, from_=1, to=100, textvariable=self.filter_order_var) self.filter_order_ent.pack(fill=X) self.set_filter = ['Частоту центра и ширину', 'Частоты границ'] set_filter_lb = Label(settings_frame, text='Задать').pack(anchor=NW) # по умолчанию будет выбран первый элемент из set_filter self.set_filter_var = StringVar(value=self.set_filter[0] if self.params["set_filter"].get() == str() else self.params["set_filter"].get()) #Combobox self.set_filter_cb = ttk.Combobox(settings_frame, textvariable=self.set_filter_var, values=self.set_filter, state="readonly" if self.filtering_mode_var.get() == ModesFilter.bandpass.value or self.filtering_mode_var.get() == ModesFilter.cutting.value else 'disabled') self.set_filter_cb.pack(fill=X) #прикрепить функцию обработки к событию <> с помощью метода bind self.set_filter_cb.bind("<>", self.on_control_filter_set) self.freq_lower_var = StringVar(value="Нижн. частота среза" if (self.set_filter_var.get() == SetsFilter.boundary_freq.value and self.set_filter_cb['state'] == "readonly") or (self.filtering_mode_var.get() == ModesFilter.low_freq.value or self.filtering_mode_var.get() == ModesFilter.high_freq.value) else "Частота центра полосы") self.upper_freq_var = StringVar(value="Верх. частота среза" if (self.set_filter_var.get() == SetsFilter.boundary_freq.value and self.set_filter_cb['state'] == "readonly") or (self.filtering_mode_var.get() == ModesFilter.low_freq.value or self.filtering_mode_var.get() == ModesFilter.high_freq.value) else "Относительная ширина") self.freq_lower_and_center_bands_lb = Label(settings_frame, textvariable=self.freq_lower_var).pack(anchor=NW) self.freq_lower_and_center_bands_var = StringVar(value='500' if self.params["freq_lower_and_center_bands"].get() == str() else self.params["freq_lower_and_center_bands"].get()) self.freq_lower_and_center_bands_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.freq_lower_and_center_bands_var, state='enabled' if self.filtering_mode_var.get() == ModesFilter.high_freq.value or self.filtering_mode_var.get() == ModesFilter.bandpass.value or self.filtering_mode_var.get() == ModesFilter.cutting.value else 'disabled') self.freq_lower_and_center_bands_ent.pack(fill=X) self.upper_freq_and_relative_width_lb = Label(settings_frame, textvariable=self.upper_freq_var).pack(anchor=NW) self.upper_freq_and_relative_width_var = StringVar(value='1000' if self.params["upper_freq_and_relative_width"].get() == str() else self.params["upper_freq_and_relative_width"].get()) self.upper_freq_and_relative_width_ent = ttk.Spinbox(settings_frame, from_=1, to=1000000, textvariable=self.upper_freq_and_relative_width_var, state='enabled' if self.filtering_mode_var.get() == ModesFilter.low_freq.value or self.filtering_mode_var.get() == ModesFilter.bandpass.value or self.filtering_mode_var.get() == ModesFilter.cutting.value else 'disabled') self.upper_freq_and_relative_width_ent.pack(fill=X, pady=5) # Кнопки ttk.Button(activity_frame, text="Применить", command=self.apply_settings).grid(row=0, column=0, padx=5) ttk.Button(activity_frame, text="Отмена", command=self.cancel).grid(row=0, column=1) #Применить настройки def apply_settings(self): #Если файла с подшипниками нет if not Path(FILE_WITH_BEARINGS).is_file(): messagebox.showwarning("Информация", f"Добавьте подшипник для сохранения настроек!") return #Данные подшипников data_bearings_df = pd.read_csv(FILE_WITH_BEARINGS) #Данные подшипника data_bearing = data_bearings_df[(data_bearings_df['bearings'].astype(str) == self.type_bearing_var.get())] if len(data_bearing) == 0: messagebox.showwarning("Информация", f"Добавьте подшипник {self.type_bearing_var.get()} перед сохранением настроек!") return self.params["dbms"].set(self.dbms_var.get()) # Получение значений из Entry и установка их в StringVar if self.mssql_server_name_ent.get() != str() and self.postgresql_server_name_ent.get() != str(): self.params["mssql_server_name"].set(self.mssql_server_name_ent.get()) self.params["postgresql_server_name"].set(self.postgresql_server_name_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Название сервера'!") return if self.data_base_mssql_ent.get() != str() and self.data_base_postgresql_ent.get(): self.params["data_base_mssql"].set(self.data_base_mssql_ent.get()) self.params["data_base_postgresql"].set(self.data_base_postgresql_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Название базы данных'!") return if self.user_mssql_ent.get() != str() and self.user_postgresql_ent.get() != str(): self.params["user_mssql"].set(self.user_mssql_ent.get()) self.params["user_postgresql"].set(self.user_postgresql_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Имя пользователя'!") return if self.password_mssql_ent.get() != str() and self.password_postgresql_ent.get() != str(): self.params["password_mssql"].set(self.password_mssql_ent.get()) self.params["password_postgresql"].set(self.password_postgresql_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Имя пользователя'!") return if self.zone_b_ent.get() != str(): self.params["zone_b"].set(self.zone_b_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Зона B'!") return if self.zone_c_ent.get() != str(): self.params["zone_c"].set(self.zone_c_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Зона C'!") return if self.zone_d_ent.get() != str(): self.params["zone_d"].set(self.zone_d_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Зона D'!") return if self.zone_e_ent.get() != str(): self.params["zone_e"].set(self.zone_e_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Зона E'!") return if self.type_bearing_cb.get() != str(): self.params["type_bearing"].set(self.type_bearing_cb.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Тип подшипника'!") return if self.coef_outer_clip_ent.get() == str(): messagebox.showerror("Предупреждение", "Заполните поле 'Коэффициент наружного кольца'!") return if self.coef_inner_clip_ent.get() == str(): messagebox.showerror("Предупреждение", "Заполните поле 'Коэффициент внутреннего кольца'!") return if self.coef_rolls_ent.get() == str(): messagebox.showerror("Предупреждение", "Заполните поле 'Коэффициент тел качения'!") return if self.coef_separator_ent.get() == str(): messagebox.showerror("Предупреждение", "Заполните поле 'Коэффициент сепаратора'!") return if self.limit_kurtosis_ent.get() != str(): self.params["limit_kurtosis"].set(self.limit_kurtosis_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Куртосис * n'!") return if self.limit_variance_ent.get() != str(): self.params["limit_variance"].set(self.limit_variance_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Дисперсия * n'!") return if self.limit_peaks_rev_in_c_zone_ent.get() != str(): self.params["limit_peaks_rev_in_c_zone"].set(self.limit_peaks_rev_in_c_zone_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Лимит пиков за 1 оборот для зоны С'!") return if self.rotation_speed_ent.get() != str(): self.params["rotation_speed"].set(self.rotation_speed_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Частота вращения (Гц)'!") return if self.filter_order_ent.get() != str(): self.params["filter_order"].set(self.filter_order_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Порядок фильтра'!") return if self.freq_lower_and_center_bands_ent.get() != str(): self.params["freq_lower_and_center_bands"].set(self.freq_lower_and_center_bands_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Нижн. частота среза' или 'Частота центра полосы'!") return if self.upper_freq_and_relative_width_ent.get() != str(): self.params["upper_freq_and_relative_width"].set(self.upper_freq_and_relative_width_ent.get()) else: messagebox.showerror("Предупреждение", "Заполните поле 'Верх. частота среза' или 'Относительная ширина''!") return self.params["on_off_filtering"].set(self.on_off_filtering_var.get()) self.params["on_off_location_defect"].set(self.on_off_location_defect_var.get()) self.params["bearing_condition"].set(self.bearing_condition_cb.get()) self.params["trusted_connect"].set(self.trusted_connect_cb.get()) self.params["type_filtration"].set(self.type_filtration_cb.get()) self.params["filtering_mode"].set(self.filtering_mode_cb.get()) self.params["set_filter"].set(self.set_filter_cb.get()) self.params["on_off_zone"].set(self.on_off_zone_cb.get()) # Сохранение параметров в XML файл root = ET.Element("Settings") for param_name, param_value in self.params.items(): ET.SubElement(root, param_name).text = str(param_value.get()) tree = ET.ElementTree(root) tree.write("settings.xml") error = connect_to_db_mssql() if self.dbms_var.get() == DbmsNames.mssql.value else connect_to_db_postgresql() if error is None: # Изменение команды кнопки update_bd_btn.configure(command=connect_to_db_mssql if self.dbms_var.get() == DbmsNames.mssql.value else connect_to_db_postgresql) messagebox.showinfo("Сохранено", "Настройки сохранены!") #Отмена def cancel(self): # Освобождаем захват при закрытии окна self.parent.grab_release() self.parent.destroy() #Обновляет параметры прокрутки холста при изменении размеров рамки def on_update_scroll(self, event): self.settings_cvs.configure(scrollregion=self.settings_cvs.bbox("all"),width=365,height=390) # ============================================================================= # #Выбрать СУБД # def on_select_dbms(self): # #если равно "Microsoft SQL" # if self.dbms_var.get() == self.mssql_name: # print(1) # else: # print(2) # ============================================================================= #Управление режимами фильтрации def on_control_filter_mode(self, event): #self.on_control_filter_set(event) match self.filtering_mode_var.get(): case ModesFilter.low_freq.value: self.freq_lower_and_center_bands_ent.configure(state='disabled') self.upper_freq_and_relative_width_ent.configure(state='enabled') self.set_filter_cb.configure(state='disabled') self.freq_lower_var.set('Нижн. частота среза') self.upper_freq_var.set('Верх. частота среза') case ModesFilter.high_freq.value: self.freq_lower_and_center_bands_ent.configure(state='enabled') self.upper_freq_and_relative_width_ent.configure(state='disabled') self.set_filter_cb.configure(state='disabled') self.freq_lower_var.set('Нижн. частота среза') self.upper_freq_var.set('Верх. частота среза') case ModesFilter.bandpass.value: self.freq_lower_and_center_bands_ent.configure(state='enabled') self.upper_freq_and_relative_width_ent.configure(state='enabled') self.set_filter_cb.configure(state='readonly') if self.set_filter_var.get() == SetsFilter.center_freq_width.value: self.freq_lower_var.set('Частота центра полосы') self.upper_freq_var.set('Относительная ширина') else: self.freq_lower_var.set('Нижн. частота среза') self.upper_freq_var.set('Верх. частота среза') case ModesFilter.cutting.value: self.freq_lower_and_center_bands_ent.configure(state='enabled') self.upper_freq_and_relative_width_ent.configure(state='enabled') self.set_filter_cb.configure(state='readonly') if self.set_filter_var.get() == SetsFilter.center_freq_width.value: self.freq_lower_var.set('Частота центра полосы') self.upper_freq_var.set('Относительная ширина') else: self.freq_lower_var.set('Нижн. частота среза') self.upper_freq_var.set('Верх. частота среза') #Управление набором фильтрации def on_control_filter_set(self, event): match self.set_filter_var.get(): case SetsFilter.center_freq_width.value: self.freq_lower_var.set('Частота центра полосы') self.upper_freq_var.set('Относительная ширина') case SetsFilter.boundary_freq.value: self.freq_lower_var.set('Нижн. частота среза') self.upper_freq_var.set('Верх. частота среза') #изменить значения коэффициентов для компонентов подшипника def on_change_values_coeffs(self, event): #Если файла с подшипниками нет if not Path(FILE_WITH_BEARINGS).is_file(): #messagebox.showwarning("Информация", f"Файл {FILE_WITH_BEARINGS} не найден!") return None #Данные подшипников data_bearings_df = pd.read_csv(FILE_WITH_BEARINGS) #Данные подшипника #data_bearing = data_bearings_df[(data_bearings_df['bearings'] == self.type_bearing_var.get())] data_bearing = data_bearings_df[(data_bearings_df['bearings'].astype(str) == self.type_bearing_var.get())] #Данные коэффициентов по выбранному подшипнику coef_outer_clip = data_bearing['coef_outer_clip'].values[0] coef_inner_clip = data_bearing['coef_inner_clip'].values[0] coef_rolls = data_bearing['coef_rolls'].values[0] coef_separator = data_bearing['coef_separator'].values[0] if event != None: #Обновить данные в полях для коэффициентов self.coef_outer_clip_var.set(coef_outer_clip) self.coef_inner_clip_var.set(coef_inner_clip) self.coef_rolls_var.set(coef_rolls) self.coef_separator_var.set(coef_separator) return (coef_outer_clip, coef_inner_clip, coef_rolls, coef_separator) #Добавить подшипник def add_or_change_bearing(self): # Данные подшипника data_bearing = [[self.type_bearing_var.get(), self.coef_outer_clip_var.get(), self.coef_inner_clip_var.get(), self.coef_rolls_var.get(), self.coef_separator_var.get()]] # Определяем файл для записи with open(FILE_WITH_BEARINGS, 'a', newline='') as file: writer = csv.writer(file) # Если файл не пуст, то добавляем заголовки if file.tell() == 0: writer.writerow(["bearings", "coef_outer_clip", "coef_inner_clip", "coef_rolls", "coef_separator"]) with open(FILE_WITH_BEARINGS, 'r') as csvfile: reader = csv.DictReader(csvfile) #Если есть этот подшипник, то просто изменить данные for row in reader: if row['bearings'] == self.type_bearing_cb.get(): confirm_update = messagebox.askyesno("Подтверждение", f"Вы действительно хотите изменить данные подшипника {self.type_bearing_var.get()}?") if confirm_update: # Данные подшипников data_bearings_df = pd.read_csv(FILE_WITH_BEARINGS) # Изменить значение в столбце "bearings" и других столбцах для строки, где bearings == self.type_bearing_var.get() data_bearings_df.loc[data_bearings_df['bearings'].astype(str) == self.type_bearing_var.get(), ['bearings', 'coef_outer_clip', 'coef_inner_clip', 'coef_rolls', 'coef_separator']] = [ self.type_bearing_var.get(), self.coef_outer_clip_var.get(), self.coef_inner_clip_var.get(), self.coef_rolls_var.get(), self.coef_separator_var.get() ] # Запись изменений в CSV-файл data_bearings_df.to_csv(FILE_WITH_BEARINGS, index=False) messagebox.showinfo("Информация", f"Данные подшипника {self.type_bearing_var.get()} изменены!") else: messagebox.showinfo("Отмена", "Изменение данных подшипника отменено!") return # Записываем данные writer.writerows(data_bearing) messagebox.showinfo("Информация", f"Подшипник {self.type_bearing_var.get()} добавлен!") # Обновляем список подшипников type_bearings = self.read_bearings_from_csv() self.type_bearing_cb['values'] = type_bearings self.type_bearing_cb.set(self.type_bearing_var.get()) self.apply_settings() #Удаление подшипника def delete_bearing(self): #Если файла с подшипниками нет if not Path(FILE_WITH_BEARINGS).is_file(): messagebox.showwarning("Информация", f"Файл {FILE_WITH_BEARINGS} не найден!") return #Данные подшипников data_bearings_df = pd.read_csv(FILE_WITH_BEARINGS) # Спросить пользователя, действительно ли он хочет удалить подшипник confirm_delete = messagebox.askyesno("Подтверждение", f"Вы действительно хотите удалить подшипник {self.type_bearing_var.get()}?") if confirm_delete: # Удалить строку, где bearings == self.type_bearing_var.get() data_bearings_df = data_bearings_df[data_bearings_df['bearings'].astype(str) != self.type_bearing_var.get()] # Запись изменений в CSV-файл data_bearings_df.to_csv(FILE_WITH_BEARINGS, index=False) messagebox.showinfo("Информация", f"Подшипник {self.type_bearing_var.get()} удалён!") # Обновляем список подшипников type_bearings = self.read_bearings_from_csv() self.type_bearing_cb['values'] = type_bearings self.type_bearing_cb.set(type_bearings[0]) self.apply_settings() else: messagebox.showinfo("Отмена", "Удаление подшипника отменено!") #Чтение названий подшипников из csv def read_bearings_from_csv(self): bearings = [] try: with open(FILE_WITH_BEARINGS, 'r') as file: reader = csv.DictReader(file) for row in reader: bearings.append(row['bearings']) except FileNotFoundError: #messagebox.showwarning("Информация", f"Файл {FILE_WITH_BEARINGS} не найден!") bearings = None return bearings return bearings #Получить значения коэффициентов для компонентов подшипника def get_values_coeffs_bearing(): #Если файла с подшипниками нет if not Path(FILE_WITH_BEARINGS).is_file(): return None #Загрузка параметров params = load_settings() #Данные подшипников data_bearings_df = pd.read_csv(FILE_WITH_BEARINGS) #Данные подшипника data_bearing = data_bearings_df[(data_bearings_df['bearings'].astype(str) == str(params["type_bearing"].get()))] #Данные коэффициентов по выбранному подшипнику coef_outer_clip = data_bearing['coef_outer_clip'].values[0] coef_inner_clip = data_bearing['coef_inner_clip'].values[0] coef_rolls = data_bearing['coef_rolls'].values[0] coef_separator = data_bearing['coef_separator'].values[0] return (coef_outer_clip, coef_inner_clip, coef_rolls, coef_separator) #Загрузка настроек в params def load_settings(): # Параметры params = { "dbms": StringVar(), "mssql_server_name": StringVar(), "data_base_mssql": StringVar(), "user_mssql": StringVar(), "password_mssql": StringVar(), "trusted_connect": StringVar(), "postgresql_server_name": StringVar(), "data_base_postgresql": StringVar(), "user_postgresql": StringVar(), "password_postgresql": StringVar(), "zone_b": StringVar(), "zone_c": StringVar(), "zone_d": StringVar(), "zone_e": StringVar(), "type_bearing": StringVar(), "limit_kurtosis": StringVar(), "limit_variance": StringVar(), "limit_peaks_rev_in_c_zone": StringVar(), "bearing_condition": StringVar(), "rotation_speed": StringVar(), "type_filtration": StringVar(), "filtering_mode": StringVar(), "filter_order": StringVar(), "set_filter": StringVar(), "freq_lower_and_center_bands": StringVar(), "upper_freq_and_relative_width": StringVar(), "on_off_filtering": StringVar(), "on_off_location_defect": StringVar(), "on_off_zone": StringVar() # Добавьте другие параметры здесь } try: tree = ET.parse("settings.xml") root = tree.getroot() for param in root: param_name = param.tag param_value = param.text if param_name in params: params[param_name].set(param_value) except FileNotFoundError: #messagebox.showwarning("Предупреждение", "Файл настроек не найден. Будут использованы значения по умолчанию!") print(FileNotFoundError) return params #Показать окно настроек def show_settings(): settings_window = Toplevel() app = SettingsWindow(settings_window) #Убрать сворачивание и разворачивание окна settings_window.attributes('-toolwindow', True) #Запрет на изменение размера окна settings_window.resizable(width=False, height=False) #Размещение окна в центре экрана w = settings_window.winfo_screenwidth() h = settings_window.winfo_screenheight() w = w // 2 # середина экрана h = h // 2 w = w - 200 # смещение от середины h = h - 250 settings_window.geometry(f'400x450+{w}+{h}') # Захватываем фокус settings_window.grab_set() settings_window.mainloop() #Применение фильтра для сигнала def apply_filter(sample_rate, data, filter_type, mode, order, f1, f2=None): nyquist = 0.5 * sample_rate f1_norm = f1 / nyquist if f2: f2_norm = f2 / nyquist if filter_type == TypesFilter.butterworth.value: if mode == ModesFilter.low_freq.value: b, a = butter(order, f1_norm, btype='low') elif mode == ModesFilter.high_freq.value: b, a = butter(order, f1_norm, btype='high') elif mode == ModesFilter.bandpass.value: b, a = butter(order, [f1_norm, f2_norm], btype='band') elif mode == ModesFilter.cutting.value: b, a = butter(order, [f1_norm, f2_norm], btype='bandstop') else: #raise ValueError("Указан недопустимый режим!") messagebox.showerror("Ошибка","Указан недопустимый режим фильтрации!") elif filter_type == TypesFilter.bessel.value: if mode == ModesFilter.low_freq.value: b, a = bessel(order, f1_norm, btype='low', norm='phase') elif mode == ModesFilter.high_freq.value: b, a = bessel(order, f1_norm, btype='high', norm='phase') elif mode == ModesFilter.bandpass.value: b, a = bessel(order, [f1_norm, f2_norm], btype='band', norm='phase') elif mode == ModesFilter.cutting.value: b, a = bessel(order, [f1_norm, f2_norm], btype='bandstop', norm='phase') else: #raise ValueError("Указан недопустимый режим!") messagebox.showerror("Ошибка","Указан недопустимый режим фильтрации!") else: #raise ValueError("Указан неверный тип фильтра!") messagebox.showerror("Ошибка","Указан неверный тип фильтра!") filtered_data = filtfilt(b, a, data) return filtered_data #Фильтр Баттерворта def butterworth_filter(sample_rate, data, mode, order, f1, f2=None): return apply_filter(sample_rate, data, TypesFilter.butterworth.value, mode, order, f1, f2) #Фильтр Бесселя def bessel_filter(sample_rate, data, mode, order, f1, f2=None): return apply_filter(sample_rate, data, TypesFilter.bessel.value, mode, order, f1, f2) #Фильтровать сигнал def get_filter_signal(sample_rate, signal): # Параметры настроек params = load_settings() mode = params["filtering_mode"].get() filter_type = params["type_filtration"].get() order = int(params["filter_order"].get()) f1 = float(params["freq_lower_and_center_bands"].get()) f2 = float(params["upper_freq_and_relative_width"].get()) if mode in [ModesFilter.bandpass.value, ModesFilter.cutting.value] else None #Фильтрованный сигнал filtered_signal = [] if filter_type == TypesFilter.butterworth.value: filtered_signal = butterworth_filter(sample_rate, signal, mode, order, f1, f2) elif filter_type == TypesFilter.bessel.value: filtered_signal = bessel_filter(sample_rate, signal, mode, order, f1, f2) return filtered_signal #Обновляет параметры прокрутки холста при изменении размеров рамки def on_update_scroll_settings(event): control_cvs.configure(scrollregion=control_cvs.bbox("all"),width=430,height=600) # Создание главного окна main_window = Tk() main_window.title("Дефект") #Размещение окна в центре экрана w = main_window.winfo_screenwidth() h = main_window.winfo_screenheight() w = w // 2 # середина экрана h = h // 2 w = w - 400 # смещение от середины h = h - 400 main_window.geometry(f'800x800+{w}+{h}') #Установка темы приложения style = ttkthemes.ThemedStyle(main_window) style.set_theme("breeze") #подписи точек name_points = ['О(1)','В(1)','Г(1)' ,'В(2)'] #массив амплитуд arrays = {f'arraysign{i+1}': None for i in range(NUM_SENSORS)} #массив амплитуд из БД arrays_amplitudes = {f'arraysign{i+1}': None for i in range(NUM_SENSORS)} # Словарь для отслеживания построенных линий на графике plotted_lines = {} # Создание меню фрейма menu_frame = ttk.Frame(main_window) menu_frame.grid(row=0, column=0, columnspan=2) #Каждый виджет, кроме окна, располагается в определенном родительском контейнере #Панель управления myframe=Frame(main_window,relief=SOLID, width=50, height=100, bd=1) #Задать положение контейнера myframe.grid(row=1, column=0, sticky="nw", padx=5, pady=5) control_cvs=Canvas(myframe) control_frame = Frame(control_cvs) #скроллбар для панели управления control_sb_ver=Scrollbar(myframe,orient="vertical",command=control_cvs.yview) control_cvs.configure(yscrollcommand=control_sb_ver.set) control_sb_ver.pack(side="right",fill="y") control_cvs.pack(side="left") control_cvs.create_window((0,0),window=control_frame,anchor='nw') #функция on_update_scroll_settings обновляет параметры прокрутки холста при изменении размеров рамки control_frame.bind("", on_update_scroll_settings) #Область панели управления #control_frame = ttk.Frame(main_window, borderwidth=1, padding=[1,1]) #Задать положение контейнера #control_frame.grid(row=1, column=0, rowspan=2, sticky="nw", padx=5, pady=5) #Область панели управления для метода spm control_spm_frame = ttk.Frame(main_window, borderwidth=1, padding=[1,1]) #Задать положение контейнера control_spm_frame.grid(row=2, column=0, rowspan=2, sticky="nw", padx=5, pady=5) #Информация о датчиках inform_about_sensor_frame = ttk.Frame(control_frame, borderwidth=1, padding=[1,1]) #Загруженные сигналы (из файла или из БД) load_signals_frame = ttk.Frame(control_frame, borderwidth=1, padding=[1,1]) #Область для отображения графиков charts_frame = ttk.Frame(main_window, borderwidth=1, relief=SOLID, padding=[1,1]) #Задать положение контейнера charts_frame.grid(row=1, column=1, rowspan=2, sticky="nwse", padx=5, pady=5) #Label(charts_frame, text="Нет данных", font=("Arial", 10, "bold")).pack(anchor=S) #Панель для вывода результатов result_frame = ttk.Frame(main_window, borderwidth=1, relief=SOLID, padding=[1,1]) #Задать положение контейнера result_frame.grid(row=3, column=1, rowspan=2, sticky="nwse", padx=3, pady=3) # Растягиваем столбец 1 result_frame.grid_columnconfigure(1, weight=1) # Определение функций для загрузки таблиц menu_bar = Menu(menu_frame) file_menu = Menu(menu_bar, tearoff=0) menu_bar.add_cascade(label="Настройки", command = show_settings) # Привязываем меню к главному окну main_window.config(menu=menu_bar) #Создать виджеты для ввода данных create_widgets_for_input() #Создать виджеты вывода результатов create_widgets_output_results() # Параметры настроек params = load_settings() if params["dbms"].get() == DbmsNames.mssql.value or params["dbms"].get() == str(): #Соединение с базой данных (MS SQL) connect_to_db_mssql() else: #Соединение с базой данных (PostgreSQL) connect_to_db_postgresql() #Показать виджеты для ввода данных show_widgets_for_input() # Растягиваем столбец 1 main_window.grid_columnconfigure(1, weight=1) # Растягиваем строку 1 main_window.grid_rowconfigure(1, weight=1) # Запуск главного цикла событий main_window.mainloop()