#!./.venv/bin/python
+from http.server import HTTPServer, SimpleHTTPRequestHandler
from multiprocessing import Process, Array, Value
import requests
import time
import sacn
+import sys
+import os
class dummy():
def __init__(self):
self.LOW = "low"
self.BOARD = None
self.OUT = "output"
- def setmode(_): pass
- def setup(pin, mode):
+ def setmode(self, _): pass
+ def setup(self, pin, mode):
if type(pin) == int:
print(f"pin {pin} mode set to {mode}")
else:
+ pin = tuple(str(p) for p in pin)
print(f"pins {', '.join(pin)} mode set to {mode}")
- def output(pin, state):
+ def output(self, pin, state):
if type(pin) == int:
print(f"pin {pin} set {state}")
else:
+ pin = tuple(str(p) for p in pin)
print(f"pins {', '.join(pin)} set {state}")
- def cleanup(): pass
+ def cleanup(self): pass
try: import RPi.GPIO
except ModuleNotFoundError:
self.light_speed = 0.5
# dynamic variables
- self.cues = Array("I", [0])
+ self.cues = Array("I", [0 for i in range(512)])
self.last_position = Value("I", 0)
self.current_position = Value("I", 0)
self.target_position = Value("I", 0)
self.movement_speed = Value("I", 10)
self.function = Array("u", "linear")
- get_loop = Process(target=self.get(), daemon = True)
- move_loop = Process(target=self.move(), daemon = True)
- light_loop = Process(target=self.light(), daemon = True)
- get_loop.start()
- move_loop.start()
- light_loop.start()
-
def get(self):
print("connecting to OzVa...")
try:
self.movement_speed.value = int(data["movementSpeed"])
self.function.value = data["function"]
- self.cues.value = [255 if i == "true" else 0 for i in data["lightingCues"]]
- time.sleep(self.request_speed - (time.time() - t))
+ int_cues = [255 if i == "true" else 0 for i in data["lightingCues"]]
+ for i, value in enumerate(int_cues):
+ self.cues[i] = value
+
+ duration = self.request_speed - (time.time() - t)
+ time.sleep(duration * (duration >= 0))
except KeyboardInterrupt:
try:
if forward and (not parked):
GPIO.output(13, GPIO.HIGH)
GPIO.output(11, GPIO.HIGH)
+ GPIO.output((11, 13, 15), GPIO.LOW)
+ self.current_position.value += 1
+
elif (not forward) and (not parked):
GPIO.output(11, GPIO.HIGH)
+ GPIO.output((11, 13, 15), GPIO.LOW)
+ self.current_position.value -= 1
- GPIO.output((11, 13, 15), GPIO.LOW)
-
- time.sleep(self.movement_speed.value - (time.time() - t))
+ duration = self.request_speed - (time.time() - t)
+ time.sleep(duration * (duration >= 0))
except KeyboardInterrupt:
GPIO.cleanup()
sender.activate_output(2)
sender[2].multicast = True
+ sender.dmx_data = tuple(self.cues)
+
try:
print("sending lighting data...")
while True:
t = time.time()
- sender.dmx_data = tuple(self.cues.value)
+ if sender.dmx_data != tuple(self.cues):
+ sender.dmx_data = tuple(self.cues)
- time.sleep(self.light_speed - (time.time() - t))
+ duration = self.request_speed - (time.time() - t)
+ time.sleep(duration * (duration >= 0))
except KeyboardInterrupt:
sender.stop()
except SystemExit:
os._exit(130)
+ def start(self):
+ print("starting subprocesses!")
+ move_loop = Process(target=self.move, daemon = True)
+ move_loop.start()
+ time.sleep(1)
+ get_loop = Process(target=self.get, daemon = True)
+ get_loop.start()
+ time.sleep(1)
+ light_loop = Process(target=self.light, daemon = True)
+ light_loop.start()
+ time.sleep(1)
+
if __name__ == "__main__":
print("initializing...")
a = clock()
+ a.start()
+
+ try:
+ while True: pass
+ httpd = HTTPServer(('localhost', 8000), SimpleHTTPRequestHandler)
+ httpd.serve_forever()
+ except:
+ try:
+ sys.exit()
+ except SystemExit:
+ os._exit(130)
+