01234567890123456789012345678901234567890123456789012345678901234567890123456789
178179180181182183184185186187188189190191192193194195196197 198199200201202203204205206207208209210211212213214215216217218 274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
<----SKIPPED LINES---->
self.write_format = write_format
self.read_timeout = read_timeout
self.error_pin = error_pin
self.to_parent_q = to_parent_q
self.last_read = 0
self.last_receipt = 0
self.reset_flag = True
self.link = None
self.name = name
self.__simulated_reads__ = None
if self.connection_type == CONNECTION_FLAG_BLUETOOTH:
self.open_function = OpenBluetooth
elif self.connection_type == CONNECTION_FLAG_USB:
self.open_function = OpenUSB
elif self.connection_type == CONNECTION_FLAG_SIMULATED:
self.open_function = None
if self.error_pin: # Error turned on when main initiated; turned off when connected
self.to_parent_q.put(('pin', (self.error_pin, True)))
self.start_time = time.time()
def __str__(self):
return self.name
def Open(self):
"""Opens an instantiated serial connection for reading and writing."""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
lines = []
if os.path.exists(self.connection_tuple[0]):
with open(self.connection_tuple[0], 'r') as f:
for line in f:
if line.strip():
lines.append(eval(line)) # pylint: disable=W0123
else:
Log('File %s does not exist for simulated commands to Arudino'
% self.connection_tuple[0], self.link)
self.__simulated_reads__ = lines
<----SKIPPED LINES---->
Args:
format_string: String of the form expected by struct.pack
bytes_read: if passed, the bytes that are read are appended to the list.
Returns:
Tuple of values matching that as identified in format_string.
"""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
if (
self.__simulated_reads__ and
time.time() - self.start_time > self.__simulated_reads__[0][0]): # time for next
next_line = self.__simulated_reads__.pop(0)
return next_line[1]
return ()
if not format_string:
format_string = self.read_format
try:
data = Read(self.link, format_string, bytes_read=bytes_read)
except OSError as e:
failure_message = 'Failed to read: %s' % e
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
self.last_read = time.time()
if data:
self.last_receipt = time.time()
if LOG_SERIALS:
ts = time.time() - self.start_time
str_data = str(['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
with open(SERIALS_LOG, 'a') as f:
f.write('%10.3f RECD@%s: %s\n' % (ts, self.name, str_data))
if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout:
failure_message = 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
self.last_read - self.last_receipt, self.read_timeout, self.name)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
<----SKIPPED LINES---->
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
<----SKIPPED LINES---->
self.write_format = write_format
self.read_timeout = read_timeout
self.error_pin = error_pin
self.to_parent_q = to_parent_q
self.last_read = 0
self.last_receipt = 0
self.reset_flag = True
self.link = None
self.name = name
self.__simulated_reads__ = None
if self.connection_type == CONNECTION_FLAG_BLUETOOTH:
self.open_function = OpenBluetooth
elif self.connection_type == CONNECTION_FLAG_USB:
self.open_function = OpenUSB
elif self.connection_type == CONNECTION_FLAG_SIMULATED:
self.open_function = None
if self.error_pin: # Error turned on when main initiated; turned off when connected
error_message = 'Process %s (%s) initialized into error state' % (os.getpid(), str(self.name))
Log(error_message)
self.to_parent_q.put(('pin', (self.error_pin, True, error_message)))
self.start_time = time.time()
def __str__(self):
return self.name
def Open(self):
"""Opens an instantiated serial connection for reading and writing."""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
lines = []
if os.path.exists(self.connection_tuple[0]):
with open(self.connection_tuple[0], 'r') as f:
for line in f:
if line.strip():
lines.append(eval(line)) # pylint: disable=W0123
else:
Log('File %s does not exist for simulated commands to Arudino'
% self.connection_tuple[0], self.link)
self.__simulated_reads__ = lines
<----SKIPPED LINES---->
Args:
format_string: String of the form expected by struct.pack
bytes_read: if passed, the bytes that are read are appended to the list.
Returns:
Tuple of values matching that as identified in format_string.
"""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
if (
self.__simulated_reads__ and
time.time() - self.start_time > self.__simulated_reads__[0][0]): # time for next
next_line = self.__simulated_reads__.pop(0)
return next_line[1]
return ()
if not format_string:
format_string = self.read_format
try:
data = Read(self.link, format_string, bytes_read=bytes_read)
except OSError as e:
failure_message = 'Failed to read from %s: %s' % (self.name, e)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
self.last_read = time.time()
if data:
self.last_receipt = time.time()
if LOG_SERIALS:
ts = time.time() - self.start_time
str_data = str(['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
with open(SERIALS_LOG, 'a') as f:
f.write('%10.3f RECD@%s: %s\n' % (ts, self.name, str_data))
if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout:
failure_message = 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
self.last_read - self.last_receipt, self.read_timeout, self.name)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
<----SKIPPED LINES---->
|