01234567890123456789012345678901234567890123456789012345678901234567890123456789
309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 791792793794795796797798799800801802803804805806807808809810 811812813814815816817818819820821822823824825826827828829830 | <----SKIPPED LINES----> format_string = self.read_format try: data = Read(self.link, format_string, bytes_read=bytes_read) # on OSError, record a disconnection and attempt to reconnect & re-read except OSError as e: failure_message = 'Failed to read from %s: %s' % (self.name, e) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) self.last_read = time.time() if data: self.last_receipt = time.time() if LOG_SERIALS: now = time.time() elapsed = now - self.start_time str_data = str( ['%7.2f' % d if isinstance(d, float) else str(d) for d in data]) now_time = messageboard.EpochDisplayTime(now, '%H:%M:%S.%f') message = '%s %10.3f RECD@%s: %s\n' % ( now_time, elapsed, self.name, str_data) with open(SERIALS_LOG, 'a') as f: f.write(message) messageboard.Tail( SERIALS_LOG, ROLLING_SERIALS_LOG, max_line_length=130, lines_to_keep=messageboard.ROLLING_LOG_SIZE) # If we haven't received data (despite reading) in more than the timeout, # close and reopen. if ( self.read_timeout and self.last_read - self.last_receipt > self.read_timeout): failure_message = ( 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % ( self.last_read - self.last_receipt, self.read_timeout, self.name)) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) <----SKIPPED LINES----> now = time.time() elapsed = now - self.start_time str_values = str( ['%7.2f' % v if isinstance(v, float) else str(v) for v in values]) if self.connection_type == CONNECTION_FLAG_SIMULATED: with open(self.connection_tuple[1], 'a') as f: f.write('%10.3f: %s\n' % (elapsed, str_values)) return if not format_string: format_string = self.write_format try: Write(self.link, values, format_string) except OSError as e: failure_message = 'Failed to write: %s' % e if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) self.Write(values) if LOG_SERIALS: now_time = messageboard.EpochDisplayTime(now, '%H:%M:%S.%f') message = '%s %10.3f SENT@%s: %s\n' % ( now_time, elapsed, self.name, str_values) with open(SERIALS_LOG, 'a') as f: f.write(message) messageboard.Tail( SERIALS_LOG, ROLLING_SERIALS_LOG, max_line_length=130, lines_to_keep=messageboard.ROLLING_LOG_SIZE) def HasReset(self): """Indicates exactly once whether serial has reset since last called.""" if self.connection_type == CONNECTION_FLAG_SIMULATED: raise NotImplementedError('Not implemented for simulations') flag = self.reset_flag self.reset_flag = False return flag def RunCommand(cmd, sleep_seconds=1, log=True): """Runs shell command, checking if it completed within timeout.""" <----SKIPPED LINES----> d['laser_green'] = laser[1] d['laser_blue'] = laser[2] d['led_red'] = led[0] d['led_green'] = led[1] d['led_blue'] = led[2] d['arduino_reset'] = reset return d def HexColorToRGBTuple(hex_color): """Converts i.e.: #329a43 to (50, 154, 67).""" r = hex_color[1:3] g = hex_color[3:5] b = hex_color[5:7] return (int(r, 16), int(g, 16), int(b, 16)) def Gamma(c, gamma): """Converts a desired brightness 0..255 to a gamma-corrected PWM 0..255.""" return round(((c / MAX_PWM) ** gamma) * MAX_PWM) def GammaRGB(rgb, gamma): """Gamma converts an RGB tuple.""" return Gamma(rgb[0], gamma), Gamma(rgb[1], gamma), Gamma(rgb[2], gamma) def GetGamma(configuration): """Returns gamma stored in configuration file.""" return configuration['servo_rgb_gamma'] / 10 # gamma saved as whole number def ServoMain(to_arduino_q, to_parent_q, shutdown): """Main servo controller for projecting the plane position on a hemisphere. Takes the latest flight from the to_arduino_q and converts that to the current azimuth and altitude of the plane on a hemisphere. """ sys.stderr = open(messageboard.STDERR_FILE, 'a') <----SKIPPED LINES----> |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418 791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832 | <----SKIPPED LINES----> format_string = self.read_format try: data = Read(self.link, format_string, bytes_read=bytes_read) # on OSError, record a disconnection and attempt to reconnect & re-read except OSError as e: failure_message = 'Failed to read from %s: %s' % (self.name, e) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) self.last_read = time.time() if data: self.last_receipt = time.time() if LOG_SERIALS: now = time.time() elapsed = now - self.start_time str_data = str( ['%7.2f' % d if isinstance(d, float) else str(d) for d in data]) now_str = messageboard.EpochDisplayTime(now, '%Y-%m-%d %H:%M:%S.%f') message = '%s %10.3f RECD@%s: %s\n' % ( now_str, elapsed, self.name, str_data) with open(SERIALS_LOG, 'a') as f: f.write(message) messageboard.Tail( SERIALS_LOG, ROLLING_SERIALS_LOG, max_line_length=130, lines_to_keep=messageboard.ROLLING_LOG_SIZE) # If we haven't received data (despite reading) in more than the timeout, # close and reopen. if ( self.read_timeout and self.last_read - self.last_receipt > self.read_timeout): failure_message = ( 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % ( self.last_read - self.last_receipt, self.read_timeout, self.name)) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) <----SKIPPED LINES----> now = time.time() elapsed = now - self.start_time str_values = str( ['%7.2f' % v if isinstance(v, float) else str(v) for v in values]) if self.connection_type == CONNECTION_FLAG_SIMULATED: with open(self.connection_tuple[1], 'a') as f: f.write('%10.3f: %s\n' % (elapsed, str_values)) return if not format_string: format_string = self.write_format try: Write(self.link, values, format_string) except OSError as e: failure_message = 'Failed to write: %s' % e if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) self.Write(values) if LOG_SERIALS: now_str = messageboard.EpochDisplayTime(now, '%Y-%m-%d %H:%M:%S.%f') message = '%s %10.3f SENT@%s: %s\n' % ( now_str, elapsed, self.name, str_values) with open(SERIALS_LOG, 'a') as f: f.write(message) messageboard.Tail( SERIALS_LOG, ROLLING_SERIALS_LOG, max_line_length=130, lines_to_keep=messageboard.ROLLING_LOG_SIZE) def HasReset(self): """Indicates exactly once whether serial has reset since last called.""" if self.connection_type == CONNECTION_FLAG_SIMULATED: raise NotImplementedError('Not implemented for simulations') flag = self.reset_flag self.reset_flag = False return flag def RunCommand(cmd, sleep_seconds=1, log=True): """Runs shell command, checking if it completed within timeout.""" <----SKIPPED LINES----> d['laser_green'] = laser[1] d['laser_blue'] = laser[2] d['led_red'] = led[0] d['led_green'] = led[1] d['led_blue'] = led[2] d['arduino_reset'] = reset return d def HexColorToRGBTuple(hex_color): """Converts i.e.: #329a43 to (50, 154, 67).""" r = hex_color[1:3] g = hex_color[3:5] b = hex_color[5:7] return (int(r, 16), int(g, 16), int(b, 16)) def Gamma(c, gamma): """Converts a desired brightness 0..255 to a gamma-corrected PWM 0..255.""" # Based on https://hackaday.com/2016/08/23/ # rgb-leds-how-to-master-gamma-and-hue-for-perfect-brightness/ return round(((c / MAX_PWM) ** gamma) * MAX_PWM) def GammaRGB(rgb, gamma): """Gamma converts an RGB tuple.""" return Gamma(rgb[0], gamma), Gamma(rgb[1], gamma), Gamma(rgb[2], gamma) def GetGamma(configuration): """Returns gamma stored in configuration file.""" return configuration['servo_rgb_gamma'] / 10 # gamma saved as whole number def ServoMain(to_arduino_q, to_parent_q, shutdown): """Main servo controller for projecting the plane position on a hemisphere. Takes the latest flight from the to_arduino_q and converts that to the current azimuth and altitude of the plane on a hemisphere. """ sys.stderr = open(messageboard.STDERR_FILE, 'a') <----SKIPPED LINES----> |