01234567890123456789012345678901234567890123456789012345678901234567890123456789
910111213141516171819202122232425262728 293031323334353637383940 4142434445464748495051525354555657585960 658659660661662663664665666667668669670671672673674675676677 678679680681682683684685686687688689690691692693694695696697698699 936937938939940941942943944945946947948949950951952953954955 956957958959960961962963964965966967968969970971972973974975 11501151115211531154115511561157115811591160116111621163116411651166116711681169 117011711172117311741175117611771178117911801181118211831184 118511861187118811891190119111921193119411951196119711981199120012011202120312041205 |
<----SKIPPED LINES---->
import termios
import time
import psutil
import serial
import serial.tools.list_ports
from pySerialTransfer import pySerialTransfer
from constants import RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, SHUTDOWN_TEXT
import messageboard
ARDUINO_LOG = 'arduino_log.txt'
ARDUINO_ROLLING_LOG = 'arduino_rolling_log.txt'
SERIALS_LOG = 'arduino_serials_log.txt'
VERBOSE = False # log additional fine-grained details into ARDUINO_LOG
LOG_SERIALS = False # log serial data sent from Arduino to ARDUINO_LOG
SIMULATE_ARDUINO = False
SERVO_SIMULATED_IN = 'servo_in.txt'
SERVO_SIMULATED_OUT = 'servo_out.txt'
REMOTE_SIMULATED_IN = 'remote_in.txt'
REMOTE_SIMULATED_OUT = 'remote_out.txt'
if RASPBERRY_PI:
ARDUINO_LOG = MESSAGEBOARD_PATH + ARDUINO_LOG
SERIALS_LOG = MESSAGEBOARD_PATH + SERIALS_LOG
SERVO_SIMULATED_OUT = MESSAGEBOARD_PATH + SERVO_SIMULATED_OUT
SERVO_SIMULATED_IN = MESSAGEBOARD_PATH + SERVO_SIMULATED_IN
REMOTE_SIMULATED_OUT = MESSAGEBOARD_PATH + REMOTE_SIMULATED_OUT
REMOTE_SIMULATED_IN = MESSAGEBOARD_PATH + REMOTE_SIMULATED_IN
ARDUINO_ROLLING_LOG = WEBSERVER_PATH + ARDUINO_ROLLING_LOG
CONNECTION_FLAG_BLUETOOTH = 1
CONNECTION_FLAG_USB = 2
CONNECTION_FLAG_SIMULATED = 3
RASPBERRY_PI = psutil.sys.platform.title() == 'Linux'
SN_SERVO = '5583834303435111C1A0'
SERVO_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (2, '98:D3:11:FC:42:16', 1)) # with MOSFET
#SERVO_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (3, '98:D3:91:FD:B1:8F', 1)) # no MOSFET
SN_REMOTE = '75835343130351802272'
# directly connected to Serial2
# connected thru MOSFET to Serial1
REMOTE_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (1, '98:D3:91:FD:B3:C9', 1))
MIN_ALTITUDE = 5 # below this elevation degrees, turn off the tracking
if SIMULATE_ARDUINO:
<----SKIPPED LINES---->
messageboard.DisplayFlightNumber(last_flight),
messageboard.DisplayFlightNumber(flight)
), ser=link)
# Turn off laser so that line isn't traced while it moves to new position
link.Write((*last_angles, *LASER_RGB_OFF))
last_flight = flight
if time.time() >= next_read:
heartbeat = link.Read() # simple ack message sent by servos
next_read = time.time() + READ_DELAY_TIME
if heartbeat and VERBOSE:
Log(heartbeat)
now = GetNow(json_desc_dict, additional_attr)
current_angles = AzimuthAltitude(flight, now)
if current_angles and time.time() > next_write:
if current_angles[1] >= configuration['minimum_altitude_servo_tracking']:
Log('Flight #: %s current_angles: %s' % (
messageboard.DisplayFlightNumber(flight), str(current_angles)))
laser_rgb = LaserRGBFlight(flight)
link.Write((*current_angles, *laser_rgb))
last_angles = current_angles
else:
link.Write((*last_angles, *LASER_RGB_OFF))
next_write = time.time() + WRITE_DELAY_TIME
link.Close(SHUTDOWN_TEXT)
LASER_RGB_OFF = (0, 0, 0)
def LaserRGBFlight(flight):
"""Based on flight attributes, set the laser."""
if not flight:
return LASER_RGB_OFF
return 1, 0, 0
<----SKIPPED LINES---->
# whether the screen is on; if so, let's put that message at the front of the message
# queue, and delete any subsequent messages in queue because presumably the button
# was pushed either a) when the screen was off (so no messages in queue), or b)
# because the screen was on, but the last flight details got lost after other screens!
if command['last_plane']:
to_parent_q.put(('replay', ()))
log_lines.append(' |-->Requested last flight (re)display')
# a command might request a histogram; simply generate and save a histogram file to disk
if command['histogram_enabled']:
h_type = GetName(HISTOGRAM_TYPES, command['current_hist_type'])
h_history = GetName(HISTOGRAM_HISTORY, command['current_hist_history'])
to_parent_q.put(('histogram', (h_type, h_history)))
log_lines.append(' |-->Requested %s histogram with %s data' % (h_type, h_history))
# a command might update us on the display mode; based on the display mode, we might
# pass different attributes back to the remote
if display_mode != command['display_mode']:
log_lines.append(' |-->Display mode set to %d (%s)' % (
command['display_mode'], DISPLAY_MODE_NAMES[display_mode]))
# a command might tell us the battery is low
to_parent_q.put(
('pin', (messageboard.GPIO_ERROR_BATTERY_CHARGE, command['low_battery'])))
if low_battery != command['low_battery']:
log_lines.append(' |-->Low battery set to %d' % command['low_battery'])
if VERBOSE:
if log_lines:
log_lines.insert(0, '') # for improved formatting
Log('\n'.join(log_lines), link)
return command['display_mode'], command['low_battery']
def SecondsToShortString(s):
"""Converts a number of seconds to a three-character time representation (i.e.: 23M).
Converts seconds to a three character representation containing at most two digits,
potentially with a decimal point, and one character indicating time unit (S, M, H, or D).
<----SKIPPED LINES---->
('setting_off_time', 'H'),
('setting_delay', 'H'),
('last_plane', '?'),
('display_mode', 'H'),
('histogram_enabled', '?'),
('current_hist_type', 'H'),
('current_hist_history', 'H'),
('low_battery', '?'))
write_config = (
('setting_screen_enabled', '?'), # 1 bytes
('setting_max_distance', 'H'), # 2 bytes
('setting_max_altitude', 'L'), # 4 bytes
('setting_on_time', 'H'), # 2 bytes
('setting_off_time', 'H'), # 2 bytes
('setting_delay', 'H'), # 2 bytes
('line1', '9s'), # 9 bytes; 8 character plus terminator
('line2', '9s'), # 9 bytes; 8 character plus terminator
('line1_dec_mask', 'H'), # 2 bytes
('line2_dec_mask', 'H'), # 2 bytes
)
#pylint: enable = bad-whitespace
read_keys, unused_read_format_tuple, read_format_string = SplitFormat(read_config)
write_keys, write_format_tuple, write_format_string = SplitFormat(write_config)
values_d = {}
low_batt = False
to_parent_q.put(('pin', (messageboard.GPIO_ERROR_BATTERY_CHARGE, low_batt)))
link = Serial(
*REMOTE_CONNECTION, read_timeout=7,
error_pin=messageboard.GPIO_ERROR_ARDUINO_REMOTE_CONNECTION, to_parent_q=to_parent_q,
read_format=read_format_string, write_format=write_format_string, name='Remote')
link.Open()
display_mode = DISP_LAST_FLIGHT_NUMB_ORIG_DEST # initial state
flight, json_desc_dict, configuration, additional_attr = InitialMessageValues(
to_arduino_q)
next_read = 0
next_write = 0
while not shutdown.value:
if not to_arduino_q.empty():
to_arduino_message = to_arduino_q.get(block=False)
flight, json_desc_dict, configuration, additional_attr = to_arduino_message
if 'test_remote' in configuration:
messageboard.RemoveSetting(configuration, 'test_remote')
def TestDisplayMode(m):
SendRemoteMessage(
flight, json_desc_dict, configuration, additional_attr,
m, write_keys, write_format_tuple, link)
time.sleep(1)
<----SKIPPED LINES---->
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
9101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703 940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980 115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214 |
<----SKIPPED LINES---->
import termios
import time
import psutil
import serial
import serial.tools.list_ports
from pySerialTransfer import pySerialTransfer
from constants import RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, SHUTDOWN_TEXT
import messageboard
ARDUINO_LOG = 'arduino_log.txt'
ARDUINO_ROLLING_LOG = 'arduino_rolling_log.txt'
SERIALS_LOG = 'arduino_serials_log.txt'
VERBOSE = False # log additional fine-grained details into ARDUINO_LOG
LOG_SERIALS = False # log serial data sent from Arduino to ARDUINO_LOG
SIMULATE_ARDUINO = False
REMOTE_DISPLAY_MODE = 'display_mode.txt'
SERVO_SIMULATED_IN = 'servo_in.txt'
SERVO_SIMULATED_OUT = 'servo_out.txt'
REMOTE_SIMULATED_IN = 'remote_in.txt'
REMOTE_SIMULATED_OUT = 'remote_out.txt'
if RASPBERRY_PI:
ARDUINO_LOG = MESSAGEBOARD_PATH + ARDUINO_LOG
SERIALS_LOG = MESSAGEBOARD_PATH + SERIALS_LOG
SERVO_SIMULATED_OUT = MESSAGEBOARD_PATH + SERVO_SIMULATED_OUT
SERVO_SIMULATED_IN = MESSAGEBOARD_PATH + SERVO_SIMULATED_IN
REMOTE_SIMULATED_OUT = MESSAGEBOARD_PATH + REMOTE_SIMULATED_OUT
REMOTE_SIMULATED_IN = MESSAGEBOARD_PATH + REMOTE_SIMULATED_IN
REMOTE_DISPLAY_MODE = MESSAGEBOARD_PATH + REMOTE_DISPLAY_MODE
ARDUINO_ROLLING_LOG = WEBSERVER_PATH + ARDUINO_ROLLING_LOG
CONNECTION_FLAG_BLUETOOTH = 1
CONNECTION_FLAG_USB = 2
CONNECTION_FLAG_SIMULATED = 3
RASPBERRY_PI = psutil.sys.platform.title() == 'Linux'
SN_SERVO = '5583834303435111C1A0'
SERVO_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (2, '98:D3:11:FC:42:16', 1)) # with MOSFET
#SERVO_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (3, '98:D3:91:FD:B1:8F', 1)) # no MOSFET
SN_REMOTE = '75835343130351802272'
# directly connected to Serial2
# connected thru MOSFET to Serial1
REMOTE_CONNECTION = (CONNECTION_FLAG_BLUETOOTH, (1, '98:D3:91:FD:B3:C9', 1))
MIN_ALTITUDE = 5 # below this elevation degrees, turn off the tracking
if SIMULATE_ARDUINO:
<----SKIPPED LINES---->
messageboard.DisplayFlightNumber(last_flight),
messageboard.DisplayFlightNumber(flight)
), ser=link)
# Turn off laser so that line isn't traced while it moves to new position
link.Write((*last_angles, *LASER_RGB_OFF))
last_flight = flight
if time.time() >= next_read:
heartbeat = link.Read() # simple ack message sent by servos
next_read = time.time() + READ_DELAY_TIME
if heartbeat and VERBOSE:
Log(heartbeat)
now = GetNow(json_desc_dict, additional_attr)
current_angles = AzimuthAltitude(flight, now)
if current_angles and time.time() > next_write:
if current_angles[1] >= configuration['minimum_altitude_servo_tracking']:
if VERBOSE:
Log('Flight #: %s current_angles: %s' % (
messageboard.DisplayFlightNumber(flight), str(current_angles)))
laser_rgb = LaserRGBFlight(flight)
link.Write((*current_angles, *laser_rgb))
last_angles = current_angles
else:
link.Write((*last_angles, *LASER_RGB_OFF))
next_write = time.time() + WRITE_DELAY_TIME
link.Close(SHUTDOWN_TEXT)
LASER_RGB_OFF = (0, 0, 0)
def LaserRGBFlight(flight):
"""Based on flight attributes, set the laser."""
if not flight:
return LASER_RGB_OFF
return 1, 0, 0
<----SKIPPED LINES---->
# whether the screen is on; if so, let's put that message at the front of the message
# queue, and delete any subsequent messages in queue because presumably the button
# was pushed either a) when the screen was off (so no messages in queue), or b)
# because the screen was on, but the last flight details got lost after other screens!
if command['last_plane']:
to_parent_q.put(('replay', ()))
log_lines.append(' |-->Requested last flight (re)display')
# a command might request a histogram; simply generate and save a histogram file to disk
if command['histogram_enabled']:
h_type = GetName(HISTOGRAM_TYPES, command['current_hist_type'])
h_history = GetName(HISTOGRAM_HISTORY, command['current_hist_history'])
to_parent_q.put(('histogram', (h_type, h_history)))
log_lines.append(' |-->Requested %s histogram with %s data' % (h_type, h_history))
# a command might update us on the display mode; based on the display mode, we might
# pass different attributes back to the remote
if display_mode != command['display_mode']:
log_lines.append(' |-->Display mode set to %d (%s)' % (
command['display_mode'], DISPLAY_MODE_NAMES[display_mode]))
messageboard.WriteFile(REMOTE_DISPLAY_MODE, str(command['display_mode']))
# a command might tell us the battery is low
to_parent_q.put(
('pin', (messageboard.GPIO_ERROR_BATTERY_CHARGE, command['low_battery'])))
if low_battery != command['low_battery']:
log_lines.append(' |-->Low battery set to %d' % command['low_battery'])
if VERBOSE:
if log_lines:
log_lines.insert(0, '') # for improved formatting
Log('\n'.join(log_lines), link)
return command['display_mode'], command['low_battery']
def SecondsToShortString(s):
"""Converts a number of seconds to a three-character time representation (i.e.: 23M).
Converts seconds to a three character representation containing at most two digits,
potentially with a decimal point, and one character indicating time unit (S, M, H, or D).
<----SKIPPED LINES---->
('setting_off_time', 'H'),
('setting_delay', 'H'),
('last_plane', '?'),
('display_mode', 'H'),
('histogram_enabled', '?'),
('current_hist_type', 'H'),
('current_hist_history', 'H'),
('low_battery', '?'))
write_config = (
('setting_screen_enabled', '?'), # 1 bytes
('setting_max_distance', 'H'), # 2 bytes
('setting_max_altitude', 'L'), # 4 bytes
('setting_on_time', 'H'), # 2 bytes
('setting_off_time', 'H'), # 2 bytes
('setting_delay', 'H'), # 2 bytes
('line1', '9s'), # 9 bytes; 8 character plus terminator
('line2', '9s'), # 9 bytes; 8 character plus terminator
('line1_dec_mask', 'H'), # 2 bytes
('line2_dec_mask', 'H'), # 2 bytes
('display_mode', 'H'), # 2 bytes
)
#pylint: enable = bad-whitespace
read_keys, unused_read_format_tuple, read_format_string = SplitFormat(read_config)
write_keys, write_format_tuple, write_format_string = SplitFormat(write_config)
values_d = {}
low_batt = False
to_parent_q.put(('pin', (messageboard.GPIO_ERROR_BATTERY_CHARGE, low_batt)))
link = Serial(
*REMOTE_CONNECTION, read_timeout=7,
error_pin=messageboard.GPIO_ERROR_ARDUINO_REMOTE_CONNECTION, to_parent_q=to_parent_q,
read_format=read_format_string, write_format=write_format_string, name='Remote')
link.Open()
# Read in the saved display mode, if it exists
display_mode = int(messageboard.ReadFile(REMOTE_DISPLAY_MODE, log_exception=False))
if not display_mode:
display_mode = DISP_LAST_FLIGHT_NUMB_ORIG_DEST
flight, json_desc_dict, configuration, additional_attr = InitialMessageValues(
to_arduino_q)
next_read = 0
next_write = 0
while not shutdown.value:
if not to_arduino_q.empty():
to_arduino_message = to_arduino_q.get(block=False)
flight, json_desc_dict, configuration, additional_attr = to_arduino_message
if 'test_remote' in configuration:
messageboard.RemoveSetting(configuration, 'test_remote')
def TestDisplayMode(m):
SendRemoteMessage(
flight, json_desc_dict, configuration, additional_attr,
m, write_keys, write_format_tuple, link)
time.sleep(1)
<----SKIPPED LINES---->
|