From 3868fe82e1399e8a525cf585d3d680b2a30120ca Mon Sep 17 00:00:00 2001 From: will Date: Mon, 19 Aug 2024 14:35:16 +0100 Subject: [PATCH] Safty changes Added Venv support Added automatic dummy GPIO class for easy testing Added safe "Value" and "Array" objects for further safety Added readout text Added if __name__ statement which initializes the class --- clock.py | 96 ++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 72 insertions(+), 24 deletions(-) mode change 100644 => 100755 clock.py diff --git a/clock.py b/clock.py old mode 100644 new mode 100755 index 12e150c..661d0f1 --- a/clock.py +++ b/clock.py @@ -1,21 +1,48 @@ +#!./.venv/bin/python +from multiprocessing import Process, Array, Value import requests import time import sacn -import RPi.GPIO as GPIO -class clock: +class dummy(): def __init__(self): + self.HIGH = "high" + self.LOW = "low" + self.BOARD = None + self.OUT = "output" + def setmode(_): pass + def setup(pin, mode): + if type(pin) == int: + print(f"pin {pin} mode set to {mode}") + else: + print(f"pins {', '.join(pin)} mode set to {mode}") + + def output(pin, state): + if type(pin) == int: + print(f"pin {pin} set {state}") + else: + print(f"pins {', '.join(pin)} set {state}") + + def cleanup(): pass + +try: import RPi.GPIO +except ModuleNotFoundError: + print("RPi.GPIO Not found!\nDefaulting to dummy GPIO...") + GPIO = dummy() + +class clock(): + def __init__(self): + # fixed variables self.url = "https://data.ozva.co.uk/shopping/clock.json" - self.request_speed = 0.1 - self.light_speed = 0.5 - self.cues = 0 - self.current_position = 0 - self.last_position = 0 - self.movement_speed = 10 - self.function = "linear" + # dynamic variables + self.cues = Array("I", [0]) + self.current_position = Value("I", 0) + self.last_position = Value("I", 0) + self.movement_speed = Value("I", 10) + self.function = Value("u", "linear") get_loop = Process(target=self.get(), daemon = True) move_loop = Process(target=self.move(), daemon = True) @@ -25,7 +52,9 @@ class clock: light_loop.start() def get(self): + print("connecting to OzVa...") try: + print("getting data...") while True: t = time.time() @@ -33,29 +62,34 @@ class clock: data = r.json() new_target = int(data["currentPosition"]) - if new_target != self.target_position: - self.last_position = self.current_position - self.target_position = new_target + if new_target != self.target_position.value: + self.last_position.value = self.current_position.value + self.target_position.value = new_target - self.movement_speed = int(data["movementSpeed"]) - self.function = data["function"] - self.cues = data["lightingCues"] + self.movement_speed.value = int(data["movementSpeed"]) + self.function.value = data["function"] + self.cues.value = [255 if i == "true" else 0 for i in data["lightingCues"]] time.sleep(self.request_speed - (time.time() - t)) - except: - pass + except KeyboardInterrupt: + try: + sys.exit() + except SystemExit: + os._exit(130) def move(self): + print("starting motor service...") GPIO.setmode(GPIO.BOARD) GPIO.setup((11, 13, 15), GPIO.OUT) try: + print("sending movement data...") while True: t = time.time() - parked = self.current_position != self.target_position - forward = self.target_position >= self.current_position + parked = self.current_position.value != self.target_position.value + forward = self.target_position.value >= self.current_position.value if forward and (not parked): GPIO.output(13, GPIO.HIGH) GPIO.output(11, GPIO.HIGH) @@ -64,26 +98,40 @@ class clock: GPIO.output((11, 13, 15), GPIO.LOW) - time.sleep(self.movement_speed - (time.time() - t)) + time.sleep(self.movement_speed.value - (time.time() - t)) - except: + except KeyboardInterrupt: GPIO.cleanup() + try: + sys.exit() + except SystemExit: + os._exit(130) + def light(self): + print("starting lighting service...") sender = sacn.sACNsender() sender.start() sender.activate_output(2) sender[2].multicast = True try: + print("sending lighting data...") while True: t = time.time() - data = tuple(255 if i == "true" else 0 for i in self.cues) - sender.dmx_data = data + sender.dmx_data = tuple(self.cues.value) time.sleep(self.light_speed - (time.time() - t)) - except: + except KeyboardInterrupt: sender.stop() + try: + sys.exit() + except SystemExit: + os._exit(130) + +if __name__ == "__main__": + print("initializing...") + a = clock() -- 2.39.2