arduino-2020-07-17-0822.py
01234567890123456789012345678901234567890123456789012345678901234567890123456789









756757758759760761762763764765766767768769770771772773774775 776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812 813814815816817818819820821822823824825826827828829830831832








866867868869870871872873874875876877878879880881882883884885886887888889890891892893 894     895896897898  899900901902903904905906 907908909910911912913914915916917918919920921922923924925926








936937938939940941942943944945946947948949950951952953954955 956957  958959960                    961962963964965966967968969970971972973974975976977978979980981 982983984985986987988989990 991992993994995996997998 9991000100110021003100410051006100710081009101010111012101310141015101610171018








15901591159215931594159515961597159815991600160116021603160416051606160716081609 1610161116121613161416151616161716181619



                            <----SKIPPED LINES---->




  for intensity in range(0, 101, 5):
    b = intensity / 100
    SendLEDTestMessage(*comms_args, *GammaRGB(rgb, gamma, brightness=b))


def ServoTestSweep(link, write_keys, write_format_tuple, altitude=45):
  """Sweep red laser around 360 degrees."""
  for azimuth in range(0, 360, 10):

    message_dict = GenerateServoMessage(
        laser=LASER_RED, angles=(azimuth, altitude))
    link.Write(DictToValueTuple(message_dict, write_keys, write_format_tuple))

    time.sleep(WRITE_DELAY_TIME)


last_angles = (0, 0)
last_laser = LASER_OFF
last_led = LED_OFF
def GenerateServoMessage(

    angles=None,
    laser=None,
    led=None,
    reset=False):
  """Creates a dictionary of messages for servo arduino.

  All values are optional; if not provided, the cached value last sent to the
  Arduino is used.
  """
  if angles:  # if angles provided, update the cache
    global last_angles
    last_angles = angles
  else:  # if angles not provided, use last-provided angles
    angles = last_angles

  if laser:
    global last_laser
    last_laser = laser
  else:
    laser = last_laser

  if led:
    global last_led
    last_led = led
  else:
    led = last_led

  d = {}
  d['azimuth'] = angles[0]
  d['altitude'] = angles[1]
  d['laser_red'] = laser[0]
  d['laser_green'] = laser[1]
  d['laser_blue'] = laser[2]
  d['led_red'] = led[0]
  d['led_green'] = led[1]
  d['led_blue'] = led[2]
  d['arduino_reset'] = reset


  return d


def HexColorToRGBTuple(hex_color):
  """Converts i.e.: #329a43 to (50, 154, 67)."""
  r = hex_color[1:3]
  g = hex_color[3:5]
  b = hex_color[5:7]
  return (int(r, 16), int(g, 16), int(b, 16))


def Gamma(c, gamma, max_value=MAX_PWM):
  """Converts a desired brightness 0..max to a gamma-corrected PWM 0..max."""
  # Based on https://hackaday.com/2016/08/23/
  # rgb-leds-how-to-master-gamma-and-hue-for-perfect-brightness/
  return ((c / max_value) ** gamma) * max_value


def PerceivedBrightness(rgb, gamma):




                            <----SKIPPED LINES---->






