01234567890123456789012345678901234567890123456789012345678901234567890123456789
178179180181182183184185186187188189190191192193194195196197 198199200201202203204205206207208209210211212213214215216217218 274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 | <----SKIPPED LINES----> self.write_format = write_format self.read_timeout = read_timeout self.error_pin = error_pin self.to_parent_q = to_parent_q self.last_read = 0 self.last_receipt = 0 self.reset_flag = True self.link = None self.name = name self.__simulated_reads__ = None if self.connection_type == CONNECTION_FLAG_BLUETOOTH: self.open_function = OpenBluetooth elif self.connection_type == CONNECTION_FLAG_USB: self.open_function = OpenUSB elif self.connection_type == CONNECTION_FLAG_SIMULATED: self.open_function = None if self.error_pin: # Error turned on when main initiated; turned off when connected self.to_parent_q.put(('pin', (self.error_pin, True))) self.start_time = time.time() def __str__(self): return self.name def Open(self): """Opens an instantiated serial connection for reading and writing.""" if self.connection_type == CONNECTION_FLAG_SIMULATED: lines = [] if os.path.exists(self.connection_tuple[0]): with open(self.connection_tuple[0], 'r') as f: for line in f: if line.strip(): lines.append(eval(line)) # pylint: disable=W0123 else: Log('File %s does not exist for simulated commands to Arudino' % self.connection_tuple[0], self.link) self.__simulated_reads__ = lines <----SKIPPED LINES----> Args: format_string: String of the form expected by struct.pack bytes_read: if passed, the bytes that are read are appended to the list. Returns: Tuple of values matching that as identified in format_string. """ if self.connection_type == CONNECTION_FLAG_SIMULATED: if ( self.__simulated_reads__ and time.time() - self.start_time > self.__simulated_reads__[0][0]): # time for next next_line = self.__simulated_reads__.pop(0) return next_line[1] return () if not format_string: format_string = self.read_format try: data = Read(self.link, format_string, bytes_read=bytes_read) except OSError as e: failure_message = 'Failed to read: %s' % e if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) self.last_read = time.time() if data: self.last_receipt = time.time() if LOG_SERIALS: ts = time.time() - self.start_time str_data = str(['%7.2f' % d if isinstance(d, float) else str(d) for d in data]) with open(SERIALS_LOG, 'a') as f: f.write('%10.3f RECD@%s: %s\n' % (ts, self.name, str_data)) if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout: failure_message = 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % ( self.last_read - self.last_receipt, self.read_timeout, self.name) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) <----SKIPPED LINES----> |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 | <----SKIPPED LINES----> self.write_format = write_format self.read_timeout = read_timeout self.error_pin = error_pin self.to_parent_q = to_parent_q self.last_read = 0 self.last_receipt = 0 self.reset_flag = True self.link = None self.name = name self.__simulated_reads__ = None if self.connection_type == CONNECTION_FLAG_BLUETOOTH: self.open_function = OpenBluetooth elif self.connection_type == CONNECTION_FLAG_USB: self.open_function = OpenUSB elif self.connection_type == CONNECTION_FLAG_SIMULATED: self.open_function = None if self.error_pin: # Error turned on when main initiated; turned off when connected error_message = 'Process %s (%s) initialized into error state' % (os.getpid(), str(self.name)) Log(error_message) self.to_parent_q.put(('pin', (self.error_pin, True, error_message))) self.start_time = time.time() def __str__(self): return self.name def Open(self): """Opens an instantiated serial connection for reading and writing.""" if self.connection_type == CONNECTION_FLAG_SIMULATED: lines = [] if os.path.exists(self.connection_tuple[0]): with open(self.connection_tuple[0], 'r') as f: for line in f: if line.strip(): lines.append(eval(line)) # pylint: disable=W0123 else: Log('File %s does not exist for simulated commands to Arudino' % self.connection_tuple[0], self.link) self.__simulated_reads__ = lines <----SKIPPED LINES----> Args: format_string: String of the form expected by struct.pack bytes_read: if passed, the bytes that are read are appended to the list. Returns: Tuple of values matching that as identified in format_string. """ if self.connection_type == CONNECTION_FLAG_SIMULATED: if ( self.__simulated_reads__ and time.time() - self.start_time > self.__simulated_reads__[0][0]): # time for next next_line = self.__simulated_reads__.pop(0) return next_line[1] return () if not format_string: format_string = self.read_format try: data = Read(self.link, format_string, bytes_read=bytes_read) except OSError as e: failure_message = 'Failed to read from %s: %s' % (self.name, e) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) self.last_read = time.time() if data: self.last_receipt = time.time() if LOG_SERIALS: ts = time.time() - self.start_time str_data = str(['%7.2f' % d if isinstance(d, float) else str(d) for d in data]) with open(SERIALS_LOG, 'a') as f: f.write('%10.3f RECD@%s: %s\n' % (ts, self.name, str_data)) if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout: failure_message = 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % ( self.last_read - self.last_receipt, self.read_timeout, self.name) if self.error_pin: self.to_parent_q.put(('pin', (self.error_pin, True, failure_message))) self.Reopen(log_message=failure_message) return self.Read(format_string=format_string) <----SKIPPED LINES----> |