01234567890123456789012345678901234567890123456789012345678901234567890123456789
8990919293949596979899100101102103104105106107108 109110111112113114115116117118119120121122123124125126127128 376377378379380381382383384385386387388389390391392393394395396 397398399400401402403404405406407408409410411412413414415416 1062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101 11021103110411051106110711081109111011111112111311141115111611171118111911201121 50165017501850195020502150225023502450255026502750285029503050315032503350345035503650375038503950405041504250435044504550465047504850495050505150525053505450555056 55925593559455955596559755985599560056015602560356045605560656075608560956105611 56125613561456155616561756185619562056215622562356245625562656275628562956305631563256335634563556365637563856395640564156425643564456455646564756485649565056515652565356545655565656575658565956605661566256635664566556665667566856695670567156725673567456755676567756785679568056815682568356845685 56865687568856895690569156925693569456955696569756985699570057015702570357045705 | <----SKIPPED LINES----> # if any - is appended to the end of this pickle file. However, since this file is # cached in working memory, flights older than 30 days are flushed from this periodically. PICKLE_FLIGHTS = 'pickle/flights.pk' # True splits all the flights created in simulation into separate date files, just like # the non-simulated runs; False consolidates all flights into one pickle file. SPLIT_SIMULATION_FLIGHT_PICKLE = False # Status data about messageboard - is it running, etc. Specifically, has tuples # of data (timestamp, system_id, status), where system_id is either the pin id of GPIO, # or a 0 to indicate overall system, and status is boolean PICKLE_DASHBOARD = 'pickle/dashboard.pk' CACHED_ELEMENT_PREFIX = 'cached_' # This web-exposed file is used for non-error messages that might highlight data or # code logic to check into. It is only cleared out manually. LOGFILE = 'log.txt' # Identical to the LOGFILE, except it includes just the most recent n lines. Newest # lines are at the end. ROLLING_LOGFILE = 'rolling_log.txt' #file for error messages # Users can trigger .png histograms analogous to the text ones from the web interface; # this is the folder (within WEBSERVER_PATH) where those files are placed WEBSERVER_IMAGE_FOLDER = 'images/' # Multiple histograms can be generated, i.e. for airline, aircraft, day of week, etc. # The output files are named by the prefix & suffix, i.e.: prefix + type + . + suffix, # as in histogram_aircraft.png. These names match up to the names expected by the html # page that displays the images. Also, note that the suffix is interpreted by matplotlib # to identify the image format to create. HISTOGRAM_IMAGE_PREFIX = 'histogram_' HISTOGRAM_IMAGE_SUFFIX = 'png' # For those of the approximately ten different types of histograms _not_ generated, # an empty image is copied into the location expected by the webpage instead; this is # the location of that "empty" image file. HISTOGRAM_EMPTY_IMAGE_FILE = 'empty.png' # This file indicates a pending request for histograms - either png, text-based, or # both; once it is processed, this file is deleted. The contents are concatenated key-value # pairs, histogram=all;histogram_history=24h; etc. HISTOGRAM_CONFIG_FILE = 'histogram.txt' <----SKIPPED LINES----> file = LOGFILE # special case: for the main logfile, we always keep a rolling log if not rolling and file == LOGFILE: rolling = ROLLING_LOGFILE try: with open(file, 'a') as f: # by excluding the timestamp, file diffs become easier between runs if not SIMULATION or file == LOGFILE: f.write('='*80+'\n') f.write(str(datetime.datetime.now(TZ))+'\n') f.write('\n') f.write(str(message)+'\n') except IOError: Log('Unable to append to ' + file) if rolling: existing_log_lines = ReadFile(file).splitlines() with open(rolling, 'w') as f: f.write('\n'.join(existing_log_lines[-1000:])) def LogTimes(times): """Logs elapsed time messages from a list of epochs.""" msg = '' for n, t in enumerate(times[:-1]): msg += '%.2fs to get from reading %d to reading %s\n' % (times[n + 1] - t, n, n + 1) Log(msg) def MaintainRollingWebLog(message, max_count, filename=None): """Maintains a rolling text file of at most max_count printed messages. Newest data at top and oldest data at the end, of at most max_count messages, where the delimiter between each message is identified by a special fixed string. Args: message: text message to prepend to the file. max_count: maximum number of messages to keep in the file; the max_count+1st message is deleted. <----SKIPPED LINES----> return True def PrependFileName(full_path, prefix): """Converts /dir/file.png to /dir/prefixfile.png.""" directory, file_name = os.path.split(full_path) file_name = prefix+file_name return os.path.join(directory, file_name) def UnpickleObjectFromFile(full_path, date_segmentation, max_days=None, filenames=False): """Load a repository of pickled data into memory. Args: full_path: name (potentially including path) of the pickled file date_segmentation: If true, searches for all files that have a prefix of yyyy-mm-dd as a prefix to the file name specified in the full path, and loads them in sequence for unpickling; if false, uses the full_path as is and loads just that single file. max_days: Integer that, if specified, indicates maximum number of days of files to load back in; otherwise, loads all. filenames: If true, rather than returning the list of data, returns a list of the filenames that would have been read. Returns: Return a list - either of the data, or of all the file names that would have been read. """ if date_segmentation: directory, file = os.path.split(full_path) d = '[0-9]' sep = '-' date_format = d*4 + sep + d*2 + sep + d*2 # yyyy-mm-dd exp = date_format + sep + file pattern = re.compile(exp) files = os.listdir(directory) if max_days: # no need to read any files older than x days earliest_date = EpochDisplayTime(time.time() - max_days*SECONDS_IN_DAY, '%Y-%m-%d') files = [f for f in files if f[:10] >= earliest_date] files = sorted([os.path.join(directory, f) for f in files if pattern.match(f)]) else: if os.path.exists(full_path): files = [full_path] else: return [] data = [] if filenames: return files for file in files: try: with open(file, 'rb') as f: while True: data.append(pickle.load(f)) except (EOFError, pickle.UnpicklingError): <----SKIPPED LINES----> sys.exit() # phew - they're a subset; so they probably got the signal; just wait a few secs elif still_running_ids: n = 0 running_parents = FindRunningParents() while running_parents: if n == max_seconds: Log('Kill signal sent from this process %d to %s, but %s still ' 'running after waiting cume %d seconds; rebooting' % ( os.getpid(), str(already_running_ids), str(running_parents), n+1)) PerformGracefulShutdown((), (), True) if not n % 3: Log('Kill signal sent from this process %d to %s, but %s still ' 'running after waiting cume %d seconds' % ( os.getpid(), str(already_running_ids), str(running_parents), n)) n += 1 time.sleep(1) running_parents = FindRunningParents() def InitArduinos(configuration): """Initializes and starts the two arduino threads with new shared-memory queues.""" to_remote_q = multiprocessing.Queue() to_servo_q = multiprocessing.Queue() to_main_q = multiprocessing.Queue() shutdown_remote = multiprocessing.Value('i') # shared flag to initiate shutdown shutdown_servo = multiprocessing.Value('i') # shared flag to initiate shutdown shutdown = (shutdown_remote, shutdown_servo) return (to_remote_q, to_servo_q, to_main_q, shutdown) def RefreshArduinos( remote, servo, to_remote_q, to_servo_q, to_main_q, shutdown, flights, json_desc_dict, configuration): """Ensure arduinos are running, restarting if needed, & send them the current message""" remote, servo = ValidateArduinosRunning( remote, servo, to_remote_q, to_servo_q, to_main_q, shutdown, configuration) EnqueueArduinos(flights, json_desc_dict, configuration, to_servo_q, to_remote_q) return remote, servo <----SKIPPED LINES----> # This flag slows down simulation time around a flight, great for debugging the arduinos simulation_slowdown = bool('-f' in sys.argv) # Redirect any errors to a log file instead of the screen, and add a datestamp if not SIMULATION: sys.stderr = open(STDERR_FILE, 'a') Log('', STDERR_FILE) init_timing.append(time.time()) # time 1 Log('Starting up process %d' % os.getpid()) already_running_ids = FindRunningParents() if already_running_ids: for pid in already_running_ids: Log('Sending termination signal to %d' % pid) os.kill(pid, signal.SIGTERM) init_timing.append(time.time()) # time 2 SetPinMode() configuration = ReadAndParseSettings(CONFIG_FILE) startup_time = time.time() json_desc_dict = {} init_timing.append(time.time()) # time 3 flights = UnpickleObjectFromFile(PICKLE_FLIGHTS, True, max_days=MAX_INSIGHT_HORIZON_DAYS) # Clear the loaded flight of any cached data, identified by keys with a specific # suffix, since code fixes may change the values for some of those cached elements for flight in flights: for key in list(flight.keys()): if key.endswith(CACHED_ELEMENT_PREFIX): flight.pop(key) init_timing.append(time.time()) # time 4 # If we're displaying just a single insight message, we want it to be something # unique, to the extent possible; this dict holds a count of the diff types of messages # displayed so far insight_message_distribution = {} # bootstrap the flight insights distribution from a list of insights on each # flight (i.e.: flight['insight_types'] for a given flight might look like # [1, 2, 7, 9], or [], to indicate which insights were identified; this then # transforms that into {0: 25, 1: 18, ...} summing across all flights. missing_insights = [] for flight in flights: if 'insight_types' not in flight: missing_insights.append( '%s on %s' % (DisplayFlightNumber(flight), DisplayTime(flight, '%x %X'))) distribution = flight.get('insight_types', []) for key in distribution: insight_message_distribution[key] = ( insight_message_distribution.get(key, 0) + 1) if missing_insights: Log('Flights missing insight distributions: %s' % ';'.join(missing_insights)) init_timing.append(time.time()) # time 5 # initialize objects required for arduinos, but we can only start them in the main # loop, because the tail end of the init section needs to confirm that all other # messageboard.py processes have exited! to_remote_q, to_servo_q, to_main_q, shutdown = InitArduinos(configuration) remote, servo = None, None # used in simulation to print the hour of simulation once per simulated hour prev_simulated_hour = '' persistent_nearby_aircraft = {} # key = flight number; value = last seen epoch persistent_path = {} histogram = {} # Next up to print is index 0; this is a list of tuples: # tuple element#1: flag indicating the type of message that this is # tuple element#2: the message itself message_queue = [] next_message_time = time.time() # We repeat the loop every x seconds; this ensures that if the processing time is long, # we don't wait another x seconds after processing completes next_loop_time = time.time() + LOOP_DELAY_SECONDS # These files are read only if the version on disk has been modified more recently # than the last time it was read last_dump_json_timestamp = 0 init_timing.append(time.time()) # time 6 WaitUntilKillComplete(already_running_ids) init_timing.append(time.time()) # time 7 LogTimes(init_timing) Log('Finishing initialization of %d; starting radio polling loop' % os.getpid()) while (not SIMULATION or SIMULATION_COUNTER < len(DUMP_JSONS)) and not SHUTDOWN_SIGNAL: last_heartbeat_time = Heartbeat(last_heartbeat_time) new_configuration = ReadAndParseSettings(CONFIG_FILE) CheckForNewFilterCriteria(configuration, new_configuration, message_queue, flights) configuration = new_configuration ResetLogs(configuration) # clear the logs if requested # if this is a SIMULATION, then process every diff dump. But if it isn't a simulation, # then only read & do related processing for the next dump if the last-modified # timestamp indicates the file has been updated since it was last read. tmp_timestamp = 0 if not SIMULATION: dump_json_exists = os.path.exists(DUMP_JSON_FILE) if dump_json_exists: tmp_timestamp = os.path.getmtime(DUMP_JSON_FILE) if (SIMULATION and DumpJsonChanges()) or ( not SIMULATION and dump_json_exists and tmp_timestamp > last_dump_json_timestamp): last_dump_json_timestamp = tmp_timestamp (persistent_nearby_aircraft, flight, now, <----SKIPPED LINES----> |
01234567890123456789012345678901234567890123456789012345678901234567890123456789
8990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 1071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131 50265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066 56025603560456055606560756085609561056115612561356145615561656175618561956205621562256235624562556265627562856295630563156325633563456355636563756385639564056415642564356445645564656475648564956505651565256535654565556565657565856595660566156625663566456655666566756685669567056715672567356745675567656775678567956805681568256835684568556865687568856895690569156925693569456955696569756985699570057015702570357045705570657075708570957105711571257135714571557165717 | <----SKIPPED LINES----> # if any - is appended to the end of this pickle file. However, since this file is # cached in working memory, flights older than 30 days are flushed from this periodically. PICKLE_FLIGHTS = 'pickle/flights.pk' # True splits all the flights created in simulation into separate date files, just like # the non-simulated runs; False consolidates all flights into one pickle file. SPLIT_SIMULATION_FLIGHT_PICKLE = False # Status data about messageboard - is it running, etc. Specifically, has tuples # of data (timestamp, system_id, status), where system_id is either the pin id of GPIO, # or a 0 to indicate overall system, and status is boolean PICKLE_DASHBOARD = 'pickle/dashboard.pk' CACHED_ELEMENT_PREFIX = 'cached_' # This web-exposed file is used for non-error messages that might highlight data or # code logic to check into. It is only cleared out manually. LOGFILE = 'log.txt' # Identical to the LOGFILE, except it includes just the most recent n lines. Newest # lines are at the end. ROLLING_LOGFILE = 'rolling_log.txt' #file for error messages ROLLING_LOG_SIZE = 1000 # default number of lines which may be overridden by settings file # Users can trigger .png histograms analogous to the text ones from the web interface; # this is the folder (within WEBSERVER_PATH) where those files are placed WEBSERVER_IMAGE_FOLDER = 'images/' # Multiple histograms can be generated, i.e. for airline, aircraft, day of week, etc. # The output files are named by the prefix & suffix, i.e.: prefix + type + . + suffix, # as in histogram_aircraft.png. These names match up to the names expected by the html # page that displays the images. Also, note that the suffix is interpreted by matplotlib # to identify the image format to create. HISTOGRAM_IMAGE_PREFIX = 'histogram_' HISTOGRAM_IMAGE_SUFFIX = 'png' # For those of the approximately ten different types of histograms _not_ generated, # an empty image is copied into the location expected by the webpage instead; this is # the location of that "empty" image file. HISTOGRAM_EMPTY_IMAGE_FILE = 'empty.png' # This file indicates a pending request for histograms - either png, text-based, or # both; once it is processed, this file is deleted. The contents are concatenated key-value # pairs, histogram=all;histogram_history=24h; etc. HISTOGRAM_CONFIG_FILE = 'histogram.txt' <----SKIPPED LINES----> file = LOGFILE # special case: for the main logfile, we always keep a rolling log if not rolling and file == LOGFILE: rolling = ROLLING_LOGFILE try: with open(file, 'a') as f: # by excluding the timestamp, file diffs become easier between runs if not SIMULATION or file == LOGFILE: f.write('='*80+'\n') f.write(str(datetime.datetime.now(TZ))+'\n') f.write('\n') f.write(str(message)+'\n') except IOError: Log('Unable to append to ' + file) if rolling: existing_log_lines = ReadFile(file).splitlines() with open(rolling, 'w') as f: f.write('\n'.join(existing_log_lines[-ROLLING_LOG_SIZE:])) def UpdateRollingLogSize(configuration): """Set the global rolling_log_line_count based on settings file.""" if 'rolling_log_line_count' in configuration: global ROLLING_LOG_SIZE ROLLING_LOG_SIZE = configuration['rolling_log_line_count'] def LogTimes(times): """Logs elapsed time messages from a list of epochs.""" msg = '' for n, t in enumerate(times[:-1]): msg += '%.2fs to get from reading %d to reading %s\n' % (times[n + 1] - t, n, n + 1) Log(msg) def MaintainRollingWebLog(message, max_count, filename=None): """Maintains a rolling text file of at most max_count printed messages. Newest data at top and oldest data at the end, of at most max_count messages, where the delimiter between each message is identified by a special fixed string. Args: message: text message to prepend to the file. max_count: maximum number of messages to keep in the file; the max_count+1st message is deleted. <----SKIPPED LINES----> return True def PrependFileName(full_path, prefix): """Converts /dir/file.png to /dir/prefixfile.png.""" directory, file_name = os.path.split(full_path) file_name = prefix+file_name return os.path.join(directory, file_name) def UnpickleObjectFromFile(full_path, date_segmentation, max_days=None, filenames=False): """Load a repository of pickled data into memory. Args: full_path: name (potentially including path) of the pickled file date_segmentation: If true, searches for all files that have a prefix of yyyy-mm-dd as a prefix to the file name specified in the full path, and loads them in sequence for unpickling; if false, uses the full_path as is and loads just that single file. max_days: Integer that, if specified, indicates maximum number of days of files to load back in; otherwise, loads all. That is, at most max_days files will be read. filenames: If true, rather than returning the list of data, returns a list of the filenames that would have been read. Returns: Return a list - either of the data, or of all the file names that would have been read. """ if date_segmentation: directory, file = os.path.split(full_path) d = '[0-9]' sep = '-' date_format = d*4 + sep + d*2 + sep + d*2 # yyyy-mm-dd exp = date_format + sep + file pattern = re.compile(exp) files = os.listdir(directory) if max_days: # no need to read any files older than x days earliest_date = EpochDisplayTime( time.time() - (max_days - 1) * SECONDS_IN_DAY, '%Y-%m-%d') files = [f for f in files if f[:10] >= earliest_date] files = sorted([os.path.join(directory, f) for f in files if pattern.match(f)]) else: if os.path.exists(full_path): files = [full_path] else: return [] data = [] if filenames: return files for file in files: try: with open(file, 'rb') as f: while True: data.append(pickle.load(f)) except (EOFError, pickle.UnpicklingError): <----SKIPPED LINES----> sys.exit() # phew - they're a subset; so they probably got the signal; just wait a few secs elif still_running_ids: n = 0 running_parents = FindRunningParents() while running_parents: if n == max_seconds: Log('Kill signal sent from this process %d to %s, but %s still ' 'running after waiting cume %d seconds; rebooting' % ( os.getpid(), str(already_running_ids), str(running_parents), n+1)) PerformGracefulShutdown((), (), True) if not n % 3: Log('Kill signal sent from this process %d to %s, but %s still ' 'running after waiting cume %d seconds' % ( os.getpid(), str(already_running_ids), str(running_parents), n)) n += 1 time.sleep(1) running_parents = FindRunningParents() def InitArduinoVariables(): """Initializes and starts the two arduino threads with new shared-memory queues.""" to_remote_q = multiprocessing.Queue() to_servo_q = multiprocessing.Queue() to_main_q = multiprocessing.Queue() shutdown_remote = multiprocessing.Value('i') # shared flag to initiate shutdown shutdown_servo = multiprocessing.Value('i') # shared flag to initiate shutdown shutdown = (shutdown_remote, shutdown_servo) return (to_remote_q, to_servo_q, to_main_q, shutdown) def RefreshArduinos( remote, servo, to_remote_q, to_servo_q, to_main_q, shutdown, flights, json_desc_dict, configuration): """Ensure arduinos are running, restarting if needed, & send them the current message""" remote, servo = ValidateArduinosRunning( remote, servo, to_remote_q, to_servo_q, to_main_q, shutdown, configuration) EnqueueArduinos(flights, json_desc_dict, configuration, to_servo_q, to_remote_q) return remote, servo <----SKIPPED LINES----> # This flag slows down simulation time around a flight, great for debugging the arduinos simulation_slowdown = bool('-f' in sys.argv) # Redirect any errors to a log file instead of the screen, and add a datestamp if not SIMULATION: sys.stderr = open(STDERR_FILE, 'a') Log('', STDERR_FILE) init_timing.append(time.time()) # time 1 Log('Starting up process %d' % os.getpid()) already_running_ids = FindRunningParents() if already_running_ids: for pid in already_running_ids: Log('Sending termination signal to %d' % pid) os.kill(pid, signal.SIGTERM) init_timing.append(time.time()) # time 2 SetPinMode() configuration = ReadAndParseSettings(CONFIG_FILE) UpdateRollingLogSize(configuration) startup_time = time.time() json_desc_dict = {} init_timing.append(time.time()) # time 3 flights = UnpickleObjectFromFile(PICKLE_FLIGHTS, True, max_days=MAX_INSIGHT_HORIZON_DAYS) # Clear the loaded flight of any cached data, identified by keys with a specific # suffix, since code fixes may change the values for some of those cached elements for flight in flights: for key in list(flight.keys()): if key.endswith(CACHED_ELEMENT_PREFIX): flight.pop(key) init_timing.append(time.time()) # time 4 # If we're displaying just a single insight message, we want it to be something # unique, to the extent possible; this dict holds a count of the diff types of messages # displayed so far insight_message_distribution = {} # bootstrap the flight insights distribution from a list of insights on each # flight (i.e.: flight['insight_types'] for a given flight might look like # [1, 2, 7, 9], or [], to indicate which insights were identified; this then # transforms that into {0: 25, 1: 18, ...} summing across all flights. missing_insights = [] for flight in flights: if 'insight_types' not in flight: missing_insights.append( '%s on %s' % (DisplayFlightNumber(flight), DisplayTime(flight, '%x %X'))) distribution = flight.get('insight_types', []) for key in distribution: insight_message_distribution[key] = ( insight_message_distribution.get(key, 0) + 1) if missing_insights: Log('Flights missing insight distributions: %s' % ';'.join(missing_insights)) init_timing.append(time.time()) # time 5 # initialize objects required for arduinos, but we can only start them in the main # loop, because the tail end of the init section needs to confirm that all other # messageboard.py processes have exited! to_remote_q, to_servo_q, to_main_q, shutdown = InitArduinoVariables() remote, servo = None, None # used in simulation to print the hour of simulation once per simulated hour prev_simulated_hour = '' persistent_nearby_aircraft = {} # key = flight number; value = last seen epoch persistent_path = {} histogram = {} # Next up to print is index 0; this is a list of tuples: # tuple element#1: flag indicating the type of message that this is # tuple element#2: the message itself message_queue = [] next_message_time = time.time() # We repeat the loop every x seconds; this ensures that if the processing time is long, # we don't wait another x seconds after processing completes next_loop_time = time.time() + LOOP_DELAY_SECONDS # These files are read only if the version on disk has been modified more recently # than the last time it was read last_dump_json_timestamp = 0 init_timing.append(time.time()) # time 6 WaitUntilKillComplete(already_running_ids) init_timing.append(time.time()) # time 7 LogTimes(init_timing) Log('Finishing initialization of %d; starting radio polling loop' % os.getpid()) while (not SIMULATION or SIMULATION_COUNTER < len(DUMP_JSONS)) and not SHUTDOWN_SIGNAL: last_heartbeat_time = Heartbeat(last_heartbeat_time) new_configuration = ReadAndParseSettings(CONFIG_FILE) UpdateRollingLogSize(new_configuration) CheckForNewFilterCriteria(configuration, new_configuration, message_queue, flights) configuration = new_configuration ResetLogs(configuration) # clear the logs if requested # if this is a SIMULATION, then process every diff dump. But if it isn't a simulation, # then only read & do related processing for the next dump if the last-modified # timestamp indicates the file has been updated since it was last read. tmp_timestamp = 0 if not SIMULATION: dump_json_exists = os.path.exists(DUMP_JSON_FILE) if dump_json_exists: tmp_timestamp = os.path.getmtime(DUMP_JSON_FILE) if (SIMULATION and DumpJsonChanges()) or ( not SIMULATION and dump_json_exists and tmp_timestamp > last_dump_json_timestamp): last_dump_json_timestamp = tmp_timestamp (persistent_nearby_aircraft, flight, now, <----SKIPPED LINES----> |