def ServoMain(to_arduino_q, to_parent_q, shutdown):
  """Main servo controller for projecting the plane position on a hemisphere.

  Takes the latest flight from the to_arduino_q and converts that to the current
  azimuth and altitude of the plane on a hemisphere.
  """
  sys.stderr = open(messageboard.STDERR_FILE, 'a')

  Log('Process started with process id %d' % os.getpid())

  # Ensures that the child can exit if the parent exits unexpectedly
  # docs.python.org/2/library/multiprocessing.html
  # #multiprocessing.Queue.cancel_join_thread
  to_arduino_q.cancel_join_thread()
  to_parent_q.cancel_join_thread()

  #pylint: disable = bad-whitespace
  write_config = (
      ('azimuth',       'f'),  # 4 bytes
      ('altitude',      'f'),  # 4 bytes
      ('laser_red',     '?'),  # 1 byte
      ('laser_green',   '?'),  # 1 byte
      ('laser_blue',    '?'),  # 1 byte
      ('led_red',       'H'),  # 2 bytes
      ('led_green',     'H'),  # 2 bytes
      ('led_blue',      'H'),  # 2 bytes

      ('arduino_reset', '?'),  # 1 byte





  )
  #pylint: enable = bad-whitespace
  write_keys, write_format_tuple, write_format_string = SplitFormat(
      write_config)



  # write_format: azimuth, altitude, R, G, & B intensity
  # read heartbeat: millis
  link = Serial(
      *SERVO_CONNECTION, read_timeout=60,
      error_pin=messageboard.GPIO_ERROR_ARDUINO_SERVO_CONNECTION,
      to_parent_q=to_parent_q,
      read_format='l', write_format=write_format_string, name='Servo')

  link.Open()

  last_flight = {}
  flight, json_desc_dict, configuration, additional_attr = InitialMessageValues(
      to_arduino_q)
  next_read = 0
  next_write = 0
  now = GetNow(json_desc_dict, additional_attr)
  gamma = GetGamma(configuration)
  brightness = 1.0
  brightness_step = 0.1

  while not shutdown.value:
    SetLoggingGlobals(configuration)
    if not to_arduino_q.empty():
      flight, json_desc_dict, configuration, additional_attr = to_arduino_q.get(
          block=False)

      if 'test_servos_ordinal' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_ordinal')




                            <----SKIPPED LINES---->




        ServoTestDim(
            link, write_keys, write_format_tuple,
            HexColorToRGBTuple(configuration['led_color']), gamma)

      gamma = GetGamma(configuration)
      new_flight = DifferentFlights(flight, last_flight)
      if new_flight:
        Log('Flight changed from %s to %s' % (
            messageboard.DisplayFlightNumber(last_flight),
            messageboard.DisplayFlightNumber(flight)
        ), ser=link)

        # Turn off laser so line isn't traced while it moves to new position
        message_dict = GenerateServoMessage(laser=LASER_OFF)
        message_tuple = DictToValueTuple(
            message_dict, write_keys, write_format_tuple)
        link.Write(message_tuple)

      last_flight = flight


    if time.time() >= next_read:
      heartbeat = link.Read()  # simple ack message sent by servos


      next_read = time.time() + READ_DELAY_TIME_SERVO
      if heartbeat and VERBOSE:
        Log('Heartbeat read by Servo: %s' % str(heartbeat))





















    now = GetNow(json_desc_dict, additional_attr)

    current_angles = AzimuthAltitude(flight, now)
    if time.time() > next_write:
      if (current_angles and
          current_angles[1] >=
          configuration['minimum_altitude_servo_tracking'] and
          configuration['servo_mode'] in ('laser_only', 'both')):

        if VERBOSE:
          Log('Flight #: %s current_angles: %s' % (
              messageboard.DisplayFlightNumber(flight), str(current_angles)))

        laser_rgb = LaserRGBFlight(flight)
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(

            laser=laser_rgb, angles=current_angles, led=rgb_tuple)

      elif configuration['servo_mode'] == 'laser_only':
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(laser=LASER_OFF, led=rgb_tuple)


      else:
        if brightness < 1:
          brightness += brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(laser=LASER_OFF, led=rgb_tuple)


      message_tuple = DictToValueTuple(
          message_dict, write_keys, write_format_tuple)
      link.Write(message_tuple)

      next_write = time.time() + WRITE_DELAY_TIME

  # One final write telling Arduino to do a software reset
  message_dict = GenerateServoMessage(laser=LASER_OFF, reset=True)
  message_tuple = DictToValueTuple(message_dict, write_keys, write_format_tuple)
  link.Write(message_tuple)

  link.Close(SHUTDOWN_TEXT)


