Commit 15cd665f authored by Mikaël BRIDAY's avatar Mikaël BRIDAY
Browse files

update scripts, now thread safe

parent c876662b
#! /usr/bin/env python3
# -*- coding: UTF-8 -*-
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
import sys,time
import threading
# Communication part
import struct
pipc_gpio_magic = 0xdeadbeef
pipc_adc_magic = 0xcafecafe
pipc_serial_magic = 0xabcdef01
pipc_mcp_magic = 0xfeedfeed
pipc_reset_magic = 0xbadf00d
import posix_ipc as pipc
mq_to_qemu = pipc.MessageQueue("/to_qemu",flags=pipc.O_CREAT, read=False, write=True)
mq_from_qemu = pipc.MessageQueue("/from_qemu",flags=pipc.O_CREAT, read=True, write=False)
class MainWindow(QMainWindow):
#for retinal persistence.
led = [False,False,False,False,False,False]
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args,**kwargs)
self.setWindowTitle("QEmu GPIO interface for STM32F303 - Charlipexing")
self.leds = []
# bits 0 to 2 are led dirs, bits 4 to 6 are led States
self.charliePlexingState = 0
layoutV = QVBoxLayout()
self.palOn = QPalette()
self.palOn.setColor(QPalette.Base,Qt.darkGreen)
self.palOff = QPalette()
self.palOff.setColor(QPalette.Base,Qt.darkRed)
gbox=QGroupBox("Charlieplexing")
#Gpio pin state
layout = QHBoxLayout()
for i in range(6):
chkbox=QCheckBox("L"+str(i))
chkbox.setCheckable(False)
chkbox.setAutoFillBackground(True)
chkbox.setPalette(self.palOff)
self.leds.append(chkbox)
layout.addWidget(chkbox)
gbox.setLayout(layout)
layoutV.addWidget(gbox)
widget = QWidget()
widget.setLayout(layoutV)
self.setCentralWidget(widget)
@pyqtSlot()
def reset(self):
"""called by QEmu at startup. It should send back information to QEmu """
print('reset')
for chkbox in self.leds:
chkbox.setPalette(self.palOff)
def setGPIO(self,port,dir_mask,output):
update = False
if port == 'B': #pin0 is PB7
self.charliePlexingState &= ~0x9 #raz bit 0 and 3
self.charliePlexingState |= (dir_mask >> 7) & 1
self.charliePlexingState |= ((output >> 7) & 1) << 3
update = True
if port == 'F': #pin1,2 is PF0,1
self.charliePlexingState &= ~0x36 # raz bit 1,2,4, 5
self.charliePlexingState |= (dir_mask & 3) << 1 #set bits 1,2
self.charliePlexingState |= (output & 3) << 4 #set bits 4,5
update = True
if update:
mask=[0x1B,0x1B,0x36,0x36,0x2D,0x2D]
data=[0x0B,0x13,0x16,0x26,0x0D,0x25]
#Leds
for l in range(6):
if (self.charliePlexingState & mask[l]) == data[l]:
self.leds[l].setPalette(self.palOn)
self.led[l] = True
#else:
# self.leds[l].setPalette(self.palOff)
@pyqtSlot()
def switchLedOffs(self):
for i in range(6):
if not self.led[i]:
window.leds[i].setPalette(window.palOff)
self.led[i] = False
app = QApplication(sys.argv)
window = MainWindow()
class Receiver(QThread):
resetSig = pyqtSignal()
gpioSig = pyqtSignal(str,int,int)
def __init__self():
QThread.__init__(self)
def run(self):
while True:
msg = mq_from_qemu.receive()
magic = struct.unpack("=I",msg[0][0:4]) #get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if magic[0] == pipc_gpio_magic:
magic, changed_out, dir_mask,output,gpio = struct.unpack("=IHHHH",msg[0])
name = ['A','B','C','D','F']
if gpio < 5:
self.gpioSig.emit(name[gpio],dir_mask,output)
elif magic[0] == pipc_serial_magic:
pass
elif magic[0] == pipc_mcp_magic:
pass
elif magic[0] == pipc_reset_magic:
self.resetSig.emit()
else:
raise Exception("Wrong magic number in GPIO IPC message: 0x{val:08x}".format(val=magic[0]) )
#required if we get too many messages to let time for the UI.
time.sleep(.01)
class RetinalPersistence(QThread):
switchOffLedSig = pyqtSignal()
def __init__self():
QThread.__init__(self)
def run(self):
while True:
time.sleep(0.2)
self.switchOffLedSig.emit()
window.show()
thread = Receiver()
thread.resetSig.connect(window.reset)
thread.gpioSig.connect(window.setGPIO)
thread.start()
threadPersistence = RetinalPersistence()
threadPersistence.switchOffLedSig.connect(window.switchLedOffs)
threadPersistence.start()
app.exec_() #event loop
#! /usr/bin/env python3
# -*- coding: UTF-8 -*-
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
import sys,time
import threading
# Communication part
import struct
pipc_gpio_magic = 0xdeadbeef
pipc_adc_magic = 0xcafecafe
pipc_serial_magic = 0xabcdef01
pipc_mcp_magic = 0xfeedfeed
pipc_reset_magic = 0xbadf00d
import posix_ipc as pipc
mq_to_qemu = pipc.MessageQueue("/to_qemu",flags=pipc.O_CREAT, read=False, write=True)
mq_from_qemu = pipc.MessageQueue("/from_qemu",flags=pipc.O_CREAT, read=True, write=False)
#for retinal persistence.
led = [False,False,False,False,False,False]
class MainWindow(QMainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args,**kwargs)
self.setWindowTitle("QEmu MCP23S17 spi lab")
self.leds = [] #PB0, PB3 => tuple (pin,chkbox)
self.buttons = [] #PB6, PB1 => tuple (pin,chkbox)
self.warning = 0 # 0 = ok, 1 warning, 2 error
# bits 0 to 2 are led dirs, bits 4 to 6 are led States
self.charliePlexingState = 0
self.mcp = {'A':[],'B':[]}
layoutV = QVBoxLayout()
self.palOn = QPalette()
self.palOn.setColor(QPalette.Base,Qt.darkGreen)
self.palOff = QPalette()
self.palOff.setColor(QPalette.Base,Qt.darkRed)
self.palError = QPalette()
self.palError.setColor(QPalette.Window,Qt.red)
self.palWarning = QPalette()
self.palWarning.setColor(QPalette.Window,Qt.darkRed)
gbox=QGroupBox("user leds")
#Gpio pin state
#led D3
layout = QHBoxLayout()
chkbox_LED_D3=QCheckBox("D3 (PB0)")
chkbox_LED_D3.setCheckable(False)
chkbox_LED_D3.setAutoFillBackground(True)
chkbox_LED_D3.setPalette(self.palOff)
self.leds.append((0,chkbox_LED_D3))
layout.addWidget(chkbox_LED_D3)
#Gpio pin state
#led D13
chkbox_LED_D13=QCheckBox("D13 (PB3)")
chkbox_LED_D13.setCheckable(False)
chkbox_LED_D13.setAutoFillBackground(True)
chkbox_LED_D13.setPalette(self.palOff)
self.leds.append((3,chkbox_LED_D13))
layout.addWidget(chkbox_LED_D13)
gbox.setLayout(layout)
layoutV.addWidget(gbox)
gbox=QGroupBox("push buttons")
#Gpio pin state
#led D3
layout = QHBoxLayout()
chkbox_PB_D5=QCheckBox("D5 (PB6)")
chkbox_PB_D5.setCheckable(True)
self.buttons.append((6,chkbox_PB_D5))
layout.addWidget(chkbox_PB_D5)
#Gpio pin state
#led D13
chkbox_PB_D6=QCheckBox("D6 (PB1)")
chkbox_PB_D6.setCheckable(True)
self.buttons.append((1,chkbox_PB_D6))
layout.addWidget(chkbox_PB_D6)
gbox.setLayout(layout)
layoutV.addWidget(gbox)
for i,pb in self.buttons:
pb.stateChanged.connect(lambda x, i=i: self.buttonChecked(x,i))
gbox=QGroupBox("serial")
layout = QHBoxLayout()
self.serial = QTextEdit()
self.serial.setReadOnly(True)
layout.addWidget(self.serial)
gbox.setLayout(layout)
layoutV.addWidget(gbox)
#ADC pin state
gbox=QGroupBox("ADC - potentiometer")
layout = QHBoxLayout()
slider = QSlider(Qt.Horizontal)
slider.setMinimum(0)
slider.setMaximum(4095)
layout.addWidget(slider)
self.labelADC = QLabel('0')
slider.valueChanged.connect(self.updateSlider)
layout.addWidget(self.labelADC)
gbox.setLayout(layout)
layoutV.addWidget(gbox)
#MCP23S17
gbox=QGroupBox("MCP23S17")
layoutMCPV = QVBoxLayout()
layout = QHBoxLayout()
for i in range(8):
chkbox=QCheckBox("PA"+str(i))
chkbox.setCheckable(False)
chkbox.setAutoFillBackground(True)
chkbox.setPalette(self.palOff)
self.mcp['A'].append(chkbox)
layout.addWidget(chkbox)
layoutMCPV.addLayout(layout)
layout = QHBoxLayout()
for i in range(8):
chkbox=QCheckBox("PB"+str(i))
chkbox.setCheckable(True)
self.mcp['B'].append(chkbox)
layout.addWidget(chkbox)
chkbox.stateChanged.connect(lambda x, i=i: self.buttonMCP_PB(x,i))
layoutMCPV.addLayout(layout)
gbox.setLayout(layoutMCPV)
layoutV.addWidget(gbox)
#gbox.setLayout(layout)
#layoutV.addWidget(gbox)
#end
widget = QWidget()
widget.setLayout(layoutV)
self.setCentralWidget(widget)
@pyqtSlot()
def reset(self):
print('reset')
for (pin,chkbox) in self.leds:
chkbox.setPalette(self.palOff)
for (pin,chkbox) in self.buttons:
chkbox.setCheckState(Qt.Unchecked)
for led in self.mcp['A']:
led.setPalette(self.palOff)
for bp in self.mcp['B']:
bp.setCheckState(Qt.Unchecked)
def buttonMCP_PB(self,state,pin):
""" called when a button is checked (port B, MCP) """
gpioId = 1 #buttons on PORT B
#print("button PB"+str(pin)+": "+str(state))
s=struct.pack("=IIII",pipc_mcp_magic,pin,state==Qt.Checked,gpioId)
mq_to_qemu.send(s)
def buttonChecked(self,state,pin):
""" called when a checkbox is checked """
gpioId = 1 #2 buttons on GPIOB
#print("send msg: GPIO "+str(gpioId)+", pin "+str(pin)+', state '+str(state==Qt.Checked) )
s=struct.pack("=IIII",pipc_gpio_magic,pin,state==Qt.Checked,gpioId)
mq_to_qemu.send(s)
def updateSlider(self,val):
""" called each time the slider value is updated
"""
self.labelADC.setText(str(val))
#print("send msg to ADC 0:"+str(val))
s=struct.pack("=III",pipc_adc_magic,0,val)
mq_to_qemu.send(s)
def setGPIO(self,port,dir_mask,output):
#print('gpio '+str(port)+', dir'+str(dir_mask)+', out'+str(output))
if port == 'B': #pin0 is PB7
for (pin,widget) in self.leds:
pinMask = 1<<pin
if (dir_mask & pinMask == pinMask) and (output & pinMask == pinMask):
widget.setPalette(self.palOn)
else:
widget.setPalette(self.palOff)
def setMCPGPIO(self,port,dir_mask,output):
""" update MCP leds
"""
#print('gpio mcp '+str(port)+', dir'+str(dir_mask)+', out'+str(output))
if port == 'A': #PA => leds
pin = 0
for widget in self.mcp['A']:
pinMask = 1<<pin
if (dir_mask & pinMask == pinMask) and (output & pinMask == pinMask):
widget.setPalette(self.palOn)
else:
widget.setPalette(self.palOff)
pin += 1
app = QApplication(sys.argv)
window = MainWindow()
class Receiver(QThread):
resetSig = pyqtSignal()
gpioSig = pyqtSignal(str,int,int)
serialSig = pyqtSignal(str)
mcpSig = pyqtSignal(str,int,int)
def __init__self():
QThread.__init__(self)
def run(self):
while True:
msg = mq_from_qemu.receive()
magic = struct.unpack("=I",msg[0][0:4]) #get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if magic[0] == pipc_gpio_magic:
magic, changed_out, dir_mask,output,gpio = struct.unpack("=IHHHH",msg[0])
name = ['A','B','C','D','F']
if gpio < 5:
self.gpioSig.emit(name[gpio],dir_mask,output)
elif magic[0] == pipc_serial_magic:
magic, char = struct.unpack("=II",msg[0])
self.serialSig.emit(chr(char & 0xff))
elif magic[0] == pipc_mcp_magic:
magic, changed_out, dir_mask,output,gpio = struct.unpack("=IHHHH",msg[0])
#print("mg=",magic," dir=",dir_mask," gpio=",gpio," out=",output)
name = ['A','B']
if gpio < 2:
self.mcpSig.emit(name[gpio],dir_mask,output)
elif magic[0] == pipc_reset_magic:
self.resetSig.emit()
else:
raise Exception("Wrong magic number in GPIO IPC message: 0x{val:08x}".format(val=magic[0]) )
#required if we get too many messages to let time for the UI.
time.sleep(.01)
def warningPersistence():
while True:
time.sleep(2)
if window.warning > 0:
window.warning = 0
#print('reinit')
window.show()
thread = Receiver()
thread.resetSig.connect(window.reset)
thread.gpioSig.connect(window.setGPIO)
thread.serialSig.connect(window.serial.insertPlainText)
thread.mcpSig.connect(window.setMCPGPIO)
thread.start()
threadWarning = threading.Thread(target=warningPersistence)
threadWarning.daemon = True
threadWarning.start()
app.exec_() #event loop
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment