01234567890123456789012345678901234567890123456789012345678901234567890123456789
12 3 4 56 789101112131415161718192021 222324 252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 101 102103 104105106107108109110111112 113114115116117118119120121122123124125126127128129130131132133134135136137138139 140141142143144145 146147148149150151152153 154155156157158159160 161162163164165 166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 209 | #!/usr/bin/python3 import numbers import struct import sys import time import psutil import serial import serial.tools.list_ports from pySerialTransfer import pySerialTransfer import messageboard VERBOSE = True SIMULATE_ARDUINO = False if '-s' in sys.argv: SIMULATE_ARDUINO = True def SizeOf(format_string): bytes = 0 for c in format_string: if c in ('<', '>'): size = 0 elif c in ('l', 'f', 'i'): size = 4 else: raise('Unrecognized format character %s' % c) bytes += size return bytes def StuffInt(txfer_obj, int_to_send, start_pos=0): """Insert integer into pySerialtxfer TX buffer starting at the specified index.""" object_byte_size = 4 if VERBOSE: print('SENT (%d byte int): %f' % (object_byte_size, int_to_send)) return StuffObject(txfer_obj, (int_to_send, ), 'i', object_byte_size, start_pos=start_pos) def StuffFloat(txfer_obj, float_to_send, start_pos=0): """Insert float into pySerialtxfer TX buffer starting at the specified index.""" object_byte_size = 4 if VERBOSE: print('SENT (%d byte float): %f' % (object_byte_size, float_to_send)) return StuffObject(txfer_obj, (float_to_send, ), 'f', object_byte_size, start_pos=start_pos) def StuffStr(txfer_obj, string_to_send, max_length=None, start_pos=0): """Insert string into pySerialtxfer TX buffer starting at the specified index. Args: txfer_obj: see StuffObject string_to_send: see StuffObject max_length: if provided, the string is truncated to the requested size; otherwise defaults to the length of the string start_pos: see StuffObject Returns: start_pos for next object """ if max_length is None: max_length = len(string_to_send) format_string = '%ds' % max_length truncated_string = string_to_send[:max_length] truncated_string_b = bytes(truncated_string, 'ascii') if VERBOSE: print('SENT (%d byte str): %s' % (max_length, truncated_string)) return StuffObject( txfer_obj, (truncated_string_b, ), format_string, max_length, start_pos=start_pos) def StuffObject( txfer_obj, val, format_string, object_byte_size, start_pos=0): """Insert an object into pySerialtxfer TX buffer starting at the specified index. Args: txfer_obj: txfer - Transfer class instance to communicate over serial val: tuple of values to be inserted into TX buffer format_string: string used with struct.pack to pack the val object_byte_size: integer number of bytes of the object to pack start_pos: index of the last byte of the float in the TX buffer + 1 Returns: start_pos for next object """ val_bytes = struct.pack(format_string, *val) if len(val) > 1 and VERBOSE: print('SENT (%d byte struct): %s' % (object_byte_size, str(val))) if not SIMULATE_ARDUINO: try: for index in range(object_byte_size): txfer_obj.txBuff[index + start_pos] = val_bytes[index] except (OSError, serial.serialutil.SerialException) as e: LogMessage('Error %s in StuffObject with val %s and txfer_obj %s' % (e, str(val), str(txfer_obj))) return start_pos + index return object_byte_size + start_pos def OpenArduino(timeout=0, baud=115200, manufacturer='arduino', sn=None): """Finds a USB device with Arduino as its mfg and returns an open connection to it. Args: baud: The connection speed. timeout: Max duration in seconds that we should wait to establish a connection. Returns: Open link to the Arduino if found and opened; None otherwise. """ initial_time = time.time() arduino_port = None attempted = False while not attempted or time.time() - initial_time < timeout and not arduino_port: attempted = True ports = serial.tools.list_ports.comports(include_links=False) for port in ports: port_mfg = port.manufacturer if port_mfg: port_mfg = port_mfg.lower() match_mfg = not manufacturer or (port_mfg and manufacturer.lower() in port_mfg) port_sn = port.serial_number match_sn = not sn or port_sn == sn if match_mfg and match_sn: arduino_port = port.device break arduino = None if arduino_port: try: arduino = pySerialTransfer.SerialTransfer(arduino_port, baud=baud) time.sleep(2) # Need to give Arduino time before it is ready to receive except: pass return arduino def ReadArduinoSerial(link): """Returns the string, if any, waiting to be read from the open serial connection. Args: link: open pySerialTransfer instance to communicate over serial Returns: String of any contents from serial; empty string if nothing available. """ response = None if link.available(): if link.status < 0: print('ERROR: %s' % link.status) # RAISE ERROR HERE response = '' for index in range(link.bytesRead): response += chr(link.rxBuff[index]) if VERBOSE: print('RECD (%d bytes): %s' % (len(response), response)) return response def AzimuthAltitude(flight): """Provides current best-estimate location details given last known position. Given a flight dictionary, this determines the plane's best estimate for current location using its last-known position in the flight's lat / lon / speed / altitude / and track. Those attributes may have already been updated by messageboard using a more recently obtained radio signal from dump1090 than that in the canonical location, and if so, key flight_loc_now indicates the time at which those locations are current as of. Args: flight: dictionary of flight attributes. Returns: Returns a tuple of the azimuth and altitude, in degrees, at the current system time. """ geo_time = flight.get('flight_loc_now') lat = flight.get('lat') lon = flight.get('lon') speed = flight.get('speed') altitude = flight.get('altitude') track = flight.get('track') unused_vertrate = flight.get('vertrate') if all([isinstance(x, numbers.Number) for x in (lat, lon, speed, altitude, track)]): elapsed_time = time.time() - geo_time meters_traveled = messageboard.MetersTraveled(speed, elapsed_time) new_position = messageboard.TrajectoryLatLon((lat, lon), meters_traveled, track) distance = messageboard.HaversineDistanceMeters(new_position, messageboard.HOME) angles = messageboard.Angles( messageboard.HOME, messageboard.HOME_ALT, new_position, altitude / messageboard.FEET_IN_METER) azimuth = angles['azimuth_degrees'] altitude = angles['altitude_degrees'] return (azimuth, altitude) return None |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 220221222223224225226227228229230231232233234235236237 238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876 | #!/usr/bin/python3 import contextlib import io import os import numbers import queue import random import struct import subprocess import sys import time import psutil import serial import serial.tools.list_ports from pySerialTransfer import pySerialTransfer import messageboard VERBOSE = True SIMULATE_ARDUINO = False if '-s' in sys.argv: SIMULATE_ARDUINO = True SIMULATION_FILE = 'remote_simulation.txt' #was arduino_simulation.txt SN_SERVO = '75835343130351802272' # arduino uno serial SN_REMOTE = '5583834303435111C1A0' # arduino mega serial SETTINGS_READ_TIME = 1 # read settings file once every n seconds LATEST_FLIGHT_READ_TIME = 1 # read flight file file once every n seconds COMMAND_DELAY_TIME = 1 # send a command to servos every n seconds # Serial interaction with remote COMMAND_START_CHAR = '<' COMMAND_END_CHAR = '>' KEY_NOT_PRESENT_STRING = 'N/A' MAX_CMD_LENGTH = 250 DISP_LAST_FLIGHT_NUMB_ORIG_DEST = 0 DISP_LAST_FLIGHT_AZIMUTH_ELEVATION = 1 DISP_FLIGHT_COUNT_LAST_SEEN = 2 DISP_RADIO_RANGE = 3 DISPLAY_MODE_NAMES = [ 'LAST_FLIGHT_NUMB_ORIG_DEST', 'LAST_FLIGHT_AZIMUTH_ELEVATION', 'FLIGHT_COUNT_LAST_SEEN', 'RADIO_RANGE'] SETTINGS_READ_TIME = 1 # read settings file once every n seconds LATEST_FLIGHT_READ_TIME = 1 # read flight file file once every n seconds COMMAND_DELAY_TIME = 0.2 # send a command to remote every n seconds def Log(s): print('ARDUINO: ' + s) # TODO: change back to logging class Serial(): def __init__( self, open_function, connection_tuple, baud=9600, open_timeout=5, read_format=None, write_format=None, read_timeout=0, error_pin=None): self.open_function = open_function self.connection_tuple = connection_tuple self.baud = baud self.open_timeout = open_timeout self.read_format = read_format self.write_format = write_format self.read_timeout = read_timeout self.error_pin = error_pin self.last_read = 0 self.last_receipt = 0 self.link = None if self.error_pin: messageboard.UpdateStatusLight(self.error_pin, True) def Open(self): self.link = self.open_function( self.connection_tuple, baud=self.baud, timeout=self.open_timeout) if self.error_pin: messageboard.UpdateStatusLight(self.error_pin, False) self.last_read = time.time() self.last_receipt = time.time() return self.link def Reopen(self, log_message=None): self.link = ReopenConnection( self.open_function, self.link, self.connection_tuple, baud=self.baud, timeout=self.open_timeout, log_message=log_message) if self.error_pin: messageboard.UpdateStatusLight(self.error_pin, False) self.last_read = time.time() self.last_receipt = time.time() def Close(self): self.link.close() def Available(self): self.link.available() def Read(self, format_string=None): if not format_string: format_string = self.read_format try: data = Read(self.link, format_string) except OSError as e: if self.error_pin: messageboard.UpdateStatusLight(self.error_pin, True) self.Reopen(log_message='Failed to read: %s' % e) return self.Read(format_string=format_string) self.last_read = time.time() if data: self.last_receipt = time.time() if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout: if self.error_pin: messageboard.UpdateStatusLight(self.error_pin, True) self.Reopen(log_message='Heartbeat not received in %.2f seconds' % (self.last_read - self.last_receipt)) return self.Read(format_string=format_string) return data def Write(self, values, format_string=None): if not format_string: format_string = self.write_format try: Write(self.link, values, format_string) except OSError as e: if self.error_pin: messageboard.UpdateStatusLight(self.error_pin, True) self.Reopen(log_message='Failed to write: %s' % e) self.Write(values) def RunCommand(cmd, sleep_seconds=1, log=True): conn = subprocess.Popen(cmd, shell=True) time.sleep(sleep_seconds) conn.poll() if conn.returncode is None: Log('ERROR: %s did not complete within %d seconds' % (cmd, sleep_seconds)) sys.exit() if log: Log('%s completed' % cmd) def OpenBluetooth(connection_tuple, baud=9600, timeout=5): """Attempts to make connections for a number of attempts, exiting program on fail.""" rfcomm_device, bt_mac_address = connection_tuple attempts = 5 # max number of attempts to make a connection attempt = 1 link = None dev = '/dev/rfcomm%d' % rfcomm_device desc = '%s @ %s' % (bt_mac_address, dev) if not os.path.exists(dev): RunCommand('sudo rfcomm bind %d %s' % (rfcomm_device, bt_mac_address)) while attempt <= attempts: link = pySerialTransfer.SerialTransfer(dev, baud) # We think we made a connection; lets see if we can get a receipt start = time.time() try: while time.time() - start < timeout: b = ReadBytes(link) time.sleep(0.1) # avoid a tight loop soaking up all serial / RPi resources if b: Log('Handshake received at %s by receipt of bytes %s' % (desc, b)) return link Log('No handshake received at %s after %d seconds on attempt %d' % (desc, timeout, attempt)) except OSError as e: Log('Handshake error with %s on attempt %d: %s' % (desc, attempt, e)) attempt += 1 Log('ERROR: Failed to connect to %s after %d attempts' % (bt_mac_address, attempt - 1)) sys.exit() def OpenUSB(connection_tuple=('arduino', None), baud=9600, timeout=5): """Finds a USB device with Arduino as its mfg and returns an open connection to it. Args: baud: The connection speed. timeout: Max duration in seconds that we should wait to establish a connection. Returns: Open link to the Arduino if found and opened; None otherwise. """ manufacturer, sn = connection_tuple initial_time = time.time() arduino_port = None attempted = False while not attempted or time.time() - initial_time < timeout and not arduino_port: attempted = True ports = serial.tools.list_ports.comports(include_links=False) for port in ports: port_mfg = port.manufacturer if port_mfg: port_mfg = port_mfg.lower() match_mfg = not manufacturer or (port_mfg and manufacturer.lower() in port_mfg) port_sn = port.serial_number match_sn = not sn or port_sn == sn if match_mfg and match_sn: arduino_port = port.device break link = None if arduino_port: link = pySerialTransfer.SerialTransfer(arduino_port, baud=baud) time.sleep(2) # Need to give Arduino time before it is ready to receive else: # no USB-based matching port found raise serial.SerialException( 'ERROR: No USB port found for mfg %s and sn %s' % (manufacturer, sn)) return link def ReopenConnection( open_function, link, connection_tuple, baud=9600, timeout=3, log_message=None): """Close and reopen the serial.""" if log_message: Log(log_message) link.close() link = open_function(connection_tuple, baud=baud, timeout=timeout) return link def StuffTuple(link, t): start_pos = 0 for v in t: start_pos = link.tx_obj(v, start_pos) return start_pos def Write(link, values, format_string): """Sends the encapsulated string command on an open pySerialTransfer.""" packed_bytes = struct.pack(format_string, *values) print(format_string, values, packed_bytes) for n, b in enumerate(packed_bytes): link.txBuff[n] = b #link.tx_obj(b) link.send(len(packed_bytes)) Log('Sent %s' % str(values)) def Write2(link, values, unused_format_string): """Sends the encapsulated string command on an open pySerialTransfer.""" byte_count = StuffTuple(link, values) link.send(byte_count) Log('Sent %s' % str(values)) def ReadBytes(link): read_bytes = [] if link.available(): if link.status < 0: raise serial.SerialException('ERROR: %s' % link.status) read_bytes = [] for index in range(link.bytesRead): read_bytes.append(link.rxBuff[index]) return read_bytes def Unpack(read_bytes, format_string): try: data = struct.unpack(format_string, bytearray(read_bytes)) except struct.error as e: # exception desc is missing some key detail, so re-raise raise struct.error( '%s but only %d bytes provided (%s)' % (e, len(read_bytes), read_bytes)) data = [str(d, 'ascii').split('\x00')[0] if isinstance(d, bytes) else d for d in data] return data def Read(link, format_string): read_bytes = ReadBytes(link) data = () if read_bytes: data = Unpack(read_bytes, format_string) return data def AzimuthAltitude(flight): """Provides current best-estimate location details given last known position. Given a flight dictionary, this determines the plane's best estimate for current location using its last-known position in the flight's lat / lon / speed / altitude / and track. Those attributes may have already been updated by messageboard using a more recently obtained radio signal from dump1090 than that in the canonical location, and if so, key flight_loc_now indicates the time at which those locations are current as of. Args: flight: dictionary of flight attributes. Returns: Returns a tuple of the azimuth and altitude, in degrees, at the current system time. """ geo_time = flight.get('flight_loc_now') lat = flight.get('lat') lon = flight.get('lon') speed = flight.get('speed') altitude = flight.get('altitude') track = flight.get('track') unused_vertrate = flight.get('vertrate') if all([isinstance(x, numbers.Number) for x in (lat, lon, speed, altitude, track)]): elapsed_time = time.time() - geo_time meters_traveled = messageboard.MetersTraveled(speed, elapsed_time) new_position = messageboard.TrajectoryLatLon((lat, lon), meters_traveled, track) unused_distance = messageboard.HaversineDistanceMeters(new_position, messageboard.HOME) angles = messageboard.Angles( messageboard.HOME, messageboard.HOME_ALT, new_position, altitude / messageboard.FEET_IN_METER) azimuth = angles['azimuth_degrees'] altitude = angles['altitude_degrees'] return (azimuth, altitude) return None def DrainQueue(q): while not q.empty(): value = q.get() return value def InitialMessageValues(q): v = DrainQueue(q) if v: return v return {}, {} def ServoMain(to_arduino_q, unused_to_parent_q): Log('Process started with process id %d' % os.getpid()) link = Serial( OpenBluetooth, (RFCOMM_NUMBER_SERVO, HC05_MAC_ADDRESS_SERVO), read_timeout=5, error_pin=messageboard.GPIO_ERROR_ARDUINO_SERVO_CONNECTION) link.Open() servo_precision = 1 #servos = OpenUSB(sn=SN_SERVO) flight = {} altitude = 0 azimuth = 0 flight, json_desc_dict = InitialMessageValues(to_arduino_q) while True: if not to_arduino_q.empty(): flight, unused_json_desc_dict = to_arduino_q.get() link.Read('?') # simple ack message sent by servos current_angles = AzimuthAltitude(flight) if current_angles: azimuth_change = azimuth - current_angles[0] altitude_change = altitude - current_angles[1] if abs(azimuth_change) > servo_precision or abs(altitude_change) > servo_precision: (azimuth, altitude) = current_angles values = (azimuth, altitude) link.Write(values) time.sleep(COMMAND_DELAY_TIME) def SendAndWaitAck( remote, str_to_send, signature=None, timeout_time=3, max_send_attempts=3): """Sends a string to the open remote and optionally verifies receipt via a response. This sends a string to the remote. If a response is requested, this will attempt potentially transmits as requested, waiting after each transmit some number of seconds for a response that includes at minimum the key-value pair ack=signature (so the full response would be <ack=signature;>). Args: remote: open pySerialTransfer connection to the Arduino. str_to_send: string to send. signature: if a confirmation response is requested, the numeric value expected in that response. timeout_time: the max number of seconds per send attempt to wait for a response. max_send_attempts: the maximum number of times we will attempt to send. Returns: Boolean indicating whether the send attempt was successful (True) or not (False). """ byte_count = StuffStr(remote, str_to_send) send_attempt = 0 response = None first_send_time = time.time() if not SIMULATE_ARDUINO: while not response and send_attempt < max_send_attempts: remote.send(byte_count) send_attempt += 1 response_start_time = time.time() while not response and time.time() - response_start_time < timeout_time: response = ReadArduinoSerial(remote) response_stop_time = time.time() if VERBOSE: if response: print('Send attempts: %d; round trip %f; listening time on final receipt: %f' % ( send_attempt, (response_stop_time - first_send_time), (response_stop_time - response_start_time))) else: print('No response received after send attempt %d; total time elapsed: %f' % ( send_attempt, (response_stop_time - first_send_time))) if signature is not None: if not response: if VERBOSE: print('Acknowledgment required yet none received\n') return False d = ParseArduinoResponse(response) if d is None: if VERBOSE: print('Improperly formatted response string: %s\n' % response) return False if 'ack' not in d: if VERBOSE: print('Response does not have an acknowledgement "ack" key: %s\n' % response) return False signature_received = d['ack'] if signature_received != signature: if VERBOSE: print( 'Acknowledgement code %s does not match expected value %s\n' % (signature_received, signature)) return False return True def FloatToAlphanumericStr(x, decimals, total_length, sign=True): """Formats a float as a string without a decimal point. Since the decimal point is controlled independently on the alphanumeric display, numbers that include decimals must be sent without the decimal point. This formats a float as %+5.2f, for example - but without the decimal. Args: x: Value to format. decimals: how many digits should follow the (absent) decimal point. total_length: desired total length of the resulting string-ified number (inclusive of the absent decimal point. sign: boolean indicating whether a sign should be included for positive numbers. Returns: String as specified - for instance, 0.4 might be formatted as ' +04'. """ sign_str = '' if sign: sign_str = '+' format_string = '%' + sign_str + str(total_length) + '.' + str(decimals) + 'f' number_string = format_string % x number_string = number_string.replace('.', '') return number_string def DesiredState(settings, flight, display_mode): """Generates a dictionary defining the target state for the remote. Generates the complete set of key-value pairs that describe the desired state for the remote. Args: settings: dictionary representing the current state of the messageboard configuration. flight: dictionary describing the most recent flight. display_mode: integer specifying what display mode - and thus what attributes about the flight or other attributes - should be sent. Returns: Dictionary describing the full state desired for the remote. """ # Dictionary to send to remote d = {} d['setting_max_distance'] = settings['setting_max_distance'] d['setting_max_altitude'] = settings['setting_max_altitude'] d['setting_screen_enabled'] = 0 if 'setting_screen_enabled' in settings: d['setting_screen_enabled'] = 1 d['setting_on_time'] = settings['setting_on_time'] d['setting_off_time'] = settings['setting_off_time'] d['setting_delay'] = settings['setting_delay'] now = flight.get('now') # time flight was seen line1_decimal_mask = '' line2_decimal_mask = '' if display_mode == DISP_LAST_FLIGHT_NUMB_ORIG_DEST: # UAL1827 / SFO-LAX line1 = flight.get('flight_number', '') origin = ReplaceWithNA(flight.get('flight_origin', '')) destination = ReplaceWithNA(flight.get('flight_destination', '')) line2 = '%s-%s' % (origin, destination) elif display_mode == DISP_LAST_FLIGHT_AZIMUTH_ELEVATION: # AZM+193.1 / ALT +79.3 current_angles = AzimuthAltitude(flight) if current_angles: (azimuth, altitude) = current_angles line1 = 'AZM%s' % FloatToAlphanumericStr(azimuth, 1, 5, True) line1_decimal_mask = '00000010' line2 = 'ALT%s' % FloatToAlphanumericStr(altitude, 1, 5, True) line2_decimal_mask = '00000010' else: line1 = KEY_NOT_PRESENT_STRING line2 = KEY_NOT_PRESENT_STRING elif display_mode == DISP_FLIGHT_COUNT_LAST_SEEN: # 18 TODAY / T+ 14H line1 = '%2d TODAY' % flight.get('flight_count_today', 0) elapsed_time_str = 'UNK' if now: elapsed_time_str = SecondsToShortString(time.time() - now) line2 = 'T+ %s' % elapsed_time_str elif display_mode == DISP_RADIO_RANGE: # RNG 87MI / 9 PLANES radio_range = flight.get('radio_range_miles') radio_range_str = KEY_NOT_PRESENT_STRING if radio_range is not None: radio_range_str = '%2dMI' % round(radio_range) line1 = 'RNG %s' % radio_range_str radio_range_flights = flight.get('radio_range_flights', 0) plural = '' if radio_range_flights == 1: plural = 'S' line2 = '%d PLANE%s' % (radio_range_flights, plural) d['line1'] = line1 d['line2'] = line2 d['line1_decimal_mask'] = line1_decimal_mask d['line2_decimal_mask'] = line2_decimal_mask return d def StateToString(desired_state, current_state, ack=0): """Transforms the desired state dictionary to a command string. The dict describing key-value pairs is converted into a single command string in the following way: - key-value pairs matching that which are already known to the remote are not sent - an optional additional key-value pair of ack=random_int is added if a response from the remote is expected confirming receipt - format is transformed to an alphabetized list of <key1=value1;...;keyn=valuen;> Args: desired_state: Dictionary describing desired end state of remote. current_state: Dictionary describing current known state of remote. ack: Integer indicating whether the number of digits desired for an acknowledgement number; 0 indicates no acknowledgement requested. Returns: String as described above. """ s = {} for key in desired_state: if desired_state[key] != current_state.get(key): s[key] = desired_state[key] signature = None if ack: signature = random.randrange(1, 10**ack) s['ack'] = signature kv_str = messageboard.BuildSettings(s) cmd_str = '%s%s;%s' % (COMMAND_START_CHAR, kv_str, COMMAND_END_CHAR) if len(cmd_str) > MAX_CMD_LENGTH - 1: # the -1 is because C appends \0 to a string Log('Command to remote has len %d; max length is %d: %s' % ( len(cmd_str)+1, MAX_CMD_LENGTH, cmd_str)) return (cmd_str, signature) def ParseArduinoResponse(s): """Transforms a command string of form <key1=value1;...;keyn=valuen;> into a dict. Args: s: command string. Returns: Dictionary of the command string. """ if s[0] == COMMAND_START_CHAR and s[-1] == COMMAND_END_CHAR: return messageboard.ParseSettings(s[1:-1]) Log('Invalid remote response: %s' % s) return None def ExecuteArduinoCommand(command_str, settings, display_mode): """Executes the request as communicated in the command string. The remote may make one of the following requests: - Update a setting - (Re)display a recent flight - Display a histogram - Send information for a different display mode This function parses the command, in the form <key1=value1;...;keyn=valuen;>, and then executes as requested. Args: command_str: string response, including the start and end markers, from the remote. settings: dictionary representing the current state of the messageboard configuration. display_mode: integer specifying what display mode - and thus what attributes about the flight or other attributes - should be sent. Returns: The (potentially updated) display_mode. """ if not(command_str[0] == COMMAND_START_CHAR and command_str[-1] == COMMAND_END_CHAR): if VERBOSE: Log('Invalid remote command: %s' % command_str) print('Invalid command - missing delimiters: %s' % command_str) return display_mode command = messageboard.ParseSettings(command_str[1:-1]) # command might update a setting; see if there's a change, and if so, write to disk setting_change = False # remote sees on/off as 1/0 whereas messageboard.py & the web interface expect on/off if 'setting_screen_enabled' in command: arduino_screen_enabled = command['setting_screen_enabled'] if arduino_screen_enabled: command['setting_screen_enabled'] = 'on' else: command['setting_screen_enabled'] = 'off' setting_keys = ['setting_screen_enabled', 'setting_max_distance', 'setting_max_altitude', 'setting_on_time', 'setting_off_time', 'setting_delay'] for key in setting_keys: if key in command and command[key] != settings[key]: setting_change = True if setting_change: for key in setting_keys: if key in command: settings[key] = command[key] if VERBOSE: print( ' |-->Updated %s to %s in %s' % (key, command[key], messageboard.CONFIG_FILE)) messageboard.WriteFile(messageboard.CONFIG_FILE, messageboard.BuildSettings(settings)) # a command might request info about flight to be (re)displayed, irrespective of # whether the screen is on; if so, let's put that message at the front of the message # queue, and delete any subsequent messages in queue because presumably the button # was pushed either a) when the screen was off (so no messages in queue), or b) # because the screen was on, but the last flight details got lost after other screens! if 'last_plane' in command: messageboard.WriteFile(messageboard.LAST_FLIGHT_FILE, '') if VERBOSE: print(' |-->Requested last flight (re)display') # a command might request a histogram; simply generate and save a histogram file to disk if 'histogram' in command: histogram = {} histogram['type'] = 'messageboard' histogram['histogram'] = command['histogram'] histogram['histogram_history'] = command['histogram_history'] histogram['histogram_max_screens'] = '_1' # default value since not provided histogram['histogram_data_summary'] = 'off' # default value since not provided kv_string = messageboard.BuildSettings(histogram) messageboard.WriteFile(messageboard.HISTOGRAM_CONFIG_FILE, kv_string) if VERBOSE: print(' |-->Requested %s histogram with %s data' % ( command['histogram'], command['histogram_history'])) # a command might update us on the display mode; based on the display mode, we might # pass different attributes back to the remote if 'display_mode' in command: display_mode = command['display_mode'] if VERBOSE: print(' |-->Display mode updated to %d (%s)' % ( display_mode, DISPLAY_MODE_NAMES[display_mode])) if VERBOSE: print() return display_mode def SecondsToShortString(seconds): """Converts a number of seconds to a three-character time representation (i.e.: 87M). Converts seconds to a three character representation containing at most two digits (1-99) and one character indicating time unit (S, M, H, or D). Args: seconds: Number of seconds. Returns: String as described. """ s = round(seconds) if s < 99: return '%2dS' % s m = round(seconds / messageboard.SECONDS_IN_MINUTE) if m < 99: return '%2dM' % m h = round(seconds / messageboard.SECONDS_IN_HOUR) if h < 99: return '%2dH' % h d = round(seconds / (messageboard.SECONDS_IN_DAY)) return '%2dD' % d def SimulateCommand(potential_commands, counter, fraction_command=0.01, randomized=False): """Simulates the remote generating a command for remote-free testing. A command from the list of potential_commands is generated periodically, roughly fraction_command percent of the time. - not randomized: a command is returned every time (counter / fraction_command) rolls over to a new integer. The command returned is the next one in the list of potential commands. For instance, if fraction_command = 0.01, when counter=100, the first command in potential_commands is returned; when counter=200, the second command is returned, and so on. At the end of the list, we rotate back to the first command. - randomized: fraction_command percent of the time, a randomly selected command is sent. Args: potential_commands: A fully-formed string (potentially including the ack key-value pair) to sent to the remote. counter: integer indicating how many times a command could potentially have been generated; only relevant if randomized is False. fraction_command: The fraction (or percent) of the time a command is returned. randomized: Boolean indicating whether the command generation is deterministic or randomized. Returns: Potentially empty string representing a fully formed command that might come from an remote, had one been connected. """ # should we generate a command? generate = False if randomized: generate = random.random() < fraction_command else: generate = int(fraction_command * counter) != int(fraction_command * (counter - 1)) if not generate: return '' # which command should we pick? if randomized: index = random.randrange(len(potential_commands)) else: command_number = int(counter * fraction_command) - 1 index = command_number % len(potential_commands) return potential_commands[index] def ReplaceWithNA(s, function=None): """Replaces a messageboard not-known value with the alphanumeric not-known value.""" if s in (messageboard.KEY_NOT_PRESENT_STRING, 'None') or s is None: return KEY_NOT_PRESENT_STRING if function: return function(s) return s def RemoteMain(to_arduino_q, to_parent_q): Log('Process started with process id %d' % os.getpid()) # setting_screen_enabled: ? # setting_max_distance: h # setting_max_altitude: h # setting_on_time: h # setting_off_time: h # setting_delay: h # last_plane: ? # display_mode: h # histogram_enabled: ? # current_hist_type: 20s # current_hist_history: 20s format_string = '?hhhhh?h?20s20s' link = Serial( OpenBluetooth, (RFCOMM_NUMBER_REMOTE, HC05_MAC_ADDRESS_REMOTE), read_timeout=5, error_pin=messageboard.GPIO_ERROR_ARDUINO_SERVO_CONNECTION, format_string=format_string) remote = OpenUSB(sn=SN_REMOTE) current_state = {} settings_next_read = 0 flight_next_read = 0 display_mode = 1 simulation_counter = 0 flight = {} signature = None if SIMULATE_ARDUINO: potential_commands = messageboard.ReadFile(SIMULATION_FILE).splitlines() while True: # Read config settings once per second if time.time() > settings_next_read: settings = messageboard.ReadAndParseSettings(messageboard.CONFIG_FILE) settings_next_read = time.time() + SETTINGS_READ_TIME if VERBOSE: print('Read %s' % messageboard.CONFIG_FILE) if time.time() > flight_next_read: flight = messageboard.ReadAndParseSettings(messageboard.ARDUINO_FILE) flight_next_read = time.time() + LATEST_FLIGHT_READ_TIME if VERBOSE: print('Read %s' % messageboard.ARDUINO_FILE) # Open connection if not remote: remote = OpenUSB(sn=SN_REMOTE) current_state = {} if VERBOSE: print('Reopening connection') if remote: desired_state = DesiredState(settings, flight, display_mode) if desired_state != current_state: ack = 4 if SIMULATE_ARDUINO: ack = None (command_string, signature) = StateToString(desired_state, current_state, ack=ack) success = SendAndWaitAck( remote, command_string, signature=signature, timeout_time=3, max_send_attempts=3) if success: current_state.update(desired_state) if SIMULATE_ARDUINO: command = '' if potential_commands: simulation_counter += 1 command = SimulateCommand( potential_commands, simulation_counter, randomized=False) if command: print('RECD (SIM): %s' % command) else: command = ReadArduinoSerial(remote) if command: display_mode = ExecuteArduinoCommand(command, settings, display_mode) time.sleep(COMMAND_DELAY_TIME) |