ESP32のMicroPythonで10DOF IMU Sensor(GY-91)に搭載されているMPU9250を使ってみる

2020/08/10 categories:ESP32| tags:ESP32|MicroPython|GY-91|MPU9250|PyQtGraph|

MicroPythonを入れたESP32でGY-91に搭載されているMPU9250から値を取得して、PC上のPyQtGraphに表示してみました。

接続

接続は下記の通りで、ESP32とGY-91をI2Cで接続するだけです。他にはGY-91には電源(3.3V)とGNDを接続します。

センサから値の取得

MicroPythonのi2c.readfrom_mem()を使って、デバイスのアドレス、読み込むメモリのアドレス、読み込むデータのバイト数を指定してデータを読み込みます。加速度の場合は、0x3B(下記コードではself.ACCEL_OUT)のメモリから6バイトのデータを読み込み、8ビット×2のデータを16ビットに変換して値が得られます。

data = self.i2c.readfrom_mem(self.address, self.ACCEL_OUT, 6)
x = self.data_convert(data[1], data[0])
y = self.data_convert(data[3], data[2])
z = self.data_convert(data[5], data[4])

def data_convert(self, data1, data2):
    value = data1 | (data2 << 8)
    if(value & (1 << 16 - 1)):
        value -= (1<<16)
    return value

センサの値をESP32からPCへ送信

データを送信するためにESP32でprint()を実行します。これによってシリアル通信経由でデータを送信することができます。送信内容はax:0.0,az:0.0…といった内容です。項目を「,」で区切り、軸を「:」で区切るようにしています。

while True:
    
    accel = mpu9250.read_accel()
    gyro = mpu9250.read_gyro()
    mag = mpu9250.read_magnet()

    text  = ','.join( [ 'a'+key+':'+str(accel[key]) for key in accel ] ) + ','
    text += ','.join( [ 'g'+key+':'+str( gyro[key]) for key in accel ] ) + ','
    text += ','.join( [ 'm'+key+':'+str(  mag[key]) for key in accel ] )

    print(text)

    utime.sleep(0.1)

送信内容は下記のような感じです。

ax:0.143,az:1.027,ay:0.079,gx:0.641,gz:0.336,gy:0.153,mx:15.951,mz:5.028,my:-39.635
ax:0.143,az:1.03,ay:0.079,gx:0.702,gz:0.397,gy:0.214,mx:15.772,mz:5.548,my:-40.178
ax:0.144,az:1.028,ay:0.078,gx:0.641,gz:0.336,gy:0.153,mx:15.234,mz:3.987,my:-40.359

PCでPyQtGraphを使って表示

ESP32から送られてきたデータを処理してPyQtGraphに表示しました。加速度、ジャイロ、地磁気の3つ分のPlotWidgetを作成して、1つのPlotWidgetにx、y、zの3つのplotItemを追加して表示しています。

50msec間隔でグラフの描画を実行するためにQtCore.QTimer()を使用しました。self.timer.timeout.connect()に渡したself.update_dataが50msec間隔で実行されます。そのタイミングでシリアル通信からのデータをself.serial.readline()で取得して、文字列の処理を行い、値を取得しています。処理された値をPlotDataItemに渡すことでグラフに描画されます。

    # timer
    self.timer = QtCore.QTimer()
    self.timer.timeout.connect(self.update_data)
    self.timer.start(50)

def update_data(self):
    # stop timer
    self.timer.stop()

    # clear
    self.plotwidget['a'].clear()
    self.plotwidget['g'].clear()
    self.plotwidget['m'].clear()

    # get serial
    try:
        line = self.serial.readline().decode()
        data = { data.split(':')[0]:data.split(':')[1] for data in line.split(',') }

        # increase time
        self.plot_data['t'] = np.append( self.plot_data['t'][1:], self.plot_data['t'][-1]+1 )

        # add data
        for key in self.plot_data:
            if key == 't':
                continue
            self.plot_data[key] = np.append( self.plot_data[key][1:], float(data[key]) )

            # set data
            self.plotwidget[key[0]].addItem(
                pyqtgraph.PlotDataItem(
                    x=self.plot_data['t'], y=self.plot_data[key], 
                    pen=pyqtgraph.mkPen(color=self.plot_data_color[key], width=3), antialias=True
                )
            )
    except:
        pass
    
    # start timer
    self.timer.start(50)

