01234567890123456789012345678901234567890123456789012345678901234567890123456789
7374757677787980818283848586878889909192 9394 9596979899100101102103104105106107108109110111112113114 167168169170171172173174175176177178179180181182183184185186187188189 190191192193194195196197198199200201202 | <----SKIPPED LINES----> txfer_obj, (truncated_string_b, ), format_string, max_length, start_pos=start_pos) def StuffObject( txfer_obj, val, format_string, object_byte_size, start_pos=0): """Insert an object into pySerialtxfer TX buffer starting at the specified index. Args: txfer_obj: txfer - Transfer class instance to communicate over serial val: tuple of values to be inserted into TX buffer format_string: string used with struct.pack to pack the val object_byte_size: integer number of bytes of the object to pack start_pos: index of the last byte of the float in the TX buffer + 1 Returns: start_pos for next object """ val_bytes = struct.pack(format_string, *val) if len(val) > 1 and VERBOSE: print('SENT (%d byte struct): %s' % (object_byte_size, str(val))) for index in range(object_byte_size): txfer_obj.txBuff[index + start_pos] = val_bytes[index] return object_byte_size + start_pos def OpenArduino(timeout=0, baud=115200, manufacturer='arduino', sn=None): """Finds a USB device with Arduino as its mfg and returns an open connection to it. Args: baud: The connection speed. timeout: Max duration in seconds that we should wait to establish a connection. Returns: Open link to the Arduino if found and opened; None otherwise. """ initial_time = time.time() arduino_port = None attempted = False while not attempted or time.time() - initial_time < timeout and not arduino_port: attempted = True ports = serial.tools.list_ports.comports(include_links=False) for port in ports: <----SKIPPED LINES----> location using its last-known position in the flight's lat / lon / speed / altitude / and track. Those attributes may have already been updated by messageboard using a more recently obtained radio signal from dump1090 than that in the canonical location, and if so, key flight_loc_now indicates the time at which those locations are current as of. Args: flight: dictionary of flight attributes. Returns: Returns a tuple of the azimuth and altitude, in degrees, at the current system time. """ geo_time = flight.get('flight_loc_now') lat = flight.get('lat') lon = flight.get('lon') speed = flight.get('speed') altitude = flight.get('altitude') track = flight.get('track') unused_vertrate = flight.get('vertrate') if all([isinstance(x, numbers.Number) for x in (lat, lon, speed, altitude, track)]): meters_traveled = messageboard.MetersTraveled(speed, time.time() - geo_time) new_position = messageboard.TrajectoryLatLon((lat, lon), meters_traveled, track) angles = messageboard.Angles( messageboard.HOME, messageboard.HOME_ALT, new_position, altitude / messageboard.FEET_IN_METER) azimuth = angles['azimuth_degrees'] altitude = angles['altitude_degrees'] return (azimuth, altitude) return None |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 | <----SKIPPED LINES----> txfer_obj, (truncated_string_b, ), format_string, max_length, start_pos=start_pos) def StuffObject( txfer_obj, val, format_string, object_byte_size, start_pos=0): """Insert an object into pySerialtxfer TX buffer starting at the specified index. Args: txfer_obj: txfer - Transfer class instance to communicate over serial val: tuple of values to be inserted into TX buffer format_string: string used with struct.pack to pack the val object_byte_size: integer number of bytes of the object to pack start_pos: index of the last byte of the float in the TX buffer + 1 Returns: start_pos for next object """ val_bytes = struct.pack(format_string, *val) if len(val) > 1 and VERBOSE: print('SENT (%d byte struct): %s' % (object_byte_size, str(val))) if not SIMULATE_ARDUINO: try: for index in range(object_byte_size): txfer_obj.txBuff[index + start_pos] = val_bytes[index] except (OSError, serial.serialutil.SerialException) as e: LogMessage('Error %s in StuffObject with val %s and txfer_obj %s' % (e, str(val), str(txfer_obj))) return start_pos + index return object_byte_size + start_pos def OpenArduino(timeout=0, baud=115200, manufacturer='arduino', sn=None): """Finds a USB device with Arduino as its mfg and returns an open connection to it. Args: baud: The connection speed. timeout: Max duration in seconds that we should wait to establish a connection. Returns: Open link to the Arduino if found and opened; None otherwise. """ initial_time = time.time() arduino_port = None attempted = False while not attempted or time.time() - initial_time < timeout and not arduino_port: attempted = True ports = serial.tools.list_ports.comports(include_links=False) for port in ports: <----SKIPPED LINES----> location using its last-known position in the flight's lat / lon / speed / altitude / and track. Those attributes may have already been updated by messageboard using a more recently obtained radio signal from dump1090 than that in the canonical location, and if so, key flight_loc_now indicates the time at which those locations are current as of. Args: flight: dictionary of flight attributes. Returns: Returns a tuple of the azimuth and altitude, in degrees, at the current system time. """ geo_time = flight.get('flight_loc_now') lat = flight.get('lat') lon = flight.get('lon') speed = flight.get('speed') altitude = flight.get('altitude') track = flight.get('track') unused_vertrate = flight.get('vertrate') if all([isinstance(x, numbers.Number) for x in (lat, lon, speed, altitude, track)]): elapsed_time = time.time() - geo_time meters_traveled = messageboard.MetersTraveled(speed, elapsed_time) new_position = messageboard.TrajectoryLatLon((lat, lon), meters_traveled, track) distance = messageboard.HaversineDistanceMeters(new_position, messageboard.HOME) angles = messageboard.Angles( messageboard.HOME, messageboard.HOME_ALT, new_position, altitude / messageboard.FEET_IN_METER) azimuth = angles['azimuth_degrees'] altitude = angles['altitude_degrees'] return (azimuth, altitude) return None |