]> OzVa Git service - doomsday-clock/commitdiff
Safty changes
authorwill <greenwoodw50@gmail.com>
Mon, 19 Aug 2024 13:35:16 +0000 (14:35 +0100)
committerwill <greenwoodw50@gmail.com>
Mon, 19 Aug 2024 13:35:16 +0000 (14:35 +0100)
Added Venv support
Added automatic dummy GPIO class for easy testing
Added safe "Value" and "Array" objects for further safety
Added readout text
Added if __name__ statement which initializes the class

clock.py [changed mode: 0644->0755]

old mode 100644 (file)
new mode 100755 (executable)
index 12e150c..661d0f1
--- a/clock.py
+++ b/clock.py
@@ -1,21 +1,48 @@
+#!./.venv/bin/python
+from multiprocessing import Process, Array, Value
 import requests
 import time
 import sacn
-import RPi.GPIO as GPIO
 
-class clock:
+class dummy():
        def __init__(self):
+               self.HIGH = "high"
+               self.LOW = "low"
+               self.BOARD = None
+               self.OUT = "output"
+       def setmode(_): pass
+       def setup(pin, mode):
+               if type(pin) == int:
+                       print(f"pin {pin} mode set to {mode}")
+               else:
+                       print(f"pins {', '.join(pin)} mode set to {mode}")
+
+       def output(pin, state):
+               if type(pin) == int:
+                       print(f"pin {pin} set {state}")
+               else:
+                       print(f"pins {', '.join(pin)} set {state}")
+
+       def cleanup(): pass
+
+try: import RPi.GPIO
+except ModuleNotFoundError:
+       print("RPi.GPIO Not found!\nDefaulting to dummy GPIO...")
+       GPIO = dummy()
+
+class clock():
+       def __init__(self):
+               # fixed variables
                self.url = "https://data.ozva.co.uk/shopping/clock.json"
-
                self.request_speed = 0.1
-
                self.light_speed = 0.5
-               self.cues = 0
 
-               self.current_position = 0
-               self.last_position = 0
-               self.movement_speed = 10
-               self.function = "linear"
+               # dynamic variables
+               self.cues = Array("I", [0])
+               self.current_position = Value("I", 0)
+               self.last_position = Value("I", 0)
+               self.movement_speed = Value("I", 10)
+               self.function = Value("u", "linear")
 
                get_loop = Process(target=self.get(), daemon = True)
                move_loop = Process(target=self.move(), daemon = True)
@@ -25,7 +52,9 @@ class clock:
                light_loop.start()
 
        def get(self):
+               print("connecting to OzVa...")
                try:
+                       print("getting data...")
                        while True:
                                t = time.time()
 
@@ -33,29 +62,34 @@ class clock:
                                data = r.json()
 
                                new_target = int(data["currentPosition"])
-                               if new_target != self.target_position:
-                                       self.last_position = self.current_position
-                                       self.target_position = new_target
+                               if new_target != self.target_position.value:
+                                       self.last_position.value = self.current_position.value
+                                       self.target_position.value = new_target
 
-                               self.movement_speed = int(data["movementSpeed"])
-                               self.function = data["function"]
-                               self.cues = data["lightingCues"]
+                               self.movement_speed.value = int(data["movementSpeed"])
+                               self.function.value = data["function"]
+                               self.cues.value = [255 if i == "true" else 0 for i in data["lightingCues"]]
 
                                time.sleep(self.request_speed - (time.time() - t))
 
-               except:
-                       pass
+               except KeyboardInterrupt:
+                       try:
+                               sys.exit()
+                       except SystemExit:
+                               os._exit(130)
 
        def move(self):
+               print("starting motor service...")
                GPIO.setmode(GPIO.BOARD)
                GPIO.setup((11, 13, 15), GPIO.OUT)
 
                try:
+                       print("sending movement data...")
                        while True:
                                t = time.time()
 
-                               parked = self.current_position != self.target_position
-                               forward = self.target_position >= self.current_position
+                               parked = self.current_position.value != self.target_position.value
+                               forward = self.target_position.value >= self.current_position.value
                                if forward and (not parked):
                                        GPIO.output(13, GPIO.HIGH)
                                        GPIO.output(11, GPIO.HIGH)
@@ -64,26 +98,40 @@ class clock:
 
                                GPIO.output((11, 13, 15), GPIO.LOW)
 
-                               time.sleep(self.movement_speed - (time.time() - t))
+                               time.sleep(self.movement_speed.value - (time.time() - t))
 
-               except:
+               except KeyboardInterrupt:
                        GPIO.cleanup()
 
+                       try:
+                               sys.exit()
+                       except SystemExit:
+                               os._exit(130)
+
        def light(self):
+               print("starting lighting service...")
                sender = sacn.sACNsender()
                sender.start()
                sender.activate_output(2)
                sender[2].multicast = True
 
                try:
+                       print("sending lighting data...")
                        while True:
                                t = time.time()
 
-                               data = tuple(255 if i == "true" else 0 for i in self.cues)
-                               sender.dmx_data = data
+                               sender.dmx_data = tuple(self.cues.value)
 
                                time.sleep(self.light_speed - (time.time() - t))
 
-               except:
+               except KeyboardInterrupt:
                        sender.stop()
 
+                       try:
+                               sys.exit()
+                       except SystemExit:
+                               os._exit(130)
+
+if __name__ == "__main__":
+       print("initializing...")
+       a = clock()