01234567890123456789012345678901234567890123456789012345678901234567890123456789
1617181920212223242526272829303132333435363738394041424344454647484950515253545556 19061907190819091910191119121913191419151916191719181919192019211922192319241925 19261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972 633563366337633863396340634163426343634463456346634763486349635063516352635363546355635663576358635963606361636263636364 63656366636763686369637063716372637363746375637663776378637963806381638263836384 63876388638963906391639263936394639563966397639863996400640164026403640464056406 640764086409641064116412641364146415641664176418 6419642064216422 64236424642564266427642864296430643164326433643464356436643764386439644064416442 65736574657565766577657865796580658165826583658465856586658765886589659065916592659365946595659665976598659966006601660266036604660566066607660866096610661166126613 | <----SKIPPED LINES----> import statistics import sys import textwrap import time import tracemalloc import types import bs4 import dateutils import numpy import matplotlib import matplotlib.pyplot import psutil import pycurl import pytz import requests import tzlocal import unidecode # pylint: disable=line-too-long from constants import RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, KEY, SECRET, SUBSCRIPTION_ID # pylint: enable=line-too-long import arduino if RASPBERRY_PI: import gpiozero # pylint: disable=E0401 import RPi.GPIO # pylint: disable=E0401 VERBOSE = False # additional messages logged SHUTDOWN_SIGNAL = '' REBOOT_SIGNAL = False # to be tracked in the dashboard messages so that we know when a # restart due to exit (vs. a long delay in some processing) happened INSTANCE_START_TIME = time.time() SIMULATION = False SIMULATION_COUNTER = 0 <----SKIPPED LINES----> last_query_time = 0 def GetFlightAwareJson(flight_number): """Scrapes the text json message from FlightAware for a given flight number. Given a flight number, loads the corresponding FlightAware webpage for that flight and extracts the relevant script that contains all the flight details from that page. But only queries at most once per fixed period of time so as to avoid being blocked. Args: flight_number: text flight number (i.e.: SWA1234) Returns: Two tuple: - Text representation of the json message from FlightAware. - Text string of error message, if any """ min_query_delay_seconds = 90 global last_query_time url = 'https://flightaware.com/live/flight/' + flight_number seconds_since_last_query = time.time() - last_query_time if seconds_since_last_query < min_query_delay_seconds: error_msg = ( 'Unable to query FA for URL since last query to FA was only %d seconds ' 'ago; min of %d seconds needed: %s' % ( seconds_since_last_query, min_query_delay_seconds, url)) return '', error_msg last_query_time = time.time() #Log( # 'last_query_time: %d; time.time(): %d;' # 'seconds_since_last_query: %d; min_query_delay_seconds: %d' % ( # last_query_time, time.time(), # seconds_since_last_query, min_query_delay_seconds)) try: response = requests.get(url, timeout=5) except requests.exceptions.RequestException as e: error_msg = 'Unable to query FA for URL due to %s: %s' % (e, url) return '', error_msg soup = bs4.BeautifulSoup(response.text, 'html.parser') l = soup.find_all('script') flight_script = None for script in l: if "trackpollBootstrap" in str(script): flight_script = str(script) break if not flight_script: error_msg = ( 'Unable to find trackpollBootstrap script in page: ' + response.text) Log(error_msg) return '', error_msg first_open_curly_brace = flight_script.find('{') last_close_curly_brace = flight_script.rfind('}') flight_json = flight_script[first_open_curly_brace:last_close_curly_brace+1] return flight_json, '' def Unidecode(s): """Convert a special unicode characters to closest ASCII representation.""" if s is not None: s = unidecode.unidecode(s) return s <----SKIPPED LINES----> expected_characters = SPLITFLAP_CHARS_PER_LINE * SPLITFLAP_LINE_COUNT missing_characters = max(0, expected_characters - len(message_array)) if missing_characters: for unused_n in range(missing_characters): message_array.append(0) extra_characters = max(0, len(message_array) - expected_characters) if extra_characters: Log('Message "%s" is too long at %d characters (max %d characters)' % (s, len(message_array), expected_characters)) message_array = message_array[:expected_characters] message_2d_array = [] for line_num in range(SPLITFLAP_LINE_COUNT): message_2d_array.append(message_array[ line_num * SPLITFLAP_CHARS_PER_LINE : (line_num + 1)*SPLITFLAP_CHARS_PER_LINE]) return message_2d_array def PublishMessage( s, subscription_id=SUBSCRIPTION_ID, key=KEY, secret=SECRET, timeout=5): """Publishes a text string to a Vestaboard. The message is pushed to the vestaboard splitflap display by way of its web services; see https://docs.vestaboard.com/introduction for more details. Args: s: String to publish. subscription_id: string subscription id from Vestaboard. key: string key from Vestaboard. secret: string secret from Vestaboard. timeout: Max duration in seconds that we should wait to establish a connection. """ error_code = False curl = pycurl.Curl() # See https://stackoverflow.com/questions/31826814/ # curl-post-request-into-pycurl-code # Set URL value curl.setopt( pycurl.URL, 'https://platform.vestaboard.com/subscriptions/%s/message' % subscription_id) curl.setopt(pycurl.HTTPHEADER, [ <----SKIPPED LINES----> curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.WRITEFUNCTION, lambda x: None) # to keep stdout clean # preparing body the way pycurl.READDATA wants it body_as_dict = {'characters': StringToCharArray(s)} body_as_json_string = json.dumps(body_as_dict) # dict to json body_as_file_object = io.StringIO(body_as_json_string) # prepare and send. See also: pycurl.READFUNCTION to pass function instead curl.setopt(pycurl.READDATA, body_as_file_object) curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string)) failure_message = '' try: curl.perform() except pycurl.error as e: timing_message = CurlTimingDetailsToString(curl) failure_message = ( 'curl.perform() failed with message %s; timing details: %s' % (e, timing_message)) Log(failure_message) error_code = True else: # you may want to check HTTP response code, e.g. timing_message = CurlTimingDetailsToString(curl) status_code = curl.getinfo(pycurl.RESPONSE_CODE) if status_code != 200: failure_message = ( 'Server returned HTTP status code %d for message %s; ' 'timing details: %s' % (status_code, s, timing_message)) Log(failure_message) error_code = True curl.close() UpdateStatusLight( GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message) def CurlTimingDetailsToString(curl): """Extracts timing details of a curl request into a readable string.""" timing = {} timing['total-time'] = curl.getinfo(pycurl.TOTAL_TIME) timing['namelookup-time'] = curl.getinfo(pycurl.NAMELOOKUP_TIME) timing['connect-time'] = curl.getinfo(pycurl.CONNECT_TIME) timing['pretransfer-time'] = curl.getinfo(pycurl.PRETRANSFER_TIME) timing['redirect-time'] = curl.getinfo(pycurl.REDIRECT_TIME) timing['starttransfer-time'] = curl.getinfo(pycurl.STARTTRANSFER_TIME) results = [label + '=' + '%.4f' % timing[label] for label in timing] results = '; '.join(results) return results def TruncateEscapedLine(s): """Formats a single line of the personal message for the Vestaboard. The Vestaboard has line length limitations, a limited character set, <----SKIPPED LINES----> if message_type == FLAG_MSG_INSIGHT and not MessageMeetsDisplayCriteria( configuration): Log('Message %s purged') else: if isinstance(message_text, str): message_text = textwrap.wrap( message_text, width=SPLITFLAP_CHARS_PER_LINE) display_message = Screenify(message_text, False) Log(display_message, file=ALL_MESSAGE_FILE) # Saving this to disk allows us to identify # persistently whats currently on the screen PickleObjectToFile(message, PICKLE_SCREENS, True) screens.append(message) MaintainRollingWebLog(display_message, 25) if not SIMULATION: splitflap_message = Screenify(message_text, True) PublishMessage(splitflap_message) next_message_time = time.time() + configuration['setting_delay'] return next_message_time def DeleteMessageTypes(q, types_to_delete): """Delete messages from the queue if type is in the iterable types.""" if VERBOSE: messages_to_delete = [m for m in q if m[0] in types_to_delete] if messages_to_delete: Log('Deleting messages from queue due to new-found plane: %s' % messages_to_delete) updated_q = [m for m in q if m[0] not in types_to_delete] return updated_q def BootstrapInsightList(full_path=PICKLE_FLIGHTS): """(Re)populate flight pickle files with flight insight distributions. The set of insights generated for each flight is created at the time <----SKIPPED LINES----> |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
1617181920212223242526272829303132333435363738394041424344454647484950515253545556 19061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928 1929193019311932193319341935193619371938 193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968 63316332633363346335633663376338633963406341634263436344634563466347634863496350635163526353635463556356635763586359636063616362636363646365636663676368636963706371637263736374637563766377637863796380638163826383 6386638763886389639063916392639363946395639663976398639964006401640264036404640564066407640864096410641164126413641464156416641764186419642064216422642364246425642664276428642964306431643264336434643564366437643864396440644164426443644464456446644764486449645064516452645364546455645664576458645964606461646264636464646564666467646864696470647164726473647464756476647764786479648064816482648364846485648664876488648964906491649264936494649564966497649864996500 66316632663366346635663666376638663966406641664266436644664566466647664866496650665166526653665466556656665766586659666066616662666366646665666666676668666966706671 | <----SKIPPED LINES----> import statistics import sys import textwrap import time import tracemalloc import types import bs4 import dateutils import numpy import matplotlib import matplotlib.pyplot import psutil import pycurl import pytz import requests import tzlocal import unidecode # pylint: disable=line-too-long from constants import RASPBERRY_PI, MESSAGEBOARD_PATH, WEBSERVER_PATH, KEY, SECRET, SUBSCRIPTION_ID, LOCAL_KEY, LOCAL_VESTABOARD_ADDRESS # pylint: enable=line-too-long import arduino if RASPBERRY_PI: import gpiozero # pylint: disable=E0401 import RPi.GPIO # pylint: disable=E0401 VERBOSE = False # additional messages logged SHUTDOWN_SIGNAL = '' REBOOT_SIGNAL = False # to be tracked in the dashboard messages so that we know when a # restart due to exit (vs. a long delay in some processing) happened INSTANCE_START_TIME = time.time() SIMULATION = False SIMULATION_COUNTER = 0 <----SKIPPED LINES----> last_query_time = 0 def GetFlightAwareJson(flight_number): """Scrapes the text json message from FlightAware for a given flight number. Given a flight number, loads the corresponding FlightAware webpage for that flight and extracts the relevant script that contains all the flight details from that page. But only queries at most once per fixed period of time so as to avoid being blocked. Args: flight_number: text flight number (i.e.: SWA1234) Returns: Two tuple: - Text representation of the json message from FlightAware. - Text string of error message, if any """ min_query_delay_seconds = 90 url = 'https://flightaware.com/live/flight/' + flight_number global last_query_time seconds_since_last_query = time.time() - last_query_time if last_query_time and seconds_since_last_query < min_query_delay_seconds: error_msg = ( 'Unable to query FA for URL since last query to FA was only %d seconds ' 'ago; min of %d seconds needed: %s' % ( seconds_since_last_query, min_query_delay_seconds, url)) return '', error_msg last_query_time = time.time() try: response = requests.get(url, timeout=5) except requests.exceptions.RequestException as e: error_msg = 'Unable to query FA for URL due to %s: %s' % (e, url) return '', error_msg soup = bs4.BeautifulSoup(response.text, 'html.parser') l = soup.find_all('script') flight_script = None for script in l: if 'trackpollBootstrap' in str(script): flight_script = str(script) break if not flight_script: error_msg = ( 'Unable to find trackpollBootstrap script in page: ' + response.text) Log(error_msg) return '', error_msg first_open_curly_brace = flight_script.find('{') last_close_curly_brace = flight_script.rfind('}') flight_json = flight_script[first_open_curly_brace:last_close_curly_brace+1] return flight_json, '' def Unidecode(s): """Convert a special unicode characters to closest ASCII representation.""" if s is not None: s = unidecode.unidecode(s) return s <----SKIPPED LINES----> expected_characters = SPLITFLAP_CHARS_PER_LINE * SPLITFLAP_LINE_COUNT missing_characters = max(0, expected_characters - len(message_array)) if missing_characters: for unused_n in range(missing_characters): message_array.append(0) extra_characters = max(0, len(message_array) - expected_characters) if extra_characters: Log('Message "%s" is too long at %d characters (max %d characters)' % (s, len(message_array), expected_characters)) message_array = message_array[:expected_characters] message_2d_array = [] for line_num in range(SPLITFLAP_LINE_COUNT): message_2d_array.append(message_array[ line_num * SPLITFLAP_CHARS_PER_LINE : (line_num + 1)*SPLITFLAP_CHARS_PER_LINE]) return message_2d_array def PublishMessageWeb( s, subscription_id=SUBSCRIPTION_ID, key=KEY, secret=SECRET, timeout=5): """Publishes a text string to a Vestaboard. The message is pushed to the vestaboard splitflap display by way of its web services; see https://docs.vestaboard.com/introduction for more details. TODO: rewrite to use the easier-to-follow requests library, more in line with PublishMessageLocal. Args: s: String to publish. subscription_id: string subscription id from Vestaboard. key: string key from Vestaboard. secret: string secret from Vestaboard. timeout: Max duration in seconds that we should wait to establish a connection. """ error_code = False curl = pycurl.Curl() # See https://stackoverflow.com/questions/31826814/ # curl-post-request-into-pycurl-code # Set URL value curl.setopt( pycurl.URL, 'https://platform.vestaboard.com/subscriptions/%s/message' % subscription_id) curl.setopt(pycurl.HTTPHEADER, [ <----SKIPPED LINES----> curl.setopt(pycurl.POST, 1) curl.setopt(pycurl.WRITEFUNCTION, lambda x: None) # to keep stdout clean # preparing body the way pycurl.READDATA wants it body_as_dict = {'characters': StringToCharArray(s)} body_as_json_string = json.dumps(body_as_dict) # dict to json body_as_file_object = io.StringIO(body_as_json_string) # prepare and send. See also: pycurl.READFUNCTION to pass function instead curl.setopt(pycurl.READDATA, body_as_file_object) curl.setopt(pycurl.POSTFIELDSIZE, len(body_as_json_string)) failure_message = '' try: curl.perform() except pycurl.error as e: timing_message = CurlTimingDetailsToString(curl) failure_message = ( 'curl.perform() failed with message %s; timing details: %s' % (e, timing_message)) # Using the remote webservice failed, but maybe the local API will # be more successful? If this succeeds, then we should not indicate # a failure on the status light / dashboard, but we should still log # the remote failure Log(failure_message) error_code = PublishMessageLocal(s, timeout=timeout, update_dashboard=False) else: # you may want to check HTTP response code, e.g. timing_message = CurlTimingDetailsToString(curl) status_code = curl.getinfo(pycurl.RESPONSE_CODE) if status_code != 200: failure_message = ( 'Server returned HTTP status code %d for message %s; ' 'timing details: %s' % (status_code, s, timing_message)) Log(failure_message) error_code = PublishMessageLocal( s, timeout=timeout, update_dashboard=False) # We've logged the error code from the external web service, but the # Vestaboard local API was able to recover from the error, so we need not # log the failure message / error code in the dashboard. if not error_code: failure_message = '' curl.close() UpdateStatusLight( GPIO_ERROR_VESTABOARD_CONNECTION, error_code, failure_message) def PublishMessageLocal( s, local_key=LOCAL_KEY, local_address=LOCAL_VESTABOARD_ADDRESS, timeout=5, update_dashboard=True): """Publishes a text string to a Vestaboard via local API. The message is pushed to the vestaboard splitflap display by way of its local API; see https://docs.vestaboard.com/local for more details. Args: s: String to publish. local_key: string key from Vestaboard for local API access. local_address: the address and port to the local Vestaboard service. timeout: Max duration in seconds that we should wait to establish a connection. update_dashboard: Boolean indicating whether this method should update the system dashboard, or if that should be left to the calling function. Returns: False if successful; error message string if failure occurs. """ error_code = False data = str(StringToCharArray(s)) headers = { 'X-Vestaboard-Local-Api-Key': local_key, 'Content-Type': 'application/x-www-form-urlencoded', } try: response = requests.post( local_address, headers=headers, data=data, timeout=timeout) except requests.exceptions.RequestException as e: error_msg = 'Unable to reach %s (%s): %s' % (local_address, response, e) Log(error_msg) error_code = True if not error_code: Log('Message sent to Vestaboard by way of local api') if update_dashboard: UpdateStatusLight( GPIO_ERROR_VESTABOARD_CONNECTION, error_code, error_msg) return error_code def CurlTimingDetailsToString(curl): """Extracts timing details of a curl request into a readable string.""" timing = {} timing['total-time'] = curl.getinfo(pycurl.TOTAL_TIME) timing['namelookup-time'] = curl.getinfo(pycurl.NAMELOOKUP_TIME) timing['connect-time'] = curl.getinfo(pycurl.CONNECT_TIME) timing['pretransfer-time'] = curl.getinfo(pycurl.PRETRANSFER_TIME) timing['redirect-time'] = curl.getinfo(pycurl.REDIRECT_TIME) timing['starttransfer-time'] = curl.getinfo(pycurl.STARTTRANSFER_TIME) results = [label + '=' + '%.4f' % timing[label] for label in timing] results = '; '.join(results) return results def TruncateEscapedLine(s): """Formats a single line of the personal message for the Vestaboard. The Vestaboard has line length limitations, a limited character set, <----SKIPPED LINES----> if message_type == FLAG_MSG_INSIGHT and not MessageMeetsDisplayCriteria( configuration): Log('Message %s purged') else: if isinstance(message_text, str): message_text = textwrap.wrap( message_text, width=SPLITFLAP_CHARS_PER_LINE) display_message = Screenify(message_text, False) Log(display_message, file=ALL_MESSAGE_FILE) # Saving this to disk allows us to identify # persistently whats currently on the screen PickleObjectToFile(message, PICKLE_SCREENS, True) screens.append(message) MaintainRollingWebLog(display_message, 25) if not SIMULATION: splitflap_message = Screenify(message_text, True) PublishMessageWeb(splitflap_message) next_message_time = time.time() + configuration['setting_delay'] return next_message_time def DeleteMessageTypes(q, types_to_delete): """Delete messages from the queue if type is in the iterable types.""" if VERBOSE: messages_to_delete = [m for m in q if m[0] in types_to_delete] if messages_to_delete: Log('Deleting messages from queue due to new-found plane: %s' % messages_to_delete) updated_q = [m for m in q if m[0] not in types_to_delete] return updated_q def BootstrapInsightList(full_path=PICKLE_FLIGHTS): """(Re)populate flight pickle files with flight insight distributions. The set of insights generated for each flight is created at the time <----SKIPPED LINES----> |