01234567890123456789012345678901234567890123456789012345678901234567890123456789
380381382383384385386387388389390391392393394395396397398399400 401402 403404405406407408409410411412413414 415416 417418419420421422423424425426427428429430431432433434435436 906907908909910911912913914915916917918919920921922923924925 926927928929930931932933934935936937938939940941942943944945 52555256525752585259526052615262526352645265526652675268526952705271527252735274 5275 52765277527852795280528152825283528452855286528752885289529052915292529352945295 52995300530153025303530453055306530753085309531053115312531353145315531653175318 53195320532153225323532453255326532753285329533053315332533353345335533653375338533953405341534253435344534553465347534853495350 |
<----SKIPPED LINES---->
# modified based on SIMULATION flag
if not file:
file = LOGFILE
# special case: for the main logfile, we always keep a rolling log
if not rolling and file == LOGFILE:
rolling = ROLLING_LOGFILE
try:
with open(file, 'a') as f:
# by excluding the timestamp, file diffs become easier between runs
if not SIMULATION or file == LOGFILE:
f.write('='*80+'\n')
f.write(str(datetime.datetime.now(TZ))+'\n')
f.write('\n')
f.write(str(message)+'\n')
except IOError:
Log('Unable to append to ' + file)
if rolling:
existing_log_lines = ReadFile(file).splitlines()
with open(rolling, 'w') as f:
f.write('\n'.join(existing_log_lines[-ROLLING_LOG_SIZE:]))
def UpdateRollingLogSize(configuration):
"""Set the global rolling_log_line_count based on settings file."""
if 'rolling_log_line_count' in configuration:
global ROLLING_LOG_SIZE
ROLLING_LOG_SIZE = configuration['rolling_log_line_count']
def LogTimes(times, threshold=0):
"""Logs elapsed time messages from a list tuples of epochs and identifiers."""
if threshold and times[-1][0] - times[0][0] < threshold:
return
msg = ''
for n, t in enumerate(times[:-1]):
msg += '%.2fs to get from reading %s to reading %s\n' % (
times[n + 1][0] - t[0], t[1], times[n + 1][1])
Log(msg)
def MaintainRollingWebLog(message, max_count, filename=None):
"""Maintains a rolling text file of at most max_count printed messages.
Newest data at top and oldest data at the end, of at most max_count messages,
where the delimiter between each message is identified by a special fixed string.
Args:
message: text message to prepend to the file.
max_count: maximum number of messages to keep in the file; the max_count+1st message
is deleted.
filename: the file to update.
"""
# can't define as a default parameter because ROLLING_MESSAGE_FILE name is potentially
# modified based on SIMULATION flag
<----SKIPPED LINES---->
"""Formats now on flight into a a 3-digit string like '12a' or ' 1p'."""
time_string = DisplayTime(flight)
if time_string:
hour_string = time_string[11:13]
hour_0_23 = int(hour_string)
is_pm = int(hour_0_23/12) == 1
hour_number = hour_0_23 % 12
if hour_number == 0:
hour_number = 12
out_string = str(hour_number).rjust(2)
if is_pm:
out_string += 'p'
else:
out_string += 'a'
else:
out_string = KEY_NOT_PRESENT_STRING
return out_string
def MinuteOfDay(ts=time.time()):
dt = datetime.datetime.fromtimestamp(ts, TZ)
minute_of_day = dt.hour * MINUTES_IN_HOUR + dt.minute
return minute_of_day
def HoursSinceMidnight(timezone=TIMEZONE):
"""Returns the float number of hours elapsed since midnight in the given timezone."""
tz = pytz.timezone(timezone)
now = datetime.datetime.now(tz)
seconds_since_midnight = (
now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds()
hours = seconds_since_midnight / SECONDS_IN_HOUR
return hours
def HoursSinceFlight(now, then):
"""Returns the number of hours between a timestamp and a flight.
Args:
now: timezone-aware datetime representation of timestamp
<----SKIPPED LINES---->
curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string))
failure_message = ''
try:
curl.perform()
except pycurl.error as e:
failure_message = 'curl.perform() failed with message %s' % e
Log('curl.perform() failed with message %s' % e)
error_code = True
else:
# you may want to check HTTP response code, e.g.
status_code = curl.getinfo(pycurl.RESPONSE_CODE)
if status_code != 200:
Log('Server returned HTTP status code %d for message %s' % (status_code, s))
error_code = True
curl.close()
UpdateStatusLight(GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message)
def TruncateEscapedLine(s):
s = s.upper()
character_mapping = {
'[': '(',
'<': '(',
']': ')',
'>': ')',
'|': '/',
'\\': '/'}
for c in character_mapping:
s = s.replace(c, character_mapping[c])
l = 0
valid_characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$()-+&=;:''"%,./?'
validated_s = ''
valid_escape_values = list(range(70))
open_escape_char = '{'
close_escape_char = '}'
pointer = 0
while pointer < len(s) and l < SPLITFLAP_CHARS_PER_LINE:
if s[pointer] in valid_characters:
<----SKIPPED LINES---->
elif s[pointer] == open_escape_char:
end = s.find(close_escape_char, pointer)
if end == -1: # open escape did not close
pointer = len(s)
else:
try:
escape_value = int(s[pointer+1:end])
except ValueError:
escape_value = None
if escape_value in valid_escape_values:
validated_s += s[pointer:end+1]
l += 1
pointer = end
else:
pointer += 1
return validated_s
def PersonalMessage(configuration, message_queue):
if 'clear_board' in configuration:
RemoveSetting(configuration, 'clear_board')
message_queue.append((FLAG_MSG_CLEAR, ''))
minute_of_day = MinuteOfDay()
if (
not message_queue and
'personal_message_enabled' in configuration and
configuration['personal_message'] and
minute_of_day <= configuration['personal_off_time'] and
minute_of_day > configuration['personal_on_time'] + 1):
message = configuration['personal_message']
lines = [TruncateEscapedLine(l) for l in message.split('\n')]
message_queue.append((FLAG_MSG_PERSONAL, lines))
def ManageMessageQueue(message_queue, next_message_time, configuration):
"""Check time & if appropriate, display next message from queue.
Args:
message_queue: FIFO list of message tuples of (message type, message string).
next_message_time: epoch at which next message should be displayed
configuration: dictionary of configuration attributes.
Returns:
Next_message_time, potentially updated if a message has been displayed, or unchanged
if no message was displayed.
"""
if message_queue and (time.time() >= next_message_time or SIMULATION):
if SIMULATION: # drain the queue because the messages come so fast
messages_to_display = list(message_queue)
# passed by reference, so clear it out since we drained it to the display
<----SKIPPED LINES---->
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472 942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982 529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345 53495350535153525353535453555356535753585359536053615362536353645365536653675368536953705371537253735374537553765377537853795380538153825383538453855386538753885389539053915392539353945395539653975398539954005401540254035404540554065407540854095410 |
<----SKIPPED LINES---->
# modified based on SIMULATION flag
if not file:
file = LOGFILE
# special case: for the main logfile, we always keep a rolling log
if not rolling and file == LOGFILE:
rolling = ROLLING_LOGFILE
try:
with open(file, 'a') as f:
# by excluding the timestamp, file diffs become easier between runs
if not SIMULATION or file == LOGFILE:
f.write('='*80+'\n')
f.write(str(datetime.datetime.now(TZ))+'\n')
f.write('\n')
f.write(str(message)+'\n')
except IOError:
Log('Unable to append to ' + file)
if rolling:
Tail(file, rolling, lines_to_keep=ROLLING_LOG_SIZE)
def Tail(in_name, rolling_name, max_line_length=100, lines_to_keep=1000):
"""Fast pythonic implementation of tail -n.
Args:
in_name: name of file for which we want the tail
rolling_name: name of file to write out
max_line_length: since this uses seek to find the block of text near the end
that has at most the lines_to_keep number of lines, we need to estimate the
max line length over that block of text. We can afford to be a little
conservative here.
lines_to_keep: how many lines to keep in the rolling file.
Returns:
Integer number of lines actually kept.
"""
with open(in_name, 'r') as f:
f.seek(0, os.SEEK_END)
f_length = f.tell()
bytes_to_read = min(max_line_length * lines_to_keep, f_length)
f.seek(f_length - bytes_to_read)
end_text = f.read()
lines = end_text.split('\n')
# perhaps the file was smaller than lines_to_keep lines, or many lines were
# longer than max_line_length; in that case, the resulting text block will
# potentially be smaller than lines_to_keep
lines_to_keep = min(lines_to_keep, len(lines))
with open(rolling_name, 'w') as f:
f.write('\n'.join(lines[-lines_to_keep:]))
return lines_to_keep
def UpdateRollingLogSize(configuration):
"""Set the global rolling_log_line_count based on settings file."""
if 'rolling_log_line_count' in configuration:
global ROLLING_LOG_SIZE
ROLLING_LOG_SIZE = configuration['rolling_log_line_count']
def LogTimes(times, threshold=0, title=''):
"""Logs elapsed time messages from a list tuples of epochs and identifiers."""
total_time = times[-1][0] - times[0][0]
if threshold and total_time < threshold:
return
msg = 'Code timing\n'
if title:
msg = '%s\n' % title
msg = 'Total time: %.2fs\n' % total_time
for n, t in enumerate(times[:-1]):
msg += '%.2fs to get from reading %s to reading %s\n' % (
times[n + 1][0] - t[0], t[1], times[n + 1][1])
Log(msg)
def MaintainRollingWebLog(message, max_count, filename=None):
"""Maintains a rolling text file of at most max_count printed messages.
Newest data at top and oldest data at the end, of at most max_count messages,
where the delimiter between each message is identified by a special fixed string.
Args:
message: text message to prepend to the file.
max_count: maximum number of messages to keep in the file; the max_count+1st message
is deleted.
filename: the file to update.
"""
# can't define as a default parameter because ROLLING_MESSAGE_FILE name is potentially
# modified based on SIMULATION flag
<----SKIPPED LINES---->
"""Formats now on flight into a a 3-digit string like '12a' or ' 1p'."""
time_string = DisplayTime(flight)
if time_string:
hour_string = time_string[11:13]
hour_0_23 = int(hour_string)
is_pm = int(hour_0_23/12) == 1
hour_number = hour_0_23 % 12
if hour_number == 0:
hour_number = 12
out_string = str(hour_number).rjust(2)
if is_pm:
out_string += 'p'
else:
out_string += 'a'
else:
out_string = KEY_NOT_PRESENT_STRING
return out_string
def MinuteOfDay(ts=time.time()):
"""Returns integer minute of day (0..1439) for the given timestamp or for now."""
dt = datetime.datetime.fromtimestamp(ts, TZ)
minute_of_day = dt.hour * MINUTES_IN_HOUR + dt.minute
return minute_of_day
def HoursSinceMidnight(timezone=TIMEZONE):
"""Returns the float number of hours elapsed since midnight in the given timezone."""
tz = pytz.timezone(timezone)
now = datetime.datetime.now(tz)
seconds_since_midnight = (
now - now.replace(hour=0, minute=0, second=0, microsecond=0)).total_seconds()
hours = seconds_since_midnight / SECONDS_IN_HOUR
return hours
def HoursSinceFlight(now, then):
"""Returns the number of hours between a timestamp and a flight.
Args:
now: timezone-aware datetime representation of timestamp
<----SKIPPED LINES---->
curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string))
failure_message = ''
try:
curl.perform()
except pycurl.error as e:
failure_message = 'curl.perform() failed with message %s' % e
Log('curl.perform() failed with message %s' % e)
error_code = True
else:
# you may want to check HTTP response code, e.g.
status_code = curl.getinfo(pycurl.RESPONSE_CODE)
if status_code != 200:
Log('Server returned HTTP status code %d for message %s' % (status_code, s))
error_code = True
curl.close()
UpdateStatusLight(GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message)
def TruncateEscapedLine(s):
"""Formats a single line of the personal message for the Vestaboard.
The Vestaboard has line length limitations, a limited character set, and escape
characters. This function:
- replaces some unsupported characters with very similar supported characters
- truncates the line after the max line length, allowing for escape characters
- truncates the line after an unsupported character that does not have a replacement
Args:
s: input string
Returns:
Reformatted potentially-truncated line.
"""
s = s.upper()
character_mapping = {
'[': '(',
'<': '(',
']': ')',
'>': ')',
'|': '/',
'\\': '/'}
for c in character_mapping:
s = s.replace(c, character_mapping[c])
l = 0
valid_characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$()-+&=;:''"%,./?'
validated_s = ''
valid_escape_values = list(range(70))
open_escape_char = '{'
close_escape_char = '}'
pointer = 0
while pointer < len(s) and l < SPLITFLAP_CHARS_PER_LINE:
if s[pointer] in valid_characters:
<----SKIPPED LINES---->
elif s[pointer] == open_escape_char:
end = s.find(close_escape_char, pointer)
if end == -1: # open escape did not close
pointer = len(s)
else:
try:
escape_value = int(s[pointer+1:end])
except ValueError:
escape_value = None
if escape_value in valid_escape_values:
validated_s += s[pointer:end+1]
l += 1
pointer = end
else:
pointer += 1
return validated_s
def PersonalMessage(configuration, message_queue):
"""Formats and displays the personal message.
A user-defined message can be displayed to the board whenever there isn't a
flight message during user-specified hours of the day. This clears the board, if
requested, and then adds that message to the queue.
Args:
configuration: the settings dictionary.
message_queue: the existing queue, to which the personal message - if any - is added.
"""
if 'clear_board' in configuration:
RemoveSetting(configuration, 'clear_board')
message_queue.append((FLAG_MSG_CLEAR, ''))
minute_of_day = MinuteOfDay()
if (
not message_queue and
'personal_message_enabled' in configuration and
configuration['personal_message'] and
minute_of_day <= configuration['personal_off_time'] and
minute_of_day > configuration['personal_on_time'] + 1):
message = configuration['personal_message']
lines = [TruncateEscapedLine(l) for l in message.split('\n')[:SPLITFLAP_LINE_COUNT]]
message_queue.append((FLAG_MSG_PERSONAL, lines))
def ManageMessageQueue(message_queue, next_message_time, configuration):
"""Check time & if appropriate, display next message from queue.
Args:
message_queue: FIFO list of message tuples of (message type, message string).
next_message_time: epoch at which next message should be displayed
configuration: dictionary of configuration attributes.
Returns:
Next_message_time, potentially updated if a message has been displayed, or unchanged
if no message was displayed.
"""
if message_queue and (time.time() >= next_message_time or SIMULATION):
if SIMULATION: # drain the queue because the messages come so fast
messages_to_display = list(message_queue)
# passed by reference, so clear it out since we drained it to the display
<----SKIPPED LINES---->
|