Skip to content
Snippets Groups Projects
Commit a230d4c9 authored by tuhe's avatar tuhe
Browse files

updates

parent ffc92b78
No related branches found
No related tags found
No related merge requests found
No preview for this file type
No preview for this file type
......@@ -45,12 +45,13 @@ class Logger(object):
pass
class Capturing(list):
def __init__(self, *args, unmute=False, **kwargs):
def __init__(self, *args, stdout=None, unmute=False, **kwargs):
self._stdout = stdout
self.unmute = unmute
super().__init__(*args, **kwargs)
def __enter__(self, capture_errors=True): # don't put arguments here.
self._stdout = sys.stdout
self._stdout = sys.stdout if self._stdout == None else self._stdout
self._stringio = StringIO()
if self.unmute:
sys.stdout = Logger(self._stringio)
......@@ -70,6 +71,20 @@ class Capturing(list):
if self.capture_errors:
sys.sterr = self._sterr
class Capturing2(Capturing):
def __exit__(self, *args):
lines = self._stringio.getvalue().splitlines()
txt = "\n".join(lines)
numbers = extract_numbers(txt)
self.extend(lines)
del self._stringio # free up some memory
sys.stdout = self._stdout
if self.capture_errors:
sys.sterr = self._sterr
self.output = txt
self.numbers = numbers
class QItem(unittest.TestCase):
title = None
......@@ -313,12 +328,13 @@ class Report():
modules = os.path.normpath(relative_path[:-3]).split(os.sep)
return root_dir, relative_path, modules
def __init__(self, strict=False, payload=None):
working_directory = os.path.abspath(os.path.dirname(self._file()))
self.wdir, self.name = setup_dir_by_class(self, working_directory)
# self.computed_answers_file = os.path.join(self.wdir, self.name + "_resources_do_not_hand_in.dat")
for (q,_) in self.questions:
q.nL = self.nL # Set maximum line length.
if payload is not None:
self.set_payload(payload, strict=strict)
......@@ -360,28 +376,6 @@ class Report():
for q, _ in self.questions:
q._cache = payloads[q.__qualname__]
# for item in q.items:
# if q.name not in payloads or item.name not in payloads[q.name]:
# s = f"> Broken resource dictionary submitted to unitgrade for question {q.name} and subquestion {item.name}. Framework will not work."
# if strict:
# raise Exception(s)
# else:
# print(s)
# else:
# item._correct_answer_payload = payloads[q.name][item.name]['payload']
# item.estimated_time = payloads[q.name][item.name].get("time", 1)
# q.estimated_time = payloads[q.name].get("time", 1)
# if "precomputed" in payloads[q.name][item.name]: # Consider removing later.
# item._precomputed_payload = payloads[q.name][item.name]['precomputed']
# try:
# if "title" in payloads[q.name][item.name]: # can perhaps be removed later.
# item.title = payloads[q.name][item.name]['title']
# except Exception as e: # Cannot set attribute error. The title is a function (and probably should not be).
# pass
# # print("bad", e)
# self.payloads = payloads
def rm_progress_bar(txt):
# More robust version. Apparently length of bar can depend on various factors, so check for order of symbols.
nlines = []
......@@ -404,10 +398,9 @@ def extract_numbers(txt):
all = [float(a) if ('.' in a or "e" in a) else int(a) for a in all]
if len(all) > 500:
print(txt)
raise Exception("unitgrade.unitgrade.py: Warning, many numbers!", len(all))
raise Exception("unitgrade.unitgrade.py: Warning, too many numbers!", len(all))
return all
class ActiveProgress():
def __init__(self, t, start=True, title="my progress bar",show_progress_bar=True):
self.t = t
......@@ -459,8 +452,9 @@ class ActiveProgress():
from unittest.suite import _isnotsuite
class MySuite(unittest.suite.TestSuite): # Not sure we need this one anymore.
pass
# class MySuite(unittest.suite.TestSuite): # Not sure we need this one anymore.
# raise Exception("no suite")
# pass
def instance_call_stack(instance):
s = "-".join(map(lambda x: x.__name__, instance.__class__.mro()))
......@@ -616,8 +610,11 @@ class UTextResult(unittest.TextTestResult):
n = UTextResult.number
item_title = self.getDescription(test)
item_title = item_title.split("\n")[0]
# item_title = item_title.split("\n")[0]
item_title = test.shortDescription() # Better for printing (get from cache).
if item_title == None:
# For unittest framework where getDescription may return None.
item_title = self.getDescription(test)
# test.countTestCases()
self.item_title_print = "*** q%i.%i) %s" % (n + 1, j + 1, item_title)
estimated_time = 10
......@@ -632,7 +629,7 @@ class UTextResult(unittest.TextTestResult):
def _setupStdout(self):
if self._previousTestClass == None:
total_estimated_time = 2
total_estimated_time = 1
if hasattr(self.__class__, 'q_title_print'):
q_title_print = self.__class__.q_title_print
else:
......@@ -688,7 +685,10 @@ def cache(foo, typed=False):
"""
maxsize = None
def wrapper(self, *args, **kwargs):
key = self.cache_id() + ("cache", _make_key(args, kwargs, typed))
key = (self.cache_id(), ("@cache", foo.__name__, _make_key(args, kwargs, typed)) )
# key = (self.cache_id(), '@cache')
# if self._cache_contains[key]
if not self._cache_contains(key):
value = foo(self, *args, **kwargs)
self._cache_put(key, value)
......@@ -700,15 +700,56 @@ def cache(foo, typed=False):
class UTestCase(unittest.TestCase):
_outcome = None # A dictionary which stores the user-computed outcomes of all the tests. This differs from the cache.
_cache = None # Read-only cache.
_cache2 = None # User-written cache
_cache = None # Read-only cache. Ensures method always produce same result.
_cache2 = None # User-written cache.
def capture(self):
return Capturing2(stdout=self._stdout)
@classmethod
def question_title(cls):
""" Return the question title """
return cls.__doc__.strip().splitlines()[0].strip() if cls.__doc__ != None else cls.__qualname__
@classmethod
def reset(cls):
print("Warning, I am not sure UTestCase.reset() is needed anymore and it seems very hacky.")
cls._outcome = None
cls._cache = None
cls._cache2 = None
def _callSetUp(self):
self._stdout = sys.stdout
import io
sys.stdout = io.StringIO()
super().setUp()
# print("Setting up...")
def _callTearDown(self):
sys.stdout = self._stdout
super().tearDown()
# print("asdfsfd")
def shortDescriptionStandard(self):
sd = super().shortDescription()
if sd == None:
sd = self._testMethodName
return sd
def shortDescription(self):
# self._testMethodDoc.strip().splitlines()[0].strip()
sd = self.shortDescriptionStandard()
title = self._cache_get( (self.cache_id(), 'title'), sd )
return title if title != None else sd
@property
def title(self):
return self.shortDescription()
@title.setter
def title(self, value):
self._cache_put((self.cache_id(), 'title'), value)
def _get_outcome(self):
if not (self.__class__, '_outcome') or self.__class__._outcome == None:
self.__class__._outcome = {}
......@@ -716,37 +757,31 @@ class UTestCase(unittest.TestCase):
def _callTestMethod(self, testMethod):
t = time.time()
self._ensure_cache_exists() # Make sure cache is there.
if self._testMethodDoc != None:
# Ensure the cache is eventually updated with the right docstring.
self._cache_put((self.cache_id(), 'title'), self.shortDescriptionStandard() )
# Fix temp cache here (for using the @cache decorator)
self._cache2[ (self.cache_id(), 'assert') ] = {}
res = testMethod()
elapsed = time.time() - t
# if res == None:
# res = {}
# res['time'] = elapsed
sd = self.shortDescription()
self._cache_put( (self.cache_id(), 'title'), self._testMethodName if sd == None else sd)
# self._test_fun_output = res
# self._cache_put( (self.cache_id(), 'title'), self.shortDescription() )
self._get_outcome()[self.cache_id()] = res
self._cache_put( (self.cache_id(), "time"), elapsed)
# This is my base test class. So what is new about it?
def cache_id(self):
c = self.__class__.__qualname__
m = self._testMethodName
return (c,m)
def unique_cache_id(self):
k0 = self.cache_id()
key = ()
for i in itertools.count():
key = k0 + (i,)
if not self._cache2_contains(key):
break
return key
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._load_cache()
self.cache_indexes = defaultdict(lambda: 0)
self._assert_cache_index = 0
# self.cache_indexes = defaultdict(lambda: 0)
def _ensure_cache_exists(self):
if not hasattr(self.__class__, '_cache') or self.__class__._cache == None:
......@@ -766,17 +801,22 @@ class UTestCase(unittest.TestCase):
self._ensure_cache_exists()
return key in self.__class__._cache
def _cache2_contains(self, key):
self._ensure_cache_exists()
return key in self.__class__._cache2
def wrap_assert(self, assert_fun, first, *args, **kwargs):
key = (self.cache_id(), 'assert')
if not self._cache_contains(key):
print("Warning, framework missing", key)
cache = self._cache_get(key, {})
id = self._assert_cache_index
if not id in cache:
print("Warning, framework missing cache index", key, "id =", id)
_expected = cache.get(id, first)
assert_fun(first, _expected, *args, **kwargs)
cache[id] = first
self._cache_put(key, cache)
self._assert_cache_index += 1
def assertEqualC(self, first: Any, msg: Any = ...) -> None:
id = self.unique_cache_id()
if not self._cache_contains(id):
print("Warning, framework missing key", id)
self.assertEqual(first, self._cache_get(id, first), msg)
self._cache_put(id, first)
self.wrap_assert(self.assertEqual, first, msg)
def _cache_file(self):
return os.path.dirname(inspect.getfile(self.__class__) ) + "/unitgrade/" + self.__class__.__name__ + ".pkl"
......
......@@ -5,7 +5,7 @@ import pyfiglet
from unitgrade2 import Hidden, myround, msum, mfloor, ActiveProgress
from unitgrade2 import __version__
import unittest
from unitgrade2.unitgrade2 import MySuite
# from unitgrade2.unitgrade2 import MySuite
from unitgrade2.unitgrade2 import UTextResult
import inspect
......@@ -64,53 +64,6 @@ def evaluate_report_student(report, question=None, qitem=None, unmute=None, pass
show_tol_err=show_tol_err)
# try: # For registering stats.
# import unitgrade_private
# import irlc.lectures
# import xlwings
# from openpyxl import Workbook
# import pandas as pd
# from collections import defaultdict
# dd = defaultdict(lambda: [])
# error_computed = []
# for k1, (q, _) in enumerate(report.questions):
# for k2, item in enumerate(q.items):
# dd['question_index'].append(k1)
# dd['item_index'].append(k2)
# dd['question'].append(q.name)
# dd['item'].append(item.name)
# dd['tol'].append(0 if not hasattr(item, 'tol') else item.tol)
# error_computed.append(0 if not hasattr(item, 'error_computed') else item.error_computed)
#
# qstats = report.wdir + "/" + report.name + ".xlsx"
#
# if os.path.isfile(qstats):
# d_read = pd.read_excel(qstats).to_dict()
# else:
# d_read = dict()
#
# for k in range(1000):
# key = 'run_'+str(k)
# if key in d_read:
# dd[key] = list(d_read['run_0'].values())
# else:
# dd[key] = error_computed
# break
#
# workbook = Workbook()
# worksheet = workbook.active
# for col, key in enumerate(dd.keys()):
# worksheet.cell(row=1, column=col+1).value = key
# for row, item in enumerate(dd[key]):
# worksheet.cell(row=row+2, column=col+1).value = item
#
# workbook.save(qstats)
# workbook.close()
#
# except ModuleNotFoundError as e:
# s = 234
# pass
if question is None:
print("Provisional evaluation")
tabulate(table_data)
......@@ -142,7 +95,12 @@ class UnitgradeTextRunner(unittest.TextTestRunner):
class SequentialTestLoader(unittest.TestLoader):
def getTestCaseNames(self, testCaseClass):
test_names = super().getTestCaseNames(testCaseClass)
testcase_methods = list(testCaseClass.__dict__.keys())
# testcase_methods = list(testCaseClass.__dict__.keys())
ls = []
for C in testCaseClass.mro():
if issubclass(C, unittest.TestCase):
ls = list(C.__dict__.keys()) + ls
testcase_methods = ls
test_names.sort(key=testcase_methods.index)
return test_names
......@@ -174,12 +132,12 @@ def evaluate_report(report, question=None, qitem=None, passall=False, verbose=Fa
for n, (q, w) in enumerate(report.questions):
# q = q()
q_hidden = False
# q_hidden = False
# q_hidden = issubclass(q.__class__, Hidden)
if question is not None and n+1 != question:
continue
suite = loader.loadTestsFromTestCase(q)
qtitle = q.__name__
qtitle = q.question_title() if hasattr(q, 'question_title') else q.__qualname__
q_title_print = "Question %i: %s"%(n+1, qtitle)
print(q_title_print, end="")
q.possible = 0
......@@ -193,77 +151,6 @@ def evaluate_report(report, question=None, qitem=None, passall=False, verbose=Fa
UTextResult.number = n
res = UTextTestRunner(verbosity=2, resultclass=UTextResult).run(suite)
# res = UTextTestRunner(verbosity=2, resultclass=unittest.TextTestResult).run(suite)
z = 234
# for j, item in enumerate(q.items):
# if qitem is not None and question is not None and j+1 != qitem:
# continue
#
# if q_with_outstanding_init is not None: # check for None bc. this must be called to set titles.
# # if not item.question.has_called_init_:
# start = time.time()
#
# cc = None
# if show_progress_bar:
# total_estimated_time = q.estimated_time # Use this. The time is estimated for the q itself. # sum( [q2.estimated_time for q2 in q_with_outstanding_init] )
# cc = ActiveProgress(t=total_estimated_time, title=q_title_print)
# from unitgrade import Capturing # DON'T REMOVE THIS LINE
# with eval('Capturing')(unmute=unmute): # Clunky import syntax is required bc. of minify issue.
# try:
# for q2 in q_with_outstanding_init:
# q2.init()
# q2.has_called_init_ = True
#
# # item.question.init() # Initialize the question. Useful for sharing resources.
# except Exception as e:
# if not passall:
# if not silent:
# print(" ")
# print("="*30)
# print(f"When initializing question {q.title} the initialization code threw an error")
# print(e)
# print("The remaining parts of this question will likely fail.")
# print("="*30)
#
# if show_progress_bar:
# cc.terminate()
# sys.stdout.flush()
# print(q_title_print, end="")
#
# q_time =np.round( time.time()-start, 2)
#
# print(" "* max(0,nL - len(q_title_print) ) + (" (" + str(q_time) + " seconds)" if q_time >= 0.1 else "") ) # if q.name in report.payloads else "")
# print("=" * nL)
# q_with_outstanding_init = None
#
# # item.question = q # Set the parent question instance for later reference.
# item_title_print = ss = "*** q%i.%i) %s"%(n+1, j+1, item.title)
#
# if show_progress_bar:
# cc = ActiveProgress(t=item.estimated_time, title=item_title_print)
# else:
# print(item_title_print + ( '.'*max(0, nL-4-len(ss)) ), end="")
# hidden = issubclass(item.__class__, Hidden)
# # if not hidden:
# # print(ss, end="")
# # sys.stdout.flush()
# start = time.time()
#
# (current, possible) = item.get_points(show_expected=show_expected, show_computed=show_computed,unmute=unmute, passall=passall, silent=silent)
# q_[j] = {'w': item.weight, 'possible': possible, 'obtained': current, 'hidden': hidden, 'computed': str(item._computed_answer), 'title': item.title}
# tsecs = np.round(time.time()-start, 2)
# if show_progress_bar:
# cc.terminate()
# sys.stdout.flush()
# print(item_title_print + ('.' * max(0, nL - 4 - len(ss))), end="")
#
# if not hidden:
# ss = "PASS" if current == possible else "*** FAILED"
# if tsecs >= 0.1:
# ss += " ("+ str(tsecs) + " seconds)"
# print(ss)
# ws, possible, obtained = upack(q_)
possible = res.testsRun
obtained = len(res.successes)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment