import subprocess from datetime import datetime from config import config import requests from multiprocessing import Process class Event(): def __init__(self, dial_manager, phone_held, phone_hung_up): self.dial_manager = dial_manager self.phone_held = phone_held self.phone_hung_up = phone_hung_up def get_name(self): return "generic event" def get_text(self): return "event" def speak(self): text = self.get_text() print("speaking:", text) subprocess.run(["espeak", text]) class FortuneEvent(Event): def get_name(self): return "fortune" def get_text(self): res = subprocess.run(["fortune", "-s"], stdout=subprocess.PIPE) return res.stdout.decode("utf-8") class DirectoryEvent(Event): def get_name(self): return "directory" def get_text(self): def recursive_build(prefix, sequences): sequence_list = [] for key in sequences.keys(): sequence = prefix+" "+str(key) if isinstance(sequences[key], dict): sequence_list += recursive_build(sequence, sequences[key]) else: sequence_list.append((sequence, sequences[key])) return sequence_list sequences = self.dial_manager.sequences text = "" for sequence, event in recursive_build("", sequences): text += f"For {event.get_name()} dial {sequence}. " return text class OperatorEvent(Event): def get_name(self): return "operator" def get_text(self): now = datetime.now() return f"Right now, it is {now.hour} {now.minute}. Dial 4 1 1 for directory." class WeatherEvent(Event): def get_name(self): return "weather" def get_text(self): key = config["openweathermap"]["key"] lat = config["openweathermap"]["lat"] lon = config["openweathermap"]["lon"] location = config["openweathermap"]["location"] units = config["openweathermap"]["units"] onecall_url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={key}&units={units}" onecall = requests.get(onecall_url).json() current_temp = onecall["current"]["temp"] current_desc = onecall["current"]["weather"][0]["description"] weather = f"In {location}, it is {current_temp} degrees with {current_desc}. " high_temp = onecall["daily"][0]["temp"]["max"] pop = onecall["daily"][0]["pop"] * 100 forecast = f"Today there is a high of {high_temp} with a {pop} percent chance of percipitation." return f"{weather}{forecast}" class TimerEvent(Event): def get_name(self): return "timer" def get_text(self): start = datetime.now() self.phone_hung_up.wait() end = datetime.now() print(end - start) return "time" class RecordThread(Process): def __init__(self): super().__init__() self.daemon = True def run(self): subprocess.run(["time", "cat"]) class RecordEvent(Event): def get_name(self): return "recorder" def get_text(self): start = datetime.now() print("timer start", start) record_thread = RecordThread() record_thread.start() self.phone_hung_up.wait() print("terminating") record_thread.terminate() end = datetime.now() print(end - start) return str(end-start)