PyQt5のQSerialPortでシリアルポートのモニターを作ってみた
2021/04/20 categories:TOOL| tags:TOOL|PyQt5|Python|QSerialPort|
PyQt5のQSerialPortを使ってシリアルポートのモニターを作ってみました。
実装する機能
実装する機能は以下の4つです。
- シリアルポートの設定
- シリアルポートからテキストデータを送信
- シリアルポートで受信したデータをテキストとバイナリで表示
QMainWindowを表示
まずはQMainWindowを作成して、必要なUIを設定します。UIは以下のように機能別に分けてクラス化しています。
クラス名 | 機能 |
---|---|
SerialMonitor | トップのクラスでそれぞれのクラスとデータのやり取りを行う |
SerialDataView | シリアルポートで送受信したデータを表示するクラス |
SerialSendView | 送信するデータを入力するクラス |
ToolBar | シリアルポートの設定用UIのクラス |
UI部分のみのコードは以下の通りです。
# -*- coding: utf-8 -*-
import sys
from PyQt5 import QtWidgets, QtCore, QtGui
from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
class SerialMonitor(QtWidgets.QMainWindow):
def __init__(self):
super(SerialMonitor, self).__init__()
self.serialDataView = SerialDataView(self)
self.serialSendView = SerialSendView(self)
self.setCentralWidget( QtWidgets.QWidget(self) )
layout = QtWidgets.QVBoxLayout( self.centralWidget() )
layout.addWidget(self.serialDataView)
layout.addWidget(self.serialSendView)
layout.setContentsMargins(3, 3, 3, 3)
self.toolBar = ToolBar(self)
self.addToolBar(self.toolBar)
self.setStatusBar( QtWidgets.QStatusBar(self) )
self.statusText = QtWidgets.QLabel(self)
self.statusBar().addWidget( self.statusText )
class SerialDataView(QtWidgets.QWidget):
def __init__(self, parent):
super(SerialDataView, self).__init__(parent)
self.serialData = QtWidgets.QTextEdit(self)
self.serialData.setReadOnly(True)
self.serialData.setFontFamily('Courier New')
self.serialDataHex = QtWidgets.QTextEdit(self)
self.serialDataHex.setReadOnly(True)
self.serialDataHex.setFontFamily('Courier New')
self.label = QtWidgets.QLabel('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F')
self.label.setFont( QtGui.QFont('Courier New') )
self.label.setIndent(5)
self.setLayout( QtWidgets.QGridLayout(self) )
self.layout().addWidget(self.serialData, 0, 0, 2, 1)
self.layout().addWidget(self.label, 0, 1, 1, 1)
self.layout().addWidget(self.serialDataHex, 1, 1, 1, 1)
self.layout().setContentsMargins(2, 2, 2, 2)
class SerialSendView(QtWidgets.QWidget):
serialSendSignal = QtCore.pyqtSignal(str)
def __init__(self, parent):
super(SerialSendView, self).__init__(parent)
self.sendData = QtWidgets.QTextEdit(self)
self.sendData.setAcceptRichText(False)
self.sendButton = QtWidgets.QPushButton('Send')
self.setLayout( QtWidgets.QHBoxLayout(self) )
self.layout().addWidget(self.sendData)
self.layout().addWidget(self.sendButton)
self.layout().setContentsMargins(3, 3, 3, 3)
class ToolBar(QtWidgets.QToolBar):
def __init__(self, parent):
super(ToolBar, self).__init__(parent)
self.portOpenButton = QtWidgets.QPushButton('Open')
self.portOpenButton.setCheckable(True)
self.portNames = QtWidgets.QComboBox(self)
self.portNames.addItems([ port.portName() for port in QSerialPortInfo().availablePorts() ])
self.baudRates = QtWidgets.QComboBox(self)
self.baudRates.addItems([
'110', '300', '600', '1200', '2400', '4800', '9600', '14400', '19200', '28800',
'31250', '38400', '51200', '56000', '57600', '76800', '115200', '128000', '230400', '256000', '921600'
])
self.dataBits = QtWidgets.QComboBox(self)
self.dataBits.addItems(['5 bit', '6 bit', '7 bit', '8 bit'])
self._parity = QtWidgets.QComboBox(self)
self._parity.addItems(['No Parity', 'Even Parity', 'Odd Parity', 'Space Parity', 'Mark Parity'])
self.stopBits = QtWidgets.QComboBox(self)
self.stopBits.addItems(['One Stop', 'One And Half Stop', 'Two Stop'])
self._flowControl = QtWidgets.QComboBox(self)
self._flowControl.addItems(['No Flow Control', 'Hardware Control', 'Software Control'])
self.addWidget( self.portOpenButton )
self.addWidget( self.portNames)
self.addWidget( self.baudRates)
self.addWidget( self.dataBits)
self.addWidget( self._parity)
self.addWidget( self.stopBits)
self.addWidget( self._flowControl)
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
window = SerialMonitor()
window.show()
app.exec()
表示結果は以下の通りです。
基本的にレイアウトにコンポーネントを追加しているだけです。シリアルポートに関連する部分はToolBarクラスのportNamesというオブジェクト名のQComboBoxに追加しているアイテムです。このUIにはPCが認識しているシリアルポートの一覧を表示するために、QSerialPortInfoクラスのavailablePortsでシリアルポートのリストを取得して、そのポートの名前をQSerialPort.portName()で取得して、取得結果をQComboBoxに追加しています。
それ以外のシリアルポート関連の項目は単純なリストとして追加して、ユーザーがUIで選択できるようにしています。ちなみに、フロー制御を選択できるようにしていますが、その機能は実装しないので、ただ選べるようにしただけです。
シリアルポートを開く
シリアルポートでデータを送受信するためにはシリアルポートを開く必要があります。ToolBarのportOpenButtonをクリックした時に発行されるシグナルにSerialMonitorのportOpenを接続して、接続したメソッド内でシリアルポートを開く処理を行います。
class SerialMonitor(QtWidgets.QMainWindow):
def __init__(self):
super(SerialMonitor, self).__init__()
self.port = QSerialPort()
def portOpen(self, flag):
if flag:
self.port.setBaudRate( self.toolBar.baudRate() )
self.port.setPortName( self.toolBar.portName() )
self.port.setDataBits( self.toolBar.dataBit() )
self.port.setParity( self.toolBar.parity() )
self.port.setStopBits( self.toolBar.stopBit() )
self.port.setFlowControl( self.toolBar.flowControl() )
r = self.port.open(QtCore.QIODevice.ReadWrite)
if not r:
self.statusText.setText('Port open error')
self.toolBar.portOpenButton.setChecked(False)
self.toolBar.serialControlEnable(True)
else:
self.statusText.setText('Port opened')
self.toolBar.serialControlEnable(False)
else:
self.port.close()
self.statusText.setText('Port closed')
self.toolBar.serialControlEnable(True)
QSerialPortにボーレートやポート名などの必要な項目を設定して、openすることでシリアル通信ができるようになります。このとき、指定したシリアルポートが他のプログラムに使用されているといった場合には、openの返り値がfalseとなります。
シリアルポートからデータを送信
SerialSendViewのsendButtonをクリックしたときにシリアルポートからデータが送信される処理を実装しました。まずはSerialSendViewクラス内でsendButtonのclickedシグナルにsendButtonClickedを接続します。そのメソッド内で、serialSendSignalを発行して、sendDataのテキストデータをシグナルの接続先に渡します。SerialMonitorではserialSendSignalシグナルにsendFromPortを接続していて、self.port.write( text.encode() )でSerialSendViewから受け取ったテキストデータをバイト列にエンコードして、シリアルポートから送信しています。データを送信後、SerialSendViewのsendDataはクリアして、SerialMonitorではserialDataViewに送信したテキストデータを表示しています。
class SerialMonitor(QtWidgets.QMainWindow):
def __init__(self):
self.serialSendView.serialSendSignal.connect(self.sendFromPort)
def sendFromPort(self, text):
self.port.write( text.encode() )
self.serialDataView.appendSerialText( text, QtGui.QColor(0, 0, 255) )
class SerialSendView(QtWidgets.QWidget):
serialSendSignal = QtCore.pyqtSignal(str)
def __init__(self, parent):
self.sendButton = QtWidgets.QPushButton('Send')
self.sendButton.clicked.connect(self.sendButtonClicked)
def sendButtonClicked(self):
self.serialSendSignal.emit( self.sendData.toPlainText() )
self.sendData.clear()
シリアルポートからデータを受信
シリアルポートでデータが受信可能になったときに発行されるreadyReadシグナルにreadFromPortを接続して、そのメソッド内でport.readAll()を実行することでシリアル通信で受信したデータを取得しています。readFromPortメソッド内では、QtCore.QTextStream(data).readAll()でバイト列を文字列に変換して、serialDataView.appendSerialTextでその文字列を渡しています。これらの処理は、if len(data) > 0:によってデータが存在していた場合のみ実行するようにしています。
class SerialMonitor(QtWidgets.QMainWindow):
def __init__(self):
self.port = QSerialPort()
self.port.readyRead.connect(self.readFromPort)
def readFromPort(self):
data = self.port.readAll()
if len(data) > 0:
self.serialDataView.appendSerialText( QtCore.QTextStream(data).readAll(), QtGui.QColor(255, 0, 0) )
シリアルポートで送受信したデータを表示
送受信したデータはSerialDataViewに表示します。テキストデータとしてserialDataに表示し、バイナリデータとしてserialDataHexに表示します。データの表示はappendSerialTextメソッドで処理しています。送信と受信を区別しやすくするために、送信を青色、受信を赤色としています。
テキストデータの追加はserialData.insertPlainText()で文字列の最後の位置に挿入するだけです。
バイナリデータを16進数表示の文字列として追加するには、そのままでは追加することが出来なく、ひと工夫が必要で、以下のように処理しました。
- 既に表示されている16進数表示の文字列を取得して、最後の行を取得
- 16進数表示として見やすくなるように、追加テキストデータを2文字ごとに分割してリストにする
- 最後の行に16バイト分表示されていなかった場合、テキストデータを追加した後に16バイト分表示されるように追加テキストデータを分割してappendListsに追加
- 上の処理後に残った追加テキストデータを16バイトごとに分割appendListsに追加
- appendListsの最後が16バイト未満だった場合は、そのデータの最後の改行を削除
- appendListsの要素をそれぞれ、スペースで結合してserialDataHexに追加
class SerialDataView(QtWidgets.QWidget):
def appendSerialText(self, appendText, color):
self.serialData.moveCursor(QtGui.QTextCursor.End)
self.serialData.setFontFamily('Courier New')
self.serialData.setTextColor(color)
self.serialDataHex.moveCursor(QtGui.QTextCursor.End)
self.serialDataHex.setFontFamily('Courier New')
self.serialDataHex.setTextColor(color)
self.serialData.insertPlainText(appendText)
lastData = self.serialDataHex.toPlainText().split('\n')[-1]
lastLength = math.ceil( len(lastData) / 3 )
appendLists = []
splitedByTwoChar = re.split( '(..)', appendText.encode().hex() )[1::2]
if lastLength > 0:
t = splitedByTwoChar[ : 16-lastLength ] + ['\n']
appendLists.append( ' '.join(t) )
splitedByTwoChar = splitedByTwoChar[ 16-lastLength : ]
appendLists += [ ' '.join(splitedByTwoChar[ i*16 : (i+1)*16 ] + ['\n']) for i in range( math.ceil(len(splitedByTwoChar)/16) ) ]
if len(appendLists[-1]) < 47:
appendLists[-1] = appendLists[-1][:-1]
for insertText in appendLists:
self.serialDataHex.insertPlainText(insertText)
self.serialData.moveCursor(QtGui.QTextCursor.End)
self.serialDataHex.moveCursor(QtGui.QTextCursor.End)
動作の様子
送信と受信を直結したケーブルをPCに接続して、送信したデータがそのまま受信側へ帰ってくるようにしてテストした結果は以下の通りです。
ソースコード
# -*- coding: utf-8 -*-
import sys
import math
import re
from PyQt5 import QtWidgets, QtCore, QtGui
from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
class SerialMonitor(QtWidgets.QMainWindow):
def __init__(self):
super(SerialMonitor, self).__init__()
self.port = QSerialPort()
self.serialDataView = SerialDataView(self)
self.serialSendView = SerialSendView(self)
self.setCentralWidget( QtWidgets.QWidget(self) )
layout = QtWidgets.QVBoxLayout( self.centralWidget() )
layout.addWidget(self.serialDataView)
layout.addWidget(self.serialSendView)
layout.setContentsMargins(3, 3, 3, 3)
self.setWindowTitle('Serial Monitor')
### Tool Bar ###
self.toolBar = ToolBar(self)
self.addToolBar(self.toolBar)
### Status Bar ###
self.setStatusBar( QtWidgets.QStatusBar(self) )
self.statusText = QtWidgets.QLabel(self)
self.statusBar().addWidget( self.statusText )
### Signal Connect ###
self.toolBar.portOpenButton.clicked.connect(self.portOpen)
self.serialSendView.serialSendSignal.connect(self.sendFromPort)
self.port.readyRead.connect(self.readFromPort)
def portOpen(self, flag):
if flag:
self.port.setBaudRate( self.toolBar.baudRate() )
self.port.setPortName( self.toolBar.portName() )
self.port.setDataBits( self.toolBar.dataBit() )
self.port.setParity( self.toolBar.parity() )
self.port.setStopBits( self.toolBar.stopBit() )
self.port.setFlowControl( self.toolBar.flowControl() )
r = self.port.open(QtCore.QIODevice.ReadWrite)
if not r:
self.statusText.setText('Port open error')
self.toolBar.portOpenButton.setChecked(False)
self.toolBar.serialControlEnable(True)
else:
self.statusText.setText('Port opened')
self.toolBar.serialControlEnable(False)
else:
self.port.close()
self.statusText.setText('Port closed')
self.toolBar.serialControlEnable(True)
def readFromPort(self):
data = self.port.readAll()
if len(data) > 0:
self.serialDataView.appendSerialText( QtCore.QTextStream(data).readAll(), QtGui.QColor(255, 0, 0) )
def sendFromPort(self, text):
self.port.write( text.encode() )
self.serialDataView.appendSerialText( text, QtGui.QColor(0, 0, 255) )
class SerialDataView(QtWidgets.QWidget):
def __init__(self, parent):
super(SerialDataView, self).__init__(parent)
self.serialData = QtWidgets.QTextEdit(self)
self.serialData.setReadOnly(True)
self.serialData.setFontFamily('Courier New')
self.serialData.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding)
self.serialDataHex = QtWidgets.QTextEdit(self)
self.serialDataHex.setReadOnly(True)
self.serialDataHex.setFontFamily('Courier New')
self.serialDataHex.setFixedWidth(500)
self.serialDataHex.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Expanding)
self.label = QtWidgets.QLabel('00 01 02 03 04 05 06 07 08 09 0A 0B 0C 0D 0E 0F')
self.label.setFont( QtGui.QFont('Courier New') )
self.label.setIndent(5)
self.setLayout( QtWidgets.QGridLayout(self) )
self.layout().addWidget(self.serialData, 0, 0, 2, 1)
self.layout().addWidget(self.label, 0, 1, 1, 1)
self.layout().addWidget(self.serialDataHex, 1, 1, 1, 1)
self.layout().setContentsMargins(2, 2, 2, 2)
def appendSerialText(self, appendText, color):
self.serialData.moveCursor(QtGui.QTextCursor.End)
self.serialData.setFontFamily('Courier New')
self.serialData.setTextColor(color)
self.serialDataHex.moveCursor(QtGui.QTextCursor.End)
self.serialDataHex.setFontFamily('Courier New')
self.serialDataHex.setTextColor(color)
self.serialData.insertPlainText(appendText)
lastData = self.serialDataHex.toPlainText().split('\n')[-1]
lastLength = math.ceil( len(lastData) / 3 )
appendLists = []
splitedByTwoChar = re.split( '(..)', appendText.encode().hex() )[1::2]
if lastLength > 0:
t = splitedByTwoChar[ : 16-lastLength ] + ['\n']
appendLists.append( ' '.join(t) )
splitedByTwoChar = splitedByTwoChar[ 16-lastLength : ]
appendLists += [ ' '.join(splitedByTwoChar[ i*16 : (i+1)*16 ] + ['\n']) for i in range( math.ceil(len(splitedByTwoChar)/16) ) ]
if len(appendLists[-1]) < 47:
appendLists[-1] = appendLists[-1][:-1]
for insertText in appendLists:
self.serialDataHex.insertPlainText(insertText)
self.serialData.moveCursor(QtGui.QTextCursor.End)
self.serialDataHex.moveCursor(QtGui.QTextCursor.End)
class SerialSendView(QtWidgets.QWidget):
serialSendSignal = QtCore.pyqtSignal(str)
def __init__(self, parent):
super(SerialSendView, self).__init__(parent)
self.sendData = QtWidgets.QTextEdit(self)
self.sendData.setAcceptRichText(False)
self.sendData.setSizePolicy(QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Preferred)
self.sendButton = QtWidgets.QPushButton('Send')
self.sendButton.clicked.connect(self.sendButtonClicked)
self.sendButton.setSizePolicy(QtWidgets.QSizePolicy.Maximum, QtWidgets.QSizePolicy.Preferred)
self.setLayout( QtWidgets.QHBoxLayout(self) )
self.layout().addWidget(self.sendData)
self.layout().addWidget(self.sendButton)
self.layout().setContentsMargins(3, 3, 3, 3)
def sendButtonClicked(self):
self.serialSendSignal.emit( self.sendData.toPlainText() )
self.sendData.clear()
class ToolBar(QtWidgets.QToolBar):
def __init__(self, parent):
super(ToolBar, self).__init__(parent)
self.portOpenButton = QtWidgets.QPushButton('Open')
self.portOpenButton.setCheckable(True)
self.portOpenButton.setMinimumHeight(32)
self.portNames = QtWidgets.QComboBox(self)
self.portNames.addItems([ port.portName() for port in QSerialPortInfo().availablePorts() ])
self.portNames.setMinimumHeight(30)
self.baudRates = QtWidgets.QComboBox(self)
self.baudRates.addItems([
'110', '300', '600', '1200', '2400', '4800', '9600', '14400', '19200', '28800',
'31250', '38400', '51200', '56000', '57600', '76800', '115200', '128000', '230400', '256000', '921600'
])
self.baudRates.setCurrentText('115200')
self.baudRates.setMinimumHeight(30)
self.dataBits = QtWidgets.QComboBox(self)
self.dataBits.addItems(['5 bit', '6 bit', '7 bit', '8 bit'])
self.dataBits.setCurrentIndex(3)
self.dataBits.setMinimumHeight(30)
self._parity = QtWidgets.QComboBox(self)
self._parity.addItems(['No Parity', 'Even Parity', 'Odd Parity', 'Space Parity', 'Mark Parity'])
self._parity.setCurrentIndex(0)
self._parity.setMinimumHeight(30)
self.stopBits = QtWidgets.QComboBox(self)
self.stopBits.addItems(['One Stop', 'One And Half Stop', 'Two Stop'])
self.stopBits.setCurrentIndex(0)
self.stopBits.setMinimumHeight(30)
self._flowControl = QtWidgets.QComboBox(self)
self._flowControl.addItems(['No Flow Control', 'Hardware Control', 'Software Control'])
self._flowControl.setCurrentIndex(0)
self._flowControl.setMinimumHeight(30)
self.addWidget( self.portOpenButton )
self.addWidget( self.portNames)
self.addWidget( self.baudRates)
self.addWidget( self.dataBits)
self.addWidget( self._parity)
self.addWidget( self.stopBits)
self.addWidget( self._flowControl)
def serialControlEnable(self, flag):
self.portNames.setEnabled(flag)
self.baudRates.setEnabled(flag)
self.dataBits.setEnabled(flag)
self._parity.setEnabled(flag)
self.stopBits.setEnabled(flag)
self._flowControl.setEnabled(flag)
def baudRate(self):
return int(self.baudRates.currentText())
def portName(self):
return self.portNames.currentText()
def dataBit(self):
return int(self.dataBits.currentIndex() + 5)
def parity(self):
return self._parity.currentIndex()
def stopBit(self):
return self.stopBits.currentIndex()
def flowControl(self):
return self._flowControl.currentIndex()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
window = SerialMonitor()
window.show()
app.exec()