01234567890123456789012345678901234567890123456789012345678901234567890123456789
567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
<----SKIPPED LINES---->
import random
import struct
import subprocess
import sys
import termios
import time
import psutil
import serial
import serial.tools.list_ports
from pySerialTransfer import pySerialTransfer
from constants import (
RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, SHUTDOWN_TEXT)
import messageboard
ARDUINO_LOG = 'arduino_log.txt'
ARDUINO_ROLLING_LOG = 'arduino_rolling_log.txt'
#SERIALS_LOG = 'arduino_serials_log.txt'
VERBOSE = False # log additional fine-grained details into ARDUINO_LOG
LOG_SERIALS = False # log serial data sent from Arduino to ARDUINO_LOG
SIMULATE_ARDUINO = False
REMOTE_DISPLAY_MODE = 'display_mode.txt'
SERVO_SIMULATED_IN = 'servo_in.txt'
SERVO_SIMULATED_OUT = 'servo_out.txt'
REMOTE_SIMULATED_IN = 'remote_in.txt'
REMOTE_SIMULATED_OUT = 'remote_out.txt'
if RASPBERRY_PI:
ARDUINO_LOG = MESSAGEBOARD_PATH + ARDUINO_LOG
#SERIALS_LOG = MESSAGEBOARD_PATH + SERIALS_LOG
SERVO_SIMULATED_OUT = MESSAGEBOARD_PATH + SERVO_SIMULATED_OUT
SERVO_SIMULATED_IN = MESSAGEBOARD_PATH + SERVO_SIMULATED_IN
REMOTE_SIMULATED_OUT = MESSAGEBOARD_PATH + REMOTE_SIMULATED_OUT
REMOTE_SIMULATED_IN = MESSAGEBOARD_PATH + REMOTE_SIMULATED_IN
REMOTE_DISPLAY_MODE = MESSAGEBOARD_PATH + REMOTE_DISPLAY_MODE
ARDUINO_ROLLING_LOG = WEBSERVER_PATH + ARDUINO_ROLLING_LOG
CONNECTION_FLAG_BLUETOOTH = 1
CONNECTION_FLAG_USB = 2
CONNECTION_FLAG_SIMULATED = 3
RASPBERRY_PI = psutil.sys.platform.title() == 'Linux'
SN_SERVO = '5583834303435111C1A0'
SERVO_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (2, '98:D3:11:FC:42:16', 1))
SN_REMOTE = '75835343130351802272'
REMOTE_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (1, '98:D3:91:FD:B3:C9', 1))
LASER_OFF = (False, False, False)
<----SKIPPED LINES---->
format_string = self.read_format
try:
data = Read(self.link, format_string, bytes_read=bytes_read)
# on OSError, record a disconnection and attempt to reconnect & re-read
except OSError as e:
failure_message = 'Failed to read from %s: %s' % (self.name, e)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
self.last_read = time.time()
if data:
self.last_receipt = time.time()
if LOG_SERIALS:
ts = time.time() - self.start_time
str_data = str(
['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
message = '%10.3f RECD@%s: %s\n' % (ts, self.name, str_data)
Log(message)
#with open(SERIALS_LOG, 'a') as f:
# f.write(message)
# If we haven't received data (despite reading) in more than the timeout,
# close and reopen.
if (
self.read_timeout and
self.last_read - self.last_receipt > self.read_timeout):
failure_message = (
'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
self.last_read - self.last_receipt, self.read_timeout, self.name))
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
if self.read_timeout and VERBOSE:
Log('%s: last_read: %.1f; last_receipt: %.1f; '
'delta: %.1f; read_timeout: %.1f' % (
self.name, self.last_read, self.last_receipt,
self.last_read - self.last_receipt, self.read_timeout))
elif VERBOSE:
<----SKIPPED LINES---->
ts = time.time() - self.start_time
str_values = str(
['%7.2f' % v if isinstance(v, float) else str(v) for v in values])
if self.connection_type == CONNECTION_FLAG_SIMULATED:
with open(self.connection_tuple[1], 'a') as f:
f.write('%10.3f: %s\n' % (ts, str_values))
return
if not format_string:
format_string = self.write_format
try:
Write(self.link, values, format_string)
except OSError as e:
failure_message = 'Failed to write: %s' % e
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
self.Write(values)
if LOG_SERIALS:
message = '%10.3f SENT@%s: %s\n' % (ts, self.name, str_values)
Log(message)
#with open(SERIALS_LOG, 'a') as f:
# f.write(message)
def HasReset(self):
"""Indicates exactly once whether serial has reset since last called."""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
raise NotImplementedError('Not implemented for simulations')
flag = self.reset_flag
self.reset_flag = False
return flag
def RunCommand(cmd, sleep_seconds=1, log=True):
"""Runs shell command, checking if it completed within timeout."""
conn = subprocess.Popen(cmd, shell=True)
time.sleep(sleep_seconds)
conn.poll()
if conn.returncode is None:
Log('ERROR: %s did not complete within %d seconds'
% (cmd, sleep_seconds))
<----SKIPPED LINES---->
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 303304305306307308309310311312313314315316317318319320321322 323324325326327328329330331332333334335336337338339340341342343344 362363364365366367368369370371372373374375376377378379380381 382383384385386387388389390391392393394395396397398399400401402403 |
<----SKIPPED LINES---->
import random
import struct
import subprocess
import sys
import termios
import time
import psutil
import serial
import serial.tools.list_ports
from pySerialTransfer import pySerialTransfer
from constants import (
RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, SHUTDOWN_TEXT)
import messageboard
ARDUINO_LOG = 'arduino_log.txt'
ARDUINO_ROLLING_LOG = 'arduino_rolling_log.txt'
SERIALS_LOG = 'arduino_serials_log.txt'
VERBOSE = False # log additional fine-grained details into ARDUINO_LOG
LOG_SERIALS = False # log serial data sent from Arduino to ARDUINO_LOG
SIMULATE_ARDUINO = False
REMOTE_DISPLAY_MODE = 'display_mode.txt'
SERVO_SIMULATED_IN = 'servo_in.txt'
SERVO_SIMULATED_OUT = 'servo_out.txt'
REMOTE_SIMULATED_IN = 'remote_in.txt'
REMOTE_SIMULATED_OUT = 'remote_out.txt'
if RASPBERRY_PI:
ARDUINO_LOG = MESSAGEBOARD_PATH + ARDUINO_LOG
SERIALS_LOG = MESSAGEBOARD_PATH + SERIALS_LOG
SERVO_SIMULATED_OUT = MESSAGEBOARD_PATH + SERVO_SIMULATED_OUT
SERVO_SIMULATED_IN = MESSAGEBOARD_PATH + SERVO_SIMULATED_IN
REMOTE_SIMULATED_OUT = MESSAGEBOARD_PATH + REMOTE_SIMULATED_OUT
REMOTE_SIMULATED_IN = MESSAGEBOARD_PATH + REMOTE_SIMULATED_IN
REMOTE_DISPLAY_MODE = MESSAGEBOARD_PATH + REMOTE_DISPLAY_MODE
ARDUINO_ROLLING_LOG = WEBSERVER_PATH + ARDUINO_ROLLING_LOG
CONNECTION_FLAG_BLUETOOTH = 1
CONNECTION_FLAG_USB = 2
CONNECTION_FLAG_SIMULATED = 3
RASPBERRY_PI = psutil.sys.platform.title() == 'Linux'
SN_SERVO = '5583834303435111C1A0'
SERVO_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (2, '98:D3:11:FC:42:16', 1))
SN_REMOTE = '75835343130351802272'
REMOTE_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (1, '98:D3:91:FD:B3:C9', 1))
LASER_OFF = (False, False, False)
<----SKIPPED LINES---->
format_string = self.read_format
try:
data = Read(self.link, format_string, bytes_read=bytes_read)
# on OSError, record a disconnection and attempt to reconnect & re-read
except OSError as e:
failure_message = 'Failed to read from %s: %s' % (self.name, e)
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
self.last_read = time.time()
if data:
self.last_receipt = time.time()
if LOG_SERIALS:
ts = time.time() - self.start_time
str_data = str(
['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
message = '%10.3f RECD@%s: %s\n' % (ts, self.name, str_data)
with open(SERIALS_LOG, 'a') as f:
f.write(message)
# If we haven't received data (despite reading) in more than the timeout,
# close and reopen.
if (
self.read_timeout and
self.last_read - self.last_receipt > self.read_timeout):
failure_message = (
'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
self.last_read - self.last_receipt, self.read_timeout, self.name))
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
return self.Read(format_string=format_string)
if self.read_timeout and VERBOSE:
Log('%s: last_read: %.1f; last_receipt: %.1f; '
'delta: %.1f; read_timeout: %.1f' % (
self.name, self.last_read, self.last_receipt,
self.last_read - self.last_receipt, self.read_timeout))
elif VERBOSE:
<----SKIPPED LINES---->
ts = time.time() - self.start_time
str_values = str(
['%7.2f' % v if isinstance(v, float) else str(v) for v in values])
if self.connection_type == CONNECTION_FLAG_SIMULATED:
with open(self.connection_tuple[1], 'a') as f:
f.write('%10.3f: %s\n' % (ts, str_values))
return
if not format_string:
format_string = self.write_format
try:
Write(self.link, values, format_string)
except OSError as e:
failure_message = 'Failed to write: %s' % e
if self.error_pin:
self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
self.Reopen(log_message=failure_message)
self.Write(values)
if LOG_SERIALS:
message = '%10.3f SENT@%s: %s\n' % (ts, self.name, str_values)
with open(SERIALS_LOG, 'a') as f:
f.write(message)
def HasReset(self):
"""Indicates exactly once whether serial has reset since last called."""
if self.connection_type == CONNECTION_FLAG_SIMULATED:
raise NotImplementedError('Not implemented for simulations')
flag = self.reset_flag
self.reset_flag = False
return flag
def RunCommand(cmd, sleep_seconds=1, log=True):
"""Runs shell command, checking if it completed within timeout."""
conn = subprocess.Popen(cmd, shell=True)
time.sleep(sleep_seconds)
conn.poll()
if conn.returncode is None:
Log('ERROR: %s did not complete within %d seconds'
% (cmd, sleep_seconds))
<----SKIPPED LINES---->
|