from urllib.parse import urlparse, parse_qs from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler import json import psycopg2 import re class TrackerHTTPRequestHandler(BaseHTTPRequestHandler): def __init__(self, *args): BaseHTTPRequestHandler.__init__(self, *args) def insert(self, datatype=None, key=None, value=None, created=None): if value: print("inserting", datatype, key, value) with conn.cursor() as cur: if key: cur.execute("INSERT INTO datapoint (datatype, key, value, created) VALUES (%s, %s, %s, %s)", (datatype, key, value, created)) else: cur.execute("INSERT INTO datapoint (datatype, value, created) VALUES (%s, %s, %s)", (datatype, value, created)) conn.commit() def insert_book_datapoint(self, title, pages, complete, created=None): with conn.cursor() as cur: cur.execute("SELECT id FROM book where title = %s", (title,)) row = cur.fetchone() book_id = row[0] print(book_id, pages, created) cur.execute("INSERT INTO book_datapoint (book_id, pages, created) VALUES (%s, %s, %s)", (book_id, pages, created)) if complete: cur.execute("UPDATE book SET completed = TRUE where id = %s", (book_id,)) conn.commit() def send_static_file(self, path): with open(f"static/{path}", "rb") as f: self.send_response(200) self.end_headers() self.wfile.write(f.read()) def do_GET(self): u = urlparse(self.path) print("GET", u.path) print("/timer", u.path) if u.path == "/": self.send_static_file("index.html") elif u.path == "/new_form": self.send_static_file("new_form.html") elif u.path == "/timer": self.send_static_file("timer.html") elif date_pattern.match(u.path[1:]): with conn.cursor() as cur: cur.execute("SELECT datatype, key, value FROM datapoint where date(created) = %s", (u.path[1:],)) items = [] for datatype, key, value in cur: items.append({ "datatype": datatype, "key": key, "value": value }) self.send_response(200) self.end_headers() self.wfile.write(json.dumps(items).encode("utf-8")) elif u.path == "/books": with conn.cursor() as cur: cur.execute("SELECT title FROM book WHERE completed = FALSE") items = [] for row in cur: items.append(row[0]) self.send_response(200) self.end_headers() self.wfile.write(json.dumps(items).encode("utf-8")) elif u.path == "/tasks": with conn.cursor() as cur: cur.execute("SELECT type FROM task") items = [] for row in cur: items.append(row[0]) self.send_response(200) self.end_headers() self.wfile.write(json.dumps(items).encode("utf-8")) elif u.path == "/forms": with conn.cursor() as cur: cur.execute("SELECT type, prompt, prompt_id, extra FROM form ORDER BY id") items = [] for row in cur: items.append({ "type": row[0], "prompt": row[1], "prompt_id": row[2], "extra": row[3], }) self.send_response(200) self.end_headers() self.wfile.write(json.dumps(items).encode("utf-8")) else: self.send_response(404) self.end_headers() self.wfile.write(b'not found') def do_POST(self): u = urlparse(self.path) try: length = int(self.headers['Content-Length']) post_data = json.loads(self.rfile.read(length).decode('utf-8')) except: post_data = {} print("POST", u.path) if u.path == "/submit_payload": timestamp = datetime.fromtimestamp(int(post_data["timestamp"]/1000)) del post_data["timestamp"] for book in post_data.get("books", []): try: self.insert_book_datapoint(**book, created=timestamp) except Exception as e: # Eventually add error messaging print("ERROR", e) pass post_data.pop("books", None) for k in post_data.keys(): if isinstance(post_data[k], dict): for key, value in post_data[k].items(): self.insert(datatype=k, key=key, value=value, created=timestamp) else: self.insert(datatype=k, value=post_data[k], created=timestamp) self.send_response(204) self.end_headers() self.wfile.write(b"") elif u.path == "/submit_new_form": prompt_type = post_data.pop("type") prompt = post_data.pop("prompt") prompt_id = post_data.pop("prompt_id") extra = json.dumps(post_data) with conn.cursor() as cur: cur.execute("INSERT INTO form (type, prompt, prompt_id, extra) VALUES (%s, %s, %s, %s)", (prompt_type, prompt, prompt_id, extra)) conn.commit() self.send_response(204) self.end_headers() self.wfile.write(b"") elif u.path == "/submit_task_time": task_type = post_data.pop("type") seconds = post_data.pop("seconds") with conn.cursor() as cur: cur.execute("INSERT INTO task_datapoint (type, seconds) VALUES (%s, %s)", (task_type, seconds)) conn.commit() self.send_response(204) self.end_headers() self.wfile.write(b"") elif u.path == "/sensors/airgradient/measures": chip_id = post_data.pop("chip_id") co2 = post_data.pop("rco2") pm25 = post_data.pop("pm02") temp = post_data.pop("atmp") rhum = post_data.pop("rhum") with conn.cursor() as cur: cur.execute("INSERT INTO airgradient_measure (created, chip_id, co2, pm25, temp, rhum) VALUES (NOW(), %s, %s, %s, %s, %s)", (chip_id, co2, pm25, temp, rhum)) conn.commit() self.send_response(204) self.end_headers() self.wfile.write(b"") elif u.path == "/automation": created = datetime.now() q = parse_qs(u.query) if "lat" in q and "lon" in q: with conn.cursor() as cur: cur.execute("INSERT INTO datapoint (datatype, key, value, created) VALUES (%s, %s, %s, %s)", ("geoposition", "latitude", q["lat"], created)) cur.execute("INSERT INTO datapoint (datatype, key, value, created) VALUES (%s, %s, %s, %s)", ("geoposition", "longitude", q["lon"], created)) conn.commit() self.send_response(204) self.end_headers() self.wfile.write(b"") else: self.send_response(404) self.end_headers() self.wfile.write(b'not found') def setup_db(): with conn.cursor() as cur: cur.execute("CREATE TABLE IF NOT EXISTS datapoint (id SERIAL PRIMARY KEY, created TIMESTAMP, datatype TEXT, key TEXT, value TEXT);") cur.execute("CREATE TABLE IF NOT EXISTS outside_weather (id SERIAL PRIMARY KEY, created TIMESTAMP, temp FLOAT8, humidity FLOAT8, pressure FLOAT8, uvi FLOAT8, dew_point FLOAT8, wind_speed FLOAT8, wind_guest FLOAT8, wind_deg FLOAT8);") cur.execute("CREATE TABLE IF NOT EXISTS book (id SERIAL PRIMARY KEY, title TEXT, completed BOOLEAN);") cur.execute("CREATE TABLE IF NOT EXISTS task (id SERIAL PRIMARY KEY, type TEXT);") cur.execute("CREATE TABLE IF NOT EXISTS task_datapoint (id SERIAL PRIMARY KEY, type TEXT, seconds TEXT);") cur.execute("CREATE TABLE IF NOT EXISTS book_datapoint (id SERIAL PRIMARY KEY, created TIMESTAMP, book_id SERIAL, pages TEXT, CONSTRAINT fk_book FOREIGN KEY(book_id) REFERENCES book(id));") cur.execute("CREATE TABLE IF NOT EXISTS email (id SERIAL PRIMARY KEY, created TIMESTAMP, username TEXT, domain TEXT);") cur.execute("CREATE TABLE IF NOT EXISTS form (id SERIAL PRIMARY KEY, type TEXT, prompt TEXT, prompt_id TEXT, extra JSON);") cur.execute("CREATE TABLE IF NOT EXISTS airgradient_measure (id SERIAL PRIMARY KEY, created TIMESTAMP, chip_id INTEGER, co2 INTEGER, pm25 INTEGER, temp REAL, rhum INTEGER);") conn.commit() date_pattern = re.compile(r"(\d{4})-(\d{2})-(\d{2})") conn = psycopg2.connect(host="db", dbname="tracking", user="tracking", password="password") if __name__ == "__main__": setup_db() print("Starting http server") http = HTTPServer(("", 8000), TrackerHTTPRequestHandler) print("serving forever") http.serve_forever()