def LaserRGBFlight(flight):
  """Based on flight attributes, set the laser."""
  # Possible assignment based on:
  #   - ascending / descending / level
  #   - to SFO / from SFO / other




                            <----SKIPPED LINES---->




          SendRemoteMessage(
              flight, json_desc_dict, configuration, additional_attr,
              m, write_keys, write_format_tuple, link)
          time.sleep(1)

        TestDisplayMode(DISP_LAST_FLIGHT_NUMB_ORIG_DEST)
        TestDisplayMode(DISP_LAST_FLIGHT_AZIMUTH_ELEVATION)
        TestDisplayMode(DISP_FLIGHT_COUNT_LAST_SEEN)
        TestDisplayMode(DISP_RADIO_RANGE)

    if time.time() >= next_write:
      next_write = SendRemoteMessage(
          flight, json_desc_dict, configuration, additional_attr,
          display_mode, write_keys, write_format_tuple, link)

    if time.time() >= next_read:
      bytes_read = []
      values_t = link.Read(bytes_read=bytes_read)
      values_d = dict(zip(read_keys, values_t))
      if values_d.get('confirmed'):

        Log(str(bytes_read) + '\n' + str(values_t) + '\n' + str(values_d))
        display_mode, low_batt = ExecuteArduinoCommand(
            values_d, configuration, display_mode, low_batt, to_parent_q, link)
      next_read = time.time() + READ_DELAY_TIME_REMOTE

  # send one final message telling Arduino to do a reset
  SendRemoteMessage(
      flight, json_desc_dict, configuration, additional_attr,
      display_mode, write_keys, write_format_tuple, link, reset=True)
  link.Close(SHUTDOWN_TEXT)

01234567890123456789012345678901234567890123456789012345678901234567890123456789









756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834








868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937








94794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055








1627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657



                            <----SKIPPED LINES---->




  for intensity in range(0, 101, 5):
    b = intensity / 100
    SendLEDTestMessage(*comms_args, *GammaRGB(rgb, gamma, brightness=b))


def ServoTestSweep(link, write_keys, write_format_tuple, altitude=45):
  """Sweep red laser around 360 degrees."""
  for azimuth in range(0, 360, 10):

    message_dict = GenerateServoMessage(
        laser=LASER_RED, angles=(azimuth, altitude))
    link.Write(DictToValueTuple(message_dict, write_keys, write_format_tuple))

    time.sleep(WRITE_DELAY_TIME)


last_angles = (0, 0)
last_laser = LASER_OFF
last_led = LED_OFF
def GenerateServoMessage(
    mode_switch_ack=False,
    angles=None,
    laser=None,
    led=None,
    reset=False):
  """Creates a dictionary of messages for servo arduino.

  All values are optional; if not provided, the cached value last sent to the
  Arduino is used.
  """
  if angles:  # if angles provided, update the cache
    global last_angles
    last_angles = angles
  else:  # if angles not provided, use last-provided angles
    angles = last_angles

  if laser:
    global last_laser
    last_laser = laser
  else:
    laser = last_laser

  if led:
    global last_led
    last_led = led
  else:
    led = last_led

  d = {}
  d['azimuth'] = angles[0]
  d['altitude'] = angles[1]
  d['laser_red'] = laser[0]
  d['laser_green'] = laser[1]
  d['laser_blue'] = laser[2]
  d['led_red'] = led[0]
  d['led_green'] = led[1]
  d['led_blue'] = led[2]
  d['arduino_reset'] = reset
  d['mode_switch_ack'] = mode_switch_ack

  return d


def HexColorToRGBTuple(hex_color):
  """Converts i.e.: #329a43 to (50, 154, 67)."""
  r = hex_color[1:3]
  g = hex_color[3:5]
  b = hex_color[5:7]
  return (int(r, 16), int(g, 16), int(b, 16))


def Gamma(c, gamma, max_value=MAX_PWM):
  """Converts a desired brightness 0..max to a gamma-corrected PWM 0..max."""
  # Based on https://hackaday.com/2016/08/23/
  # rgb-leds-how-to-master-gamma-and-hue-for-perfect-brightness/
  return ((c / max_value) ** gamma) * max_value


def PerceivedBrightness(rgb, gamma):




                            <----SKIPPED LINES---->






def ServoMain(to_arduino_q, to_parent_q, shutdown):
  """Main servo controller for projecting the plane position on a hemisphere.

  Takes the latest flight from the to_arduino_q and converts that to the current
  azimuth and altitude of the plane on a hemisphere.
  """
  sys.stderr = open(messageboard.STDERR_FILE, 'a')

  Log('Process started with process id %d' % os.getpid())

  # Ensures that the child can exit if the parent exits unexpectedly
  # docs.python.org/2/library/multiprocessing.html
  # #multiprocessing.Queue.cancel_join_thread
  to_arduino_q.cancel_join_thread()
  to_parent_q.cancel_join_thread()

  #pylint: disable = bad-whitespace
  write_config = (
      ('azimuth',         'f'),  # 4 bytes
      ('altitude',        'f'),  # 4 bytes
      ('laser_red',       '?'),  # 1 byte
      ('laser_green',     '?'),  # 1 byte
      ('laser_blue',      '?'),  # 1 byte
      ('led_red',         'H'),  # 2 bytes
      ('led_green',       'H'),  # 2 bytes
      ('led_blue',        'H'),  # 2 bytes
      ('mode_switch_ack', '?'),  # 1 byte
      ('arduino_reset',   '?'),  # 1 byte
  )

  read_config = (
      ('mode_switch', '?'),  # 1 byte
      ('millis',      'L'),  # 4 bytes
  )
  #pylint: enable = bad-whitespace
  write_keys, write_format_tuple, write_format_string = SplitFormat(
      write_config)
  read_keys, unused_read_format_tuple, read_format_string = SplitFormat(
      read_config)

  # write_format: azimuth, altitude, R, G, & B intensity
  # read heartbeat: mode change & millis
  link = Serial(
      *SERVO_CONNECTION, read_timeout=60,
      error_pin=messageboard.GPIO_ERROR_ARDUINO_SERVO_CONNECTION,
      to_parent_q=to_parent_q,
      read_format=read_format_string, write_format=write_format_string,
      name='Servo')
  link.Open()

  last_flight = {}
  flight, json_desc_dict, configuration, additional_attr = InitialMessageValues(
      to_arduino_q)
  next_read = 0
  next_write = 0
  now = GetNow(json_desc_dict, additional_attr)
  gamma = GetGamma(configuration)
  brightness = 1.0
  brightness_step = 0.1

  while not shutdown.value:
    SetLoggingGlobals(configuration)
    if not to_arduino_q.empty():
      flight, json_desc_dict, configuration, additional_attr = to_arduino_q.get(
          block=False)

      if 'test_servos_ordinal' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_ordinal')




                            <----SKIPPED LINES---->




        ServoTestDim(
            link, write_keys, write_format_tuple,
            HexColorToRGBTuple(configuration['led_color']), gamma)

      gamma = GetGamma(configuration)
      new_flight = DifferentFlights(flight, last_flight)
      if new_flight:
        Log('Flight changed from %s to %s' % (
            messageboard.DisplayFlightNumber(last_flight),
            messageboard.DisplayFlightNumber(flight)
        ), ser=link)

        # Turn off laser so line isn't traced while it moves to new position
        message_dict = GenerateServoMessage(laser=LASER_OFF)
        message_tuple = DictToValueTuple(
            message_dict, write_keys, write_format_tuple)
        link.Write(message_tuple)

      last_flight = flight

    mode_switch_ack = False
    if time.time() >= next_read:
      bytes_read = []
      values_t = link.Read(bytes_read=bytes_read)
      values_d = dict(zip(read_keys, values_t))
      next_read = time.time() + READ_DELAY_TIME_SERVO
      if bytes_read and VERBOSE:
        Log('Read by Servo: %s' % str(values_d))

      if values_d['mode_switch']:

        # makes a copy so as to not modify underlying config; we don't want
        # to modify underlying because otherwise the settings will bounce
        # around (values read from disk -> new values set by arduino -> old
        # values from disk -> new values from disk after Arduino update).
        configuration = dict(configuration)

        # LED only -> Laser only -> Both
        sequence = ('led_only', 'laser_only', 'both')
        old_name = configuration['servo_mode']
        old_id = sequence.index(old_name)
        new_id = (old_id + 1) % len(sequence)
        new_name = sequence[new_id]
        Log('Hemisphere mode updated from %s to %s' % (old_name, new_name))
        configuration['servo_mode'] = new_name
        settings_string = messageboard.BuildSettings(configuration)
        to_parent_q.put(('update_configuration', (settings_string, )))
        mode_switch_ack = True

    now = GetNow(json_desc_dict, additional_attr)

    current_angles = AzimuthAltitude(flight, now)
    if time.time() > next_write:
      if (current_angles and
          current_angles[1] >=
          configuration['minimum_altitude_servo_tracking'] and
          configuration['servo_mode'] in ('laser_only', 'both')):

        if VERBOSE:
          Log('Flight #: %s current_angles: %s' % (
              messageboard.DisplayFlightNumber(flight), str(current_angles)))

        laser_rgb = LaserRGBFlight(flight)
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack,
            laser=laser_rgb, angles=current_angles, led=rgb_tuple)

      elif configuration['servo_mode'] == 'laser_only':
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack, laser=LASER_OFF, led=rgb_tuple)

      else:
        if brightness < 1:
          brightness += brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack, laser=LASER_OFF, led=rgb_tuple)

      message_tuple = DictToValueTuple(
          message_dict, write_keys, write_format_tuple)
      link.Write(message_tuple)

      next_write = time.time() + WRITE_DELAY_TIME

  # One final write telling Arduino to do a software reset
  message_dict = GenerateServoMessage(laser=LASER_OFF, reset=True)
  message_tuple = DictToValueTuple(message_dict, write_keys, write_format_tuple)
  link.Write(message_tuple)

  link.Close(SHUTDOWN_TEXT)


