01234567890123456789012345678901234567890123456789012345678901234567890123456789
380381382383384385386387388389390391392393394395396397398399400 401402 403404405406407408409410411412413414 415416 417418419420421422423424425426427428429430431432433434435436 906907908909910911912913914915916917918919920921922923924925 926927928929930931932933934935936937938939940941942943944945 52555256525752585259526052615262526352645265526652675268526952705271527252735274 5275 52765277527852795280528152825283528452855286528752885289529052915292529352945295 52995300530153025303530453055306530753085309531053115312531353145315531653175318 53195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350 | <----SKIPPED LINES----> # modified based on SIMULATION flag if not file: file = LOGFILE # special case: for the main logfile, we always keep a rolling log if not rolling and file == LOGFILE: rolling = ROLLING_LOGFILE try: with open(file, 'a') as f: # by excluding the timestamp, file diffs become easier between runs if not SIMULATION or file == LOGFILE: f.write('='*80+'\n') f.write(str(datetime.datetime.now(TZ))+'\n') f.write('\n') f.write(str(message)+'\n') except IOError: Log('Unable to append to ' + file) if rolling: existing_log_lines = ReadFile(file).splitlines() with open(rolling, 'w') as f: f.write('\n'.join(existing_log_lines[-ROLLING_LOG_SIZE:])) def UpdateRollingLogSize(configuration): """Set the global rolling_log_line_count based on settings file.""" if 'rolling_log_line_count' in configuration: global ROLLING_LOG_SIZE ROLLING_LOG_SIZE = configuration['rolling_log_line_count'] def LogTimes(times, threshold=0): """Logs elapsed time messages from a list tuples of epochs and identifiers.""" if threshold and times[-1][0] - times[0][0] < threshold: return msg = '' for n, t in enumerate(times[:-1]): msg += '%.2fs to get from reading %s to reading %s\n' % ( times[n + 1][0] - t[0], t[1], times[n + 1][1]) Log(msg) def MaintainRollingWebLog(message, max_count, filename=None): """Maintains a rolling text file of at most max_count printed messages. Newest data at top and oldest data at the end, of at most max_count messages, where the delimiter between each message is identified by a special fixed string. Args: message: text message to prepend to the file. max_count: maximum number of messages to keep in the file; the max_count+1st message is deleted. filename: the file to update. """ # can't define as a default parameter because ROLLING_MESSAGE_FILE name is potentially # modified based on SIMULATION flag <----SKIPPED LINES----> """Formats now on flight into a a 3-digit string like '12a' or ' 1p'.""" time_string = DisplayTime(flight) if time_string: hour_string = time_string[11:13] hour_0_23 = int(hour_string) is_pm = int(hour_0_23/12) == 1 hour_number = hour_0_23 % 12 if hour_number == 0: hour_number = 12 out_string = str(hour_number).rjust(2) if is_pm: out_string += 'p' else: out_string += 'a' else: out_string = KEY_NOT_PRESENT_STRING return out_string def MinuteOfDay(ts=time.time()): dt = datetime.datetime.fromtimestamp(ts, TZ) minute_of_day = dt.hour * MINUTES_IN_HOUR + dt.minute return minute_of_day def HoursSinceMidnight(timezone=TIMEZONE): """Returns the float number of hours elapsed since midnight in the given timezone.""" tz = pytz.timezone(timezone) now = datetime.datetime.now(tz) seconds_since_midnight = ( now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() hours = seconds_since_midnight / SECONDS_IN_HOUR return hours def HoursSinceFlight(now, then): """Returns the number of hours between a timestamp and a flight. Args: now: timezone-aware datetime representation of timestamp <----SKIPPED LINES----> curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string)) failure_message = '' try: curl.perform() except pycurl.error as e: failure_message = 'curl.perform() failed with message %s' % e Log('curl.perform() failed with message %s' % e) error_code = True else: # you may want to check HTTP response code, e.g. status_code = curl.getinfo(pycurl.RESPONSE_CODE) if status_code != 200: Log('Server returned HTTP status code %d for message %s' % (status_code, s)) error_code = True curl.close() UpdateStatusLight(GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message) def TruncateEscapedLine(s): s = s.upper() character_mapping = { '[': '(', '<': '(', ']': ')', '>': ')', '|': '/', '\\': '/'} for c in character_mapping: s = s.replace(c, character_mapping[c]) l = 0 valid_characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$()-+&=;:''"%,./?' validated_s = '' valid_escape_values = list(range(70)) open_escape_char = '{' close_escape_char = '}' pointer = 0 while pointer < len(s) and l < SPLITFLAP_CHARS_PER_LINE: if s[pointer] in valid_characters: <----SKIPPED LINES----> elif s[pointer] == open_escape_char: end = s.find(close_escape_char, pointer) if end == -1: # open escape did not close pointer = len(s) else: try: escape_value = int(s[pointer+1:end]) except ValueError: escape_value = None if escape_value in valid_escape_values: validated_s += s[pointer:end+1] l += 1 pointer = end else: pointer += 1 return validated_s def PersonalMessage(configuration, message_queue): if 'clear_board' in configuration: RemoveSetting(configuration, 'clear_board') message_queue.append((FLAG_MSG_CLEAR, '')) minute_of_day = MinuteOfDay() if ( not message_queue and 'personal_message_enabled' in configuration and configuration['personal_message'] and minute_of_day <= configuration['personal_off_time'] and minute_of_day > configuration['personal_on_time'] + 1): message = configuration['personal_message'] lines = [TruncateEscapedLine(l) for l in message.split('\n')] message_queue.append((FLAG_MSG_PERSONAL, lines)) def ManageMessageQueue(message_queue, next_message_time, configuration): """Check time & if appropriate, display next message from queue. Args: message_queue: FIFO list of message tuples of (message type, message string). next_message_time: epoch at which next message should be displayed configuration: dictionary of configuration attributes. Returns: Next_message_time, potentially updated if a message has been displayed, or unchanged if no message was displayed. """ if message_queue and (time.time() >= next_message_time or SIMULATION): if SIMULATION: # drain the queue because the messages come so fast messages_to_display = list(message_queue) # passed by reference, so clear it out since we drained it to the display <----SKIPPED LINES----> |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982 529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345 53495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410 | <----SKIPPED LINES----> # modified based on SIMULATION flag if not file: file = LOGFILE # special case: for the main logfile, we always keep a rolling log if not rolling and file == LOGFILE: rolling = ROLLING_LOGFILE try: with open(file, 'a') as f: # by excluding the timestamp, file diffs become easier between runs if not SIMULATION or file == LOGFILE: f.write('='*80+'\n') f.write(str(datetime.datetime.now(TZ))+'\n') f.write('\n') f.write(str(message)+'\n') except IOError: Log('Unable to append to ' + file) if rolling: Tail(file, rolling, lines_to_keep=ROLLING_LOG_SIZE) def Tail(in_name, rolling_name, max_line_length=100, lines_to_keep=1000): """Fast pythonic implementation of tail -n. Args: in_name: name of file for which we want the tail rolling_name: name of file to write out max_line_length: since this uses seek to find the block of text near the end that has at most the lines_to_keep number of lines, we need to estimate the max line length over that block of text. We can afford to be a little conservative here. lines_to_keep: how many lines to keep in the rolling file. Returns: Integer number of lines actually kept. """ with open(in_name, 'r') as f: f.seek(0, os.SEEK_END) f_length = f.tell() bytes_to_read = min(max_line_length * lines_to_keep, f_length) f.seek(f_length - bytes_to_read) end_text = f.read() lines = end_text.split('\n') # perhaps the file was smaller than lines_to_keep lines, or many lines were # longer than max_line_length; in that case, the resulting text block will # potentially be smaller than lines_to_keep lines_to_keep = min(lines_to_keep, len(lines)) with open(rolling_name, 'w') as f: f.write('\n'.join(lines[-lines_to_keep:])) return lines_to_keep def UpdateRollingLogSize(configuration): """Set the global rolling_log_line_count based on settings file.""" if 'rolling_log_line_count' in configuration: global ROLLING_LOG_SIZE ROLLING_LOG_SIZE = configuration['rolling_log_line_count'] def LogTimes(times, threshold=0, title=''): """Logs elapsed time messages from a list tuples of epochs and identifiers.""" total_time = times[-1][0] - times[0][0] if threshold and total_time < threshold: return msg = 'Code timing\n' if title: msg = '%s\n' % title msg = 'Total time: %.2fs\n' % total_time for n, t in enumerate(times[:-1]): msg += '%.2fs to get from reading %s to reading %s\n' % ( times[n + 1][0] - t[0], t[1], times[n + 1][1]) Log(msg) def MaintainRollingWebLog(message, max_count, filename=None): """Maintains a rolling text file of at most max_count printed messages. Newest data at top and oldest data at the end, of at most max_count messages, where the delimiter between each message is identified by a special fixed string. Args: message: text message to prepend to the file. max_count: maximum number of messages to keep in the file; the max_count+1st message is deleted. filename: the file to update. """ # can't define as a default parameter because ROLLING_MESSAGE_FILE name is potentially # modified based on SIMULATION flag <----SKIPPED LINES----> """Formats now on flight into a a 3-digit string like '12a' or ' 1p'.""" time_string = DisplayTime(flight) if time_string: hour_string = time_string[11:13] hour_0_23 = int(hour_string) is_pm = int(hour_0_23/12) == 1 hour_number = hour_0_23 % 12 if hour_number == 0: hour_number = 12 out_string = str(hour_number).rjust(2) if is_pm: out_string += 'p' else: out_string += 'a' else: out_string = KEY_NOT_PRESENT_STRING return out_string def MinuteOfDay(ts=time.time()): """Returns integer minute of day (0..1439) for the given timestamp or for now.""" dt = datetime.datetime.fromtimestamp(ts, TZ) minute_of_day = dt.hour * MINUTES_IN_HOUR + dt.minute return minute_of_day def HoursSinceMidnight(timezone=TIMEZONE): """Returns the float number of hours elapsed since midnight in the given timezone.""" tz = pytz.timezone(timezone) now = datetime.datetime.now(tz) seconds_since_midnight = ( now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds() hours = seconds_since_midnight / SECONDS_IN_HOUR return hours def HoursSinceFlight(now, then): """Returns the number of hours between a timestamp and a flight. Args: now: timezone-aware datetime representation of timestamp <----SKIPPED LINES----> curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string)) failure_message = '' try: curl.perform() except pycurl.error as e: failure_message = 'curl.perform() failed with message %s' % e Log('curl.perform() failed with message %s' % e) error_code = True else: # you may want to check HTTP response code, e.g. status_code = curl.getinfo(pycurl.RESPONSE_CODE) if status_code != 200: Log('Server returned HTTP status code %d for message %s' % (status_code, s)) error_code = True curl.close() UpdateStatusLight(GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message) def TruncateEscapedLine(s): """Formats a single line of the personal message for the Vestaboard. The Vestaboard has line length limitations, a limited character set, and escape characters. This function: - replaces some unsupported characters with very similar supported characters - truncates the line after the max line length, allowing for escape characters - truncates the line after an unsupported character that does not have a replacement Args: s: input string Returns: Reformatted potentially-truncated line. """ s = s.upper() character_mapping = { '[': '(', '<': '(', ']': ')', '>': ')', '|': '/', '\\': '/'} for c in character_mapping: s = s.replace(c, character_mapping[c]) l = 0 valid_characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$()-+&=;:''"%,./?' validated_s = '' valid_escape_values = list(range(70)) open_escape_char = '{' close_escape_char = '}' pointer = 0 while pointer < len(s) and l < SPLITFLAP_CHARS_PER_LINE: if s[pointer] in valid_characters: <----SKIPPED LINES----> elif s[pointer] == open_escape_char: end = s.find(close_escape_char, pointer) if end == -1: # open escape did not close pointer = len(s) else: try: escape_value = int(s[pointer+1:end]) except ValueError: escape_value = None if escape_value in valid_escape_values: validated_s += s[pointer:end+1] l += 1 pointer = end else: pointer += 1 return validated_s def PersonalMessage(configuration, message_queue): """Formats and displays the personal message. A user-defined message can be displayed to the board whenever there isn't a flight message during user-specified hours of the day. This clears the board, if requested, and then adds that message to the queue. Args: configuration: the settings dictionary. message_queue: the existing queue, to which the personal message - if any - is added. """ if 'clear_board' in configuration: RemoveSetting(configuration, 'clear_board') message_queue.append((FLAG_MSG_CLEAR, '')) minute_of_day = MinuteOfDay() if ( not message_queue and 'personal_message_enabled' in configuration and configuration['personal_message'] and minute_of_day <= configuration['personal_off_time'] and minute_of_day > configuration['personal_on_time'] + 1): message = configuration['personal_message'] lines = [TruncateEscapedLine(l) for l in message.split('\n')[:SPLITFLAP_LINE_COUNT]] message_queue.append((FLAG_MSG_PERSONAL, lines)) def ManageMessageQueue(message_queue, next_message_time, configuration): """Check time & if appropriate, display next message from queue. Args: message_queue: FIFO list of message tuples of (message type, message string). next_message_time: epoch at which next message should be displayed configuration: dictionary of configuration attributes. Returns: Next_message_time, potentially updated if a message has been displayed, or unchanged if no message was displayed. """ if message_queue and (time.time() >= next_message_time or SIMULATION): if SIMULATION: # drain the queue because the messages come so fast messages_to_display = list(message_queue) # passed by reference, so clear it out since we drained it to the display <----SKIPPED LINES----> |