+#!./.venv/bin/python
+from multiprocessing import Process, Array, Value
import requests
import time
import sacn
-import RPi.GPIO as GPIO
-class clock:
+class dummy():
def __init__(self):
+ self.HIGH = "high"
+ self.LOW = "low"
+ self.BOARD = None
+ self.OUT = "output"
+ def setmode(_): pass
+ def setup(pin, mode):
+ if type(pin) == int:
+ print(f"pin {pin} mode set to {mode}")
+ else:
+ print(f"pins {', '.join(pin)} mode set to {mode}")
+
+ def output(pin, state):
+ if type(pin) == int:
+ print(f"pin {pin} set {state}")
+ else:
+ print(f"pins {', '.join(pin)} set {state}")
+
+ def cleanup(): pass
+
+try: import RPi.GPIO
+except ModuleNotFoundError:
+ print("RPi.GPIO Not found!\nDefaulting to dummy GPIO...")
+ GPIO = dummy()
+
+class clock():
+ def __init__(self):
+ # fixed variables
self.url = "https://data.ozva.co.uk/shopping/clock.json"
-
self.request_speed = 0.1
-
self.light_speed = 0.5
- self.cues = 0
- self.current_position = 0
- self.last_position = 0
- self.movement_speed = 10
- self.function = "linear"
+ # dynamic variables
+ self.cues = Array("I", [0])
+ self.current_position = Value("I", 0)
+ self.last_position = Value("I", 0)
+ self.movement_speed = Value("I", 10)
+ self.function = Value("u", "linear")
get_loop = Process(target=self.get(), daemon = True)
move_loop = Process(target=self.move(), daemon = True)
light_loop.start()
def get(self):
+ print("connecting to OzVa...")
try:
+ print("getting data...")
while True:
t = time.time()
data = r.json()
new_target = int(data["currentPosition"])
- if new_target != self.target_position:
- self.last_position = self.current_position
- self.target_position = new_target
+ if new_target != self.target_position.value:
+ self.last_position.value = self.current_position.value
+ self.target_position.value = new_target
- self.movement_speed = int(data["movementSpeed"])
- self.function = data["function"]
- self.cues = data["lightingCues"]
+ self.movement_speed.value = int(data["movementSpeed"])
+ self.function.value = data["function"]
+ self.cues.value = [255 if i == "true" else 0 for i in data["lightingCues"]]
time.sleep(self.request_speed - (time.time() - t))
- except:
- pass
+ except KeyboardInterrupt:
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
def move(self):
+ print("starting motor service...")
GPIO.setmode(GPIO.BOARD)
GPIO.setup((11, 13, 15), GPIO.OUT)
try:
+ print("sending movement data...")
while True:
t = time.time()
- parked = self.current_position != self.target_position
- forward = self.target_position >= self.current_position
+ parked = self.current_position.value != self.target_position.value
+ forward = self.target_position.value >= self.current_position.value
if forward and (not parked):
GPIO.output(13, GPIO.HIGH)
GPIO.output(11, GPIO.HIGH)
GPIO.output((11, 13, 15), GPIO.LOW)
- time.sleep(self.movement_speed - (time.time() - t))
+ time.sleep(self.movement_speed.value - (time.time() - t))
- except:
+ except KeyboardInterrupt:
GPIO.cleanup()
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
+
def light(self):
+ print("starting lighting service...")
sender = sacn.sACNsender()
sender.start()
sender.activate_output(2)
sender[2].multicast = True
try:
+ print("sending lighting data...")
while True:
t = time.time()
- data = tuple(255 if i == "true" else 0 for i in self.cues)
- sender.dmx_data = data
+ sender.dmx_data = tuple(self.cues.value)
time.sleep(self.light_speed - (time.time() - t))
- except:
+ except KeyboardInterrupt:
sender.stop()
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
+
+if __name__ == "__main__":
+ print("initializing...")
+ a = clock()