動作の様子

ソースコード

ESP32用 main.py

from machine import Pin, I2C
import utime
from mpu9250 import MPU9250

def main():

    print('hello')

    i2c = I2C(scl=Pin(21), sda=Pin(22), freq=100000)

    print(i2c.scan())

    mpu9250 = MPU9250(i2c)
    mpu9250.setting(mpu9250.GFS_1000, mpu9250.AFS_16G)

    while True:
        
        accel = mpu9250.read_accel()
        gyro = mpu9250.read_gyro()
        mag = mpu9250.read_magnet()

        text  = ','.join( [ 'a'+key+':'+str(accel[key]) for key in accel ] ) + ','
        text += ','.join( [ 'g'+key+':'+str( gyro[key]) for key in accel ] ) + ','
        text += ','.join( [ 'm'+key+':'+str(  mag[key]) for key in accel ] )

        print(text)

        utime.sleep(0.1)

if __name__ == "__main__":
    main()

ESP32用 mpu9250.py

from micropython import const
import utime

class MPU9250():
    MPU9250_ADDRESS = const(0x68)
    DEVICE_ID       = const(0x71)

    SMPLRT_DIV     = const(0x19)
    CONFIG         = const(0x1A)
    GYRO_CONFIG    = const(0x1B)
    ACCEL_CONFIG   = const(0x1C)
    ACCEL_CONFIG_2 = const(0x1D)
    LP_ACCEL_ODR   = const(0x1E)
    WOM_THR        = const(0x1F)
    FIFO_EN        = const(0x23)
    I2C_MST_CTRL   = const(0x24)
    I2C_MST_STATUS = const(0x36)
    INT_PIN_CFG    = const(0x37)
    INT_ENABLE     = const(0x38)
    INT_STATUS     = const(0x3A)
    ACCEL_OUT      = const(0x3B)
    TEMP_OUT       = const(0x41)
    GYRO_OUT       = const(0x43)

    I2C_MST_DELAY_CTRL = const(0x67)
    SIGNAL_PATH_RESET  = const(0x68)
    MOT_DETECT_CTRL    = const(0x69)
    USER_CTRL          = const(0x6A)
    PWR_MGMT_1         = const(0x6B)
    PWR_MGMT_2         = const(0x6C)
    FIFO_R_W           = const(0x74)
    WHO_AM_I           = const(0x75)

    GFS_250  = const(0x00)
    GFS_500  = const(0x01)
    GFS_1000 = const(0x02)
    GFS_2000 = const(0x03)
    AFS_2G   = const(0x00)
    AFS_4G   = const(0x01)
    AFS_8G   = const(0x02)
    AFS_16G  = const(0x03)

    def __init__(self, i2c, address = MPU9250_ADDRESS):
        self.i2c = i2c
        self.address = address
        self.setting(self.GFS_250, self.AFS_2G)
        self.ak8963 = AK8963(self.i2c)

    def searchDevice(self):
        who_am_i = self.i2c.readfrom(self.address, self.WHO_AM_I)
        if(who_am_i == self.DEVICE_ID):
            return True
        else:
            return False

    def setting(self, gfs, afs):
        if gfs == self.GFS_250:
            self.gres = 250.0/32768.0
        elif gfs == self.GFS_500:
            self.gres = 500.0/32768.0
        elif gfs == self.GFS_1000:
            self.gres = 1000.0/32768.0
        else:  # gfs == GFS_2000
            self.gres = 2000.0/32768.0

        if afs == self.AFS_2G:
            self.ares = 2.0/32768.0
        elif afs == self.AFS_4G:
            self.ares = 4.0/32768.0
        elif afs == self.AFS_8G:
            self.ares = 8.0/32768.0
        else: # afs == AFS_16G:
            self.ares = 16.0/32768.0

        buffer = bytearray(1)
        self.i2c.writeto_mem(self.address, self.PWR_MGMT_1, b'\x00') # sleep off
        utime.sleep_ms(100)
        self.i2c.writeto_mem(self.address, self.PWR_MGMT_1, b'\x01') # auto select clock source
        utime.sleep_ms(100)
        self.i2c.writeto_mem(self.address, self.CONFIG, b'\x03') # DLPF_CFG
        self.i2c.writeto_mem(self.address, self.SMPLRT_DIV, b'\x04') # sample rate divider
        buffer[0] = gfs << 3
        self.i2c.writeto_mem(self.address, self.GYRO_CONFIG, buffer) # gyro full scale select
        buffer[0] = afs << 3
        self.i2c.writeto_mem(self.address, self.ACCEL_CONFIG, buffer) # accel full scale select
        self.i2c.writeto_mem(self.address, self.ACCEL_CONFIG_2, b'\x03') # A_DLPFCFG
        self.i2c.writeto_mem(self.address, self.INT_PIN_CFG, b'\x02') # BYPASS_EN
        utime.sleep_ms(100)

    def check_data_ready(self):
        drdy = self.i2c.readfrom(self.address, self.INT_STATUS)
        if drdy & 0x01:
            return True
        else:
            return False
            
    def read_accel(self):
        data = self.i2c.readfrom_mem(self.address, self.ACCEL_OUT, 6)
        x = self.data_convert(data[1], data[0])
        y = self.data_convert(data[3], data[2])
        z = self.data_convert(data[5], data[4])

        x = round(x*self.ares, 3)
        y = round(y*self.ares, 3)
        z = round(z*self.ares, 3)

        return {"x":x, "y":y, "z":z}

    def read_gyro(self):
        data = self.i2c.readfrom_mem(self.address, self.GYRO_OUT, 6)

        x = self.data_convert(data[1], data[0])
        y = self.data_convert(data[3], data[2])
        z = self.data_convert(data[5], data[4])

        x = round(x*self.gres, 3)
        y = round(y*self.gres, 3)
        z = round(z*self.gres, 3)

        return {"x":x, "y":y, "z":z}

    def read_magnet(self):
        return self.ak8963.read_magnet()

    def data_convert(self, data1, data2):
        value = data1 | (data2 << 8)
        if(value & (1 << 16 - 1)):
            value -= (1<<16)
        return value

class AK8963():
    AK8963_SLAVE_ADDRESS = const(0x0C)
    AK8963_ST1           = const(0x02)
    AK8963_MAGNET_OUT    = const(0x03)
    AK8963_CNTL1         = const(0x0A)
    AK8963_CNTL2         = const(0x0B)
    AK8963_ASAX          = const(0x10)
    AK8963_MODE_DOWN     = const(0x00)
    AK8963_MODE_ONE      = const(0x01)
    AK8963_MODE_C8HZ     = const(0x02)
    AK8963_MODE_C100HZ   = const(0x06)
    AK8963_BIT_14        = const(0x00)
    AK8963_BIT_16        = const(0x01)

    def __init__(self, i2c, address=0x76):
        self.i2c = i2c
        self.address = address
        self.setting(self.AK8963_MODE_C8HZ, self.AK8963_BIT_16)

    def setting(self, mode, mfs):
        if mfs == self.AK8963_BIT_14:
            self.mres = 4912.0/8190.0
        else: #  mfs == AK8963_BIT_16:
            self.mres = 4912.0/32760.0

        self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, b'\x00')
        utime.sleep_ms(10)
        self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, b'\x0F') # set read FuseROM mode
        utime.sleep_ms(10)
        data = self.i2c.readfrom_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_ASAX, 3) # read coef data

        self.magXcoef = (data[0] - 128) / 256.0 + 1.0
        self.magYcoef = (data[1] - 128) / 256.0 + 1.0
        self.magZcoef = (data[2] - 128) / 256.0 + 1.0

        self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, b'\x00') # set power down mode
        utime.sleep_ms(10)
        buffer = bytearray(1)
        buffer[0] = (mfs << 4 | mode)
        self.i2c.writeto_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_CNTL1, buffer) # set scale&continous mode
        utime.sleep_ms(10)

    def read_magnet(self):
        x, y, z=0, 0, 0
        
        data = self.i2c.readfrom_mem(self.AK8963_SLAVE_ADDRESS, self.AK8963_MAGNET_OUT, 7)

        # check overflow
        if (data[6] & 0x08)!=0x08:
            x = self.data_convert(data[0], data[1])
            y = self.data_convert(data[2], data[3])
            z = self.data_convert(data[4], data[5])

            x = round(x * self.mres * self.magXcoef, 3)
            y = round(y * self.mres * self.magYcoef, 3)
            z = round(z * self.mres * self.magZcoef, 3)

        return {"x":x, "y":y, "z":z}

    def data_convert(self, data1, data2):
        value = data1 | (data2 << 8)
        if(value & (1 << 16 - 1)):
            value -= (1<<16)
        return value

