from threading import Thread, Event

import time
import sys
from colorlog.escape_codes import escape_codes, parse_colors

import logging
logger = logging.getLogger()

[docs]class ProgressData: """ Custom data structure to store data generated during the execution of a single task. Parameters ---------- input_data : any The input data of the executed task. proc_time : float How long a task took to be executed. output_data : any, optional The output data produced by a given task. The data type is the same as the executed function's return. exception : :py:class:`Exception`, optional If an exception was raised, then ``exception`` stores an Exception object. Otherwise, ``exception`` will be set to None. func : function, optional The executed function for reference. """ def __init__(self, input_data, proc_time, output_data=None, exception=None, func=None): self.input_data = input_data self.output_data = output_data self.proc_time = proc_time self.exception = exception self.func = func
[docs]class ProgressResult: """ Custom iterable class to store `ProgressData` objects as they are produced during the execution of a set of tasks. This class implements append() and __iter__() by default. Parameters ---------- results : list, optional A pre-populated list of `ProgressData` objects. Attributes ---------- results : list The list of `ProgressData` objects. """ def __init__(self, results=None): if results and not isinstance(results, list): raise TypeError("A list was expected but a different data type was provided.") self.results = results or [] @property def inputs(self): """list: Inputs from each `ProgressData` object stored in ``results``.""" try: return [r.input_data for r in self.results] except AttributeError: raise TypeError("An invalid object was found in 'results'. Only ProgressData objects are valid.") @property def outputs(self): """list of tuple: Outputs from each `ProgressData` object stored in ``results``. Each tuple contains the input and the output produced for that input.""" try: return [(r.input_data, r.output_data) for r in self.results] except AttributeError: raise TypeError("An invalid object was found in 'results'. Only ProgressData objects are valid.") @property def errors(self): """list of tuple: Errors from each `ProgressData` object stored in ``results``. Each tuple contains the input and the exception raised during the execution of a task with that input.""" try: return [(r.input_data, r.exception) for r in self.results if r.exception is not None] except AttributeError: raise TypeError("An invalid object was found in 'results'. Only ProgressData objects are valid.")
[docs] def append(self, r): """Add a new `ProgressData` object to ``results``""" if not isinstance(r, ProgressData): raise TypeError("Only ProgressData objects are valid.") self.results.append(r)
def __len__(self): return len(self.results) def __iter__(self): for r in self.results: yield r
[docs]class ProgressTracker: """ A progress tracker for tasks queued in ``queue``. Parameters ---------- ntasks : int The number of tasks to be executed. queue : :py:class:`~multiprocessing.Queue` A queue to track the tasks' progress. task_name : str, optional A name to identify the set of tasks. Attributes ---------- ntasks : int The number of tasks to be executed. queue : :py:class:`~multiprocessing.Queue` The queue to track the tasks' progress from. task_name : str, optional The name to identify the set of tasks. results : ProgressResult Store results and any errors found during the tasks' processing. nerrors : int Number of errors. """ def __init__(self, ntasks, queue, task_name=None): self.ntasks = ntasks self.task_name = task_name self._progress = 0 self.queue = queue # used to communicate progress to the thread self._event = Event() # used to tell the thread when to finish self._progress_bar = Thread(target=self._print_progress, args=(self._event, self.queue)) self._progress_bar.daemon = True # Save results and any errors found during the task processing. self.results = ProgressResult() self.nerrors = 0 self._running_times = [] self._start_time = None self._end_time = None def _show_progress_bar(self, p, perc): task_name = "" if self.task_name: task_name = " - %s" % self.task_name msg = '%s%% [%s] %d/%d [Avg: %.2fs/task; Errors: %d]%s.' % (int(perc), ("\u25A0" * int(perc / 2)).ljust(50, ' '), p, self.ntasks, self.avg_running_time, self.nerrors, task_name) format_str = '\r[%s] %s%s %s%s %s' progress_str = format_str % (time.strftime('%Y-%m-%d %H:%M:%S'), parse_colors("purple"), "PROGRESS".ljust(10, " "), escape_codes["reset"], "".rjust(26, " "), msg) sys.stdout.write(progress_str) sys.stdout.flush() def _print_progress(self, e, q): """Updates a progress bar on stdout anytime progress is made""" while True: while not q.full(): # Stops the loop if function end() is called before the queue gets full. if e.is_set(): break # wait for more progress to be made time.sleep(0.1) # If our event is set and there is any progress in the queue, # break out of the infinite loop and prepare to terminate this thread if e.is_set() and q.full() is False: break # get the current progress data. progress_data = q.get() if progress_data is not None: self.results.append(progress_data) self._running_times.append(progress_data.proc_time) if progress_data.exception is not None: self.nerrors += 1 self.progress += 1 perc = round((self.progress / self.ntasks), 2) * 100 if self.ntasks > 0 else 0 self._show_progress_bar(self.progress, perc) @property def progress(self): """int: Current number of tasks executed.""" return self._progress @progress.setter def progress(self, value): self._progress = value @property def running_time(self): """float: Total running time.""" if self._start_time and self._end_time: return round(self._end_time - self._start_time, 2) return None @property def avg_running_time(self): """float: Average running time.""" if self._running_times: return round(sum(self._running_times) / len(self._running_times), 2) else: return 0
[docs] def start(self): """ Start the progress tracker. """ self._start_time = round(time.time(), 2) self._progress_bar.start()
[docs] def end(self): """ Finish the progress tracker. """ self._event.set() self._progress_bar.join() self._end_time = round(time.time(), 2) sys.stdout.write('\n')