From c72abd79e4dbfd9b533d75bb00fea1ded9975d95 Mon Sep 17 00:00:00 2001 From: Mark Powers Date: Mon, 12 Jul 2021 03:15:06 +0100 Subject: Add config, weather event, and demo timer event demoing hang up event --- .gitignore | 1 + config.py | 4 ++++ dial.py | 19 ++++++++++--------- events.py | 49 ++++++++++++++++++++++++++++++++++++++----------- hangup.py | 5 ++++- main.py | 14 ++++++++------ 6 files changed, 65 insertions(+), 27 deletions(-) create mode 100644 config.py diff --git a/.gitignore b/.gitignore index 0d20b64..5d3ea03 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +*.ini diff --git a/config.py b/config.py new file mode 100644 index 0000000..8e1f6c3 --- /dev/null +++ b/config.py @@ -0,0 +1,4 @@ +import configparser + +config = configparser.ConfigParser() +config.read('config.ini') diff --git a/dial.py b/dial.py index 1d08a05..0d14a32 100644 --- a/dial.py +++ b/dial.py @@ -2,7 +2,6 @@ from datetime import datetime, timedelta import time import RPi.GPIO as GPIO import threading - import events BUTTON_GPIO = 25 @@ -10,10 +9,9 @@ REST_TIME = 0.3 # seconds DIAL_RESET_TIME = 5 # seconds class DialThread(threading.Thread): - def __init__(self, queue, pressed): + def __init__(self, queue): threading.Thread.__init__(self, args=(), kwargs=None) self.queue = queue - self.pressed = pressed self.daemon = True GPIO.setmode(GPIO.BCM) @@ -44,20 +42,23 @@ class DialThread(threading.Thread): time.sleep(0.01) def dial_over(self, rest_start): + # After REST_TIME seconds, assume the rotary dial stopped spinning return datetime.now() - rest_start > timedelta(seconds=REST_TIME) class DialManager: - def __init__(self): + def __init__(self, phone_held, phone_hung_up): self.sequence = [] self.update = datetime.now() - self._load_sequences() + self._load_sequences(phone_held, phone_hung_up) - def _load_sequences(self): + def _load_sequences(self, phone_held, phone_hung_up): self.sequences = { - 0: events.OperatorEvent(self), - 4: {1: {1: events.DirectoryEvent(self)}}, - 7: events.FortuneEvent(self), + 0: events.OperatorEvent(self, phone_held, phone_hung_up), + 1: events.WeatherEvent(self, phone_held, phone_hung_up), + 2: events.TimerEvent(self, phone_held, phone_hung_up), + 4: {1: {1: events.DirectoryEvent(self, phone_held, phone_hung_up)}}, + 7: events.FortuneEvent(self, phone_held, phone_hung_up), } print(self.sequences) diff --git a/events.py b/events.py index 77480b9..1ed185c 100644 --- a/events.py +++ b/events.py @@ -1,9 +1,13 @@ import subprocess from datetime import datetime +from config import config +import requests class Event(): - def __init__(self, dial_manager): + def __init__(self, dial_manager, phone_held, phone_hung_up): self.dial_manager = dial_manager + self.phone_held = phone_held + self.phone_hung_up = phone_hung_up def get_name(self): return "generic event" @@ -17,9 +21,6 @@ class Event(): subprocess.run(["espeak", text]) class FortuneEvent(Event): - def __init__(self, dial_manager): - super().__init__(dial_manager) - def get_name(self): return "fortune" @@ -28,9 +29,6 @@ class FortuneEvent(Event): return res.stdout.decode("utf-8") class DirectoryEvent(Event): - def __init__(self, dial_manager): - super().__init__(dial_manager) - def get_name(self): return "directory" @@ -47,14 +45,11 @@ class DirectoryEvent(Event): sequences = self.dial_manager.sequences text = "" for sequence, event in recursive_build("", sequences): - text += f"Dial {sequence} for {event.get_name()}. " + text += f"For {event.get_name()} dial {sequence}. " return text class OperatorEvent(Event): - def __init__(self, dial_manager): - super().__init__(dial_manager) - def get_name(self): return "operator" @@ -62,3 +57,35 @@ class OperatorEvent(Event): now = datetime.now() return f"Right now, it is {now.hour} {now.minute}. Dial 4 1 1 for directory." + +class WeatherEvent(Event): + def get_name(self): + return "weather" + + def get_text(self): + key = config["openweathermap"]["key"] + lat = config["openweathermap"]["lat"] + lon = config["openweathermap"]["lon"] + location = config["openweathermap"]["location"] + units = config["openweathermap"]["units"] + onecall_url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={key}&units={units}" + onecall = requests.get(onecall_url).json() + current_temp = onecall["current"]["temp"] + current_desc = onecall["current"]["weather"][0]["description"] + weather = f"In {location}, it is {current_temp} degrees with {current_desc}. " + high_temp = onecall["daily"][0]["temp"]["max"] + pop = onecall["daily"][0]["pop"] * 100 + forecast = f"Today there is a high of {high_temp} with a {pop} percent chance of percipitation." + return f"{weather}{forecast}" + + +class TimerEvent(Event): + def get_name(self): + return "timer" + + def get_text(self): + start = datetime.now() + self.phone_hung_up.wait() + end = datetime.now() + print(end - start) + return "time" diff --git a/hangup.py b/hangup.py index 2bfb1a7..cfe6bdf 100644 --- a/hangup.py +++ b/hangup.py @@ -5,9 +5,10 @@ import time BUTTON_GPIO = 24 class HangUpThread(threading.Thread): - def __init__(self, phone_held, dial_manager): + def __init__(self, phone_held, phone_hung_up, dial_manager): threading.Thread.__init__(self, args=(), kwargs=None) self.phone_held = phone_held + self.phone_hung_up = phone_hung_up self.dial_manager = dial_manager self.daemon = True @@ -18,8 +19,10 @@ class HangUpThread(threading.Thread): while True: if not GPIO.input(BUTTON_GPIO): self.phone_held.clear() + self.phone_hung_up.set() self.dial_manager.clear_sequence() else: self.phone_held.set() + self.phone_hung_up.clear() time.sleep(0.1) diff --git a/main.py b/main.py index ef51809..674e644 100755 --- a/main.py +++ b/main.py @@ -10,27 +10,29 @@ from threading import Event if __name__ == "__main__": queue = Queue() phone_held = Event() + phone_hung_up = Event() # start phone as on hook phone_held.clear() + phone_hung_up.set() - dial_thread = DialThread(queue, phone_held) - print("starting dial thread") + dial_thread = DialThread(queue) dial_thread.start() - dial_manager = DialManager() + dial_manager = DialManager(phone_held, phone_hung_up) - hang_up_thread = HangUpThread(phone_held, dial_manager) - print("starting hang up thread") + hang_up_thread = HangUpThread(phone_held, phone_hung_up, dial_manager) hang_up_thread.start() - print("main loop") while True: + # Wait for phone to be picked up phone_held.wait() + # Dial a number try: dialed = queue.get(block=True, timeout=0.1) except: continue response = dial_manager.dial(dialed) + # If we matched a sequence, play out event if response is not None: response.speak() -- cgit v1.2.3