01234567890123456789012345678901234567890123456789012345678901234567890123456789
7374757677787980818283848586878889909192 9394 9596979899100101102103104105106107108109110111112113114 167168169170171172173174175176177178179180181182183184185186187188189 190191192193194195196197198199200201202 |
<----SKIPPED LINES---->
txfer_obj, (truncated_string_b, ), format_string, max_length, start_pos=start_pos)
def StuffObject(
txfer_obj, val, format_string, object_byte_size, start_pos=0):
"""Insert an object into pySerialtxfer TX buffer starting at the specified index.
Args:
txfer_obj: txfer - Transfer class instance to communicate over serial
val: tuple of values to be inserted into TX buffer
format_string: string used with struct.pack to pack the val
object_byte_size: integer number of bytes of the object to pack
start_pos: index of the last byte of the float in the TX buffer + 1
Returns:
start_pos for next object
"""
val_bytes = struct.pack(format_string, *val)
if len(val) > 1 and VERBOSE:
print('SENT (%d byte struct): %s' % (object_byte_size, str(val)))
for index in range(object_byte_size):
txfer_obj.txBuff[index + start_pos] = val_bytes[index]
return object_byte_size + start_pos
def OpenArduino(timeout=0, baud=115200, manufacturer='arduino', sn=None):
"""Finds a USB device with Arduino as its mfg and returns an open connection to it.
Args:
baud: The connection speed.
timeout: Max duration in seconds that we should wait to establish a connection.
Returns:
Open link to the Arduino if found and opened; None otherwise.
"""
initial_time = time.time()
arduino_port = None
attempted = False
while not attempted or time.time() - initial_time < timeout and not arduino_port:
attempted = True
ports = serial.tools.list_ports.comports(include_links=False)
for port in ports:
<----SKIPPED LINES---->
location using its last-known position in the flight's lat / lon / speed / altitude /
and track. Those attributes may have already been updated by messageboard using a
more recently obtained radio signal from dump1090 than that in the canonical location,
and if so, key flight_loc_now indicates the time at which those locations are current
as of.
Args:
flight: dictionary of flight attributes.
Returns:
Returns a tuple of the azimuth and altitude, in degrees, at the current system time.
"""
geo_time = flight.get('flight_loc_now')
lat = flight.get('lat')
lon = flight.get('lon')
speed = flight.get('speed')
altitude = flight.get('altitude')
track = flight.get('track')
unused_vertrate = flight.get('vertrate')
if all([isinstance(x, numbers.Number) for x in (lat, lon, speed, altitude, track)]):
meters_traveled = messageboard.MetersTraveled(speed, time.time() - geo_time)
new_position = messageboard.TrajectoryLatLon((lat, lon), meters_traveled, track)
angles = messageboard.Angles(
messageboard.HOME,
messageboard.HOME_ALT,
new_position,
altitude / messageboard.FEET_IN_METER)
azimuth = angles['azimuth_degrees']
altitude = angles['altitude_degrees']
return (azimuth, altitude)
return None
|
01234567890123456789012345678901234567890123456789012345678901234567890123456789
737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
<----SKIPPED LINES---->
txfer_obj, (truncated_string_b, ), format_string, max_length, start_pos=start_pos)
def StuffObject(
txfer_obj, val, format_string, object_byte_size, start_pos=0):
"""Insert an object into pySerialtxfer TX buffer starting at the specified index.
Args:
txfer_obj: txfer - Transfer class instance to communicate over serial
val: tuple of values to be inserted into TX buffer
format_string: string used with struct.pack to pack the val
object_byte_size: integer number of bytes of the object to pack
start_pos: index of the last byte of the float in the TX buffer + 1
Returns:
start_pos for next object
"""
val_bytes = struct.pack(format_string, *val)
if len(val) > 1 and VERBOSE:
print('SENT (%d byte struct): %s' % (object_byte_size, str(val)))
if not SIMULATE_ARDUINO:
try:
for index in range(object_byte_size):
txfer_obj.txBuff[index + start_pos] = val_bytes[index]
except (OSError, serial.serialutil.SerialException) as e:
LogMessage('Error %s in StuffObject with val %s and txfer_obj %s' % (e, str(val), str(txfer_obj)))
return start_pos + index
return object_byte_size + start_pos
def OpenArduino(timeout=0, baud=115200, manufacturer='arduino', sn=None):
"""Finds a USB device with Arduino as its mfg and returns an open connection to it.
Args:
baud: The connection speed.
timeout: Max duration in seconds that we should wait to establish a connection.
Returns:
Open link to the Arduino if found and opened; None otherwise.
"""
initial_time = time.time()
arduino_port = None
attempted = False
while not attempted or time.time() - initial_time < timeout and not arduino_port:
attempted = True
ports = serial.tools.list_ports.comports(include_links=False)
for port in ports:
<----SKIPPED LINES---->
location using its last-known position in the flight's lat / lon / speed / altitude /
and track. Those attributes may have already been updated by messageboard using a
more recently obtained radio signal from dump1090 than that in the canonical location,
and if so, key flight_loc_now indicates the time at which those locations are current
as of.
Args:
flight: dictionary of flight attributes.
Returns:
Returns a tuple of the azimuth and altitude, in degrees, at the current system time.
"""
geo_time = flight.get('flight_loc_now')
lat = flight.get('lat')
lon = flight.get('lon')
speed = flight.get('speed')
altitude = flight.get('altitude')
track = flight.get('track')
unused_vertrate = flight.get('vertrate')
if all([isinstance(x, numbers.Number) for x in (lat, lon, speed, altitude, track)]):
elapsed_time = time.time() - geo_time
meters_traveled = messageboard.MetersTraveled(speed, elapsed_time)
new_position = messageboard.TrajectoryLatLon((lat, lon), meters_traveled, track)
distance = messageboard.HaversineDistanceMeters(new_position, messageboard.HOME)
angles = messageboard.Angles(
messageboard.HOME,
messageboard.HOME_ALT,
new_position,
altitude / messageboard.FEET_IN_METER)
azimuth = angles['azimuth_degrees']
altitude = angles['altitude_degrees']
return (azimuth, altitude)
return None
|