*.db
*.zip
docs/*
+teleshopping.egg-info/
+teleshopping/__pycache__/
+teleshopping/api/__pycache__/
--- /dev/null
+Not sure yet! Closed source
--- /dev/null
+init: .venv/bin/pip
+ .venv/bin/pip install -r requirements.txt
+ .venv/bin/pip install -e .
+
+.venv/bin/pip:
+ python3 -m venv .venv
+
+.PHONY: init
--- /dev/null
+#!.venv/bin/python
+
+from flask import Flask
+from flask_httpauth import HTTPBasicAuth
+from datetime import datetime, timezone
+from os import path, environ, system
+from markupsafe import escape
+from flask_cors import CORS
+import json
+
+auth = HTTPBasicAuth()
+app = Flask(__name__)
+CORS(app)
+
+try:
+ app.root_path = ROOT = environ["XMDV_PATH"]
+except KeyError:
+ environ["XMDV_PATH"] = ROOT = app.root_path
+
+import teleshopping
+
+# get the static data ready
+
+auth.verify_password(teleshopping.verify_password)
+
+# pages
+app.route("/")(teleshopping.overlay)
+app.route("/autocue")(teleshopping.autocue)
+app.route("/hud")(teleshopping.hud)
+app.route("/sounds")(teleshopping.sounds)
+app.route("/chart")(teleshopping.chart)
+app.route("/extensions/<string:filename>")(teleshopping.extensions)
+
+# authenticated pages
+docs = auth.login_required(teleshopping.docs)
+app.route("/docs")(docs)
+paper_docs = auth.login_required(teleshopping.paper_docs)
+app.route("/docs/<string:filename>")(paper_docs)
+
+# admin pages
+admin = auth.login_required(teleshopping.admin)
+app.route("/admin")(admin)
+admin_page = auth.login_required(teleshopping.admin_page)
+app.route("/admin/<string:page>", methods=['GET', 'POST'])(admin_page)
+
+# the data api
+app.route("/api")(teleshopping.data)
+app.route("/api/items")(teleshopping.items)
+app.route("/api/clock")(teleshopping.clock)
+
+# the buy api
+app.route("/api/buy")(teleshopping.buy)
+# authenticated
+buy_data = auth.login_required(teleshopping.recent_data)
+app.route("/api/buy/data")(buy_data)
+all_data = auth.login_required(teleshopping.all_data)
+app.route("/api/buy/all")(all_data)
+buy_clear = auth.login_required(teleshopping.clear)
+app.route("/api/buy/clear")(buy_clear)
+
+# utility api
+app.route("/api/img/<int:item_id>")(teleshopping.img)
+# authenticated
+docs_generate = auth.login_required(teleshopping.docs_generate)
+app.route("/api/generate")(docs_generate)
+
+# utils
+app.before_request(teleshopping.check_database)
+
+if __name__ == "__main__":
+ app.run(host='127.0.0.1', port=8000, debug=True)
from distutils.core import setup
setup(name='teleshopping',
- version='1.0',
+ version='3.0',
py_modules=['teleshopping'],
)
+++ /dev/null
-#!.venv/bin/python
-
-from flask import Flask, Response, request, render_template, jsonify, send_from_directory
-from werkzeug.security import generate_password_hash, check_password_hash
-from jinja2 import Environment, FileSystemLoader
-from flask_httpauth import HTTPBasicAuth
-from datetime import datetime, timezone
-from os import path, environ, system
-from math import radians, cos, sin
-from markupsafe import escape
-from ast import literal_eval
-from flask_cors import CORS
-import sqlite3
-import json
-import time
-
-INCREMENT = 18
-
-app = Flask(__name__)
-CORS(app)
-
-auth = HTTPBasicAuth()
-try:
- app.root_path = environ["SHOPPING_PATH"]
-except KeyError:
- pass
-
-# get the secrets ready
-with open(path.join(app.root_path, "..", "secrets.json"), "r", encoding="utf-8") as f:
- users = json.loads(f.read())
-users = {k: generate_password_hash(v) for (k, v) in users.items()}
-
-# get the static data ready
-with open(path.join(app.root_path, "static", "static.json"), "r", encoding="utf-8") as f:
- static_data = json.loads(f.read())
-
-with open(path.join(app.root_path, "static", "info.json"), "r", encoding="utf-8") as f:
- static_info = json.loads(f.read())
-
-
-
-
-# make the password verifier function
-@auth.verify_password
-def verify_password(username, password):
- if username in users and \
- check_password_hash(users.get(username), password):
- return username
-
-# database helper function
-def database(field: str | list):
- query = f"SELECT {','.join(field) if type(field) == list else field} FROM state WHERE id=1;"
-
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
- result = cursor.execute(query).fetchone()
-
- column_names = [description[0] for description in cursor.description]
-
- return dict(zip(column_names, result)) # combine column names with data into dict
-
-
-
-
-@app.route("/")
-def gfx_main():
- return Response(render_template("gfx.html"), mimetype="text/html")
-
-@app.route("/autocue")
-def gfx_page():
- request.method = "internal"
- data = api()
- return Response(render_template("autocue.html", data=data, item=static_data["items"][data["item_id"]]), mimetype="text/html")
-
-@app.route("/display")
-def display_page():
- return Response(render_template("display.html"), mimetype="text/html")
-
-@app.route("/sounds")
-def sounds_page():
- return Response(render_template("sounds.html"), mimetype="text/html")
-
-@app.route("/chart")
-def chart_page():
- return Response(render_template("chart.html"), mimetype="text/html")
-
-
-
-
-@app.route("/extensions/<string:filename>")
-def interface_page(filename):
- return send_from_directory("extensions", filename)
-
-
-
-
-@app.route("/docs")
-@auth.login_required
-def web_docs():
- return Response(render_template("docs.html", data=static_data, info=static_info, book=False), mimetype="text/html")
-
-@app.route("/docs/<string:filename>")
-@auth.login_required
-def paper_docs(filename):
- return send_from_directory("docs", filename, as_attachment=True)
-
-@app.route("/admin")
-@auth.login_required
-def admin_main():
- return Response(render_template("admin.html", mimetype="text/html"))
-
-@app.route("/admin/<string:page>", methods=['GET', 'POST'])
-@auth.login_required
-def admin_page(page):
- if page in ["text", "clock", "timer", "price"]:
- if request.method == "POST": # if posting with data update the database then return the new admin page
-
- post_data = request.form.to_dict()
- aggrigate_data = {}
-
- # perform all data clean up for special cases
- aggrigate_data = {}
- for key in post_data:
- # if a key begins with the word "aggrigate": aggrigate it with all other keys with the same body into a list
- if key[:9] == "aggrigate":
- new_key = key[10:-5]
- if new_key in aggrigate_data:
- aggrigate_data[new_key].append(int(post_data[key]))
- else:
- aggrigate_data.update({new_key: [int(post_data[key])]})
-
- # if the key is a timer, make the time fixed to the epoch instead of relative (& account for offset)
- if key[:3] == "end":
- aggrigate_data.update(
- {key: int(datetime.now(timezone.utc).timestamp()) + int(post_data[key])}
- )
-
- else: aggrigate_data.update({key: post_data[key]})
-
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
- result = cursor.execute("SELECT * FROM pragma_table_info('state');").fetchall()[1:]
-
- types = {"TEXT":str, "INTEGER":int, "REAL":float}
- type_note = {c[1]:types[c[2]] for c in result}
-
- valid_data = {k: type_note[k].__call__(aggrigate_data[k]) for k in aggrigate_data if k in type_note}
-
- data_list = ', '.join([f"{k} = '{escape(v)}'" if type(v) == str else f"{k} = {v}" for (k,v) in valid_data.items()])
- query = f"""
- UPDATE state SET
- {data_list}
- WHERE id = 1;
- """
-
- print(query)
-
- cursor.execute(query)
- connection.commit()
-
- request.method = "internal" # set method to internal so that the api call returns only the python object
- if page == "clock":
- coords = [{
- "i":i,
- "x":45*cos(radians(i-90))+50,
- "y":45*sin(radians(i-90))+50
- } for i in list(range(0, 360, INCREMENT))]
- return Response(render_template("clock.html", data=api(), positions=coords), mimetype="text/html")
-
- if page == "text":
- return Response(render_template("text.html", data=api(), text=static_data['text']), mimetype="text/html")
-
- if page == "timer":
- return Response(render_template("timer.html", data=api()), mimetype="text/html")
-
- if page == "price":
- return Response(render_template("price.html", data=api(), items=static_data['items']), mimetype="text/html")
-
- else:
- return "", 404
-
-
-
-
-@app.route("/api")
-def api():
- if request.method in ["GET", "internal"]:
- data = database("*")
-
- focus_extra = {}
- for key, value in data.items():
- if key[:4] == "bool": data[key] = bool(value) # if the key starts with "bool" make the data a bool
- elif key[:4] == "list": data[key] = literal_eval(value) # if the key starts with "list" make the data a literal list
- elif key[:5] == "focus": # if the key starts with "focus" make the data into a bool with an additional dict entry for "showing" bool
- data[key] = bool((value-1)+abs(value-1)) # if 2 then True, if 1,0 then False
- focus_extra[f"bool_{key[6:]}"] = bool(value) # if 1,2 then True, if 0 then False
- data |= focus_extra
-
- if request.method == "internal": return data
- return jsonify(data)
- else:
- return "", 404
-
-@app.route("/api/items")
-def api_items():
- if request.method == "GET":
- return jsonify(static_data)
-
- else:
- return "", 404
-
-@app.route("/api/clock")
-def api_clock():
- if request.method == "GET":
- keys = ["current_position", "movement_speed", "movement_function"]
- return jsonify(database(keys))
-
- else:
- return "", 404
-
-@app.route("/api/img/<int:item_id>")
-def product_image(item_id):
- return send_from_directory(path.join("media", "products"), f"{item_id}.png")
-
-@app.route("/api/generate")
-@auth.login_required
-def generate_docs():
- # setup the jinja enviroment for use in latex
- latex_enviroment = Environment(
- loader=FileSystemLoader(path.join(app.root_path, "templates")),
- block_start_string = "|%",
- block_end_string = "%|",
- variable_start_string = "|~",
- variable_end_string = "~|",
- comment_start_string = "|#",
- comment_end_string = "#|",
- trim_blocks = True,
- lstrip_blocks = True
- )
-
- # render each latex document
- docs_path = path.join(app.root_path, "docs")
- for document in ["call-sheet.tex", "manifest-unsafe.tex", "manifest-safe.tex", "gfx-text.tex"]:
- template = latex_enviroment.get_template(document)
-
- with open(path.join(docs_path, document), "w", encoding="utf-8") as f:
- f.write(template.render(data=static_data, info=static_info))
-
- system(f"pdflatex -interaction='nonstopmode' -output-directory='{docs_path}' '{path.join(docs_path, document)}' > /dev/zero")
- system(f"pdflatex -interaction='nonstopmode' -output-directory='{docs_path}' '{path.join(docs_path, document)}' > /dev/zero")
-
- print(f" - Generated {document}")
-
- with open(path.join(docs_path, "documentation.html"), "w", encoding="utf-8") as f:
- f.write(render_template("docs.html", data=static_data, info=static_info, book=True))
-
- floorplan_path = path.join(app.root_path, 'static', 'floorplan.png')
- cameras_path = path.join(app.root_path, 'static', 'cameras.png')
- system(f"cp '{floorplan_path}' '{docs_path}'")
- system(f"cp '{cameras_path}' '{docs_path}'")
-
- system(f"ebook-convert '{path.join(docs_path, 'documentation.html')}' '{path.join(docs_path, 'documentation.mobi')}' --title='XMDV Teleshopping Documentation' --authors='William Greenwood' --comments='Valid for {static_info['shoot']['date']}' --language=en --change-justification=left --cover='{path.join(app.root_path, 'static', 'cover.jpg')}'")
-
- system(f"ebook-convert '{path.join(docs_path, 'documentation.html')}' '{path.join(docs_path, 'documentation.epub')}' --title='XMDV Teleshopping Documentation' --authors='William Greenwood' --comments='Valid for {static_info['shoot']['date']}' --language=en --change-justification=left --cover='{path.join(app.root_path, 'static', 'cover.jpg')}'")
-
- return "", 200
-
-
-
-
-@app.route("/api/buy")
-def buy():
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
- cursor.execute(f"""
- INSERT INTO orders VALUES
- (NULL, {round(time.time())})
- """)
- connection.commit()
-
- return "", 200
-
-@app.route("/api/buy/data")
-@auth.login_required
-def buy_data():
- data = []
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
-
- current = round(time.time() / 5) * 5
-
- for start in range(current - 600, current, 5):
- result = cursor.execute(f"""
- SELECT COUNT(*)
- FROM orders
- WHERE timestamp BETWEEN {start - 1} and {start + 5};
- """)
- result = result.fetchone()[0]
- data.append(result)
-
- return jsonify(data)
-
-@app.route("/api/buy/all")
-@auth.login_required
-def buy_all():
- data = []
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
-
- for order in cursor.execute(f"""
- SELECT *
- FROM orders;
- """):
- data.append(order[1])
-
- return jsonify(data)
-
-@app.route("/api/buy/clear")
-@auth.login_required
-def buy_clear():
- with open(path.join(app.root_path, "schema2"), "r", encoding="utf-8") as f:
- schema = f.read()
-
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
- cursor.execute("DROP TABLE orders;")
- cursor.execute(schema)
- connection.commit()
-
- return "", 200
-
-
-
-
-# if this is the first incoming request do a sanity check on the db
-# this might happen a few times because of threading, but it doesnt slow requests that much
-first_request = True
-@app.before_request
-def check_database():
- global first_request
- if first_request:
- with open(path.join(app.root_path, "schema1"), "r", encoding="utf-8") as f:
- schema, load = f.read().split("\n\n")
-
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
- try:
- cursor.execute(load)
- except sqlite3.IntegrityError:
- print("Database is setup correctly")
- pass
- except sqlite3.OperationalError:
- print("Table missing or corrupt...")
- try: cursor.execute("DROP TABLE state;") # catch if there is no table "state"
- except: pass
- cursor.execute(schema)
- connection.commit()
-
- with open(path.join(app.root_path, "schema2"), "r", encoding="utf-8") as f:
- schema = f.read()
-
- with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
- cursor = connection.cursor()
- try: cursor.execute("DROP TABLE orders;")
- except: pass
- cursor.execute(schema)
- connection.commit()
-
- # generate_docs() # This might be a bad idea
-
- first_request = False
-
-
-
-
-if __name__ == "__main__":
- app.run(host='127.0.0.1', port=8000, debug=True)
--- /dev/null
+from .api import *
+from .admin import *
+from .pages import *
+from .utils import *
--- /dev/null
+from math import radians, cos, sin
+
+INCREMENT = 18
+
+def admin():
+ return Response(render_template("admin.html", mimetype="text/html"))
+
+def admin_page(page):
+ if page in ["text", "clock", "timer", "price"]:
+ if request.method == "POST": # if posting with data update the database then return the new admin page
+
+ post_data = request.form.to_dict()
+ aggrigate_data = {}
+
+ # perform all data clean up for special cases
+ aggrigate_data = {}
+ for key in post_data:
+ # if a key begins with the word "aggrigate": aggrigate it with all other keys with the same body into a list
+ if key[:9] == "aggrigate":
+ new_key = key[10:-5]
+ if new_key in aggrigate_data:
+ aggrigate_data[new_key].append(int(post_data[key]))
+ else:
+ aggrigate_data.update({new_key: [int(post_data[key])]})
+
+ # if the key is a timer, make the time fixed to the epoch instead of relative (& account for offset)
+ if key[:3] == "end":
+ aggrigate_data.update(
+ {key: int(datetime.now(timezone.utc).timestamp()) + int(post_data[key])}
+ )
+
+ else: aggrigate_data.update({key: post_data[key]})
+
+ with sqlite3.connect(path.join(app.root_path, "data.db")) as connection:
+ cursor = connection.cursor()
+ result = cursor.execute("SELECT * FROM pragma_table_info('state');").fetchall()[1:]
+
+ types = {"TEXT":str, "INTEGER":int, "REAL":float}
+ type_note = {c[1]:types[c[2]] for c in result}
+
+ valid_data = {k: type_note[k].__call__(aggrigate_data[k]) for k in aggrigate_data if k in type_note}
+
+ data_list = ', '.join([f"{k} = '{escape(v)}'" if type(v) == str else f"{k} = {v}" for (k,v) in valid_data.items()])
+ query = f"""
+ UPDATE state SET
+ {data_list}
+ WHERE id = 1;
+ """
+
+ print(query)
+
+ cursor.execute(query)
+ connection.commit()
+
+ request.method = "internal" # set method to internal so that the api call returns only the python object
+ if page == "clock":
+ coords = [{
+ "i":i,
+ "x":45*cos(radians(i-90))+50,
+ "y":45*sin(radians(i-90))+50
+ } for i in list(range(0, 360, INCREMENT))]
+ return Response(render_template("clock.html", data=api(), positions=coords), mimetype="text/html")
+
+ if page == "text":
+ return Response(render_template("text.html", data=api(), text=static_data['text']), mimetype="text/html")
+
+ if page == "timer":
+ return Response(render_template("timer.html", data=api()), mimetype="text/html")
+
+ if page == "price":
+ return Response(render_template("price.html", data=api(), items=static_data['items']), mimetype="text/html")
+
+ else:
+ return "", 404
--- /dev/null
+from .buy import *
+from .data import *
+from .utils import *
--- /dev/null
+from flask import jsonify
+from os import environ
+import sqlite3
+import time
+
+ROOT = environ["XMDV_PATH"]
+
+def buy():
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+ cursor.execute(f"""
+ INSERT INTO orders VALUES
+ (NULL, {round(time.time())})
+ """)
+ connection.commit()
+
+ return "", 200
+
+def recent_data():
+ data = []
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+
+ current = round(time.time() / 5) * 5
+
+ for start in range(current - 600, current, 5):
+ result = cursor.execute(f"""
+ SELECT COUNT(*)
+ FROM orders
+ WHERE timestamp BETWEEN {start - 1} and {start + 5};
+ """)
+ result = result.fetchone()[0]
+ data.append(result)
+
+ return jsonify(data)
+
+def all_data():
+ data = []
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+
+ for order in cursor.execute(f"""
+ SELECT *
+ FROM orders;
+ """):
+ data.append(order[1])
+
+ return jsonify(data)
+
+def clear():
+ with open(f"{ROOT}/schema2", "r", encoding="utf-8") as f:
+ schema = f.read()
+
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+ cursor.execute("DROP TABLE orders;")
+ cursor.execute(schema)
+ connection.commit()
+
+ return "", 200
--- /dev/null
+from flask import request, jsonify
+from ..utils import database
+from ast import literal_eval
+from os import environ
+import json
+
+ROOT = environ["XMDV_PATH"]
+
+with open(f"{ROOT}/static/static.json", "r", encoding="utf-8") as f:
+ static_data = json.loads(f.read())
+
+def data():
+ if request.method in ["GET", "internal"]:
+ data = database("*")
+
+ focus_extra = {}
+ for key, value in data.items():
+ if key[:4] == "bool": data[key] = bool(value) # if the key starts with "bool" make the data a bool
+ elif key[:4] == "list": data[key] = literal_eval(value) # if the key starts with "list" make the data a literal list
+ elif key[:5] == "focus": # if the key starts with "focus" make the data into a bool with an additional dict entry for "showing" bool
+ data[key] = bool((value-1)+abs(value-1)) # if 2 then True, if 1,0 then False
+ focus_extra[f"bool_{key[6:]}"] = bool(value) # if 1,2 then True, if 0 then False
+ data |= focus_extra
+
+ if request.method == "internal": return data
+ return jsonify(data)
+ else:
+ return "", 404
+
+def items():
+ if request.method == "GET":
+ return jsonify(static_data)
+
+ else:
+ return "", 404
+
+def clock():
+ if request.method == "GET":
+ keys = ["current_position", "movement_speed", "movement_function"]
+ return jsonify(database(keys))
+
+ else:
+ return "", 404
--- /dev/null
+from flask import send_from_directory, render_template
+from jinja2 import Environment, FileSystemLoader
+from os import environ, system
+import json
+
+ROOT = environ["XMDV_PATH"]
+
+# get static info and data
+with open(f"{ROOT}/static/static.json", "r", encoding="utf-8") as f:
+ static_data = json.loads(f.read())
+
+with open(f"{ROOT}/static/info.json", "r", encoding="utf-8") as f:
+ static_info = json.loads(f.read())
+
+def img(item_id):
+ return send_from_directory("static/products", f"{item_id}.png")
+
+def docs_generate():
+ # setup the jinja enviroment for use in latex
+ latex_enviroment = Environment(
+ loader=FileSystemLoader(f"{ROOT}/templates"),
+ block_start_string = "|%",
+ block_end_string = "%|",
+ variable_start_string = "|~",
+ variable_end_string = "~|",
+ comment_start_string = "|#",
+ comment_end_string = "#|",
+ trim_blocks = True,
+ lstrip_blocks = True
+ )
+
+ # render each latex document
+ docs_path = f"{ROOT}/docs"
+ for document in ["call-sheet.tex", "manifest-unsafe.tex", "manifest-safe.tex", "gfx-text.tex"]:
+ template = latex_enviroment.get_template(document)
+
+ with open(f"{docs_path}/{document}", "w", encoding="utf-8") as f:
+ f.write(template.render(data=static_data, info=static_info))
+
+ system(f"pdflatex -interaction='nonstopmode' -output-directory='{docs_path}' '{docs_path}/{document}' > /dev/zero")
+ system(f"pdflatex -interaction='nonstopmode' -output-directory='{docs_path}' '{docs_path}/{document}' > /dev/zero")
+
+ print(f" - Generated {document}")
+
+ with open(f"{docs_path}/documentation.html", "w", encoding="utf-8") as f:
+ f.write(render_template("docs.html", data=static_data, info=static_info, book=True))
+
+ floorplan_path = f"{ROOT}/static/floorplan.png"
+ cameras_path = f"{ROOT}/static/cameras.png"
+ system(f"cp '{floorplan_path}' '{docs_path}'")
+ system(f"cp '{cameras_path}' '{docs_path}'")
+
+ for book_type in ["epub", "mobi"]:
+ system(f"""
+ ebook-convert '{docs_path}/documentation.html' \\
+ '{docs_path}/documentation.{book_type}' \\
+ --title='XMDV Teleshopping Documentation' \\
+ --authors='William Greenwood' \\
+ --comments='Valid for {static_info['shoot']['date']}' \\
+ --language=en \\
+ --change-justification=left \\
+ --cover='{ROOT}/static/cover.jpg'
+ """)
+
+ return "", 200
--- /dev/null
+from flask import Flask, Response, request, render_template, send_from_directory
+from .api.data import data as api
+from os import environ
+import json
+
+ROOT = environ["XMDV_PATH"]
+
+with open(f"{ROOT}/static/static.json", "r", encoding="utf-8") as f:
+ static_data = json.loads(f.read())
+
+with open(f"{ROOT}/static/info.json", "r", encoding="utf-8") as f:
+ static_info = json.loads(f.read())
+
+def overlay():
+ return Response(
+ render_template("gfx.html"),
+ mimetype="text/html"
+ )
+
+def autocue():
+ request.method = "internal"
+ data = api()
+ return Response(
+ render_template(
+ "autocue.html",
+ data=data,
+ item=static_data["items"][data["item_id"]]
+ ),
+ mimetype="text/html"
+ )
+
+def hud():
+ return Response(render_template("display.html"), mimetype="text/html")
+
+def sounds():
+ return Response(render_template("sounds.html"), mimetype="text/html")
+
+def chart():
+ return Response(render_template("chart.html"), mimetype="text/html")
+
+def extensions(filename):
+ return send_from_directory("extensions", filename)
+
+def docs():
+ return Response(
+ render_template(
+ "docs.html",
+ data=static_data,
+ info=static_info,
+ book=False),
+ mimetype="text/html"
+ )
+
+def paper_docs(filename):
+ return send_from_directory("docs", filename, as_attachment=True)
--- /dev/null
+from werkzeug.security import generate_password_hash, check_password_hash
+from os import path, environ
+import sqlite3
+import json
+
+ROOT = environ["XMDV_PATH"]
+
+# if this is the first incoming request do a sanity check on the db
+# this might happen a few times because of threading, but it doesnt slow requests that much
+first_request = True
+def check_database():
+ global first_request
+ if first_request:
+ with open(f"{ROOT}/schema1", "r", encoding="utf-8") as f:
+ schema, load = f.read().split("\n\n")
+
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+ try:
+ cursor.execute(load)
+ except sqlite3.IntegrityError:
+ print("Database is setup correctly")
+ pass
+ except sqlite3.OperationalError:
+ print("Table missing or corrupt...")
+ try: cursor.execute("DROP TABLE state;") # catch if there is no table "state"
+ except: pass
+ cursor.execute(schema)
+ connection.commit()
+
+ with open(f"{ROOT}/schema2", "r", encoding="utf-8") as f:
+ schema = f.read()
+
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+ try: cursor.execute("DROP TABLE orders;")
+ except: pass
+ cursor.execute(schema)
+ connection.commit()
+
+ # generate_docs() # This might be a bad idea
+
+ first_request = False
+
+# helper function to get a dict from the database as kv pairs
+def database(field: str | list):
+ query = f"SELECT {','.join(field) if type(field) == list else field} FROM state WHERE id=1;"
+
+ with sqlite3.connect(f"{ROOT}/data.db") as connection:
+ cursor = connection.cursor()
+ result = cursor.execute(query).fetchone()
+
+ column_names = [description[0] for description in cursor.description]
+
+ return dict(zip(column_names, result)) # combine column names with data into dict
+
+# import password data
+with open(f"{ROOT}/../secrets.json", "r", encoding="utf-8") as f:
+ users = json.loads(f.read())
+users = {k: generate_password_hash(v) for (k, v) in users.items()}
+
+# make the password verifier function
+def verify_password(username, password):
+ if username in users and \
+ check_password_hash(users.get(username), password):
+ return username