Continuing the discussion from Wanted: A good SPI mutex:
Given the following code:
Waveshare demo code for the 2.7" display using SPI calls.
#!/usr/bin/python
# -*- coding:utf-8 -*-
import sys
import os
picdir = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'pic')
libdir = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), 'lib')
if os.path.exists(libdir):
sys.path.append(libdir)
import logging
from waveshare_epd import epd2in7
import time
from PIL import Image,ImageDraw,ImageFont
import traceback
from di_mutex import DI_Mutex
mutex = DI_Mutex
logging.basicConfig(level=logging.DEBUG)
try:
logging.info("epd2in7 Demo")
epd = epd2in7.EPD()
'''2Gray(Black and white) display'''
logging.info("init and Clear")
try:
mutex.acquire()
epd.init()
finally:
mutex.release()
try:
mutex.acquire()
epd.Clear(0xFF)
finally:
mutex.release()
<snip rest of code that is, essentially, irrelevant.>
The di_mutex code used above:
# https://www.dexterindustries.com
#
# Copyright (c) 2019 Dexter Industries
# Modified 2024-07 by Jim Harris to include usage information
# Released under the MIT license (http://choosealicense.com/licenses/mit/).
# For more information see https://github.com/DexterInd/DI_Sensors/blob/master/LICENSE.md
#
# Generic Dexter Industries python mutex
#
# Usage:
# Insert and un-comment the code between the double lines.
#
# ============================
#from di_mutex import DI_Mutex
#
#spi_mutex = DI_Mutex("SPI")
#
#
#try:
# spi_mutex.acquire()
# # do protected SPI access stuff here
#finally:
# spi_mutex.release()
# ============================
from __future__ import print_function
from __future__ import division
import time
import fcntl
import os
import atexit
class DI_Mutex(object):
""" Dexter Industries mutex """
def __init__(self, name, loop_time = 0.0001):
""" Initialize """
self.Filename = "/run/lock/DI_Mutex_" + name
self.LoopTime = loop_time
self.Handle = None
try:
open(self.Filename, 'w')
if os.path.isfile(self.Filename):
os.chmod(self.Filename, 0o777)
except Exception as e:
pass
# Register the exit method
atexit.register(self.__exit_cleanup__) # register the exit method
def __exit_cleanup__(self):
""" Called at exit to clean up """
self.release()
def acquire(self):
""" Acquire the mutex """
while True:
try:
self.Handle = open(self.Filename, 'w')
# lock
fcntl.lockf(self.Handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
return
except IOError: # already locked by a different process
time.sleep(self.LoopTime)
except Exception as e:
print(e)
def release(self):
""" Release the mutex """
if self.Handle is not None and self.Handle is not True:
self.Handle.close()
self.Handle = None
time.sleep(self.LoopTime)
And finally, my sanity test code.
It essentially does nothing, but I used it to verify that the mutex code itself wasn’t failing.
#!/usr/bin/python3.7
import os
import sys
from di_mutex import DI_Mutex
mutex = DI_Mutex("SPI")
try:
mutex.acquire()
print("SPI mutex acquired")
# do protected SPI access stuff here
print("sys.path before insert = ", sys.path)
sys.path.insert(1,"/home/pi/Test_Libraries/")
print("sys.path after insert = ", sys.path)
import easygopigo3 as easy
import I2C_mutex
print("str(easy) = ", str(easy))
print("str(I2C_mutex) - ", str(I2C_mutex))
finally:
mutex.release()
print("SPI mutex released")
Notes:
The sanity test code, when run inside Thonny, works perfectly, however the demo code, modified to use the Dexter mutex, always fails as follows:
>>> %Run epd_2in7_test.py
INFO:root:epd2in7 Demo
INFO:root:init and Clear
Traceback (most recent call last):
File "/home/pi/e-Paper/RaspberryPi_JetsonNano/python/examples/epd_2in7_test.py", line 29, in <module>
mutex.acquire()
TypeError: acquire() missing 1 required positional argument: 'self'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/pi/e-Paper/RaspberryPi_JetsonNano/python/examples/epd_2in7_test.py", line 32, in <module>
mutex.release()
TypeError: release() missing 1 required positional argument: 'self'
>>>
Adding “self” as a parameter, “mutex.acquire|release(self)”, as a prefix, “self.mutex.acqire|release()”, or both “self.mutex.acquire|release(self)”:
try:
logging.info("epd2in7 Demo")
epd = epd2in7.EPD()
logging.info("init and Clear")
try:
mutex.acquire(self)
epd.init()
finally:
mutex.release(self)
try:
mutex.acquire(self)
epd.Clear(0xFF)
finally:
mutex.release(self)
generates the following error:
>>> %Run epd_2in7_test.py
INFO:root:epd2in7 Demo
INFO:root:init and Clear
Traceback (most recent call last):
File "/home/pi/e-Paper/RaspberryPi_JetsonNano/python/examples/epd_2in7_test.py", line 29, in <module>
mutex.acquire(self)
NameError: name 'self' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/pi/e-Paper/RaspberryPi_JetsonNano/python/examples/epd_2in7_test.py", line 32, in <module>
mutex.release(self)
NameError: name 'self' is not defined
>>>
I am sure it is something both obvious and stupid, but for the life of me I cannot figure out what’s going wrong.
Thanks!