def LaserRGBFlight(flight):
  """Based on flight attributes, set the laser."""
  # Possible assignment based on:
  #   - ascending / descending / level
  #   - to SFO / from SFO / other




                            <----SKIPPED LINES---->




          SendRemoteMessage(
              flight, json_desc_dict, configuration, additional_attr,
              m, write_keys, write_format_tuple, link)
          time.sleep(1)

        TestDisplayMode(DISP_LAST_FLIGHT_NUMB_ORIG_DEST)
        TestDisplayMode(DISP_LAST_FLIGHT_AZIMUTH_ELEVATION)
        TestDisplayMode(DISP_FLIGHT_COUNT_LAST_SEEN)
        TestDisplayMode(DISP_RADIO_RANGE)

    if time.time() >= next_write:
      next_write = SendRemoteMessage(
          flight, json_desc_dict, configuration, additional_attr,
          display_mode, write_keys, write_format_tuple, link)

    if time.time() >= next_read:
      bytes_read = []
      values_t = link.Read(bytes_read=bytes_read)
      values_d = dict(zip(read_keys, values_t))
      if values_d.get('confirmed'):
        if VERBOSE:
          Log(str(bytes_read) + '\n' + str(values_t) + '\n' + str(values_d))
        display_mode, low_batt = ExecuteArduinoCommand(
            values_d, configuration, display_mode, low_batt, to_parent_q, link)
      next_read = time.time() + READ_DELAY_TIME_REMOTE

  # send one final message telling Arduino to do a reset
  SendRemoteMessage(
      flight, json_desc_dict, configuration, additional_attr,
      display_mode, write_keys, write_format_tuple, link, reset=True)
  link.Close(SHUTDOWN_TEXT)