01234567890123456789012345678901234567890123456789012345678901234567890123456789
309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 791792793794795796797798799800801802803804805806807808809810 811812813814815816817818819820821822823824825826827828829830 |
<----SKIPPED LINES---->
format_string = self.read_format
try:
data = Read(self.link, format_string, bytes_read=bytes_read)
# on OSError, record a disconnection and attempt to reconnect & re-read
except OSError as e:
failure_message = 'Failed to read from %s: %s' % (self.name, e)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
self.last_read = time.time()
if data:
self.last_receipt = time.time()
if LOG_SERIALS:
now = time.time()
elapsed = now - self.start_time
str_data = str(
['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
now_time = messageboard.EpochDisplayTime(now, '%H:%M:%S.%f')
message = '%s %10.3f RECD@%s: %s\n' % (
now_time, elapsed, self.name, str_data)
with open(SERIALS_LOG, 'a') as f:
f.write(message)
messageboard.Tail(
SERIALS_LOG,
ROLLING_SERIALS_LOG,
max_line_length=130,
lines_to_keep=messageboard.ROLLING_LOG_SIZE)
# If we haven't received data (despite reading) in more than the timeout,
# close and reopen.
if (
self.read_timeout and
self.last_read - self.last_receipt > self.read_timeout):
failure_message = (
'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
self.last_read - self.last_receipt, self.read_timeout, self.name))
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
<----SKIPPED LINES---->
now = time.time()
elapsed = now - self.start_time
str_values = str(
['%7.2f' % v if isinstance(v, float) else str(v) for v in values])
if self.connection_type == CONNECTION_FLAG_SIMULATED:
with open(self.connection_tuple[1], 'a') as f:
f.write('%10.3f: %s\n' % (elapsed, str_values))
return
if not format_string:
format_string = self.write_format
try:
Write(self.link, values, format_string)
except OSError as e:
failure_message = 'Failed to write: %s' % e
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
self.Write(values)
if LOG_SERIALS:
now_time = messageboard.EpochDisplayTime(now, '%H:%M:%S.%f')
message = '%s %10.3f SENT@%s: %s\n' % (
now_time, elapsed, self.name, str_values)
with open(SERIALS_LOG, 'a') as f:
f.write(message)
messageboard.Tail(
SERIALS_LOG,
ROLLING_SERIALS_LOG,
max_line_length=130,
lines_to_keep=messageboard.ROLLING_LOG_SIZE)
def HasReset(self):
"""Indicates exactly once whether serial has reset since last called."""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
raise NotImplementedError('Not implemented for simulations')
flag = self.reset_flag
self.reset_flag = False
return flag
def RunCommand(cmd, sleep_seconds=1, log=True):
"""Runs shell command, checking if it completed within timeout."""
<----SKIPPED LINES---->
d['laser_green'] = laser[1]
d['laser_blue'] = laser[2]
d['led_red'] = led[0]
d['led_green'] = led[1]
d['led_blue'] = led[2]
d['arduino_reset'] = reset
return d
def HexColorToRGBTuple(hex_color):
"""Converts i.e.: #329a43 to (50, 154, 67)."""
r = hex_color[1:3]
g = hex_color[3:5]
b = hex_color[5:7]
return (int(r, 16), int(g, 16), int(b, 16))
def Gamma(c, gamma):
"""Converts a desired brightness 0..255 to a gamma-corrected PWM 0..255."""
return round(((c / MAX_PWM) ** gamma) * MAX_PWM)
def GammaRGB(rgb, gamma):
"""Gamma converts an RGB tuple."""
return Gamma(rgb[0], gamma), Gamma(rgb[1], gamma), Gamma(rgb[2], gamma)
def GetGamma(configuration):
"""Returns gamma stored in configuration file."""
return configuration['servo_rgb_gamma'] / 10 # gamma saved as whole number
def ServoMain(to_arduino_q, to_parent_q, shutdown):
"""Main servo controller for projecting the plane position on a hemisphere.
Takes the latest flight from the to_arduino_q and converts that to the current
azimuth and altitude of the plane on a hemisphere.
"""
sys.stderr = open(messageboard.STDERR_FILE, 'a')
<----SKIPPED LINES---->
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832 |
<----SKIPPED LINES---->
format_string = self.read_format
try:
data = Read(self.link, format_string, bytes_read=bytes_read)
# on OSError, record a disconnection and attempt to reconnect & re-read
except OSError as e:
failure_message = 'Failed to read from %s: %s' % (self.name, e)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
self.last_read = time.time()
if data:
self.last_receipt = time.time()
if LOG_SERIALS:
now = time.time()
elapsed = now - self.start_time
str_data = str(
['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
now_str = messageboard.EpochDisplayTime(now, '%Y-%m-%d %H:%M:%S.%f')
message = '%s %10.3f RECD@%s: %s\n' % (
now_str, elapsed, self.name, str_data)
with open(SERIALS_LOG, 'a') as f:
f.write(message)
messageboard.Tail(
SERIALS_LOG,
ROLLING_SERIALS_LOG,
max_line_length=130,
lines_to_keep=messageboard.ROLLING_LOG_SIZE)
# If we haven't received data (despite reading) in more than the timeout,
# close and reopen.
if (
self.read_timeout and
self.last_read - self.last_receipt > self.read_timeout):
failure_message = (
'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
self.last_read - self.last_receipt, self.read_timeout, self.name))
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
<----SKIPPED LINES---->
now = time.time()
elapsed = now - self.start_time
str_values = str(
['%7.2f' % v if isinstance(v, float) else str(v) for v in values])
if self.connection_type == CONNECTION_FLAG_SIMULATED:
with open(self.connection_tuple[1], 'a') as f:
f.write('%10.3f: %s\n' % (elapsed, str_values))
return
if not format_string:
format_string = self.write_format
try:
Write(self.link, values, format_string)
except OSError as e:
failure_message = 'Failed to write: %s' % e
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
self.Write(values)
if LOG_SERIALS:
now_str = messageboard.EpochDisplayTime(now, '%Y-%m-%d %H:%M:%S.%f')
message = '%s %10.3f SENT@%s: %s\n' % (
now_str, elapsed, self.name, str_values)
with open(SERIALS_LOG, 'a') as f:
f.write(message)
messageboard.Tail(
SERIALS_LOG,
ROLLING_SERIALS_LOG,
max_line_length=130,
lines_to_keep=messageboard.ROLLING_LOG_SIZE)
def HasReset(self):
"""Indicates exactly once whether serial has reset since last called."""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
raise NotImplementedError('Not implemented for simulations')
flag = self.reset_flag
self.reset_flag = False
return flag
def RunCommand(cmd, sleep_seconds=1, log=True):
"""Runs shell command, checking if it completed within timeout."""
<----SKIPPED LINES---->
d['laser_green'] = laser[1]
d['laser_blue'] = laser[2]
d['led_red'] = led[0]
d['led_green'] = led[1]
d['led_blue'] = led[2]
d['arduino_reset'] = reset
return d
def HexColorToRGBTuple(hex_color):
"""Converts i.e.: #329a43 to (50, 154, 67)."""
r = hex_color[1:3]
g = hex_color[3:5]
b = hex_color[5:7]
return (int(r, 16), int(g, 16), int(b, 16))
def Gamma(c, gamma):
"""Converts a desired brightness 0..255 to a gamma-corrected PWM 0..255."""
# Based on https://hackaday.com/2016/08/23/
# rgb-leds-how-to-master-gamma-and-hue-for-perfect-brightness/
return round(((c / MAX_PWM) ** gamma) * MAX_PWM)
def GammaRGB(rgb, gamma):
"""Gamma converts an RGB tuple."""
return Gamma(rgb[0], gamma), Gamma(rgb[1], gamma), Gamma(rgb[2], gamma)
def GetGamma(configuration):
"""Returns gamma stored in configuration file."""
return configuration['servo_rgb_gamma'] / 10 # gamma saved as whole number
def ServoMain(to_arduino_q, to_parent_q, shutdown):
"""Main servo controller for projecting the plane position on a hemisphere.
Takes the latest flight from the to_arduino_q and converts that to the current
azimuth and altitude of the plane on a hemisphere.
"""
sys.stderr = open(messageboard.STDERR_FILE, 'a')
<----SKIPPED LINES---->
|