From 85c4380bb756ccfa11fb0070dbcde490d80cbd4f Mon Sep 17 00:00:00 2001 From: Max Value Date: Wed, 5 Feb 2025 19:44:15 +0000 Subject: [PATCH] Removed lighting system + fixed the movement system ? left the recevier.py --- clock.py | 82 ++++++++++++++++++-------------------------------------- 1 file changed, 26 insertions(+), 56 deletions(-) diff --git a/clock.py b/clock.py index 5e5cfd7..4e50ff7 100755 --- a/clock.py +++ b/clock.py @@ -14,33 +14,38 @@ import os # fake GPIO dummy class for debuging class dummy(): - def __init__(self): + def __init__(self, debug=True): self.HIGH = "high" self.LOW = "low" self.BOARD = None self.OUT = "output" + self.debug = debug def setmode(self, _): pass def setup(self, pin, mode): if type(pin) == int: - print(f"pin {pin} mode set to {mode}") + self.print(f"pin {pin} mode set to {mode}") else: pin = tuple(str(p) for p in pin) - print(f"pins {', '.join(pin)} mode set to {mode}") + self.print(f"pins {', '.join(pin)} mode set to {mode}") def output(self, pin, state): if type(pin) == int: - print(f"pin {pin} set {state}") + self.print(f"pin {pin} set {state}") else: pin = tuple(str(p) for p in pin) - print(f"pins {', '.join(pin)} set {state}") + self.print(f"pins {', '.join(pin)} set {state}") def cleanup(self): pass + def print(self, output: str): + if self.debug: + print(output) + # use the dummy class as GPIO if the real GPIO module does not exist try: import RPi.GPIO except ModuleNotFoundError: print("RPi.GPIO Not found!\nDefaulting to dummy GPIO...") - GPIO = dummy() + GPIO = dummy(debug=False) # main class for handling all clock functions and threads class clock(): @@ -48,10 +53,8 @@ class clock(): # fixed variables self.url = "https://data.ozva.co.uk/shopping/clock.json" self.request_speed = 0.1 - self.light_speed = 0.5 # dynamic variables - self.cues = Array("I", [0 for i in range(512)]) self.last_position = Value("I", 0) self.current_position = Value("I", 0) self.target_position = Value("I", 0) @@ -62,6 +65,8 @@ class clock(): print("connecting to OzVa...") try: print("getting data...") + r = requests.get(self.url) + print("connected!") while True: t = time.time() @@ -76,10 +81,6 @@ class clock(): self.movement_speed.value = int(data["movementSpeed"]) self.function.value = data["function"] - int_cues = [255 if i == "true" else 0 for i in data["lightingCues"]] - for i, value in enumerate(int_cues): - self.cues[i] = value - duration = self.request_speed - (time.time() - t) time.sleep(duration * (duration >= 0)) @@ -96,11 +97,19 @@ class clock(): try: print("sending movement data...") + parked = True while True: t = time.time() - parked = self.current_position.value != self.target_position.value + if not parked and self.current_position.value == self.target_position.value: + print("Move completed") + + parked = self.current_position.value == self.target_position.value forward = self.target_position.value >= self.current_position.value + + if not parked: + print(f"moving... @ {self.current_position.value}") + if forward and (not parked): GPIO.output(13, GPIO.HIGH) GPIO.output(11, GPIO.HIGH) @@ -112,7 +121,7 @@ class clock(): GPIO.output((11, 13, 15), GPIO.LOW) self.current_position.value -= 1 - duration = self.request_speed - (time.time() - t) + duration = self.movement_speed.value / 1000 - (time.time() - t) time.sleep(duration * (duration >= 0)) except KeyboardInterrupt: @@ -123,52 +132,13 @@ class clock(): except SystemExit: os._exit(130) - def light(self): - print("starting lighting service...") - - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - packed_interface = struct.pack('256s', "wlo1".encode('utf_8')) - packed_address = fcntl.ioctl(sock.fileno(), 0x8915, packed_interface)[20:24] - ip = socket.inet_ntoa(packed_address) - print(f"bound to {ip}...") - - sender = sacn.sACNsender(bind_address=ip, source_name="doomsday") - sender.start() - sender.activate_output(2) - sender[2].multicast = True - - sender.dmx_data = tuple(self.cues) - - try: - print("sending lighting data...") - while True: - t = time.time() - - if sender.dmx_data != tuple(self.cues): - sender.dmx_data = tuple(self.cues) - - duration = self.request_speed - (time.time() - t) - time.sleep(duration * (duration >= 0)) - - except KeyboardInterrupt: - sender.stop() - - try: - sys.exit() - except SystemExit: - os._exit(130) - def start(self): - print("starting subprocesses!") move_loop = Process(target=self.move, daemon = True) move_loop.start() time.sleep(1) get_loop = Process(target=self.get, daemon = True) get_loop.start() time.sleep(1) - light_loop = Process(target=self.light, daemon = True) - light_loop.start() - time.sleep(1) if __name__ == "__main__": print("initializing...") @@ -176,9 +146,9 @@ if __name__ == "__main__": a.start() try: - while True: pass - httpd = HTTPServer(('localhost', 8000), SimpleHTTPRequestHandler) - httpd.serve_forever() + while 1: pass + # httpd = HTTPServer(('localhost', 8000), SimpleHTTPRequestHandler) + # httpd.serve_forever() except KeyboardInterrupt: try: sys.exit() -- 2.39.2