ESP32のMicroPythonで10DOF IMU Sensor(GY-91)に搭載されているMPU9250を使ってみる
2020/08/10 categories:ESP32| tags:ESP32|MicroPython|GY-91|MPU9250|PyQtGraph|
MicroPythonを入れたESP32でGY-91に搭載されているMPU9250から値を取得して、PC上のPyQtGraphに表示してみました。
接続
接続は下記の通りで、ESP32とGY-91をI2Cで接続するだけです。他にはGY-91には電源(3.3V)とGNDを接続します。
センサから値の取得
MicroPythonのi2c.readfrom_mem()を使って、デバイスのアドレス、読み込むメモリのアドレス、読み込むデータのバイト数を指定してデータを読み込みます。加速度の場合は、0x3B(下記コードではself.ACCEL_OUT)のメモリから6バイトのデータを読み込み、8ビット×2のデータを16ビットに変換して値が得られます。
data = self.i2c.readfrom_mem(self.address, self.ACCEL_OUT, 6)
x = self.data_convert(data[1], data[0])
y = self.data_convert(data[3], data[2])
z = self.data_convert(data[5], data[4])
def data_convert(self, data1, data2):
value = data1 | (data2 << 8)
if(value & (1 << 16 - 1)):
value -= (1<<16)
return value
センサの値をESP32からPCへ送信
データを送信するためにESP32でprint()を実行します。これによってシリアル通信経由でデータを送信することができます。送信内容はax:0.0,az:0.0…といった内容です。項目を「,」で区切り、軸を「:」で区切るようにしています。
while True:
accel = mpu9250.read_accel()
gyro = mpu9250.read_gyro()
mag = mpu9250.read_magnet()
text = ','.join( [ 'a'+key+':'+str(accel[key]) for key in accel ] ) + ','
text += ','.join( [ 'g'+key+':'+str( gyro[key]) for key in accel ] ) + ','
text += ','.join( [ 'm'+key+':'+str( mag[key]) for key in accel ] )
print(text)
utime.sleep(0.1)
送信内容は下記のような感じです。
ax:0.143,az:1.027,ay:0.079,gx:0.641,gz:0.336,gy:0.153,mx:15.951,mz:5.028,my:-39.635
ax:0.143,az:1.03,ay:0.079,gx:0.702,gz:0.397,gy:0.214,mx:15.772,mz:5.548,my:-40.178
ax:0.144,az:1.028,ay:0.078,gx:0.641,gz:0.336,gy:0.153,mx:15.234,mz:3.987,my:-40.359
PCでPyQtGraphを使って表示
ESP32から送られてきたデータを処理してPyQtGraphに表示しました。加速度、ジャイロ、地磁気の3つ分のPlotWidgetを作成して、1つのPlotWidgetにx、y、zの3つのplotItemを追加して表示しています。
50msec間隔でグラフの描画を実行するためにQtCore.QTimer()を使用しました。self.timer.timeout.connect()に渡したself.update_dataが50msec間隔で実行されます。そのタイミングでシリアル通信からのデータをself.serial.readline()で取得して、文字列の処理を行い、値を取得しています。処理された値をPlotDataItemに渡すことでグラフに描画されます。
# timer
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_data)
self.timer.start(50)
def update_data(self):
# stop timer
self.timer.stop()
# clear
self.plotwidget['a'].clear()
self.plotwidget['g'].clear()
self.plotwidget['m'].clear()
# get serial
try:
line = self.serial.readline().decode()
data = { data.split(':')[0]:data.split(':')[1] for data in line.split(',') }
# increase time
self.plot_data['t'] = np.append( self.plot_data['t'][1:], self.plot_data['t'][-1]+1 )
# add data
for key in self.plot_data:
if key == 't':
continue
self.plot_data[key] = np.append( self.plot_data[key][1:], float(data[key]) )
# set data
self.plotwidget[key[0]].addItem(
pyqtgraph.PlotDataItem(
x=self.plot_data['t'], y=self.plot_data[key],
pen=pyqtgraph.mkPen(color=self.plot_data_color[key], width=3), antialias=True
)
)
except:
pass
# start timer
self.timer.start(50)
動作の様子
ソースコード
ESP32用 main.py
from machine import Pin, I2C
import utime
from mpu9250 import MPU9250
def main():
print('hello')
i2c = I2C(scl=Pin(21), sda=Pin(22), freq=100000)
print(i2c.scan())
mpu9250 = MPU9250(i2c)
mpu9250.setting(mpu9250.GFS_1000, mpu9250.AFS_16G)
while True:
accel = mpu9250.read_accel()
gyro = mpu9250.read_gyro()
mag = mpu9250.read_magnet()
text = ','.join( [ 'a'+key+':'+str(accel[key]) for key in accel ] ) + ','
text += ','.join( [ 'g'+key+':'+str( gyro[key]) for key in accel ] ) + ','
text += ','.join( [ 'm'+key+':'+str( mag[key]) for key in accel ] )
print(text)
utime.sleep(0.1)
if __name__ == "__main__":
main()
ESP32用 mpu9250.py
from micropython import const
import utime
class MPU9250():
MPU9250_ADDRESS = const(0x68)
DEVICE_ID = const(0x71)
SMPLRT_DIV = const(0x19)
CONFIG = const(0x1A)
GYRO_CONFIG = const(0x1B)
ACCEL_CONFIG = const(0x1C)
ACCEL_CONFIG_2 = const(0x1D)
LP_ACCEL_ODR = const(0x1E)
WOM_THR = const(0x1F)
FIFO_EN = const(0x23)
I2C_MST_CTRL = const(0x24)
I2C_MST_STATUS = const(0x36)
INT_PIN_CFG = const(0x37)
INT_ENABLE = const(0x38)
INT_STATUS = const(0x3A)
ACCEL_OUT = const(0x3B)
TEMP_OUT = const(0x41)
GYRO_OUT = const(0x43)
I2C_MST_DELAY_CTRL = const(0x67)
SIGNAL_PATH_RESET = const(0x68)
MOT_DETECT_CTRL = const(0x69)
USER_CTRL = const(0x6A)
PWR_MGMT_1 = const(0x6B)
PWR_MGMT_2 = const(0x6C)
FIFO_R_W = const(0x74)
WHO_AM_I = const(0x75)
GFS_250 = const(0x00)
GFS_500 = const(0x01)
GFS_1000 = const(0x02)
GFS_2000 = const(0x03)
AFS_2G = const(0x00)
AFS_4G = const(0x01)
AFS_8G = const(0x02)
AFS_16G = const(0x03)
def __init__(self, i2c, address = MPU9250_ADDRESS):
self.i2c = i2c
self.address = address
self.setting(self.GFS_250, self.AFS_2G)
self.ak8963 = AK8963(self.i2c)
def searchDevice(self):
who_am_i = self.i2c.readfrom(self.address, self.WHO_AM_I)
if(who_am_i == self.DEVICE_ID):
return True
else:
return False
def setting(self, gfs, afs):
if gfs == self.GFS_250:
self.gres = 250.0/32768.0
elif gfs == self.GFS_500:
self.gres = 500.0/32768.0
elif gfs == self.GFS_1000:
self.gres = 1000.0/32768.0
else: # gfs == GFS_2000
self.gres = 2000.0/32768.0
if afs == self.AFS_2G:
self.ares = 2.0/32768.0
elif afs == self.AFS_4G:
self.ares = 4.0/32768.0
elif afs == self.AFS_8G:
self.ares = 8.0/32768.0
else: # afs == AFS_16G:
self.ares = 16.0/32768.0
buffer = bytearray(1)
self.i2c.writeto_mem(self.address, self.PWR_MGMT_1, b'\x00') # sleep off
utime.sleep_ms(100)
self.i2c.writeto_mem(self.address, self.PWR_MGMT_1, b'\x01') # auto select clock source
utime.sleep_ms(100)
self.i2c.writeto_mem(self.address, self.CONFIG, b'\x03') # DLPF_CFG
self.i2c.writeto_mem(self.address, self.SMPLRT_DIV, b'\x04') # sample rate divider
buffer[0] = gfs << 3
self.i2c.writeto_mem(self.address, self.GYRO_CONFIG, buffer) # gyro full scale select
buffer[0] = afs << 3
self.i2c.writeto_mem(self.address, self.ACCEL_CONFIG, buffer) # accel full scale select
self.i2c.writeto_mem(self.address, self.ACCEL_CONFIG_2, b'\x03') # A_DLPFCFG
self.i2c.writeto_mem(self.address, self.INT_PIN_CFG, b'\x02') # BYPASS_EN
utime.sleep_ms(100)
def check_data_ready(self):
drdy = self.i2c.readfrom(self.address, self.INT_STATUS)
if drdy & 0x01:
return True
else:
return False
def read_accel(self):
data = self.i2c.readfrom_mem(self.address, self.ACCEL_OUT, 6)
x = self.data_convert(data[1], data[0])
y = self.data_convert(data[3], data[2])
z = self.data_convert(data[5], data[4])
x = round(x*self.ares, 3)
y = round(y*self.ares, 3)
z = round(z*self.ares, 3)
return {"x":x, "y":y, "z":z}
def read_gyro(self):
data = self.i2c.readfrom_mem(self.address, self.GYRO_OUT, 6)
x = self.data_convert(data[1], data[0])
y = self.data_convert(data[3], data[2])
z = self.data_convert(data[5], data[4])
x = round(x*self.gres, 3)
y = round(y*self.gres, 3)
z = round(z*self.gres, 3)
return {"x":x, "y":y, "z":z}
def read_magnet(self):
return self.ak8963.read_magnet()
def data_convert(self, data1, data2):
value = data1 | (data2 << 8)
if(value & (1 << 16 - 1)):
value -= (1<<16)
return value
class AK8963():
AK8963_SLAVE_ADDRESS = const(0x0C)
AK8963_ST1 = const(0x02)
AK8963_MAGNET_OUT = const(0x03)
AK8963_CNTL1 = const(0x0A)
AK8963_CNTL2 = const(0x0B)
AK8963_ASAX = const(0x10)
AK8963_MODE_DOWN = const(0x00)
AK8963_MODE_ONE = const(0x01)
AK8963_MODE_C8HZ = const(0x02)
AK8963_MODE_C100HZ = const(0x06)
AK8963_BIT_14 = const(0x00)
AK8963_BIT_16 = const(0x01)
def __init__(self, i2c, address=0x76):
self.i2c = i2c
self.address = address
self.setting(self.AK8963_MODE_C8HZ, self.AK8963_BIT_16)
def setting(self, mode, mfs):
if mfs == self.AK8963_BIT_14:
self.mres = 4912.0/8190.0
else: # mfs == AK8963_BIT_16:
self.mres = 4912.0/32760.0
self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, b'\x00')
utime.sleep_ms(10)
self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, b'\x0F') # set read FuseROM mode
utime.sleep_ms(10)
data = self.i2c.readfrom_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_ASAX, 3) # read coef data
self.magXcoef = (data[0] - 128) / 256.0 + 1.0
self.magYcoef = (data[1] - 128) / 256.0 + 1.0
self.magZcoef = (data[2] - 128) / 256.0 + 1.0
self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, b'\x00') # set power down mode
utime.sleep_ms(10)
buffer = bytearray(1)
buffer[0] = (mfs << 4 | mode)
self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, buffer) # set scale&continous mode
utime.sleep_ms(10)
def read_magnet(self):
x, y, z=0, 0, 0
data = self.i2c.readfrom_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_MAGNET_OUT, 7)
# check overflow
if (data[6] & 0x08)!=0x08:
x = self.data_convert(data[0], data[1])
y = self.data_convert(data[2], data[3])
z = self.data_convert(data[4], data[5])
x = round(x * self.mres * self.magXcoef, 3)
y = round(y * self.mres * self.magYcoef, 3)
z = round(z * self.mres * self.magZcoef, 3)
return {"x":x, "y":y, "z":z}
def data_convert(self, data1, data2):
value = data1 | (data2 << 8)
if(value & (1 << 16 - 1)):
value -= (1<<16)
return value
PC用 main.py
## -*- coding: utf-8 -*-
import sys
from PyQt5 import QtCore, QtGui, QtWidgets
from pyqtgraph import PlotWidget
import pyqtgraph
import numpy as np
import serial
class MainWindow(QtWidgets.QMainWindow):
def __init__(self, parent=None):
super(MainWindow, self).__init__(parent)
self.plot_data = {
'ax':np.full(100, 0),
'ay':np.full(100, 0),
'az':np.full(100, 0),
'gx':np.full(100, 0),
'gy':np.full(100, 0),
'gz':np.full(100, 0),
'mx':np.full(100, 0),
'my':np.full(100, 0),
'mz':np.full(100, 0),
't' :np.arange(100)
}
self.plot_data_color = {
'ax':'#FF0000',
'ay':'#00FF00',
'az':'#0000FF',
'gx':'#DDDD00',
'gy':'#00DDDD',
'gz':'#DD00DD',
'mx':'#808020',
'my':'#208080',
'mz':'#802080'
}
self.resize(600, 600)
self.setStyleSheet("QMainWindow {background: 'white';}")
self.serial = serial.Serial('COM3', 115200, timeout=None)
# leyout
self.centralwidget = QtWidgets.QWidget(self)
self.setCentralWidget(self.centralwidget)
self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget)
# pot widget
self.plotwidget = { axis:PlotWidget(self) for axis in ['a','g','m'] }
titles = { 'a':'Acceleration','g':'Gyro','m':'Geomagnetism' }
for key in self.plotwidget:
self.plotwidget[key].setBackground("#FFFFFFFF")
plotitem = self.plotwidget[key].plotItem
plotitem.setLabels(bottom='time', left=titles[key])
plotitem.getAxis('bottom').setPen( pyqtgraph.mkPen(color='#000000') )
plotitem.getAxis('left').setPen( pyqtgraph.mkPen(color='#000000') )
# leyout
self.verticalLayout.addWidget(self.plotwidget['a'])
self.verticalLayout.addWidget(self.plotwidget['g'])
self.verticalLayout.addWidget(self.plotwidget['m'])
# timer
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_data)
self.timer.start(50)
def update_data(self):
# stop timer
self.timer.stop()
# clear
self.plotwidget['a'].clear()
self.plotwidget['g'].clear()
self.plotwidget['m'].clear()
# get serial
try:
line = self.serial.readline().decode()
data = { data.split(':')[0]:data.split(':')[1] for data in line.split(',') }
# increase time
self.plot_data['t'] = np.append( self.plot_data['t'][1:], self.plot_data['t'][-1]+1 )
# add data
for key in self.plot_data:
if key == 't':
continue
self.plot_data[key] = np.append( self.plot_data[key][1:], float(data[key]) )
# set data
self.plotwidget[key[0]].addItem(
pyqtgraph.PlotDataItem(
x=self.plot_data['t'], y=self.plot_data[key],
pen=pyqtgraph.mkPen(color=self.plot_data_color[key], width=3), antialias=True
)
)
except:
pass
# start timer
self.timer.start(50)
def main():
app = QtWidgets.QApplication(sys.argv)
mainwindow = MainWindow(None)
mainwindow.show()
app.exec()
if __name__ == '__main__':
main()