+#! .venv/bin/python
+
+from flask import Flask, Response, send_from_directory, jsonify, render_template, request
+from werkzeug.security import generate_password_hash, check_password_hash
+from apscheduler.schedulers.background import BackgroundScheduler
+from flask_httpauth import HTTPBasicAuth
+import threading
+import logging
+import sqlite3
+import zipfile
+import json
+import time
+import csv
+import os
+
+FORM_COUNT = 5
+GENERATE_COUNT = 10
+GENERATE_THRESHHOLD = 10
+BACKUP_TIME = 2
+BACKUP_COUNT = 5
+
+app = Flask(__name__)
+auth = HTTPBasicAuth()
+
+# get the secrets ready
+with open(f"{app.root_path}/secrets.json", "r", encoding="utf-8") as f:
+ users = json.loads(f.read())
+users = {k: generate_password_hash(v) for (k, v) in users.items()}
+
+@auth.verify_password
+def verify_password(username, password):
+ if username in users and \
+ check_password_hash(users.get(username), password):
+ return username
+
+# start logging
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ handlers=[
+ logging.StreamHandler(),
+ logging.FileHandler("form.log")
+ ]
+)
+logger = logging.getLogger("main")
+flask_logger = logging.getLogger('werkzeug')
+flask_logger.setLevel(logging.ERROR)
+
+logger.info("Logging active")
+
+# handle all major functions through templating
+@app.route("/", methods=["GET", "POST"])
+def page_home():
+
+ if "p" not in request.form:
+ return render_template("main.html", intro=True, failed=False)
+ elif "agree" not in request.form:
+ return render_template("main.html", intro=True, failed=True)
+ elif request.form["p"] == "1":
+ key = register_user()
+
+ return render_template(
+ "main.html",
+ key=key,
+ p=int(request.form["p"]),
+ questions=get_questions(key),
+ failed = False
+ )
+
+ # if the user is setup and has started answering questions
+ key = request.form["key"]
+
+ # check if the user has answered all the questions
+ kv = dict(request.form)
+
+ ids = [k[1:] for k in kv if k[0] == "q"]
+ for n in ids:
+ if kv[f"q{n}"] == "" or len(ids) != FORM_COUNT:
+ # rereturn the last form if questions havent been answered
+ return render_template(
+ "main.html",
+ key=key,
+ p=int(request.form["p"]) - 1,
+ questions=get_questions(key),
+ complete = kv,
+ failed = True
+ )
+
+ for n in ids:
+ answer_question(key, int(n), int(kv[f"q{n}"]))
+
+ logger.info(f"User {key} answered questions {ids}")
+
+ # artificial delay to stop the user spamming the next question button
+ # also makes it feel a bit more weighty
+ time.sleep(2)
+
+ check_generate(key)
+ return render_template(
+ "main.html",
+ key=key,
+ p=int(request.form["p"]),
+ questions=get_questions(key),
+ failed = False
+ )
+
+
+# return privacy policy
+@app.route("/privacy")
+def page_privacy():
+ return render_template("privacy.html")
+
+# register a user in the database as a table and in the tracker table
+def register_user():
+ key = os.urandom(16).hex()
+
+ # get the generic pre-written questions
+ with open("generic.json","r") as f:
+ generic = json.loads(f.read())
+
+ # add the user to the database
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+
+ # add user to the database
+ cursor.execute(f"""
+ CREATE TABLE u{key} (
+ rowid INTEGER PRIMARY KEY,
+ question TEXT,
+ option_1 TEXT,
+ option_2 TEXT,
+ option_3 TEXT,
+ option_4 TEXT,
+ created INT,
+ answer INT,
+ answer_time INT
+ );
+ """)
+
+ for question in generic:
+ cursor.execute(f"""
+ INSERT INTO u{key} (
+ question,
+ option_1,
+ option_2,
+ option_3,
+ option_4,
+ created
+ ) VALUES (
+ '{question['question']}',
+ '{question['option_1']}',
+ '{question['option_2']}',
+ '{question['option_3']}',
+ '{question['option_4']}',
+ {round(time.time())}
+ );
+ """)
+
+ cursor.execute(f"""
+ INSERT INTO master (key, counter, created) VALUES
+ ('u{key}', 0, {round(time.time())});
+ """)
+
+ connection.commit()
+
+ logger.info(f"User {key} generated")
+
+ return key
+
+# fill the question at rowid n with the answer int 1-4
+def answer_question(key: str, rowid: int, answer: int):
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+ cursor.execute(
+ f"""
+ UPDATE u{key} SET
+ answer = {answer}, answer_time = {round(time.time())}
+ WHERE rowid = {rowid};
+ """
+ )
+ cursor.execute(
+ f"UPDATE master SET counter = counter + 1 WHERE key = 'u{key}'"
+ )
+
+ return key
+
+# get FORM_COUNT questions from the database and increment the tracker
+def get_questions(key: str):
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+ """
+ can potentially save some overhead here by only selecting
+ the neccicary columns: question and option_{1..4}
+ """
+ selection = cursor.execute(
+ f"""
+ SELECT * FROM u{key}
+ WHERE answer IS NULL
+ ORDER BY
+ created DESC;
+ """
+ )
+
+ names = [description[0] for description in cursor.description]
+ result = [dict(zip(names, selection.fetchone())) for _ in range(FORM_COUNT)]
+
+ return result
+
+# check if in is neccicary to generate more questions
+def check_generate(key: str):
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+ result = cursor.execute(f"""
+ SELECT COUNT(*) FROM u{key} WHERE answer IS NULL;
+ """).fetchone()[0]
+
+ if result < GENERATE_THRESHHOLD:
+ if result < FORM_COUNT:
+ logger.error(f"User {key} has {result} unanswered questions < FORM_COUNT ({FORM_COUNT})")
+ else:
+ logger.info(f"User {key} has {result} unanswered questions < GENERATE_THRESHHOLD ({GENERATE_THRESHHOLD})")
+
+ """
+ if there is currently a generator for the user running then make sure we arent
+ making loads of new questions. possibly this should let someone know that there
+ is a question overrun?
+ """
+ for th in threading.enumerate():
+ if th.name == f"g{key}":
+ logger.warn(f"Generator {key} attempted to dispatch, but was already running")
+ return
+
+ logger.info(f"Generator {key} dispatched")
+ threading.Thread(target=generate_questions, args=(key,), name=f"g{key}").start()
+
+# add GENERATE_COUNT questions to the database for key x based off the previous questions
+def generate_questions(key: str):
+ start = time.time()
+ time.sleep(10)
+
+ # get the generic pre-written questions
+ with open("generic.json","r") as f:
+ generic = json.loads(f.read())
+
+ # add the user to the database
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+
+ for question in generic:
+ cursor.execute(f"""
+ INSERT INTO u{key} (
+ question,
+ option_1,
+ option_2,
+ option_3,
+ option_4,
+ created
+ ) VALUES (
+ '{question['question']}',
+ '{question['option_1']}',
+ '{question['option_2']}',
+ '{question['option_3']}',
+ '{question['option_4']}',
+ {round(time.time())}
+ );
+ """)
+
+ connection.commit()
+ logger.info(f"Generator {key} generated {GENERATE_COUNT} questions in {time.time() - start}s")
+
+@app.route("/auth")
+@auth.login_required
+def control():
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+
+ users = cursor.execute(f"""
+ SELECT COUNT(*) FROM master;
+ """).fetchone()[0]
+
+ answered = cursor.execute(f"""
+ SELECT SUM(counter)
+ FROM master;
+ """).fetchone()[0]
+
+ average = cursor.execute(f"""
+ SELECT AVG(counter)
+ FROM master;
+ """).fetchone()[0]
+
+ return render_template("auth.html", answered=answered, average=average, users=users)
+
+@app.route("/auth/data")
+def get_data():
+ for f in os.listdir(f"{app.root_path}/export"):
+ os.remove(f"{app.root_path}/export/{f}")
+
+ zfile = zipfile.ZipFile(f"{app.root_path}/export.zip", "w", compression=zipfile.ZIP_DEFLATED)
+
+ with sqlite3.connect(f"{app.root_path}/database.db") as connection:
+ cursor = connection.cursor()
+
+ tables = cursor.execute(
+ "SELECT name FROM sqlite_master WHERE type = 'table';").fetchall()
+ tables = [t[0] for t in tables]
+
+ for table in tables:
+ cursor.execute(f"SELECT * FROM {table};")
+ data = cursor.fetchall()
+
+ with open(f"{app.root_path}/export/{table}.csv", "w", newline="", encoding="utf-8") as f:
+ writer = csv.writer(f)
+ writer.writerow([header[0] for header in cursor.description])
+ writer.writerows(data)
+
+ zfile.write(f"{app.root_path}/export/{table}.csv", f"{table}.csv")
+
+ zfile.close()
+
+ return send_from_directory(app.root_path, "export.zip", as_attachment=True)
+
+def backup():
+ backup_time = round(time.time())
+ src = sqlite3.connect(f"{app.root_path}/database.db")
+ dst = sqlite3.connect(f"{app.root_path}/backup/backup{backup_time}.db")
+ src.backup(dst)
+
+ logger.info(f"Database backed up to 'backup/backup{backup_time}.db'")
+
+ backups = [x for x in os.listdir(f"{app.root_path}/backup") if x[0] == "b"]
+ if len(backups) > BACKUP_COUNT:
+ backups.sort(key=lambda x: int(x[6:-3]))
+
+ for i in range(len(backups) - BACKUP_COUNT):
+ os.remove(f"{app.root_path}/backup/{backups[i]}")
+ logger.warning(f"Removed backup database '{backups[i]}'")
+
+@app.route("/auth/backup")
+@auth.login_required
+def http_backup():
+ backup()
+ return "", 200
+
+@app.route("/auth/reset")
+@auth.login_required
+def reset():
+ backup_time = round(time.time())
+ with sqlite3.connect(f"{app.root_path}/database.db") as src:
+ dst = sqlite3.connect(f"{app.root_path}/backup/reset{backup_time}.db")
+ src.backup(dst)
+
+ logger.info("Reseting database")
+ logger.info(f"Database backed up to 'backup/reset{backup_time}.db'")
+
+ cursor = src.cursor()
+
+ cursor.execute("DELETE FROM master;")
+ tables = cursor.execute(
+ "SELECT name FROM sqlite_master WHERE type = 'table';").fetchall()
+
+ for table in tables:
+ if table[0][0] == "u":
+ cursor.execute(f"DROP TABLE {table[0]};")
+
+ return "", 200
+
+scheduler = BackgroundScheduler()
+scheduler.add_job(backup, 'interval', hours=BACKUP_TIME)
+scheduler.start()
+
+logger.info("Scheduled backups active")
+
+if __name__ == "__main__":
+ app.run(host='127.0.0.1', port=8000, debug=True)
--- /dev/null
+[
+ {
+ "question": "Example question 1",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 2",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 3",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 4",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 5",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 6",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 7",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 8",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 9",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ },
+ {
+ "question": "Example question 10",
+ "option_1": "answer 1",
+ "option_2": "answer 2",
+ "option_3": "answer 3",
+ "option_4": "answer 4"
+ }
+]