import subprocess from datetime import datetime from config import config import requests from multiprocessing import Process class SpeakProcess(Process): def __init__(self, text): super().__init__() self.daemon = True self.text = text def run(self): print("speaking:", self.text) subprocess.run(["espeak", self.text]) class Event(): def __init__(self, dial_manager, phone_held, phone_hung_up): self.dial_manager = dial_manager self.phone_held = phone_held self.phone_hung_up = phone_hung_up def get_name(self): return "generic event" def get_text(self): return "event" def speak(self): process = subprocess.Popen(["espeak", self.get_text()]) while process.poll() is None: if self.phone_hung_up.wait(timeout=0.1): process.terminate() class FortuneEvent(Event): def get_name(self): return "fortune" def get_text(self): res = subprocess.run(["fortune", "-s"], stdout=subprocess.PIPE) return res.stdout.decode("utf-8") class DirectoryEvent(Event): def get_name(self): return "directory" def get_text(self): def recursive_build(prefix, sequences): sequence_list = [] for key in sequences.keys(): sequence = prefix+" "+str(key) if isinstance(sequences[key], dict): sequence_list += recursive_build(sequence, sequences[key]) else: sequence_list.append((sequence, sequences[key])) return sequence_list sequences = self.dial_manager.sequences text = "" for sequence, event in recursive_build("", sequences): text += f"For {event.get_name()} dial {sequence}. " return text class OperatorEvent(Event): def get_name(self): return "operator" def get_text(self): now = datetime.now() nearest_quarter = ((n.minute // 15 + 1) * 15) % 60 hour = now.hour % 12 day_half = "A M" if hour != now.hour: day_half = "P M" return f"Right now, it is {hour} {now.minute} {day_half}." class WeatherEvent(Event): def get_name(self): return "weather" def get_text(self): key = config["openweathermap"]["key"] lat = config["openweathermap"]["lat"] lon = config["openweathermap"]["lon"] location = config["openweathermap"]["location"] units = config["openweathermap"]["units"] onecall_url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={key}&units={units}" onecall = requests.get(onecall_url).json() current_temp = onecall["current"]["temp"] current_desc = onecall["current"]["weather"][0]["description"] weather = f"In {location}, it is {current_temp} degrees with {current_desc}. " high_temp = onecall["daily"][0]["temp"]["max"] pop = onecall["daily"][0]["pop"] * 100 forecast = f"Today there is a high of {high_temp} with a {pop} percent chance of percipitation." return f"{weather}{forecast}" class TimerEvent(Event): def get_name(self): return "timer" def get_text(self): start = datetime.now() self.phone_hung_up.wait() end = datetime.now() print(end - start) return "time" class RecordThread(Process): def __init__(self): super().__init__() self.daemon = True def run(self): subprocess.run(["time", "cat"]) class RecordEvent(Event): def get_name(self): return "recorder" def get_text(self): start = datetime.now() print("timer start", start) record_thread = RecordThread() record_thread.start() self.phone_hung_up.wait() print("terminating") record_thread.terminate() end = datetime.now() print(end - start) return str(end-start)