arduino-2020-07-17-0848.py
01234567890123456789012345678901234567890123456789012345678901234567890123456789









885886887888889890891892893894895896897898899900901902903904  905906907908909910911912913914915916917918919920921922923924925926927928  929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993 99499599699799899910001001100210031004100510061007100810091010101110121013








10161017101810191020102110221023102410251026102710281029103010311032103310341035  10361037103810391040104110421043104410451046104710481049105010511052105310541055











                            <----SKIPPED LINES---->





  #pylint: disable = bad-whitespace
  write_config = (
      ('azimuth',         'f'),  # 4 bytes
      ('altitude',        'f'),  # 4 bytes
      ('laser_red',       '?'),  # 1 byte
      ('laser_green',     '?'),  # 1 byte
      ('laser_blue',      '?'),  # 1 byte
      ('led_red',         'H'),  # 2 bytes
      ('led_green',       'H'),  # 2 bytes
      ('led_blue',        'H'),  # 2 bytes
      ('mode_switch_ack', '?'),  # 1 byte
      ('arduino_reset',   '?'),  # 1 byte
  )

  read_config = (
      ('mode_switch', '?'),  # 1 byte
      ('millis',      'L'),  # 4 bytes
  )
  #pylint: enable = bad-whitespace


  write_keys, write_format_tuple, write_format_string = SplitFormat(
      write_config)
  read_keys, read_format_tuple, read_format_string = SplitFormat(
      read_config)

  # write_format: azimuth, altitude, R, G, & B intensity
  # read heartbeat: mode change & millis
  link = Serial(
      *SERVO_CONNECTION, read_timeout=60,
      error_pin=messageboard.GPIO_ERROR_ARDUINO_SERVO_CONNECTION,
      to_parent_q=to_parent_q,
      read_format=read_format_string, write_format=write_format_string,
      name='Servo')
  link.Open()

  last_flight = {}
  flight, json_desc_dict, configuration, additional_attr = InitialMessageValues(
      to_arduino_q)
  next_read = 0
  next_write = 0
  now = GetNow(json_desc_dict, additional_attr)
  gamma = GetGamma(configuration)
  brightness = 1.0
  brightness_step = 0.1



  while not shutdown.value:
    SetLoggingGlobals(configuration)
    if not to_arduino_q.empty():
      flight, json_desc_dict, configuration, additional_attr = to_arduino_q.get(
          block=False)

      if 'test_servos_ordinal' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_ordinal')
        ServoTestOrdinal(link, write_keys, write_format_tuple)
      elif 'test_servos_sweep' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_sweep')
        ServoTestSweep(link, write_keys, write_format_tuple)
      elif 'test_servos_rgb' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_rgb')
        ServoTestRgb(link, write_keys, write_format_tuple)
      elif 'test_servos_dim' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_dim')
        ServoTestDim(
            link, write_keys, write_format_tuple,
            HexColorToRGBTuple(configuration['led_color']), gamma)

      gamma = GetGamma(configuration)
      new_flight = DifferentFlights(flight, last_flight)
      if new_flight:
        Log('Flight changed from %s to %s' % (
            messageboard.DisplayFlightNumber(last_flight),
            messageboard.DisplayFlightNumber(flight)
        ), ser=link)

        # Turn off laser so line isn't traced while it moves to new position
        message_dict = GenerateServoMessage(laser=LASER_OFF)
        message_tuple = DictToValueTuple(
            message_dict, write_keys, write_format_tuple)
        link.Write(message_tuple)

      last_flight = flight

    mode_switch_ack = False
    if time.time() >= next_read:
      bytes_read = []
      values_t = link.Read(bytes_read=bytes_read)
      values_d = dict(zip(read_keys, values_t))
      next_read = time.time() + READ_DELAY_TIME_SERVO
      if bytes_read and VERBOSE:
        Log('Read by Servo: %s' % str(values_d))

      if bytes_read and values_d['mode_switch']:

        # makes a copy so as to not modify underlying config; we don't want
        # to modify underlying because otherwise the settings will bounce
        # around (values read from disk -> new values set by arduino -> old
        # values from disk -> new values from disk after Arduino update).
        configuration = dict(configuration)

        # LED only -> Laser only -> Both
        sequence = ('led_only', 'laser_only', 'both')
        old_name = configuration['servo_mode']
        old_id = sequence.index(old_name)
        new_id = (old_id + 1) % len(sequence)
        new_name = sequence[new_id]
        Log('Hemisphere mode updated from %s to %s' % (old_name, new_name))
        configuration['servo_mode'] = new_name
        settings_string = messageboard.BuildSettings(configuration)
        to_parent_q.put(('update_configuration', (settings_string, )))

        mode_switch_ack = True

    now = GetNow(json_desc_dict, additional_attr)

    current_angles = AzimuthAltitude(flight, now)
    if time.time() > next_write:
      if (current_angles and
          current_angles[1] >=
          configuration['minimum_altitude_servo_tracking'] and
          configuration['servo_mode'] in ('laser_only', 'both')):

        if VERBOSE:
          Log('Flight #: %s current_angles: %s' % (
              messageboard.DisplayFlightNumber(flight), str(current_angles)))

        laser_rgb = LaserRGBFlight(flight)
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),




                            <----SKIPPED LINES---->




            mode_switch_ack=mode_switch_ack,
            laser=laser_rgb, angles=current_angles, led=rgb_tuple)

      elif configuration['servo_mode'] == 'laser_only':
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack, laser=LASER_OFF, led=rgb_tuple)

      else:
        if brightness < 1:
          brightness += brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack, laser=LASER_OFF, led=rgb_tuple)



      message_tuple = DictToValueTuple(
          message_dict, write_keys, write_format_tuple)
      link.Write(message_tuple)

      next_write = time.time() + WRITE_DELAY_TIME

  # One final write telling Arduino to do a software reset
  message_dict = GenerateServoMessage(laser=LASER_OFF, reset=True)
  message_tuple = DictToValueTuple(message_dict, write_keys, write_format_tuple)
  link.Write(message_tuple)

  link.Close(SHUTDOWN_TEXT)


def LaserRGBFlight(flight):
  """Based on flight attributes, set the laser."""
  # Possible assignment based on:
  #   - ascending / descending / level
  #   - to SFO / from SFO / other




                            <----SKIPPED LINES---->





01234567890123456789012345678901234567890123456789012345678901234567890123456789









885886887888889890891892893894895896897898899900901902903904905906907908  909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968 9699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015








101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059











                            <----SKIPPED LINES---->





  #pylint: disable = bad-whitespace
  write_config = (
      ('azimuth',         'f'),  # 4 bytes
      ('altitude',        'f'),  # 4 bytes
      ('laser_red',       '?'),  # 1 byte
      ('laser_green',     '?'),  # 1 byte
      ('laser_blue',      '?'),  # 1 byte
      ('led_red',         'H'),  # 2 bytes
      ('led_green',       'H'),  # 2 bytes
      ('led_blue',        'H'),  # 2 bytes
      ('mode_switch_ack', '?'),  # 1 byte
      ('arduino_reset',   '?'),  # 1 byte
  )

  read_config = (
      ('mode_switch', '?'),  # 1 byte
      ('millis',      'L'),  # 4 bytes
  )
  #pylint: enable = bad-whitespace
  read_keys, unused_read_format_tuple, read_format_string = SplitFormat(
      read_config)
  write_keys, write_format_tuple, write_format_string = SplitFormat(
      write_config)



  # write_format: azimuth, altitude, R, G, & B intensity
  # read heartbeat: mode change & millis
  link = Serial(
      *SERVO_CONNECTION, read_timeout=60,
      error_pin=messageboard.GPIO_ERROR_ARDUINO_SERVO_CONNECTION,
      to_parent_q=to_parent_q,
      read_format=read_format_string, write_format=write_format_string,
      name='Servo')
  link.Open()

  last_flight = {}
  flight, json_desc_dict, configuration, additional_attr = InitialMessageValues(
      to_arduino_q)
  next_read = 0
  next_write = 0
  now = GetNow(json_desc_dict, additional_attr)
  gamma = GetGamma(configuration)
  brightness = 1.0
  brightness_step = 0.1

  mode_switch_ack = False

  while not shutdown.value:
    SetLoggingGlobals(configuration)
    if not to_arduino_q.empty():
      flight, json_desc_dict, configuration, additional_attr = to_arduino_q.get(
          block=False)

      if 'test_servos_ordinal' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_ordinal')
        ServoTestOrdinal(link, write_keys, write_format_tuple)
      elif 'test_servos_sweep' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_sweep')
        ServoTestSweep(link, write_keys, write_format_tuple)
      elif 'test_servos_rgb' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_rgb')
        ServoTestRgb(link, write_keys, write_format_tuple)
      elif 'test_servos_dim' in configuration:
        messageboard.RemoveSetting(configuration, 'test_servos_dim')
        ServoTestDim(
            link, write_keys, write_format_tuple,
            HexColorToRGBTuple(configuration['led_color']), gamma)

      gamma = GetGamma(configuration)
      new_flight = DifferentFlights(flight, last_flight)
      if new_flight:
        Log('Flight changed from %s to %s' % (
            messageboard.DisplayFlightNumber(last_flight),
            messageboard.DisplayFlightNumber(flight)
        ), ser=link)

        # Turn off laser so line isn't traced while it moves to new position
        message_dict = GenerateServoMessage(laser=LASER_OFF)
        message_tuple = DictToValueTuple(
            message_dict, write_keys, write_format_tuple)
        link.Write(message_tuple)

      last_flight = flight


    if time.time() >= next_read:
      bytes_read = []
      values_t = link.Read(bytes_read=bytes_read)
      values_d = dict(zip(read_keys, values_t))
      next_read = time.time() + READ_DELAY_TIME_SERVO
      if bytes_read and VERBOSE:
        Log('Read by Servo: %s' % str(values_d))

      if bytes_read and values_d['mode_switch']:

        # makes a copy so as to not modify underlying config; we don't want
        # to modify underlying because otherwise the settings will bounce
        # around (values read from disk -> new values set by arduino -> old
        # values from disk -> new values from disk after Arduino update).
        configuration = dict(configuration)

        # LED only -> Laser only -> Both
        sequence = ('led_only', 'laser_only', 'both')
        old_name = configuration['servo_mode']
        old_id = sequence.index(old_name)
        new_id = (old_id + 1) % len(sequence)
        new_name = sequence[new_id]
        Log('Hemisphere mode updated from %s to %s' % (old_name, new_name))
        configuration['servo_mode'] = new_name
        settings_string = messageboard.BuildSettings(configuration)
        to_parent_q.put(('update_configuration', (settings_string, )))

        mode_switch_ack = True

    now = GetNow(json_desc_dict, additional_attr)

    current_angles = AzimuthAltitude(flight, now)
    if time.time() > next_write:
      if (current_angles and
          current_angles[1] >=
          configuration['minimum_altitude_servo_tracking'] and
          configuration['servo_mode'] in ('laser_only', 'both')):

        if VERBOSE:
          Log('Flight #: %s current_angles: %s' % (
              messageboard.DisplayFlightNumber(flight), str(current_angles)))

        laser_rgb = LaserRGBFlight(flight)
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),




                            <----SKIPPED LINES---->




            mode_switch_ack=mode_switch_ack,
            laser=laser_rgb, angles=current_angles, led=rgb_tuple)

      elif configuration['servo_mode'] == 'laser_only':
        if brightness > 0:
          brightness -= brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack, laser=LASER_OFF, led=rgb_tuple)

      else:
        if brightness < 1:
          brightness += brightness_step
        rgb_tuple = GammaRGB(
            HexColorToRGBTuple(configuration['led_color']),
            gamma, brightness=brightness)
        message_dict = GenerateServoMessage(
            mode_switch_ack=mode_switch_ack, laser=LASER_OFF, led=rgb_tuple)

      mode_switch_ack = False

      message_tuple = DictToValueTuple(
          message_dict, write_keys, write_format_tuple)
      link.Write(message_tuple)

      next_write = time.time() + WRITE_DELAY_TIME

  # One final write telling Arduino to do a software reset
  message_dict = GenerateServoMessage(laser=LASER_OFF, reset=True)
  message_tuple = DictToValueTuple(message_dict, write_keys, write_format_tuple)
  link.Write(message_tuple)

  link.Close(SHUTDOWN_TEXT)


def LaserRGBFlight(flight):
  """Based on flight attributes, set the laser."""
  # Possible assignment based on:
  #   - ascending / descending / level
  #   - to SFO / from SFO / other




                            <----SKIPPED LINES---->