#! /usr/bin/env python # use operating system env command to locate and execute Python by searching the PATH environment variable # ------------------------------------------- ## # Speak to Me - ver 1.39 [May 2023] # # -- Important Prep -- # ******************** # Pre-Install sox in order to play files with subprocess # # # Add this to ~/.bashrc (to remove atomic8 module error) when importing texttospeech # export LD_PRELOAD=/usr/lib/arm-linux-gnueabihf/libatomic.so.1.2.0 # ------------------------------------------- ## # # See main() for Argument list # from random import randrange # add GPIO requirements import signal import RPi.GPIO as GPIO import socket # Add to check for internet connection import urllib.request import os, subprocess, time, sys, re import shlex # use to split command and arguments into proper array from subprocess import STDOUT from dateutil.parser import parse as dtparse from datetime import date, timedelta, datetime as dt, timezone as tz from timeit import default_timer as timer # use as time between button presses from google.cloud import texttospeech from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError # ################ Parameters that can be modified #################### VOICE_GENDER = 'male' # set to female or male MAX_EVENTS = 10 FUTURE_EVENT_DAYS = 7 # Optional Audio Settings AUDIO_FOLDER_1 = 'your_audio_subfolder_of_mp3_files' # under home login folder AUDIO_1_DESCRIPTION = 'description' AUDIO_FOLDER_2 = 'your_audio_subfolder_of_mp3_files' # under home login folder AUDIO_2_DESCRIPTION = 'description' # MEALS and their TIMES # Announce these meals and their times MEALS = ["breakfast", "lunch", "supper"] MEAL_TIMES = ["7:20 AM", "11:15 AM", "4:15 PM"] # Time (- mins) since previous meal started to notify that it just started RECENT_MEAL_STARTED_MINS = -35 # (negative number of minutes from now) FIRST_MEAL_NEXT_DAY = MEALS[0] + ', tomorrow morning' # Describe when the first meal of day occurs on the next day # Calendar OAuth Key File CALENDAR_OAUTH_FILE = 'desktop-app.json' # subfolder is used to store API/service credentials and refresh token # must exist already and contain CALENDAR_OAUTH_FILE, TTS_KEY_FILE CREDENTIALS_SUBFOLDER = '.credentials' # Text to Speech Setup (retrieved from Google Console) # Download JSON from after adding Credentials - Oauth - Desktop App # Is this from a service account TTS_KEY_FILE = 'calendar-text-to-speech-your-key.json' # ######################################################################### # -------- application constants -------- # APPLICATION_NAME = "It's a Button Box" SCOPES = ['https://www.googleapis.com/auth/calendar.events.readonly'] # Wrap in [] # temp file names - created on the fly to play TTS phrases TEMP_AUDIO_FILE = '/tmp/temp_phrase.mp3' AUDIO_FILE_EXT = 'mp3' # case insensitive audio file type being played # this file gets created after first successful login CLIENT_TOKEN_FILE = 'calendar-token.json' # delete the file from home folder if you need to force new authentication # Audio constants - from J8 - pins 7,11,13,15; pin 9 used for ground BUTTON_1_GPIO = 4 # YELLOW (back left) BUTTON_2_GPIO = 22 # BLUE (back right) BUTTON_3_GPIO = 27 # GREEN (front left) BUTTON_4_GPIO = 17 # RED (front right) YELLOW = BUTTON_1_GPIO BLUE = BUTTON_2_GPIO GREEN = BUTTON_3_GPIO RED = BUTTON_4_GPIO # unix sox = play ('play "' # windows '"C:\Program Files (x86)\sox\play" "' SOX_CMD = '/usr/bin/play ' #Regular expression to look for time format in sox log file REGEX_TIME = '(\d\d:\d\d:\d\d\.\d\d)' TIME_PRESSED = None SOX_PROCESS = None SOX_ID = None SOX_LOG_FILE = None SOX_LOG_FILE_HANDLE = None # ----------------------------------------- # FUNCTIONS # ****************************************************************************** # Checks for Internet and DNS server lookup def connected(): try: urllib.request.urlopen('https://google.com') #Python 3.x return True except: return False # Interrupt handler def signal_handler(sig, frame): global SOX_PROCESS, SOX_LOG_FILE_HANDLE, SOX_ID print('Exiting Program from Ctrl-C ...') # Stop any running sox play if (SOX_ID is None): print("No SOX Process Found") pass else: print("Killing Sox Process") # (output, error) = SOX_PROCESS.communicate() os.kill(SOX_ID, signal.SIGTERM) SOX_PROCESS.wait() SOX_PROCESS = None SOX_ID = None #SOX_PROCESS.send_signal(signal.SIGTERM) # changed from interrupt if (SOX_LOG_FILE_HANDLE is None): pass else: print('Closing Sox Log File') SOX_LOG_FILE_HANDLE.close() SOX_LOG_FILE_HANDLE = None # Stop GPIO processes GPIO.cleanup() time.sleep(0.5) exit(0) def cancel_GPIO_detection(): GPIO.remove_event_detect(YELLOW) GPIO.remove_event_detect(BLUE) GPIO.remove_event_detect(GREEN) GPIO.remove_event_detect(RED) def button_pressed_callback(channel): global SOX_PROCESS, SOX_LOG_FILE_HANDLE, SOX_ID, TIME_PRESSED # ... print('Button Pressed: ' + str(channel)) print('GPIO input: ' + str(GPIO.input)) stop_playback = False ignore_button = False if TIME_PRESSED is None: pass else: time_now = timer() time_since_last_pressed = time_now - TIME_PRESSED print("SECONDS SINCE BUTTON FIRST PRESSED: " + str(time_since_last_pressed)) if (time_since_last_pressed > 2.0): # Stop any running sox play if (SOX_ID is None): print("No SOX Process Found") pass else: print('Audio Stopping: ID: ' + str(SOX_ID)) os.kill(SOX_ID, signal.SIGTERM) #os.kill(SOX_ID, signal.SIGTERM) #the normal way to politely ask a program to terminate. SOX_PROCESS.wait() SOX_PROCESS = None SOX_ID = None stop_playback = True if (SOX_LOG_FILE_HANDLE is None): pass else: print('Closing Log File') SOX_LOG_FILE_HANDLE.close() SOX_LOG_FILE_HANDLE = None time.sleep(0.5) else: # Ignore any quick false button presses print("!! Ignoring Button Press !!") ignore_button = True if ((not ignore_button) and (not stop_playback)): # process button press time.sleep(0.1) if GPIO.input(YELLOW): if DEBUG: print("Button 1 pressed!") get_start_end_dates() # Get Today's events announce_events(local_date_time, today_end, 'TODAY', addDetail:=False) # Get Tomorrow's events announce_events(tomorrow_min, tomorrow_max, 'TOMORROW', addDetail:=False) elif GPIO.input(BLUE): if DEBUG: print("Button 2 pressed!") # Get Next Meal Time next_meal_time() elif GPIO.input(GREEN): if DEBUG: print("Button 3 pressed!") cbc_file = '/home/speaktome/news/cbc_news.mp3' SOX_LOG_FILE = '/home/speaktome/news/sox-play.log' last_file_found = False start_time = "0" # use log file to get name of last played audio file and where we left off playing if (os.path.isfile(SOX_LOG_FILE)): print('Opening Log File: ' + str(SOX_LOG_FILE)) print('Looking for extension: ' + AUDIO_FILE_EXT) with open(SOX_LOG_FILE, "r") as file: for line in file: if not (last_file_found): if (AUDIO_FILE_EXT in line): last_file_found = True last_played_file = line.strip()[:-1] # remove ending character ":" print("LAST PLAYED FILE: " + last_played_file) elif 'In:' in line: # #In:0.68% 00:00:06.66 [00:16:16.14] Out:106k [ =====|===== ] Hd:4.4 Clip:0 last_line = line elif 'Aborted' in line: # Play was interrupted when file played last match = re.search(REGEX_TIME, last_line) if match: start_time = match.group(0) if match else None print("USING OLD START TIME: " + start_time) print('LAST PLAYED ' + last_played_file + ' AT: ' + start_time) finished_file = False break elif 'Done' in line: # File fully finished playing finished_file = True break file.close sys.stdout.flush() time.sleep(0.25) # maks sure file is closed before opening it for write else: print('No Log File Found') finished_file = True print('Opening: ' + SOX_LOG_FILE + ' at ' + start_time) with open(SOX_LOG_FILE, 'w') as SOX_LOG_FILE_HANDLE: # use array of command arguments for shell = False # Add current work directory # use -d flag for sox command to replace linked play command # use -t alsa to quiet the encoding message # Test using trim play_cmd_string = SOX_CMD + cbc_file + ' -V1' + ' trim ' + start_time print(play_cmd_string) play_command = shlex.split(play_cmd_string) print("Opening SOX Process") SOX_PROCESS = subprocess.Popen(play_command, stdout=SOX_LOG_FILE_HANDLE, stderr=STDOUT, shell=False) SOX_ID = SOX_PROCESS.pid TIME_PRESSED = timer() #playAudioFromFolder(AUDIO_FOLDER_1, AUDIO_1_DESCRIPTION) elif GPIO.input(RED): if DEBUG: print("Button 4 pressed!") folderName = AUDIO_FOLDER_2 description = AUDIO_2_DESCRIPTION mp3_folder = os.path.join(home_dir, folderName) SOX_LOG_FILE = os.path.join(mp3_folder, 'sox-play.log') print('Audio Folder: ' + mp3_folder) if not (os.path.isdir(mp3_folder)): if DEBUG: print('No Audio Folder found: ' + mp3_folder) say_phrase('Audio Folder for " + description + " Could Not Be Found') exit(0) print('Playing :' + description) say_phrase('Playing ' + description + ', press button again to stop , ') finished_file = False last_file_found = False last_played_file = '' start_time = "0" # use log file to get name of last played audio file and where we left off playing if (os.path.isfile(SOX_LOG_FILE)): print('Opening Log File: ' + str(SOX_LOG_FILE)) print('Looking for extension: ' + AUDIO_FILE_EXT) with open(SOX_LOG_FILE, "r") as file: for line in file: if not (last_file_found): if (AUDIO_FILE_EXT in line): last_file_found = True last_played_file = line.strip()[:-1] # remove ending character ":" print("LAST PLAYED FILE: " + last_played_file) elif 'In:' in line: # #In:0.68% 00:00:06.66 [00:16:16.14] Out:106k [ =====|===== ] Hd:4.4 Clip:0 last_line = line elif 'Aborted' in line: # Play was interrupted when file played last match = re.search(REGEX_TIME, last_line) if match: start_time = match.group(0) if match else None print("USING OLD START TIME: " + start_time) print('LAST PLAYED ' + last_played_file + ' AT: ' + start_time) finished_file = False break elif 'Done' in line: # File fully finished playing finished_file = True break file.close sys.stdout.flush() time.sleep(0.25) # maks sure file is closed before opening it for write else: print('No Log File Found') finished_file = True audio_file = '' first_file = '' # if last file found then make sure it exists, otherwise select first file in list list_of_files = sorted( filter( lambda x: re.search('\.' + AUDIO_FILE_EXT + '$', x, re.IGNORECASE) , os.listdir(mp3_folder) ) ) for file_name in list_of_files: if not (last_file_found): # play first file in list print('Playing first file - no previous file was played') audio_file = os.path.join(mp3_folder, file_name) print("Playing first file found: " + audio_file) break elif (first_file == ''): # remember this file just in case last one played doesn't exist first_file = os.path.join(folderName, file_name) if (finished_file) and (last_played_file == ''): # this only happens if last file finished conditions met below audio_file = os.path.join(mp3_folder, file_name) say_phrase('Starting a New Audio File') print('Starting With New File: ' + audio_file) break elif (file_name in last_played_file ): if (finished_file): last_played_file = '' # continue loop one more time if possible else: audio_file = os.path.join(mp3_folder, file_name) break if (audio_file == '') and (first_file != ''): # start from the top print('STARTING FROM THE TOP') audio_file = first_file start_time = '0' say_phrase('Starting From Top') sleep(0.25) # wait after say phrase if (audio_file != ''): with open(SOX_LOG_FILE, 'w') as SOX_LOG_FILE_HANDLE: # Test using trim play_cmd_string = SOX_CMD + audio_file + ' -V1' + ' trim ' + start_time print(play_cmd_string) play_command = shlex.split(play_cmd_string) print("Opening SOX Process") SOX_PROCESS = subprocess.Popen(play_command, stdout=SOX_LOG_FILE_HANDLE, stderr=STDOUT, shell=False) SOX_ID = SOX_PROCESS.pid TIME_PRESSED = timer() def start_GPIO_detection(): GPIO.add_event_detect(YELLOW, GPIO.FALLING, callback=button_pressed_callback, bouncetime=100) GPIO.add_event_detect(BLUE, GPIO.FALLING, callback=button_pressed_callback, bouncetime=100) GPIO.add_event_detect(GREEN, GPIO.FALLING, callback=button_pressed_callback, bouncetime=100) GPIO.add_event_detect(RED, GPIO.FALLING, callback=button_pressed_callback, bouncetime=100) def setup_GPIO(): GPIO.setmode(GPIO.BCM) # BOARD uses PIN # # set up as an input, pulled down (0V), connected to 3V3 on button press GPIO.setup(YELLOW, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.setup(BLUE, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.setup(GREEN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) GPIO.setup(RED, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) start_GPIO_detection() # say dates and times correctly (maybe use SSML in future to get ordinals) def minutes(m): if m==0: return '' elif m<10: # say OH 9 instead of 9 return 'Oh ' + str(m) else: return str(m) # use example from https://stackoverflow.com/a/5891598/2419128 def suffix(d): return 'th' if 11<=d<=13 else {1:'st',2:'nd',3:'rd'}.get(d%10, 'th') def custom_strftime(format, t): strTime = t.strftime(format).replace('{S}', str(t.day) + suffix(t.day)) print('hour of day: ' + str(t.hour)) # Change AM/PM to reflect the general time of day if (t.hour) < 5: # strTime = strTime.replace('AM','in the morning, REALLY EARLY IN THE MORNING. You should go back to bed.') strTime = strTime.replace('PM','in the afternoon') elif (t.hour) < 12: strTime = strTime.replace('AM','in the morning') if (t.hour) >= 5: strTime = strTime.replace('PM','in the evening') else: strTime = strTime.replace('PM','in the afternoon') return strTime.replace('{mm}', minutes(t.minute)) def setup_calendar(): global service creds = None # The token json file stores the user's access and refresh token # Created automatically when authorization flow completes first time. token_path = os.path.join(credential_dir, CLIENT_TOKEN_FILE) credentials_path = os.path.join(credential_dir, CALENDAR_OAUTH_FILE) if os.path.exists(token_path): creds = Credentials.from_authorized_user_file(token_path, SCOPES) if DEBUG: print('reading previous credentials from token') try: # If there are no (valid) credentials available, let the user log in. # First time - a url will be displayed. Copy/paste into browser when # logged into Google Console if (not creds) or (not creds.valid): if DEBUG: print('creds invalid or missing') if creds and creds.expired and creds.refresh_token: if DEBUG: print('refreshing request') creds.refresh(Request()) if DEBUG: print('Creds refreshed') else: flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES) if DEBUG: print('loading cred using local server') creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(token_path, 'w') as token: token.write(creds.to_json()) # Call the Calendar API service = build('calendar', 'v3', credentials=creds) except HttpError as error: print('An error occurred in setup_calendar: %s' % error) exit(0) def say_phrase(sayAloud, audioFile=TEMP_AUDIO_FILE, tempo=0.8): if not connected(): print ('No Internet Connection') audioFile = os.path.join(home_dir, 'phrases/NO_INTERNET.MP3') else: # Optional parameter for tempo - slow it down if lower than 1 synthesis_input = texttospeech.SynthesisInput(text=sayAloud) # Perform the text-to-speech request on the text input with the selected # voice parameters and audio file type response = client.synthesize_speech(input=synthesis_input, voice=voice, audio_config=audio_config) # Write the response to the mp3 file. with open(audioFile, "wb") as out: out.write(response.audio_content) # use sox to play mp3 file and slow it down slightly subprocess.call(SOX_CMD + audioFile + ' -V1 -q tempo ' + str(tempo), shell=True) def announce_events(startTime, endTime, openingText, addDetail): ALL_DAY_FORMAT = '%A %B {S}' # Use SUMMARY format for 'TODAY', 'TOMORROW' DATE_TIME_DETAIL = '%A %B {S}, AT %-I {mm} %p' DATE_TIME_SUMMARY = ', AT %-I {mm} %p' print (startTime) print (endTime) try: events_result = service.events().list(showDeleted=False, maxResults=9999, calendarId='primary', timeMin=startTime, timeMax=endTime, singleEvents=True, orderBy='startTime').execute() if DEBUG: print (events_result) events = events_result.get('items', []) if not events: if DEBUG: print(openingText, ': NO EVENTS') else: openingText = 'You have nothing scheduled, ' + openingText say_phrase(openingText, TEMP_AUDIO_FILE) else: # Full day events num_events = 0 if DEBUG: print(openingText) else: say_phrase(openingText, TEMP_AUDIO_FILE) for event in events: start = event['start'].get('dateTime', event['start'].get('date')) if 'date' in event['start']: if (num_events == 0): phrase = ' is ' else: phrase = ' and ' phrase = phrase + event['summary'] if (addDetail): stime = custom_strftime(ALL_DAY_FORMAT, dtparse(start)) phrase = phrase + ', on ' + stime if DEBUG: print('Event: ', num_events, 'Phrase: ', phrase) else: say_phrase(phrase, TEMP_AUDIO_FILE) num_events = num_events + 1 # Appointments and timed events num_events = 0 for event in events: start = event['start'].get('dateTime', event['start'].get('date')) if 'dateTime' in event['start']: if (addDetail): stime = custom_strftime(DATE_TIME_DETAIL, dtparse(start)) phrase = 'On ' + stime + ', you have ' + event['summary'] else: if (num_events == 0): phrase = ' You have ' else: phrase = ' and ' stime = custom_strftime(DATE_TIME_SUMMARY, dtparse(start)) phrase = phrase + event['summary'] + stime if DEBUG: print('Event: ', num_events, 'Phrase: ', phrase) else: say_phrase(phrase, TEMP_AUDIO_FILE) num_events = num_events + 1 except HttpError as error: print('An error occurred in announce_events: %s' % error) say_phrase('Error Announcing Events') def get_start_end_dates(): global local_date_time, today_end global tomorrow_min, tomorrow_max global futureStart, futureEnd local_time = time.localtime() if DEBUG: print(dt.now) if DEBUG: print(local_time) # https://stackoverflow.com/a/39079819/2419128 local_date_time = dt.now(dt.now().astimezone().tzinfo).isoformat() if DEBUG: print('Local Date Time: %s' % local_date_time) # Get last 6 characters for the local timezone offset hours ("-hh:mm") localOffset = local_date_time[-6:] # Vancouver PST localTZ = str(dt.now().astimezone().tzinfo) end_of_day = dt.now().replace(hour=23, minute=59) if DEBUG: print('End of Day: %s' % end_of_day) # Get the hours offset from end of TZ formatted string today_end = end_of_day.isoformat() + localOffset if DEBUG: print('Today End: %s' % today_end) tomorrow_min = (end_of_day + timedelta(minutes=1)).isoformat() + localOffset tomorrow_max = (end_of_day + timedelta(days=1)).isoformat() + localOffset # Finally get events in future ############################### futureStart = (end_of_day + timedelta(days=1, minutes=1)).isoformat() + localOffset futureEnd = (end_of_day + timedelta(days=(FUTURE_EVENT_DAYS))).isoformat() + localOffset def all_calendar_events(): try: # Get Today's events announce_events(local_date_time, today_end, 'TODAY', addDetail:=False) # Get Tomorrow's events ############################### announce_events(tomorrow_min, tomorrow_max, 'TOMORROW', addDetail:=False) # Finally get events in future ############################### announce_events(futureStart, futureEnd, 'COMING UP', addDetail:=True) except HttpError as error: print('An error occurred: %s' % error) say_phrase("Error Occurred Announcing Events") def setup_tts(maleOrFemale='male'): # Instantiates a client global client, voice, audio_config client = texttospeech.TextToSpeechClient() # Build the voice request, select the language code ("en-US") and ssml # https://cloud.google.com/text-to-speech/docs/voices (Test all available voices) if (maleOrFemale=='male'): voice = texttospeech.VoiceSelectionParams( language_code="en-US", name="en-US-Wavenet-D", ssml_gender=texttospeech.SsmlVoiceGender.MALE ) else: voice = texttospeech.VoiceSelectionParams( language_code="en-US", name="en-US-Wavenet-F", ssml_gender=texttospeech.SsmlVoiceGender.FEMALE ) # Select the type of audio file you want returned audio_config = texttospeech.AudioConfig(audio_encoding=texttospeech.AudioEncoding.MP3) def welcome_message(): todayfmt = '%A %B {S}' stime = custom_strftime(todayfmt, dt.now()) phrase = "Today is " + stime print(phrase) if not DEBUG: say_phrase(phrase) tmfmt = '%-l {mm} %p' # Hour (12-hour clock) as a decimal number. 1, 2, … 12 # strip leading zero from time stime = custom_strftime(tmfmt, dt.now()) phrase = "The time is " + stime print('Welcome time: ', phrase) if not DEBUG: say_phrase(phrase) def get_hrs_mins(mealtime): tm = mealtime.split(':') hrs = int(tm[0]) mins_ampm = tm[1].split(' ') mins = int(mins_ampm[0]) # adjust hours if PM - use 24 hour clock if (mins_ampm[1] == 'PM'): hrs = hrs + 12 return hrs, mins def next_meal_time(): # Retrieve meal times from MEAL_TIMES list welcome_message() now = dt.now() meal_time = get_hrs_mins(MEAL_TIMES[0]) breakfast = now.replace(hour=meal_time[0], minute=meal_time[1], second=0) meal_time = get_hrs_mins(MEAL_TIMES[1]) lunch = now.replace(hour=meal_time[0], minute=meal_time[1], second=0) meal_time = get_hrs_mins(MEAL_TIMES[2]) supper = now.replace(hour=meal_time[0], minute=meal_time[1], second=0) # Get current time slot relative to meal times timeSinceBreakfast = (breakfast - now).total_seconds() timeSinceLunch = (lunch - now).total_seconds() timeSinceSupper = (supper - now).total_seconds() # Determine next meal if (timeSinceBreakfast > 0): hours = timeSinceBreakfast // 3600 minutes = (timeSinceBreakfast % 3600) // 60 nextmeal = MEALS[0] elif (timeSinceLunch > 0): hours = timeSinceLunch // 3600 minutes = (timeSinceLunch % 3600) // 60 nextmeal = MEALS[1] # Check to see if breakfast just started lastmins = (timeSinceBreakfast // 60) if (lastmins > RECENT_MEAL_STARTED_MINS): howlongago = str(abs(int(lastmins))) phrase = MEALS[0] + ' just started ' + howlongago if (howlongago==1): phrase = phrase + ' minute ago' else: phrase = phrase + ' minutes ago' if DEBUG: print('LAST MEAL NOTICE: ', phrase) else: say_phrase(phrase) elif (timeSinceSupper > 0): hours = timeSinceSupper // 3600 minutes = (timeSinceSupper % 3600) // 60 nextmeal = MEALS[2] # Check to see if lunch just started lastmins = (timeSinceLunch // 60) if (lastmins > RECENT_MEAL_STARTED_MINS): howlongago = str(abs(int(lastmins))) phrase = MEALS[1] + ' just started ' + howlongago if (howlongago==1): phrase = phrase + ' minute ago' else: phrase = phrase + ' minutes ago' if DEBUG: print('LAST MEAL NOTICE: ', phrase) else: say_phrase(phrase) else: nextmeal = FIRST_MEAL_NEXT_DAY # Check to see if supper just started lastmins = (timeSinceSupper // 60) if (lastmins > RECENT_MEAL_STARTED_MINS): howlongago = str(abs(int(lastmins))) phrase = MEALS[2] + ' just started ' + howlongago if (howlongago==1): phrase = phrase + ' minute ago' else: phrase = phrase + ' minutes ago' if DEBUG: print('LAST MEAL NOTICE: ', phrase) else: say_phrase(phrase) if (nextmeal != FIRST_MEAL_NEXT_DAY): phrase = "Your next meal is, " + nextmeal + ', ' if (hours == 0): if (minutes > 5): phrase = phrase + 'starting in, ' + str(int(minutes)) + ' minutes' else: phrase = phrase + 'starting NOW' elif (hours == 1): if (minutes > 5): phrase = phrase + 'starting in 1 hour, and, ' + str(int(minutes)) + ' minutes' else: phrase = phrase + 'starting in about 1 hour' else: if (minutes > 15): phrase = phrase + 'starting in, ' + str(int(hours)) + ' hours, and, ' + str(int(minutes)) + ' minutes' else: phrase = phrase + "starting in about, " + str(int(hours)) + " hours" else: phrase = "Your next meal is , " + nextmeal if DEBUG: print('MEAL NOTICE: ', phrase) else: say_phrase(phrase) def playAudioFromFolder(folderName, description): global home_dir global SOX_PROCESS, SOX_LOG_FILE, SOX_LOG_FILE_HANDLE, SOX_ID mp3_folder = os.path.join(home_dir, folderName) print('Audio Folder: ' + mp3_folder) if not (os.path.isdir(mp3_folder)): if DEBUG: print('No Audio Folder found: ' + mp3_folder) say_phrase('Audio Folder for " + description + " Could Not Be Found') exit(0) print('Playing :' + description) say_phrase('Playing ' + description + ', press button again to stop , ') SOX_LOG_FILE = os.path.join(mp3_folder, 'sox-play.log') finished_file = False last_file_found = False last_played_file = '' start_time = '00:00:00' # use log file to get name of last played audio file and where we left off playing if (os.path.isfile(SOX_LOG_FILE)): print('Opening Log File: ' + str(SOX_LOG_FILE)) print('Looking for extension: ' + AUDIO_FILE_EXT) with open(SOX_LOG_FILE, "r") as file: for line in file: if not (last_file_found): if (AUDIO_FILE_EXT in line): last_file_found = True last_played_file = line.strip()[:-1] # remove ending character ":" print("LAST PLAYED FILE: " + last_played_file) elif 'In:' in line: # #In:0.68% 00:00:06.66 [00:16:16.14] Out:106k [ =====|===== ] Hd:4.4 Clip:0 last_line = line elif 'Aborted' in line: # Play was interrupted when file played last match = re.search(REGEX_TIME, last_line) if match: start_time = match.group(0) if match else None print("USING OLD START TIME: " + start_time) print('LAST PLAYED ' + last_played_file + ' AT: ' + start_time) finished_file = False break elif 'Done' in line: # File fully finished playing finished_file = True break file.close else: print('No Log File Found') finished_file = True audio_file = '' first_file = '' # if last file found then make sure it exists, otherwise select first file in list list_of_files = sorted( filter( lambda x: re.search('\.' + AUDIO_FILE_EXT + '$', x, re.IGNORECASE) , os.listdir(mp3_folder) ) ) for file_name in list_of_files: if not (last_file_found): # play first file in list print('Playing first file - no previous file was played') audio_file = os.path.join(mp3_folder, file_name) print("Playing first file found: " + audio_file) break elif (first_file == ''): # remember this file just in case last one played doesn't exist first_file = os.path.join(folderName, file_name) if (finished_file) and (last_played_file == ''): # this only happens if last file finished conditions met below audio_file = os.path.join(mp3_folder, file_name) say_phrase('Starting with a New File') print('Starting With New File: ' + audio_file) break elif (file_name in last_played_file ): if (finished_file): last_played_file = '' # continue loop one more time if possible else: audio_file = os.path.join(mp3_folder, file_name) break if (audio_file == '') and (first_file != ''): # start from the top print('STARTING FROM THE TOP') audio_file = first_file start_time = '00:00:00' say_phrase('Starting From Top') if (audio_file != ''): print('Opening: ' + SOX_LOG_FILE + ' at ' + start_time) with open(SOX_LOG_FILE, 'w') as SOX_LOG_FILE_HANDLE: # use array of command arguments for shell = False # Add current work directory # use -d flag for sox command to replace linked play command # use -t alsa to quiet the encoding message play_cmd_string = SOX_CMD + audio_file + ' -V1' + ' trim ' + start_time print(play_cmd_string) play_command = shlex.split(play_cmd_string) print("Opening SOX Process") SOX_PROCESS = subprocess.Popen(play_command, stdout=SOX_LOG_FILE_HANDLE, stderr=STDOUT, shell=False) SOX_ID = SOX_PROCESS.pid time.sleep(2.0) print("Opening SOX Process: " + str(SOX_ID)) else: say_phrase('No Audio Files to play') print('No Audio Files to play (.' + AUDIO_FILE_EXT + ')' ) def main(): global DEBUG, home_dir NOSTART = False DEBUG = False DEFAULT_ARGS = [] #["TODAY", "MEALS"] ALLOW_ARGS = ["TODAY", "TOMORROW", "FUTURE", "MEALS", "ALLEVENTS", "AUDIO1", "AUDIO2", "DEBUG", "NOSTART"] numArgs = len(sys.argv) if (numArgs == 1): a = DEFAULT_ARGS else: print(f'Arguments of the script: {sys.argv[1:]=}') a = [x.upper() for x in sys.argv[1:]] found_bad_arg=False for arg in a: # search against bigger list if not (arg.upper() in ALLOW_ARGS): found_bad_arg=True print('NOT ALLOWED: ', arg) if (found_bad_arg): for i, arg in enumerate(ALLOW_ARGS): print(f"Arguments Allowed: {arg}") exit(0) if ("DEBUG" in a): DEBUG = True if ("NOSTART" in a): NOSTART = True if not connected(): print ('No Internet Connection to Google. Please try again in a few minutes') exit(0) else: print('Connected to internet') setup_tts(maleOrFemale:=VOICE_GENDER) print('Setup Complete') setup_calendar() # Authorize calendar events extract and set times get_start_end_dates() if ("ALLEVENTS" in a): all_calendar_events() else: if ("AUDIO1" in a): playAudioFromFolder(AUDIO_FOLDER_1, AUDIO_1_DESCRIPTION) else: if ("AUDIO2" in a): playAudioFromFolder(AUDIO_FOLDER_2, AUDIO_2_DESCRIPTION) else: if ("MEALS" in a): next_meal_time() if ("TODAY" in a): announce_events(local_date_time, today_end, 'TODAY', addDetail:=False) if ("TOMORROW" in a): announce_events(tomorrow_min, tomorrow_max, 'TOMORROW', addDetail:=False) if ("FUTURE" in a): announce_events(futureStart, futureEnd, 'COMING UP', addDetail:=True) # Setup Button Press Checks setup_GPIO() print('Waiting for Ctrl-C') # Implement handler if Ctrl-C stops play if (NOSTART == 0): signal.signal(signal.SIGINT, signal_handler) signal.pause() exit(0) if __name__ == '__main__': print('Initializing ...') # Get path of running script home_dir = os.path.realpath(os.path.dirname(__file__)) # Initialize application credential_dir = os.path.join(home_dir, CREDENTIALS_SUBFOLDER) if not os.path.exists(credential_dir): print('MISSING FOLDER AND CREDENTIAL KEY FILES: ', credential_dir) exit(0) tts_credentials = os.path.join(credential_dir, TTS_KEY_FILE) if not os.path.isfile(tts_credentials): print('MISSING CREDENTIAL FILE: ', tts_credentials) exit(0) # credentials for tts os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = tts_credentials main()