PC用 main.py

## -*- coding: utf-8 -*-
import sys
from PyQt5 import QtCore, QtGui, QtWidgets
from pyqtgraph import PlotWidget
import pyqtgraph
import numpy as np
import serial

class MainWindow(QtWidgets.QMainWindow):
    def __init__(self, parent=None):
        super(MainWindow, self).__init__(parent)

        self.plot_data = {
            'ax':np.full(100, 0),
            'ay':np.full(100, 0),
            'az':np.full(100, 0),
            'gx':np.full(100, 0),
            'gy':np.full(100, 0),
            'gz':np.full(100, 0),
            'mx':np.full(100, 0),
            'my':np.full(100, 0),
            'mz':np.full(100, 0),
            't' :np.arange(100)
        }

        self.plot_data_color = {
            'ax':'#FF0000',
            'ay':'#00FF00',
            'az':'#0000FF',
            'gx':'#DDDD00',
            'gy':'#00DDDD',
            'gz':'#DD00DD',
            'mx':'#808020',
            'my':'#208080',
            'mz':'#802080'
        }

        self.resize(600, 600)
        self.setStyleSheet("QMainWindow {background: 'white';}")
        self.serial = serial.Serial('COM3', 115200, timeout=None)

        # leyout
        self.centralwidget = QtWidgets.QWidget(self)
        self.setCentralWidget(self.centralwidget)
        self.verticalLayout = QtWidgets.QVBoxLayout(self.centralwidget)
        
        # pot widget
        self.plotwidget = { axis:PlotWidget(self) for axis in ['a','g','m'] }
        titles = { 'a':'Acceleration','g':'Gyro','m':'Geomagnetism' }
        for key in self.plotwidget:
            self.plotwidget[key].setBackground("#FFFFFFFF")
            plotitem = self.plotwidget[key].plotItem
            plotitem.setLabels(bottom='time', left=titles[key])
            plotitem.getAxis('bottom').setPen( pyqtgraph.mkPen(color='#000000') )
            plotitem.getAxis('left').setPen( pyqtgraph.mkPen(color='#000000') )
        
        # leyout
        self.verticalLayout.addWidget(self.plotwidget['a'])
        self.verticalLayout.addWidget(self.plotwidget['g'])
        self.verticalLayout.addWidget(self.plotwidget['m'])

        # timer
        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self.update_data)
        self.timer.start(50)

    def update_data(self):
        # stop timer
        self.timer.stop()

        # clear
        self.plotwidget['a'].clear()
        self.plotwidget['g'].clear()
        self.plotwidget['m'].clear()

        # get serial
        try:
            line = self.serial.readline().decode()
            data = { data.split(':')[0]:data.split(':')[1] for data in line.split(',') }

            # increase time
            self.plot_data['t'] = np.append( self.plot_data['t'][1:], self.plot_data['t'][-1]+1 )

            # add data
            for key in self.plot_data:
                if key == 't':
                    continue
                self.plot_data[key] = np.append( self.plot_data[key][1:], float(data[key]) )

                # set data
                self.plotwidget[key[0]].addItem(
                    pyqtgraph.PlotDataItem(
                        x=self.plot_data['t'], y=self.plot_data[key], 
                        pen=pyqtgraph.mkPen(color=self.plot_data_color[key], width=3), antialias=True
                    )
                )
        except:
            pass
        
        # start timer
        self.timer.start(50)

def main():
    app = QtWidgets.QApplication(sys.argv)
    mainwindow = MainWindow(None)
    mainwindow.show()
    app.exec()

if __name__ == '__main__':
    main()

Share post

Related Posts

Comments

comments powered by Disqus