#!/usr/bin/env python3 ¡¡¡ OK !!! # Program fft_spectrum_gui_3can_py3_01.py # - Corrected g_scale. # - g_scale = (Vref / ADC resolution) * (300 mv/g) # - Error noted and corrected by Steve Ferry. # 08/02/2018 # - Added a timeout control to a while loop. # 12/01/2017 # Program fft_spectrum_gui_3can.py modified: # - From Python 2.7 to Python 3.5. # - Works with AVR program adxl335_3can_01.c # 20/12/2016 # Program fft_spectrum_gui_3can.py # Program fft_spectrum_gui.py modified: # - 3 data channels (3 axes) # 15/11/2016 # Program fft_spectrum_gui.py # - Based on program frame_tab_plot_07.py # - Sample acceleration data from a ADXL335 accelerometer. # - Plot sampled data and its FFT spectrumsu # - Save data on file and open files with saved data. # - Serial communication with microcontroller.e # - Serial port selection. # - RadioButtons to select a Window function to apply. # 13/07/2016 import matplotlib matplotlib.use('TkAgg') from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg, NavigationToolbar2TkAgg # implement the default mpl key bindings from matplotlib.backend_bases import key_press_handler from matplotlib.figure import Figure from scipy import fftpack, arange, signal import numpy as np import sys import tkinter as Tk from tkinter import filedialog from tkinter import messagebox from tkinter.scrolledtext import ScrolledText from tkinter import ttk import string import serial import serial.tools.list_ports import time datos_a_leer = 16384 # Amount of samples to read. #datos_a_leer = 128 sample_rate = 5000 # Sampling frequency (SPS). #g_scale = 3.0/512 # +- 3g. 10 bit ADC. Wrong value. g_scale = (3.3 / 1024) * (1000/300) #Right value. Thanks Steve. #g_scale = (Vref / ADC resolution) * 1/(300 mv/g) max_freq = 1500 # Maximum signal frequency, X and Y axis (accelerometer). max_freq_z = 500 # Maximum signal frequency, Z axis (accelerometer). g_canal_1 = [] #Global canal_1 g_canal_2 = [] #Global canal_2 g_canal_3 = [] #Global canal_3 t_timeout = 8 #Timeout time in seconds. def scan_serial(): """ Scans for serial ports""" portnames = [] ports = (list(serial.tools.list_ports.comports())) for index in range(len(ports)): portnames.append(ports[index][0]) return portnames def simpleParse(mainString, beginString, endString ): """Searches for a substring between beginString and endString""" posBeginString = mainString.find(beginString) + len(beginString) posEndString = mainString.find(endString) resultado = mainString[posBeginString:posEndString] return resultado def extraer_int_tag(datos_arch, tag): """ Extracts data from string datos_str, delimited by y and convets it to integer numbers (list of integers)""" str_canal = '' beginString = '<' + tag + '>' endString = '' str_parse = simpleParse(datos_arch, beginString, endString ) str_canal = str_parse.split(',') canal = [] n = len(str_canal) for i in range(n): canal.append(int(str_canal[i])) return canal def conv_str_tag(canal, tag): """ Convert every channel from int to str, separated by a coma and adds tags at the beggining and end. """ n = len(canal) s_canal = '<' + tag + '>' for i in range(n-1): s_canal = s_canal + str(canal[i]) + ',' s_canal = s_canal + str(canal[n-1]) + '' return s_canal def grabar(canal_1, canal_2, canal_3, archivo): """ Saves X and Y axis data on file archivo""" str_canal = '' str_canal += conv_str_tag(canal_1, 'L1') + '\n' str_canal += conv_str_tag(canal_2, 'L2') + '\n' str_canal += conv_str_tag(canal_3, 'L3') + '\n' str_aux = '' str_aux += '' + str(datos_a_leer) + '' + '\n' str_aux += '' + str(sample_rate) + '' + '\n' #str_aux += '' + str(ganancia) + '' + '\n' # Write to file arch = open(archivo, "w") arch.write(str_aux) arch.write(str_canal) arch.close() class Application: def __init__(self, parent): self.parent = parent self.frames() self.f_saved = True #Sampled data saved root.protocol("WM_DELETE_WINDOW", self.on_closing) def on_closing(self): if (self.f_saved==False): if messagebox.askokcancel("Quit", "Sampled data not saved. Do you wanto to quit?"): root.destroy() else: root.destroy() def frames(self): frame1 = Tk.Frame(root, bd=5, relief='raised', borderwidth=1) frame2 = Tk.Frame(root, bd=5, relief='raised') note = ttk.Notebook(frame2) self.tab1 = ttk.Frame(note) self.tab2 = ttk.Frame(note) note.add(self.tab1, text = "Frquency") note.add(self.tab2, text = "Time") # Positioning frame1.pack(side='left', fill='both', padx=5, pady=5) frame2.pack(side='right', fill='both', expand='true') boton_open = Tk.Button(frame1, text ="Open file", command=self.open_file) boton_save = Tk.Button(frame1, text ="Save to file", command=self.save_file) boton_scan = Tk.Button(frame1, text="Scan serial ports", command=self.scan_ports) boton_read = Tk.Button(frame1, text="Read serial data", command=self.read_serial) label1 = Tk.Label(frame1, text="Select Serial Port:") self.sel_puerto = ttk.Combobox(frame1, textvariable='', state="readonly") portnames = scan_serial() self.sel_puerto['values'] = portnames if (portnames != []): self.sel_puerto.current(0) self.text_message = ScrolledText(frame1, height=10, width=20) self.window_var = Tk.IntVar() self.window_var.set(1) #Option rectangular window radio_button1 = Tk.Radiobutton(frame1, text="Rectangular Window", variable=self.window_var, value=1, command=self.win_sel) radio_button2 = Tk.Radiobutton(frame1, text="Hann Window", variable=self.window_var, value=2, command=self.win_sel) radio_button3 = Tk.Radiobutton(frame1, text="Flattop Window", variable=self.window_var, value=3, command=self.win_sel) # Grid boton_open.grid(row=1, column=0, padx=5, pady=5) boton_save.grid(row=2, column=0, padx=5, pady=5) boton_scan.grid(row=3, column=0, padx=5, pady=5) label1.grid(row=4, column=0, padx=5, pady=5) self.sel_puerto.grid(row=5, column=0, padx=5, pady=5) boton_read.grid(row=6, column=0, padx=5, pady=5) self.text_message.grid(row=7, column=0, padx=5, pady=5) radio_button1.grid(row=8, column=0, sticky="W") radio_button2.grid(row=9, column=0, sticky="W") radio_button3.grid(row=10, column=0, sticky="W") #note.grid(row = 0, column=0) note.pack(side='top', fill='both', padx=5, pady=5) #Figure 1 fig1 = Figure(figsize=(10,9)) fig1.suptitle('Sampled signal - Acceleration') ax_11 = fig1.add_subplot(3,1,1) #ax_11.hold(False) ax_11.set_title("Channel X") ax_11.set_ylabel('g') ax_11.grid() #Shows grid. ax_12 = fig1.add_subplot(3,1,2) #ax_12.hold(False) ax_12.set_title("Channel Y") #ax_12.set_xlabel('ms') ax_12.set_ylabel('g') ax_12.grid() #Shows grid. ax_12 = fig1.add_subplot(3,1,3) #ax_12.hold(False) ax_12.set_title("Channel Z") ax_12.set_xlabel('ms') ax_12.set_ylabel('g') ax_12.grid() #Shows grid. #Figure 2 fig2 = Figure(figsize=(10,9)) fig2.suptitle('FFT spectrum') ax_21 = fig2.add_subplot(3,1,1) #ax_21.hold(False) ax_21.set_title("Channel X") ax_21.set_ylabel('g') ax_21.set_xlim(xmax=max_freq) ax_21.grid() ax_22 = fig2.add_subplot(3,1,2) #ax_22.hold(False) ax_22.set_title("Channel Y") #ax_22.set_xlabel('Hz') ax_22.set_xlim(xmax=max_freq) ax_22.set_ylabel('g') ax_22.grid() ax_23 = fig2.add_subplot(3,1,3) #ax_23.hold(False) ax_23.set_title("Channel Z") ax_23.set_xlabel('Hz') #ax_23.set_xlim(xmax=max_freq) ax_23.set_xlim(xmax=max_freq_z) ax_23.set_ylabel('g') ax_23.grid() # Canvas self.canvas2 = FigureCanvasTkAgg(fig1, master=self.tab2) self.canvas2.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) self.toolbar2 = NavigationToolbar2TkAgg(self.canvas2, self.tab2) self.toolbar2.update() self.canvas2._tkcanvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) self.canvas1 = FigureCanvasTkAgg(fig2, master=self.tab1) self.canvas1.get_tk_widget().pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) self.toolbar1 = NavigationToolbar2TkAgg(self.canvas1, self.tab1) self.toolbar1.update() self.canvas1._tkcanvas.pack(side=Tk.TOP, fill=Tk.BOTH, expand=1) def read_serial(self): puerto = self.sel_puerto.get() print(puerto) message_string = "Port: {0} \n".format(puerto) self.show_message(self.text_message, message_string) estado_serial = False try: serial_avr = serial.Serial(port=puerto, baudrate=500000, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0) time.sleep(2) # waiting the initialization... print("Initializing") message_string = "Initializing... \n" self.show_message(self.text_message, message_string) if (serial_avr.isOpen() == True): estado_serial = True else: estado_serial = False except (serial.SerialException, ValueError) as ex: #print "Can´t open serial port: " + str(ex) messagebox.showerror( "Result", "Can't open serial port: " + str(ex)) if (estado_serial == True): global g_canal_1, g_canal_2, g_canal_3, datos_a_leer canal_1 = [] canal_2 = [] canal_3 = [] buffer = [] paquete = [] valores = [] serial_avr.flushInput() serial_avr.flushOutput() valores_decod = [] conta_datos_rx = 0; #Received samples counter. print("Sending INI") message_string = "Sending INI \n" self.show_message(self.text_message, message_string) serial_avr.write(b'INI') #Start data sampling command. #serial_avr.write(chr(0x22)) #CRC 'INI'. Not used. serial_avr.write(b"\x7E") #End of packet. global t_timeout timeout_state = False t0 = time.time() #Start loop time stamp. while ((conta_datos_rx < datos_a_leer) and (timeout_state == False)): if serial_avr.inWaiting(): lectura = serial_avr.read(serial_avr.inWaiting()) buffer += lectura valores = [] if len(buffer) > 15: try: i = buffer.index(0x7E) except (ValueError): i = -1 #print("Buffer: {0}".format(buffer)) if i >= 0: paquete = buffer[:i] buffer = buffer[i+1:] #print("Paquete: {0}".format(paquete)) valores=[i for i in paquete] paquete = '' x = 0 while x < len(valores): if valores[x] == 0x7D: valores_decod.append(valores[x+1] ^ 0x20) x = x + 1 else: valores_decod.append(valores[x]) x = x + 1 canal1 = (valores_decod[0] * 256) + valores_decod[1] canal2 = (valores_decod[2] * 256) + valores_decod[3] canal3 = (valores_decod[4] * 256) + valores_decod[5] canal_1.append(canal1) canal_2.append(canal2) canal_3.append(canal3) #print("Canal 1: %s Canal2: %s " % (canal1, canal2)) valores = [] valores_decod = [] conta_datos_rx += 1 ; #print("conta_datos_rx = %s" %conta_datos_rx) #Check if t_timeout seconds have elapsed since time stamp t0 if ((time.time() - t0) > t_timeout): timeout_state = True #print("Serial port timeout") if (timeout_state == False): print("Sending PAR") self.text_message.config(state=Tk.NORMAL) #Enable to modify self.text_message.insert(Tk.END, "Sending PAR \n") self.text_message.config(state=Tk.DISABLED) #Disable - Read only root.update_idletasks() #Needed to make message visible serial_avr.write(b'PAR') #Stop data sampling. serial_avr.write(b"\x7E") #End of packet. serial_avr.close() #Close serial port. print("Amount of samples channel 1: %s" %len(canal_1)) print("Amount of samples channel 2: %s" %len(canal_2)) print("Amount of samples channel 3: %s" %len(canal_3)) message_string = "Amount of samples channel 1: {0} \n".format(len(canal_1)) message_string += "Amount of samples channel 2: {0} \n".format(len(canal_2)) message_string += "Amount of samples channel 3: {0} \n".format(len(canal_3)) self.show_message(self.text_message, message_string) #Keep a copy of the original values g_canal_1 = canal_1[:] #Copy list by value not by reference g_canal_2 = canal_2[:] g_canal_3 = canal_3[:] self.f_saved = False #Sampled data not saved self.window_var.set(1) #Option rectangular window self.plot(self.tab1, self.tab2, canal_1, canal_2, canal_3, win_var=1) else: serial_avr.write(b'PAR') #Stop data sampling. serial_avr.write(b"\x7E") #End of packet. serial_avr.close() #Close serial port. print("Serial port timeout") message_string = ("Serial port timeout \n") self.show_message(self.text_message, message_string) def show_message(self, text_message, message_string): """Shows messages on a scrollable textbox""" text_message.config(state=Tk.NORMAL) #Enable to modify text_message.insert(Tk.END, message_string) text_message.config(state=Tk.DISABLED) #Disable - Read only text_message.see("end") #Show the "end" of text root.update_idletasks() #Needed to make message visible def scan_ports(self): portnames = [] portnames = scan_serial() self.sel_puerto['values'] = portnames if (portnames != []): self.sel_puerto.current(0) def plot(self, tab1, tab2, canal_1, canal_2, canal_3, win_var=1): num_datos = len(canal_1) X = range(0, num_datos, 1) # Scale the signal in g's for indice in X: canal_1[indice] *= g_scale canal_2[indice] *= g_scale canal_3[indice] *= g_scale # Calculates medium value for each channel. vdc_canal_1 = 0 vdc_canal_2 = 0 vdc_canal_3 = 0 for indice in X: vdc_canal_1 += canal_1[indice] vdc_canal_2 += canal_2[indice] vdc_canal_3 += canal_3[indice] vdc_canal_1 = vdc_canal_1 / num_datos vdc_canal_2 = vdc_canal_2 / num_datos vdc_canal_3 = vdc_canal_3 / num_datos #print("Vdc Channel 1: {0}, Vdc Channel 2: {1}".format(vdc_canal_1, vdc_canal_2)) # Substract DC offset for indice in X: canal_1[indice] -= vdc_canal_1 canal_2[indice] -= vdc_canal_2 canal_3[indice] -= vdc_canal_3 #----------------- Plotting ---------- X1 = np.linspace(0, num_datos/5, num=num_datos) # X axis, 5000 sps, 1/5 ms. # Figure 1. Sampled signals. #Channel X ax_11, ax_12, ax_13 = self.canvas2.figure.get_axes() ax_11.clear() ax_11.plot(X1,canal_1) ax_11.set_title("Channel X") ax_11.set_ylabel('g') ax_11.grid() #Shows grid. #Channel Y ax_12.clear() ax_12.plot(X1,canal_2) ax_12.set_title("Channel Y") #ax_12.set_xlabel('ms') ax_12.set_ylabel('g') ax_12.grid() #Shows grid. #Channel Z ax_13.clear() ax_13.plot(X1,canal_3) ax_13.set_title("Channel Z") ax_13.set_xlabel('ms') ax_13.set_ylabel('g') ax_13.grid() #Shows grid. # Figure 2. FFT from signals. #Channel X canal_fft = [] canal_fft = canal_1 N = len(canal_fft) # length of the signal #Window function if(win_var == 2): w = signal.hann(N, sym=False) #Hann (Hanning) window elif(win_var == 3): w = signal.flattop(N, sym=False) #Flattop window else: w = 1 #Rectangular window T = 1.0 / sample_rate y = canal_fft yf = fftpack.fft(y*w)*(2/N) yf = yf[:int(N/2)] xf = np.linspace(0.0, 1.0/(2.0*T), N/2) ax_21, ax_22, ax_23 = self.canvas1.figure.get_axes() ax_21.clear() #ax_21.plot(xf, 2.0/N * np.abs(yf[:N/2])) ax_21.plot(xf, np.abs(yf)) ax_21.grid() ax_21.set_title("Channel X") ax_21.set_ylabel('g') ax_21.set_xlim(xmax=max_freq) #Channel Y canal_fft = [] canal_fft = canal_2 N = len(canal_fft) # length of the signal T = 1.0 / sample_rate y = canal_fft yf = fftpack.fft(y*w)*(2/N) yf = yf[:int(N/2)] xf = np.linspace(0.0, 1.0/(2.0*T), N/2) ax_22.clear() #ax_22.plot(xf, 2.0/N * np.abs(yf[:N/2])) ax_22.plot(xf, np.abs(yf)) ax_22.grid() ax_22.set_title("Channel Y") #ax_22.set_xlabel('Hz') ax_22.set_xlim(xmax=max_freq) ax_22.set_ylabel('g') #Channel Z canal_fft = [] canal_fft = canal_3 N = len(canal_fft) # length of the signal T = 1.0 / sample_rate y = canal_fft yf = fftpack.fft(y*w)*(2/N) yf = yf[:int(N/2)] xf = np.linspace(0.0, 1.0/(2.0*T), N/2) ax_23.clear() #ax_23.plot(xf, 2.0/N * np.abs(yf[:N/2])) ax_23.plot(xf, np.abs(yf)) ax_23.grid() ax_23.set_title("Channel Z") ax_23.set_xlabel('Hz') #ax_23.set_xlim(xmax=max_freq) ax_23.set_xlim(xmax=max_freq_z) ax_23.set_ylabel('g') self.canvas1.draw() self.canvas2.draw() def win_sel(self): """Window selection. Every time a window is selected, the FFT spectrum is calculated, applying the selected window function""" global g_canal_1, g_canal_2, g_canal_3 canal_1 = g_canal_1[:] #Copy list by value not by reference canal_2 = g_canal_2[:] canal_3 = g_canal_3[:] win_var = self.window_var.get() if(len(canal_1) != 0): #Apply only if data available self.plot(self.tab1, self.tab2, canal_1, canal_2, canal_3, win_var) def open_file(self): """Opens dialog to select a file, reads data from file and plots the data""" ftypes = [('Text files', '*.txt'), ('All files', '*')] dlg = filedialog.Open(root, filetypes = ftypes) fl = dlg.show() if fl != '': # Open file for reading arch = open(fl, "r") datos_arch = arch.read() # Searches for every channel, delimited by L1, L2 and L3 tags. canal_1 = extraer_int_tag(datos_arch, 'L1') canal_2 = extraer_int_tag(datos_arch, 'L2') canal_3 = extraer_int_tag(datos_arch, 'L3') print("Amount of samples in channel 1: %s" %len(canal_1)) print("Amount of samples on channel 2: %s" %len(canal_2)) print("Amount of samples on channel 3: %s" %len(canal_3)) message_string = "Amount of samples channel 1: {0} \n".format(len(canal_1)) message_string += "Amount of samples channel 2: {0} \n".format(len(canal_2)) message_string += "Amount of samples channel 3: {0} \n".format(len(canal_3)) self.show_message(self.text_message, message_string) global g_canal_1, g_canal_2, g_canal_3 #Keep a copy of the original values g_canal_1 = canal_1[:] #Copy list by value not by reference g_canal_2 = canal_2[:] g_canal_3 = canal_3[:] self.window_var.set(1) #Option rectangular window self.plot(self.tab1, self.tab2, canal_1, canal_2, canal_3, win_var=1) def save_file(self): ftypes = [('Text files', '*.txt'), ('All files', '*')] dlg = filedialog.SaveAs(root, filetypes = ftypes) fl = dlg.show() if fl != '': global g_canal_1, g_canal_2, g_canal_3 if (len(g_canal_1) > 0): grabar(g_canal_1, g_canal_2, g_canal_3, fl) self.f_saved = True #Sampled data saved else: print("No samled data to save") message_string = "No samled data to save\n" self.show_message(self.text_message, message_string) if __name__ == '__main__': root = Tk.Tk() root.title('FFT spectrum analyser for machinery vibration') app = Application(root) root.mainloop()