Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
I
itii-mac-tp-etu
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Metrics
Environments
Packages & Registries
Packages & Registries
Package Registry
Container Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Mikaël BRIDAY
itii-mac-tp-etu
Commits
570253d1
Commit
570253d1
authored
Nov 28, 2020
by
Mikaël BRIDAY
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update scripts, now thread safe
parent
15cd665f
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
141 additions
and
66 deletions
+141
-66
virtual/qemu_io.py
virtual/qemu_io.py
+42
-19
virtual/stepper.py
virtual/stepper.py
+48
-22
virtual/ultrasound.py
virtual/ultrasound.py
+51
-25
No files found.
virtual/qemu_io.py
View file @
570253d1
...
...
@@ -13,6 +13,8 @@ import struct
pipc_gpio_magic
=
0xdeadbeef
pipc_adc_magic
=
0xcafecafe
pipc_serial_magic
=
0xabcdef01
pipc_mcp_magic
=
0xfeedfeed
pipc_reset_magic
=
0xbadf00d
import
posix_ipc
as
pipc
mq_to_qemu
=
pipc
.
MessageQueue
(
"/to_qemu"
,
flags
=
pipc
.
O_CREAT
,
read
=
False
,
write
=
True
)
...
...
@@ -59,10 +61,19 @@ class MainWindow(QMainWindow):
self
.
setCentralWidget
(
widget
)
@
pyqtSlot
()
def
reset
(
self
):
"""called by QEmu at startup. It should send back information to QEmu """
print
(
'reset'
)
for
gpio
in
self
.
gpios
:
for
chkbox
in
self
.
gpios
[
gpio
]:
chkbox
.
setPalette
(
self
.
palDis
)
chkbox
.
setCheckState
(
Qt
.
Unchecked
)
def
buttonChecked
(
self
,
state
,
gpioId
,
pin
):
""" called when a checkbox is checked """
print
(
"send msg: GPIO "
+
str
(
gpioId
)
+
", pin "
+
str
(
pin
)
+
', state '
+
str
(
state
==
Qt
.
Checked
)
)
s
=
struct
.
pack
(
"=IIII"
,
pipc_magic
,
pin
,
state
==
Qt
.
Checked
,
gpioId
)
s
=
struct
.
pack
(
"=IIII"
,
pipc_
gpio_
magic
,
pin
,
state
==
Qt
.
Checked
,
gpioId
)
mq_to_qemu
.
send
(
s
)
...
...
@@ -85,27 +96,39 @@ class MainWindow(QMainWindow):
app
=
QApplication
(
sys
.
argv
)
window
=
MainWindow
()
def
receiver
():
while
True
:
msg
=
mq_from_qemu
.
receive
()
magic
=
struct
.
unpack
(
"=I"
,
msg
[
0
][
0
:
4
])
#get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if
magic
[
0
]
==
pipc_gpio_magic
:
magic
,
changed_out
,
dir_mask
,
output
,
gpio
=
struct
.
unpack
(
"=IHHHH"
,
msg
[
0
])
name
=
[
'A'
,
'B'
,
'C'
,
'D'
,
'F'
]
if
gpio
<
5
:
window
.
setGPIO
(
name
[
gpio
],
dir_mask
,
output
)
#TODO: thread safe?
elif
magic
[
0
]
==
pipc_serial_magic
:
pass
else
:
raise
Exception
(
"Wrong magic number in GPIO IPC message: 0x{val:08x}"
.
format
(
val
=
magic
[
0
])
)
#required if we get too many messages to let time for the UI.
time
.
sleep
(.
01
)
class
Receiver
(
QThread
):
resetSig
=
pyqtSignal
()
gpioSig
=
pyqtSignal
(
str
,
int
,
int
)
def
__init__self
():
QThread
.
__init__
(
self
)
def
run
(
self
):
while
True
:
msg
=
mq_from_qemu
.
receive
()
magic
=
struct
.
unpack
(
"=I"
,
msg
[
0
][
0
:
4
])
#get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if
magic
[
0
]
==
pipc_gpio_magic
:
magic
,
changed_out
,
dir_mask
,
output
,
gpio
=
struct
.
unpack
(
"=IHHHH"
,
msg
[
0
])
name
=
[
'A'
,
'B'
,
'C'
,
'D'
,
'F'
]
if
gpio
<
5
:
self
.
gpioSig
.
emit
(
name
[
gpio
],
dir_mask
,
output
)
elif
magic
[
0
]
==
pipc_serial_magic
:
pass
elif
magic
[
0
]
==
pipc_mcp_magic
:
pass
elif
magic
[
0
]
==
pipc_reset_magic
:
self
.
resetSig
.
emit
()
else
:
raise
Exception
(
"Wrong magic number in GPIO IPC message: 0x{val:08x}"
.
format
(
val
=
magic
[
0
])
)
#required if we get too many messages to let time for the UI.
time
.
sleep
(.
01
)
window
.
show
()
thread
=
threading
.
Thread
(
target
=
receiver
)
thread
.
daemon
=
True
thread
=
Receiver
()
thread
.
resetSig
.
connect
(
window
.
reset
)
thread
.
gpioSig
.
connect
(
window
.
setGPIO
)
thread
.
start
()
app
.
exec_
()
#event loop
virtual/stepper.py
View file @
570253d1
...
...
@@ -10,9 +10,11 @@ import sys,time
import
threading
# Communication part
import
struct
pipc_gpio_magic
=
0xdeadbeef
pipc_adc_magic
=
0xcafecafe
pipc_gpio_magic
=
0xdeadbeef
pipc_adc_magic
=
0xcafecafe
pipc_serial_magic
=
0xabcdef01
pipc_mcp_magic
=
0xfeedfeed
pipc_reset_magic
=
0xbadf00d
import
posix_ipc
as
pipc
mq_to_qemu
=
pipc
.
MessageQueue
(
"/to_qemu"
,
flags
=
pipc
.
O_CREAT
,
read
=
False
,
write
=
True
)
...
...
@@ -113,6 +115,19 @@ class MainWindow(QMainWindow):
self
.
setCentralWidget
(
widget
)
@
pyqtSlot
()
def
reset
(
self
):
print
(
'reset'
)
for
(
pin
,
chkbox
)
in
self
.
leds
:
chkbox
.
setPalette
(
self
.
palOff
)
for
(
pin
,
chkbox
)
in
self
.
buttons
:
chkbox
.
setCheckState
(
Qt
.
Unchecked
)
self
.
state
=
0
self
.
step
=
0
self
.
labelStepperState
.
setText
(
'state : '
+
str
(
self
.
state
))
self
.
labelStepperStep
.
setText
(
'step : '
+
str
(
self
.
step
))
self
.
slider
.
setValue
(
0
)
def
buttonChecked
(
self
,
state
,
pin
):
""" called when a checkbox is checked """
gpioId
=
1
#2 buttons on GPIOB
...
...
@@ -180,24 +195,34 @@ class MainWindow(QMainWindow):
app
=
QApplication
(
sys
.
argv
)
window
=
MainWindow
()
def
receiver
():
while
True
:
msg
=
mq_from_qemu
.
receive
()
magic
=
struct
.
unpack
(
"=I"
,
msg
[
0
][
0
:
4
])
#get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if
magic
[
0
]
==
pipc_gpio_magic
:
magic
,
changed_out
,
dir_mask
,
output
,
gpio
=
struct
.
unpack
(
"=IHHHH"
,
msg
[
0
])
name
=
[
'A'
,
'B'
,
'C'
,
'D'
,
'F'
]
if
gpio
<
5
:
window
.
setGPIO
(
name
[
gpio
],
dir_mask
,
output
)
#TODO: thread safe?
elif
magic
[
0
]
==
pipc_serial_magic
:
magic
,
char
=
struct
.
unpack
(
"=II"
,
msg
[
0
])
#print('char {0:c}'.format(char))
window
.
serial
.
insertPlainText
(
chr
(
char
&
0xff
))
else
:
raise
Exception
(
"Wrong magic number in GPIO IPC message: 0x{val:08x}"
.
format
(
val
=
magic
[
0
])
)
#required if we get too many messages to let time for the UI.
time
.
sleep
(.
01
)
class
Receiver
(
QThread
):
resetSig
=
pyqtSignal
()
gpioSig
=
pyqtSignal
(
str
,
int
,
int
)
mcpSig
=
pyqtSignal
(
str
,
int
,
int
)
def
__init__self
():
QThread
.
__init__
(
self
)
def
run
(
self
):
while
True
:
msg
=
mq_from_qemu
.
receive
()
magic
=
struct
.
unpack
(
"=I"
,
msg
[
0
][
0
:
4
])
#get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if
magic
[
0
]
==
pipc_gpio_magic
:
magic
,
changed_out
,
dir_mask
,
output
,
gpio
=
struct
.
unpack
(
"=IHHHH"
,
msg
[
0
])
name
=
[
'A'
,
'B'
,
'C'
,
'D'
,
'F'
]
if
gpio
<
5
:
self
.
gpioSig
.
emit
(
name
[
gpio
],
dir_mask
,
output
)
elif
magic
[
0
]
==
pipc_serial_magic
:
pass
elif
magic
[
0
]
==
pipc_mcp_magic
:
pass
elif
magic
[
0
]
==
pipc_reset_magic
:
self
.
resetSig
.
emit
()
else
:
raise
Exception
(
"Wrong magic number in GPIO IPC message: 0x{val:08x}"
.
format
(
val
=
magic
[
0
])
)
#required if we get too many messages to let time for the UI.
time
.
sleep
(.
01
)
def
warningPersistence
():
while
True
:
...
...
@@ -209,8 +234,9 @@ def warningPersistence():
window
.
show
()
thread
=
threading
.
Thread
(
target
=
receiver
)
thread
.
daemon
=
True
thread
=
Receiver
()
thread
.
resetSig
.
connect
(
window
.
reset
)
thread
.
gpioSig
.
connect
(
window
.
setGPIO
)
thread
.
start
()
threadWarning
=
threading
.
Thread
(
target
=
warningPersistence
)
...
...
virtual/ultrasound.py
View file @
570253d1
...
...
@@ -13,6 +13,8 @@ import struct
pipc_gpio_magic
=
0xdeadbeef
pipc_adc_magic
=
0xcafecafe
pipc_serial_magic
=
0xabcdef01
pipc_mcp_magic
=
0xfeedfeed
pipc_reset_magic
=
0xbadf00d
import
posix_ipc
as
pipc
mq_to_qemu
=
pipc
.
MessageQueue
(
"/to_qemu"
,
flags
=
pipc
.
O_CREAT
,
read
=
False
,
write
=
True
)
...
...
@@ -94,12 +96,12 @@ class MainWindow(QMainWindow):
#ADC pin state
gbox
=
QGroupBox
(
"ADC - potentiometer"
)
layout
=
QHBoxLayout
()
slider
=
QSlider
(
Qt
.
Horizontal
)
slider
.
setMinimum
(
0
)
slider
.
setMaximum
(
4095
)
layout
.
addWidget
(
slider
)
s
elf
.
s
lider
=
QSlider
(
Qt
.
Horizontal
)
s
elf
.
s
lider
.
setMinimum
(
0
)
s
elf
.
s
lider
.
setMaximum
(
4095
)
layout
.
addWidget
(
s
elf
.
s
lider
)
self
.
labelADC
=
QLabel
(
'0'
)
slider
.
valueChanged
.
connect
(
self
.
updateSlider
)
s
elf
.
s
lider
.
valueChanged
.
connect
(
self
.
updateSlider
)
layout
.
addWidget
(
self
.
labelADC
)
gbox
.
setLayout
(
layout
)
layoutV
.
addWidget
(
gbox
)
...
...
@@ -109,6 +111,17 @@ class MainWindow(QMainWindow):
self
.
setCentralWidget
(
widget
)
@
pyqtSlot
()
def
reset
(
self
):
"""called by QEmu at startup. It should send back information to QEmu """
print
(
'reset'
)
for
(
pin
,
chkbox
)
in
self
.
leds
:
chkbox
.
setPalette
(
self
.
palOff
)
for
(
pin
,
chkbox
)
in
self
.
buttons
:
chkbox
.
setCheckState
(
Qt
.
Unchecked
)
self
.
serial
.
clear
()
self
.
slider
.
setValue
(
0
)
def
buttonChecked
(
self
,
state
,
pin
):
""" called when a checkbox is checked """
gpioId
=
1
#2 buttons on GPIOB
...
...
@@ -137,24 +150,35 @@ class MainWindow(QMainWindow):
app
=
QApplication
(
sys
.
argv
)
window
=
MainWindow
()
def
receiver
():
while
True
:
msg
=
mq_from_qemu
.
receive
()
magic
=
struct
.
unpack
(
"=I"
,
msg
[
0
][
0
:
4
])
#get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if
magic
[
0
]
==
pipc_gpio_magic
:
magic
,
changed_out
,
dir_mask
,
output
,
gpio
=
struct
.
unpack
(
"=IHHHH"
,
msg
[
0
])
name
=
[
'A'
,
'B'
,
'C'
,
'D'
,
'F'
]
if
gpio
<
5
:
window
.
setGPIO
(
name
[
gpio
],
dir_mask
,
output
)
#TODO: thread safe?
elif
magic
[
0
]
==
pipc_serial_magic
:
magic
,
char
=
struct
.
unpack
(
"=II"
,
msg
[
0
])
#print('char {0:c}'.format(char))
window
.
serial
.
insertPlainText
(
chr
(
char
&
0xff
))
else
:
raise
Exception
(
"Wrong magic number in GPIO IPC message: 0x{val:08x}"
.
format
(
val
=
magic
[
0
])
)
#required if we get too many messages to let time for the UI.
time
.
sleep
(.
01
)
class
Receiver
(
QThread
):
resetSig
=
pyqtSignal
()
gpioSig
=
pyqtSignal
(
str
,
int
,
int
)
serialSig
=
pyqtSignal
(
str
)
def
__init__self
():
QThread
.
__init__
(
self
)
def
run
(
self
):
while
True
:
msg
=
mq_from_qemu
.
receive
()
magic
=
struct
.
unpack
(
"=I"
,
msg
[
0
][
0
:
4
])
#get magic value. return a tuple.
#print("mg=",mg," pin=",pin," gpio=",gpio," state=",state)
if
magic
[
0
]
==
pipc_gpio_magic
:
magic
,
changed_out
,
dir_mask
,
output
,
gpio
=
struct
.
unpack
(
"=IHHHH"
,
msg
[
0
])
name
=
[
'A'
,
'B'
,
'C'
,
'D'
,
'F'
]
if
gpio
<
5
:
self
.
gpioSig
.
emit
(
name
[
gpio
],
dir_mask
,
output
)
elif
magic
[
0
]
==
pipc_serial_magic
:
magic
,
char
=
struct
.
unpack
(
"=II"
,
msg
[
0
])
self
.
serialSig
.
emit
(
chr
(
char
&
0xff
))
elif
magic
[
0
]
==
pipc_mcp_magic
:
pass
elif
magic
[
0
]
==
pipc_reset_magic
:
self
.
resetSig
.
emit
()
else
:
raise
Exception
(
"Wrong magic number in GPIO IPC message: 0x{val:08x}"
.
format
(
val
=
magic
[
0
])
)
#required if we get too many messages to let time for the UI.
time
.
sleep
(.
01
)
def
warningPersistence
():
while
True
:
...
...
@@ -165,8 +189,10 @@ def warningPersistence():
window
.
show
()
thread
=
threading
.
Thread
(
target
=
receiver
)
thread
.
daemon
=
True
thread
=
Receiver
()
thread
.
resetSig
.
connect
(
window
.
reset
)
thread
.
gpioSig
.
connect
(
window
.
setGPIO
)
thread
.
serialSig
.
connect
(
window
.
serial
.
insertPlainText
)
thread
.
start
()
threadWarning
=
threading
.
Thread
(
target
=
warningPersistence
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment