messageboard-2022-09-12-1636.py
01234567890123456789012345678901234567890123456789012345678901234567890123456789









1617181920212223242526272829303132333435363738394041424344454647484950515253545556








19061907190819091910191119121913191419151916191719181919192019211922192319241925  19261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972








633563366337633863396340634163426343634463456346634763486349635063516352635363546355635663576358635963606361636263636364   63656366636763686369637063716372637363746375637663776378637963806381638263836384








63876388638963906391639263936394639563966397639863996400640164026403640464056406    640764086409641064116412641364146415641664176418       6419642064216422                                                64236424642564266427642864296430643164326433643464356436643764386439644064416442








65736574657565766577657865796580658165826583658465856586658765886589659065916592659365946595659665976598659966006601660266036604660566066607660866096610661166126613











                            <----SKIPPED LINES---->




import statistics
import sys
import textwrap
import time
import tracemalloc
import types

import bs4
import dateutils
import numpy
import matplotlib
import matplotlib.pyplot
import psutil
import pycurl
import pytz
import requests
import tzlocal
import unidecode

# pylint: disable=line-too-long
from constants import RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, KEY, SECRET, SUBSCRIPTION_ID
# pylint: enable=line-too-long

import arduino

if RASPBERRY_PI:
  import gpiozero  # pylint: disable=E0401
  import RPi.GPIO  # pylint: disable=E0401


VERBOSE = False  # additional messages logged

SHUTDOWN_SIGNAL = ''
REBOOT_SIGNAL = False

# to be tracked in the dashboard messages so that we know when a
# restart due to exit (vs. a long delay in some processing) happened
INSTANCE_START_TIME = time.time()

SIMULATION = False
SIMULATION_COUNTER = 0




                            <----SKIPPED LINES---->






last_query_time = 0
def GetFlightAwareJson(flight_number):
  """Scrapes the text json message from FlightAware for a given flight number.

  Given a flight number, loads the corresponding FlightAware webpage for that
  flight and extracts the relevant script that contains all the flight details
  from that page.  But only queries at most once per fixed period of time
  so as to avoid being blocked.

  Args:
    flight_number: text flight number (i.e.: SWA1234)

  Returns:
    Two tuple:
     - Text representation of the json message from FlightAware.
     - Text string of error message, if any
  """
  min_query_delay_seconds = 90


  global last_query_time
  url = 'https://flightaware.com/live/flight/' + flight_number

  seconds_since_last_query = time.time() - last_query_time
  if seconds_since_last_query < min_query_delay_seconds:
    error_msg = (
        'Unable to query FA for URL since last query to FA was only %d seconds '
        'ago; min of %d seconds needed: %s' % (
            seconds_since_last_query, min_query_delay_seconds, url))
    return '', error_msg

  last_query_time = time.time()
  #Log(
  #    'last_query_time: %d; time.time(): %d;'
  #    'seconds_since_last_query: %d; min_query_delay_seconds: %d' % (
  #        last_query_time, time.time(),
  #        seconds_since_last_query, min_query_delay_seconds))
  try:
    response = requests.get(url, timeout=5)
  except requests.exceptions.RequestException as e:
    error_msg = 'Unable to query FA for URL due to %s: %s' % (e, url)
    return '', error_msg
  soup = bs4.BeautifulSoup(response.text, 'html.parser')
  l = soup.find_all('script')
  flight_script = None
  for script in l:
    if "trackpollBootstrap" in str(script):
      flight_script = str(script)
      break
  if not flight_script:
    error_msg = (
        'Unable to find trackpollBootstrap script in page: ' + response.text)
    Log(error_msg)
    return '', error_msg
  first_open_curly_brace = flight_script.find('{')
  last_close_curly_brace = flight_script.rfind('}')
  flight_json = flight_script[first_open_curly_brace:last_close_curly_brace+1]
  return flight_json, ''


def Unidecode(s):
  """Convert a special unicode characters to closest ASCII representation."""
  if s is not None:
    s = unidecode.unidecode(s)
  return s






                            <----SKIPPED LINES---->




  expected_characters = SPLITFLAP_CHARS_PER_LINE * SPLITFLAP_LINE_COUNT
  missing_characters = max(0, expected_characters - len(message_array))
  if missing_characters:
    for unused_n in range(missing_characters):
      message_array.append(0)
  extra_characters = max(0, len(message_array) - expected_characters)
  if extra_characters:
    Log('Message "%s" is too long at %d characters (max %d characters)'
        % (s, len(message_array), expected_characters))
    message_array = message_array[:expected_characters]

  message_2d_array = []
  for line_num in range(SPLITFLAP_LINE_COUNT):
    message_2d_array.append(message_array[
        line_num * SPLITFLAP_CHARS_PER_LINE :
        (line_num + 1)*SPLITFLAP_CHARS_PER_LINE])

  return message_2d_array


def PublishMessage(
    s,
    subscription_id=SUBSCRIPTION_ID,
    key=KEY,
    secret=SECRET,
    timeout=5):
  """Publishes a text string to a Vestaboard.

  The message is pushed to the vestaboard splitflap display by way of its
  web services; see https://docs.vestaboard.com/introduction for more details.




  Args:
    s: String to publish.
    subscription_id: string subscription id from Vestaboard.
    key: string key from Vestaboard.
    secret: string secret from Vestaboard.
    timeout: Max duration in seconds that we should wait to establish a
      connection.
  """
  error_code = False
  curl = pycurl.Curl()

  # See https://stackoverflow.com/questions/31826814/
  # curl-post-request-into-pycurl-code
  # Set URL value
  curl.setopt(
      pycurl.URL,
      'https://platform.vestaboard.com/subscriptions/%s/message'
      % subscription_id)
  curl.setopt(pycurl.HTTPHEADER, [




                            <----SKIPPED LINES---->




  curl.setopt(pycurl.POST, 1)

  curl.setopt(pycurl.WRITEFUNCTION, lambda x: None) # to keep stdout clean

  # preparing body the way pycurl.READDATA wants it
  body_as_dict = {'characters': StringToCharArray(s)}
  body_as_json_string = json.dumps(body_as_dict) # dict to json
  body_as_file_object = io.StringIO(body_as_json_string)

  # prepare and send. See also: pycurl.READFUNCTION to pass function instead
  curl.setopt(pycurl.READDATA, body_as_file_object)
  curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string))
  failure_message = ''
  try:
    curl.perform()
  except pycurl.error as e:
    timing_message = CurlTimingDetailsToString(curl)
    failure_message = (
        'curl.perform() failed with message %s; timing details: %s' %
        (e, timing_message))




    Log(failure_message)
    error_code = True
  else:
    # you may want to check HTTP response code, e.g.
    timing_message = CurlTimingDetailsToString(curl)
    status_code = curl.getinfo(pycurl.RESPONSE_CODE)
    if status_code != 200:
      failure_message = (
          'Server returned HTTP status code %d for message %s; '
          'timing details: %s' % (status_code, s, timing_message))
      Log(failure_message)
      error_code = True








  curl.close()
  UpdateStatusLight(
      GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message)


















































def CurlTimingDetailsToString(curl):
  """Extracts timing details of a curl request into a readable string."""
  timing = {}
  timing['total-time'] = curl.getinfo(pycurl.TOTAL_TIME)
  timing['namelookup-time'] = curl.getinfo(pycurl.NAMELOOKUP_TIME)
  timing['connect-time'] = curl.getinfo(pycurl.CONNECT_TIME)
  timing['pretransfer-time'] = curl.getinfo(pycurl.PRETRANSFER_TIME)
  timing['redirect-time'] = curl.getinfo(pycurl.REDIRECT_TIME)
  timing['starttransfer-time'] = curl.getinfo(pycurl.STARTTRANSFER_TIME)
  results = [label + '=' + '%.4f' % timing[label] for label in timing]
  results = '; '.join(results)
  return results


def TruncateEscapedLine(s):
  """Formats a single line of the personal message for the Vestaboard.

  The Vestaboard has line length limitations, a limited character set,




                            <----SKIPPED LINES---->




      if message_type == FLAG_MSG_INSIGHT and not MessageMeetsDisplayCriteria(
          configuration):
        Log('Message %s purged')

      else:
        if isinstance(message_text, str):
          message_text = textwrap.wrap(
              message_text,
              width=SPLITFLAP_CHARS_PER_LINE)
        display_message = Screenify(message_text, False)
        Log(display_message, file=ALL_MESSAGE_FILE)

        # Saving this to disk allows us to identify
        # persistently whats currently on the screen
        PickleObjectToFile(message, PICKLE_SCREENS, True)
        screens.append(message)

        MaintainRollingWebLog(display_message, 25)
        if not SIMULATION:
          splitflap_message = Screenify(message_text, True)
          PublishMessage(splitflap_message)

    next_message_time = time.time() + configuration['setting_delay']
  return next_message_time


def DeleteMessageTypes(q, types_to_delete):
  """Delete messages from the queue if type is in the iterable types."""
  if VERBOSE:
    messages_to_delete = [m for m in q if m[0] in types_to_delete]
    if messages_to_delete:
      Log('Deleting messages from queue due to new-found plane: %s'
          % messages_to_delete)
  updated_q = [m for m in q if m[0] not in types_to_delete]
  return updated_q


def BootstrapInsightList(full_path=PICKLE_FLIGHTS):
  """(Re)populate flight pickle files with flight insight distributions.

  The set of insights generated for each flight is created at the time




                            <----SKIPPED LINES---->





01234567890123456789012345678901234567890123456789012345678901234567890123456789









1617181920212223242526272829303132333435363738394041424344454647484950515253545556








19061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928  1929193019311932193319341935193619371938    193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968








63316332633363346335633663376338633963406341634263436344634563466347634863496350635163526353635463556356635763586359636063616362636363646365636663676368636963706371637263736374637563766377637863796380638163826383








6386638763886389639063916392639363946395639663976398639964006401640264036404640564066407640864096410641164126413641464156416641764186419642064216422642364246425642664276428642964306431643264336434643564366437643864396440644164426443644464456446644764486449645064516452645364546455645664576458645964606461646264636464646564666467646864696470647164726473647464756476647764786479648064816482648364846485648664876488648964906491649264936494649564966497649864996500








66316632663366346635663666376638663966406641664266436644664566466647664866496650665166526653665466556656665766586659666066616662666366646665666666676668666966706671











                            <----SKIPPED LINES---->




import statistics
import sys
import textwrap
import time
import tracemalloc
import types

import bs4
import dateutils
import numpy
import matplotlib
import matplotlib.pyplot
import psutil
import pycurl
import pytz
import requests
import tzlocal
import unidecode

# pylint: disable=line-too-long
from constants import RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, KEY, SECRET, SUBSCRIPTION_ID, LOCAL_KEY, LOCAL_VESTABOARD_ADDRESS
# pylint: enable=line-too-long

import arduino

if RASPBERRY_PI:
  import gpiozero  # pylint: disable=E0401
  import RPi.GPIO  # pylint: disable=E0401


VERBOSE = False  # additional messages logged

SHUTDOWN_SIGNAL = ''
REBOOT_SIGNAL = False

# to be tracked in the dashboard messages so that we know when a
# restart due to exit (vs. a long delay in some processing) happened
INSTANCE_START_TIME = time.time()

SIMULATION = False
SIMULATION_COUNTER = 0




                            <----SKIPPED LINES---->






last_query_time = 0
def GetFlightAwareJson(flight_number):
  """Scrapes the text json message from FlightAware for a given flight number.

  Given a flight number, loads the corresponding FlightAware webpage for that
  flight and extracts the relevant script that contains all the flight details
  from that page.  But only queries at most once per fixed period of time
  so as to avoid being blocked.

  Args:
    flight_number: text flight number (i.e.: SWA1234)

  Returns:
    Two tuple:
     - Text representation of the json message from FlightAware.
     - Text string of error message, if any
  """
  min_query_delay_seconds = 90
  url = 'https://flightaware.com/live/flight/' + flight_number

  global last_query_time


  seconds_since_last_query = time.time() - last_query_time
  if last_query_time and seconds_since_last_query < min_query_delay_seconds:
    error_msg = (
        'Unable to query FA for URL since last query to FA was only %d seconds '
        'ago; min of %d seconds needed: %s' % (
            seconds_since_last_query, min_query_delay_seconds, url))
    return '', error_msg

  last_query_time = time.time()





  try:
    response = requests.get(url, timeout=5)
  except requests.exceptions.RequestException as e:
    error_msg = 'Unable to query FA for URL due to %s: %s' % (e, url)
    return '', error_msg
  soup = bs4.BeautifulSoup(response.text, 'html.parser')
  l = soup.find_all('script')
  flight_script = None
  for script in l:
    if 'trackpollBootstrap' in str(script):
      flight_script = str(script)
      break
  if not flight_script:
    error_msg = (
        'Unable to find trackpollBootstrap script in page: ' + response.text)
    Log(error_msg)
    return '', error_msg
  first_open_curly_brace = flight_script.find('{')
  last_close_curly_brace = flight_script.rfind('}')
  flight_json = flight_script[first_open_curly_brace:last_close_curly_brace+1]
  return flight_json, ''


def Unidecode(s):
  """Convert a special unicode characters to closest ASCII representation."""
  if s is not None:
    s = unidecode.unidecode(s)
  return s






                            <----SKIPPED LINES---->




  expected_characters = SPLITFLAP_CHARS_PER_LINE * SPLITFLAP_LINE_COUNT
  missing_characters = max(0, expected_characters - len(message_array))
  if missing_characters:
    for unused_n in range(missing_characters):
      message_array.append(0)
  extra_characters = max(0, len(message_array) - expected_characters)
  if extra_characters:
    Log('Message "%s" is too long at %d characters (max %d characters)'
        % (s, len(message_array), expected_characters))
    message_array = message_array[:expected_characters]

  message_2d_array = []
  for line_num in range(SPLITFLAP_LINE_COUNT):
    message_2d_array.append(message_array[
        line_num * SPLITFLAP_CHARS_PER_LINE :
        (line_num + 1)*SPLITFLAP_CHARS_PER_LINE])

  return message_2d_array


def PublishMessageWeb(
    s,
    subscription_id=SUBSCRIPTION_ID,
    key=KEY,
    secret=SECRET,
    timeout=5):
  """Publishes a text string to a Vestaboard.

  The message is pushed to the vestaboard splitflap display by way of its
  web services; see https://docs.vestaboard.com/introduction for more details.

  TODO: rewrite to use the easier-to-follow requests library, more in line
  with PublishMessageLocal.

  Args:
    s: String to publish.
    subscription_id: string subscription id from Vestaboard.
    key: string key from Vestaboard.
    secret: string secret from Vestaboard.
    timeout: Max duration in seconds that we should wait to establish a
      connection.
  """
  error_code = False
  curl = pycurl.Curl()

  # See https://stackoverflow.com/questions/31826814/
  # curl-post-request-into-pycurl-code
  # Set URL value
  curl.setopt(
      pycurl.URL,
      'https://platform.vestaboard.com/subscriptions/%s/message'
      % subscription_id)
  curl.setopt(pycurl.HTTPHEADER, [




                            <----SKIPPED LINES---->




  curl.setopt(pycurl.POST, 1)

  curl.setopt(pycurl.WRITEFUNCTION, lambda x: None) # to keep stdout clean

  # preparing body the way pycurl.READDATA wants it
  body_as_dict = {'characters': StringToCharArray(s)}
  body_as_json_string = json.dumps(body_as_dict) # dict to json
  body_as_file_object = io.StringIO(body_as_json_string)

  # prepare and send. See also: pycurl.READFUNCTION to pass function instead
  curl.setopt(pycurl.READDATA, body_as_file_object)
  curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string))
  failure_message = ''
  try:
    curl.perform()
  except pycurl.error as e:
    timing_message = CurlTimingDetailsToString(curl)
    failure_message = (
        'curl.perform() failed with message %s; timing details: %s' %
        (e, timing_message))
    # Using the remote webservice failed, but maybe the local API will
    # be more successful?  If this succeeds, then we should not indicate
    # a failure on the status light / dashboard, but we should still log
    # the remote failure
    Log(failure_message)
    error_code = PublishMessageLocal(s, timeout=timeout, update_dashboard=False)
  else:
    # you may want to check HTTP response code, e.g.
    timing_message = CurlTimingDetailsToString(curl)
    status_code = curl.getinfo(pycurl.RESPONSE_CODE)
    if status_code != 200:
      failure_message = (
          'Server returned HTTP status code %d for message %s; '
          'timing details: %s' % (status_code, s, timing_message))
      Log(failure_message)
      error_code = PublishMessageLocal(
          s, timeout=timeout, update_dashboard=False)

  # We've logged the error code from the external web service, but the
  # Vestaboard local API was able to recover from the error, so we need not
  # log the failure message / error code in the dashboard.
  if not error_code:
    failure_message = ''

  curl.close()
  UpdateStatusLight(
      GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message)


def PublishMessageLocal(
    s,
    local_key=LOCAL_KEY,
    local_address=LOCAL_VESTABOARD_ADDRESS,
    timeout=5,
    update_dashboard=True):
  """Publishes a text string to a Vestaboard via local API.

  The message is pushed to the vestaboard splitflap display by way of its
  local API; see https://docs.vestaboard.com/local for more details.

  Args:
    s: String to publish.
    local_key: string key from Vestaboard for local API access.
    local_address: the address and port to the local Vestaboard service.
    timeout: Max duration in seconds that we should wait to establish a
      connection.
    update_dashboard: Boolean indicating whether this method should update the
      system dashboard, or if that should be left to the calling function.

  Returns:
    False if successful; error message string if failure occurs.
  """
  error_code = False

  data = str(StringToCharArray(s))
  headers = {
      'X-Vestaboard-Local-Api-Key': local_key,
      'Content-Type': 'application/x-www-form-urlencoded',
  }

  try:
    response = requests.post(
        local_address,
        headers=headers, data=data, timeout=timeout)
  except requests.exceptions.RequestException as e:
    error_msg = 'Unable to reach %s (%s): %s' % (local_address, response, e)
    Log(error_msg)
    error_code = True
  if not error_code:
    Log('Message sent to Vestaboard by way of local api')

  if update_dashboard:
    UpdateStatusLight(
        GPIO_ERROR_VESTABOARD_CONNECTION, error_code, error_msg)
  return error_code


def CurlTimingDetailsToString(curl):
  """Extracts timing details of a curl request into a readable string."""
  timing = {}
  timing['total-time'] = curl.getinfo(pycurl.TOTAL_TIME)
  timing['namelookup-time'] = curl.getinfo(pycurl.NAMELOOKUP_TIME)
  timing['connect-time'] = curl.getinfo(pycurl.CONNECT_TIME)
  timing['pretransfer-time'] = curl.getinfo(pycurl.PRETRANSFER_TIME)
  timing['redirect-time'] = curl.getinfo(pycurl.REDIRECT_TIME)
  timing['starttransfer-time'] = curl.getinfo(pycurl.STARTTRANSFER_TIME)
  results = [label + '=' + '%.4f' % timing[label] for label in timing]
  results = '; '.join(results)
  return results


def TruncateEscapedLine(s):
  """Formats a single line of the personal message for the Vestaboard.

  The Vestaboard has line length limitations, a limited character set,




                            <----SKIPPED LINES---->




      if message_type == FLAG_MSG_INSIGHT and not MessageMeetsDisplayCriteria(
          configuration):
        Log('Message %s purged')

      else:
        if isinstance(message_text, str):
          message_text = textwrap.wrap(
              message_text,
              width=SPLITFLAP_CHARS_PER_LINE)
        display_message = Screenify(message_text, False)
        Log(display_message, file=ALL_MESSAGE_FILE)

        # Saving this to disk allows us to identify
        # persistently whats currently on the screen
        PickleObjectToFile(message, PICKLE_SCREENS, True)
        screens.append(message)

        MaintainRollingWebLog(display_message, 25)
        if not SIMULATION:
          splitflap_message = Screenify(message_text, True)
          PublishMessageWeb(splitflap_message)

    next_message_time = time.time() + configuration['setting_delay']
  return next_message_time


def DeleteMessageTypes(q, types_to_delete):
  """Delete messages from the queue if type is in the iterable types."""
  if VERBOSE:
    messages_to_delete = [m for m in q if m[0] in types_to_delete]
    if messages_to_delete:
      Log('Deleting messages from queue due to new-found plane: %s'
          % messages_to_delete)
  updated_q = [m for m in q if m[0] not in types_to_delete]
  return updated_q


def BootstrapInsightList(full_path=PICKLE_FLIGHTS):
  """(Re)populate flight pickle files with flight insight distributions.

  The set of insights generated for each flight is created at the time




                            <----SKIPPED LINES---->