# fake GPIO dummy class for debuging
class dummy():
- def __init__(self):
+ def __init__(self, debug=True):
self.HIGH = "high"
self.LOW = "low"
self.BOARD = None
self.OUT = "output"
+ self.debug = debug
def setmode(self, _): pass
def setup(self, pin, mode):
if type(pin) == int:
- print(f"pin {pin} mode set to {mode}")
+ self.print(f"pin {pin} mode set to {mode}")
else:
pin = tuple(str(p) for p in pin)
- print(f"pins {', '.join(pin)} mode set to {mode}")
+ self.print(f"pins {', '.join(pin)} mode set to {mode}")
def output(self, pin, state):
if type(pin) == int:
- print(f"pin {pin} set {state}")
+ self.print(f"pin {pin} set {state}")
else:
pin = tuple(str(p) for p in pin)
- print(f"pins {', '.join(pin)} set {state}")
+ self.print(f"pins {', '.join(pin)} set {state}")
def cleanup(self): pass
+ def print(self, output: str):
+ if self.debug:
+ print(output)
+
# use the dummy class as GPIO if the real GPIO module does not exist
try: import RPi.GPIO
except ModuleNotFoundError:
print("RPi.GPIO Not found!\nDefaulting to dummy GPIO...")
- GPIO = dummy()
+ GPIO = dummy(debug=False)
# main class for handling all clock functions and threads
class clock():
# fixed variables
self.url = "https://data.ozva.co.uk/shopping/clock.json"
self.request_speed = 0.1
- self.light_speed = 0.5
# dynamic variables
- self.cues = Array("I", [0 for i in range(512)])
self.last_position = Value("I", 0)
self.current_position = Value("I", 0)
self.target_position = Value("I", 0)
print("connecting to OzVa...")
try:
print("getting data...")
+ r = requests.get(self.url)
+ print("connected!")
while True:
t = time.time()
self.movement_speed.value = int(data["movementSpeed"])
self.function.value = data["function"]
- int_cues = [255 if i == "true" else 0 for i in data["lightingCues"]]
- for i, value in enumerate(int_cues):
- self.cues[i] = value
-
duration = self.request_speed - (time.time() - t)
time.sleep(duration * (duration >= 0))
try:
print("sending movement data...")
+ parked = True
while True:
t = time.time()
- parked = self.current_position.value != self.target_position.value
+ if not parked and self.current_position.value == self.target_position.value:
+ print("Move completed")
+
+ parked = self.current_position.value == self.target_position.value
forward = self.target_position.value >= self.current_position.value
+
+ if not parked:
+ print(f"moving... @ {self.current_position.value}")
+
if forward and (not parked):
GPIO.output(13, GPIO.HIGH)
GPIO.output(11, GPIO.HIGH)
GPIO.output((11, 13, 15), GPIO.LOW)
self.current_position.value -= 1
- duration = self.request_speed - (time.time() - t)
+ duration = self.movement_speed.value / 1000 - (time.time() - t)
time.sleep(duration * (duration >= 0))
except KeyboardInterrupt:
except SystemExit:
os._exit(130)
- def light(self):
- print("starting lighting service...")
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- packed_interface = struct.pack('256s', "wlo1".encode('utf_8'))
- packed_address = fcntl.ioctl(sock.fileno(), 0x8915, packed_interface)[20:24]
- ip = socket.inet_ntoa(packed_address)
- print(f"bound to {ip}...")
-
- sender = sacn.sACNsender(bind_address=ip, source_name="doomsday")
- sender.start()
- sender.activate_output(2)
- sender[2].multicast = True
-
- sender.dmx_data = tuple(self.cues)
-
- try:
- print("sending lighting data...")
- while True:
- t = time.time()
-
- if sender.dmx_data != tuple(self.cues):
- sender.dmx_data = tuple(self.cues)
-
- duration = self.request_speed - (time.time() - t)
- time.sleep(duration * (duration >= 0))
-
- except KeyboardInterrupt:
- sender.stop()
-
- try:
- sys.exit()
- except SystemExit:
- os._exit(130)
-
def start(self):
- print("starting subprocesses!")
move_loop = Process(target=self.move, daemon = True)
move_loop.start()
time.sleep(1)
get_loop = Process(target=self.get, daemon = True)
get_loop.start()
time.sleep(1)
- light_loop = Process(target=self.light, daemon = True)
- light_loop.start()
- time.sleep(1)
if __name__ == "__main__":
print("initializing...")
a.start()
try:
- while True: pass
- httpd = HTTPServer(('localhost', 8000), SimpleHTTPRequestHandler)
- httpd.serve_forever()
+ while 1: pass
+ # httpd = HTTPServer(('localhost', 8000), SimpleHTTPRequestHandler)
+ # httpd.serve_forever()
except KeyboardInterrupt:
try:
sys.exit()