arduino-2020-06-12-1234.py
01234567890123456789012345678901234567890123456789012345678901234567890123456789









178179180181182183184185186187188189190191192193194195196197  198199200201202203204205206207208209210211212213214215216217218








274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314











                            <----SKIPPED LINES---->




    self.write_format = write_format
    self.read_timeout = read_timeout
    self.error_pin = error_pin
    self.to_parent_q = to_parent_q
    self.last_read = 0
    self.last_receipt = 0
    self.reset_flag = True
    self.link = None
    self.name = name

    self.__simulated_reads__ = None

    if self.connection_type == CONNECTION_FLAG_BLUETOOTH:
      self.open_function = OpenBluetooth
    elif self.connection_type == CONNECTION_FLAG_USB:
      self.open_function = OpenUSB
    elif self.connection_type == CONNECTION_FLAG_SIMULATED:
      self.open_function = None

    if self.error_pin:  # Error turned on when main initiated; turned off when connected


      self.to_parent_q.put(('pin', (self.error_pin, True)))

    self.start_time = time.time()

  def __str__(self):
    return self.name

  def Open(self):
    """Opens an instantiated serial connection for reading and writing."""
    if self.connection_type == CONNECTION_FLAG_SIMULATED:
      lines = []
      if os.path.exists(self.connection_tuple[0]):
        with open(self.connection_tuple[0], 'r') as f:
          for line in f:
            if line.strip():
              lines.append(eval(line))  # pylint: disable=W0123
      else:
        Log('File %s does not exist for simulated commands to Arudino'
            % self.connection_tuple[0], self.link)
      self.__simulated_reads__ = lines





                            <----SKIPPED LINES---->




    Args:
      format_string: String of the form expected by struct.pack
      bytes_read: if passed, the bytes that are read are appended to the list.

    Returns:
      Tuple of values matching that as identified in format_string.
    """
    if self.connection_type == CONNECTION_FLAG_SIMULATED:
      if (
          self.__simulated_reads__ and
          time.time() - self.start_time > self.__simulated_reads__[0][0]):  # time for next
        next_line = self.__simulated_reads__.pop(0)
        return next_line[1]
      return ()

    if not format_string:
      format_string = self.read_format
    try:
      data = Read(self.link, format_string, bytes_read=bytes_read)
    except OSError as e:
      failure_message = 'Failed to read: %s' % e
      if self.error_pin:
        self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
      self.Reopen(log_message=failure_message)
      return self.Read(format_string=format_string)
    self.last_read = time.time()
    if data:
      self.last_receipt = time.time()
      if LOG_SERIALS:
        ts = time.time() - self.start_time
        str_data = str(['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
        with open(SERIALS_LOG, 'a') as f:
          f.write('%10.3f RECD@%s: %s\n' % (ts, self.name, str_data))
    if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout:
      failure_message = 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
          self.last_read - self.last_receipt, self.read_timeout, self.name)
      if self.error_pin:
        self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
      self.Reopen(log_message=failure_message)
      return self.Read(format_string=format_string)





                            <----SKIPPED LINES---->





01234567890123456789012345678901234567890123456789012345678901234567890123456789









178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220








276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316











                            <----SKIPPED LINES---->




    self.write_format = write_format
    self.read_timeout = read_timeout
    self.error_pin = error_pin
    self.to_parent_q = to_parent_q
    self.last_read = 0
    self.last_receipt = 0
    self.reset_flag = True
    self.link = None
    self.name = name

    self.__simulated_reads__ = None

    if self.connection_type == CONNECTION_FLAG_BLUETOOTH:
      self.open_function = OpenBluetooth
    elif self.connection_type == CONNECTION_FLAG_USB:
      self.open_function = OpenUSB
    elif self.connection_type == CONNECTION_FLAG_SIMULATED:
      self.open_function = None

    if self.error_pin:  # Error turned on when main initiated; turned off when connected
      error_message = 'Process %s (%s) initialized into error state' % (os.getpid(), str(self.name))
      Log(error_message)
      self.to_parent_q.put(('pin', (self.error_pin, True, error_message)))

    self.start_time = time.time()

  def __str__(self):
    return self.name

  def Open(self):
    """Opens an instantiated serial connection for reading and writing."""
    if self.connection_type == CONNECTION_FLAG_SIMULATED:
      lines = []
      if os.path.exists(self.connection_tuple[0]):
        with open(self.connection_tuple[0], 'r') as f:
          for line in f:
            if line.strip():
              lines.append(eval(line))  # pylint: disable=W0123
      else:
        Log('File %s does not exist for simulated commands to Arudino'
            % self.connection_tuple[0], self.link)
      self.__simulated_reads__ = lines





                            <----SKIPPED LINES---->




    Args:
      format_string: String of the form expected by struct.pack
      bytes_read: if passed, the bytes that are read are appended to the list.

    Returns:
      Tuple of values matching that as identified in format_string.
    """
    if self.connection_type == CONNECTION_FLAG_SIMULATED:
      if (
          self.__simulated_reads__ and
          time.time() - self.start_time > self.__simulated_reads__[0][0]):  # time for next
        next_line = self.__simulated_reads__.pop(0)
        return next_line[1]
      return ()

    if not format_string:
      format_string = self.read_format
    try:
      data = Read(self.link, format_string, bytes_read=bytes_read)
    except OSError as e:
      failure_message = 'Failed to read from %s: %s' % (self.name, e)
      if self.error_pin:
        self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
      self.Reopen(log_message=failure_message)
      return self.Read(format_string=format_string)
    self.last_read = time.time()
    if data:
      self.last_receipt = time.time()
      if LOG_SERIALS:
        ts = time.time() - self.start_time
        str_data = str(['%7.2f' % d if isinstance(d, float) else str(d) for d in data])
        with open(SERIALS_LOG, 'a') as f:
          f.write('%10.3f RECD@%s: %s\n' % (ts, self.name, str_data))
    if self.read_timeout and self.last_read - self.last_receipt > self.read_timeout:
      failure_message = 'Heartbeat not received in %.2f seconds (expected: %.2f) on %s' % (
          self.last_read - self.last_receipt, self.read_timeout, self.name)
      if self.error_pin:
        self.to_parent_q.put(('pin', (self.error_pin, True, failure_message)))
      self.Reopen(log_message=failure_message)
      return self.Read(format_string=format_string)





                            <----SKIPPED LINES---->