A simple demonstration of threading in PyQt
I do a lot of work with serial comms; TeraTerm is an excellent serial terminal, but sometimes a customised application is required, for example when dealing with binary data.
The following blog describes a simple Windows & Linux serial terminal, that can be adapted to handle special protocols; it also demonstrates the creation of Python threads, and can serve as a basis for other multi-threaded applications.
It won’t win any awards for style, but could come in handy next time you encounter an obscure serial protocol.
My code is compatible with Python 2.7 and 3.x, PyQt v4 and v5, running on Windows or Linux. This necessitates some rather clunky inclusions at the top of the file:
try: from PyQt4 import QtGui, QtCore from PyQt4.QtGui import QTextEdit, QWidget, QApplication, QVBoxLayout except: from PyQt5 import QtGui, QtCore from PyQt5.QtWidgets import QTextEdit, QWidget, QApplication, QVBoxLayout try: import Queue except: import queue as Queue
There is also an issue with the different ways Python 2 and 3 handle serial data; older versions assume that the data type is an ASCII string, while in later versions it is a binary type.
This can lead to all sorts of problems (such as unwanted exceptions) so I’ve included specific functions to ensure that all outgoing data is converted from the internal representation (which could be Unicode) to a string of bytes, and incoming data is converted from the external type (data bytes or string) to a string type:
# Convert a string to bytes (for Python 3) def str_bytes(s): return s.encode('latin-1') # Convert bytes to string (if Python 3) def bytes_str(d): return d if type(d) is str else "".join([chr(b) for b in d])
You’ll need to install pySerial by the usual methods (e.g. ‘pip install’).
The key problem with serial communications is the time it takes; the slower the data rate, the longer the delay before anything happens. Without threading (or any other mitigation scheme) the User Interface (UI) will lock up after each command, waiting for the response. At best, this makes the application appear sluggish and unresponsive; at worst, it can appear to have failed, waiting for a response that never comes.
Threading allows the UI to carry on interacting with the user, while simultaneously keeping the serial link alive. Creating a new thread is really easy in Python; you just subclass QThread, and instantiate it:
class SerialThread(QtCore.QThread): def __init__(self, portname, baudrate): QtCore.QThread.__init__(self) self.portname, self.baudrate = portname, baudrate ... class MyWidget(QWidget): ... self.serth = SerialThread(portname, baudrate)
I have chosen to supply the serial parameters (port name & baud rate) when the thread object is instantiated, so they are available when the thread is started; this is done by calling the start() method, which will call a run() method in the QThread class:
# In serial thread: def run(self): print("Opening %s at %u baud" % (self.portname, self.baudrate)) ... # In UI thread: self.serth.start()
The run() method needs to keep running in a perpetual loop, but it must be possible to terminate that loop when the program exits – Python won’t do it automatically. I use a global variable, that can be set false to terminate, e.g. in pseudocode:
def run(self): [starting: open serial port] while self.running: [check for incoming characters] [check for outgoing characters] [finished: close serial port]
When the application is closing, it terminates the thread by setting the boolean variable false, then (very importantly) waits for the thread to finish its execution:
def closeEvent(self, event): self.serth.running = False self.serth.wait()
The user will be entering keystrokes in the UI thread, and it is tempting to call the serial transmit function from that thread, but this isn’t a good idea; it is better to pass the keystrokes across to the serial thread for transmission, and we need a thread-safe method of doing this. That means we can’t just use a global shared string variable; Python does a lot of behind-the-scenes processing that could lead to an unpredictable result. Instead, we’ll use a First In First Out (FIFO) queue:
# In UI thread.. txq = Queue.Queue() ... txq.put(s) # Add string to queue ... # In serial thread.. if not txq.empty(): txd = txq.get() # Get string from queue
So the serial thread polls the transmit queue for any data, outputting it to the serial port.
We could use the same technique for received data; the serial thread could add it to a queue that is polled by the UI thread, and somehow trigger a UI redraw when the new data arrives, but I prefer to use a signal; the data is attached to that signal, and is received by a UI function that has registered a connection. The signal has to be in a class definition, and must specify the type of data that will be attached:
class MyWidget(QWidget): text_update = QtCore.pyqtSignal(str)
The signal is connected to a function that will process the data:
So now the serial thread just has to generate a signal when new data is received:
This technique would be quite adequate, but I do like having the output from all my ‘print’ function calls redirected to the same window; it makes for cleaner error reporting when things go wrong, rather than having a separate console with error messages. This is done by redirecting stdout to my widget, and adding write() and flush() handlers:
class MyWidget(QWidget): text_update = QtCore.pyqtSignal(str) def __init__(self, *args): ... self.text_update.connect(self.append_text) sys.stdout = self ... def write(self, text): self.text_update.emit(text) def flush(self): pass ... def append_text(self, text): [add text to UI display]
So now, every time I make a print() call, a signal is sent to my append_text function, where the display is updated. The use of a signal means that I can still call print() from any thread, without fear of strange cross-threading problems.
Polling in serial thread
The serial thread is just polling for incoming and outgoing characters, and if there are none, the processor will execute the ‘while’ loop really quickly. In the absence of any delays, it will consume a lot of CPU time just checking for things that don’t exist. This may appear harmless, but it is quite alarming for the user if the CPU fan starts spinning rapidly whenever your application is running, and a laptop user won’t be happy if you needlessly drain their battery by performing pointless tasks. So we need to add some harmless time-wasting to the polling loop, by frequently returning control to the operating system. This can be done by calling the ‘sleep’ function, but we still want the software to be responsive when some serial data actually arrives. A suitable compromise is to use a serial read-function with a timeout, so the software ‘blocks’ (i.e. stalls) until either some characters are received, or there is a timeout:
self.ser = serial.Serial(self.portname, self.baudrate, timeout=SER_TIMEOUT) ... s = self.ser.read(self.ser.in_waiting or 1)
In case you are unfamiliar with the usage, the ‘or’ function returns the left-hand side if it is true (non-zero), otherwise the right-hand side. So every read attempt is at least 1 character, or more characters if they are available. If none are present, the read function waits until the timeout, so when the serial line is idle, most time will be spent waiting in the operating system.
This has been kept as simple as possible, with just a text box as a main widget. One minor complication is that as standard, the text box (which is actually a QTextEdit control) will capture and display all keystrokes, so we need to subclass it to intercept the keys, and call a handler function that adds them to the serial transmit queue. I didn’t want to burden the text box with this functionality, so put the handler in its parent, which is the main widget:
class MyTextBox(QTextEdit): def __init__(self, *args): QTextEdit.__init__(self, *args) def keyPressEvent(self, event): self.parent().keypress_handler(event)
The keystroke handler in the main widget gets the character from the key event, and checks for a ctrl-V ‘paste’ request; I’ve included this feature because I find it useful to cut-and-paste frequently-used serial commands from a document, rather than re-typing them every time.
# In main widget: def keypress_handler(self, event): k = event.key() s = RETURN_CHAR if k==QtCore.Qt.Key_Return else event.text() if len(s)>0 and s==PASTE_CHAR: cb = QApplication.clipboard() self.serth.ser_out(cb.text()) else: self.serth.ser_out(s)
Interestingly, with PyQt5 you can cut-and-paste full 8-bit data (i.e. bytes with the high bit set), but this doesn’t seem to work in PyQt4, which only accepts the usual ASCII character set.
I haven’t included any menu options, you have to specify the COM port on the command-line using the -c option, and baud rate using -b. There is also a -x option to display incoming data in hexadecimal, for example:
Windows: python pyqt_serialterm.py -c com2 -b 9600 -x Linux: python pyqt_serialterm.py -c /dev/ttyUSB0 -b 115200
On Linux, if you want non-root access to a serial port, you will generally need to add your username to the ‘dialout’ group:
sudo usermod -a -G dialout $USER
..then log out & back in.
# PyQt serial terminal from iosoft.blog VERSION = "v0.09" try: from PyQt4 import QtGui, QtCore from PyQt4.QtGui import QTextEdit, QWidget, QApplication, QVBoxLayout except: from PyQt5 import QtGui, QtCore from PyQt5.QtWidgets import QTextEdit, QWidget, QApplication, QVBoxLayout try: import Queue except: import queue as Queue import sys, time, serial WIN_WIDTH, WIN_HEIGHT = 684, 400 # Window size SER_TIMEOUT = 0.1 # Timeout for serial Rx RETURN_CHAR = "\n" # Char to be sent when Enter key pressed PASTE_CHAR = "\x16" # Ctrl code for clipboard paste baudrate = 115200 # Default baud rate portname = "COM1" # Default port name hexmode = False # Flag to enable hex display # Convert a string to bytes def str_bytes(s): return s.encode('latin-1') # Convert bytes to string def bytes_str(d): return d if type(d) is str else "".join([chr(b) for b in d]) # Return hexadecimal values of data def hexdump(data): return " ".join(["%02X" % ord(b) for b in data]) # Return a string with high-bit chars replaced by hex values def textdump(data): return "".join(["[%02X]" % ord(b) if b>'\x7e' else b for b in data]) # Display incoming serial data def display(s): if not hexmode: sys.stdout.write(textdump(str(s))) else: sys.stdout.write(hexdump(s) + ' ') # Custom text box, catching keystrokes class MyTextBox(QTextEdit): def __init__(self, *args): QTextEdit.__init__(self, *args) def keyPressEvent(self, event): # Send keypress to parent's handler self.parent().keypress_handler(event) # Main widget class MyWidget(QWidget): text_update = QtCore.pyqtSignal(str) def __init__(self, *args): QWidget.__init__(self, *args) self.textbox = MyTextBox() # Create custom text box font = QtGui.QFont() font.setFamily("Courier New") # Monospaced font font.setPointSize(10) self.textbox.setFont(font) layout = QVBoxLayout() layout.addWidget(self.textbox) self.setLayout(layout) self.resize(WIN_WIDTH, WIN_HEIGHT) # Set window size self.text_update.connect(self.append_text) # Connect text update to handler sys.stdout = self # Redirect sys.stdout to self self.serth = SerialThread(portname, baudrate) # Start serial thread self.serth.start() def write(self, text): # Handle sys.stdout.write: update display self.text_update.emit(text) # Send signal to synchronise call with main thread def flush(self): # Handle sys.stdout.flush: do nothing pass def append_text(self, text): # Text display update handler cur = self.textbox.textCursor() cur.movePosition(QtGui.QTextCursor.End) # Move cursor to end of text s = str(text) while s: head,sep,s = s.partition("\n") # Split line at LF cur.insertText(head) # Insert text at cursor if sep: # New line if LF cur.insertBlock() self.textbox.setTextCursor(cur) # Update visible cursor def keypress_handler(self, event): # Handle keypress from text box k = event.key() s = RETURN_CHAR if k==QtCore.Qt.Key_Return else event.text() if len(s)>0 and s==PASTE_CHAR: # Detect ctrl-V paste cb = QApplication.clipboard() self.serth.ser_out(cb.text()) # Send paste string to serial driver else: self.serth.ser_out(s) # ..or send keystroke def closeEvent(self, event): # Window closing self.serth.running = False # Wait until serial thread terminates self.serth.wait() # Thread to handle incoming & outgoing serial data class SerialThread(QtCore.QThread): def __init__(self, portname, baudrate): # Initialise with serial port details QtCore.QThread.__init__(self) self.portname, self.baudrate = portname, baudrate self.txq = Queue.Queue() self.running = True def ser_out(self, s): # Write outgoing data to serial port if open self.txq.put(s) # ..using a queue to sync with reader thread def ser_in(self, s): # Write incoming serial data to screen display(s) def run(self): # Run serial reader thread print("Opening %s at %u baud %s" % (self.portname, self.baudrate, "(hex display)" if hexmode else "")) try: self.ser = serial.Serial(self.portname, self.baudrate, timeout=SER_TIMEOUT) time.sleep(SER_TIMEOUT*1.2) self.ser.flushInput() except: self.ser = None if not self.ser: print("Can't open port") self.running = False while self.running: s = self.ser.read(self.ser.in_waiting or 1) if s: # Get data from serial port self.ser_in(bytes_str(s)) # ..and convert to string if not self.txq.empty(): txd = str(self.txq.get()) # If Tx data in queue, write to serial port self.ser.write(str_bytes(txd)) if self.ser: # Close serial port when thread finished self.ser.close() self.ser = None if __name__ == "__main__": app = QApplication(sys.argv) opt = err = None for arg in sys.argv[1:]: # Process command-line options if len(arg)==2 and arg=="-": opt = arg.lower() if opt == '-x': # -X: display incoming data in hex hexmode = True opt = None else: if opt == '-b': # -B num: baud rate, e.g. '9600' try: baudrate = int(arg) except: err = "Invalid baudrate '%s'" % arg elif opt == '-c': # -C port: serial port name, e.g. 'COM1' portname = arg if err: print(err) sys.exit(1) w = MyWidget() w.setWindowTitle('PyQT Serial Terminal ' + VERSION) w.show() sys.exit(app.exec_()) # EOF
Copyright (c) Jeremy P Bentham 2019. Please credit this blog if you use the information or software in it.