Select Git revision
report3_grade.py

tuhe authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
report3_grade.py 53.75 KiB
"""
Example student code. This file is automatically generated from the files in the instructor-directory
"""
import numpy as np
from tabulate import tabulate
from datetime import datetime
import pyfiglet
import unittest
import inspect
import os
import argparse
import time
parser = argparse.ArgumentParser(description='Evaluate your report.', epilog="""Example:
To run all tests in a report:
> python assignment1_dp.py
To run only question 2 or question 2.1
> python assignment1_dp.py -q 2
> python assignment1_dp.py -q 2.1
Note this scripts does not grade your report. To grade your report, use:
> python report1_grade.py
Finally, note that if your report is part of a module (package), and the report script requires part of that package, the -m option for python may be useful.
For instance, if the report file is in Documents/course_package/report3_complete.py, and `course_package` is a python package, then change directory to 'Documents/` and run:
> python -m course_package.report1
see https://docs.python.org/3.9/using/cmdline.html
""", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-q', nargs='?', type=str, default=None, help='Only evaluate this question (e.g.: -q 2)')
parser.add_argument('--showexpected', action="store_true", help='Show the expected/desired result')
parser.add_argument('--showcomputed', action="store_true", help='Show the answer your code computes')
parser.add_argument('--unmute', action="store_true", help='Show result of print(...) commands in code')
parser.add_argument('--passall', action="store_true", help='Automatically pass all tests. Useful when debugging.')
def evaluate_report_student(report, question=None, qitem=None, unmute=None, passall=None, ignore_missing_file=False, show_tol_err=False):
args = parser.parse_args()
if question is None and args.q is not None:
question = args.q
if "." in question:
question, qitem = [int(v) for v in question.split(".")]
else:
question = int(question)
if hasattr(report, "computed_answer_file") and not os.path.isfile(report.computed_answers_file) and not ignore_missing_file:
raise Exception("> Error: The pre-computed answer file", os.path.abspath(report.computed_answers_file), "does not exist. Check your package installation")
if unmute is None:
unmute = args.unmute
if passall is None:
passall = args.passall
results, table_data = evaluate_report(report, question=question, show_progress_bar=not unmute, qitem=qitem, verbose=False, passall=passall, show_expected=args.showexpected, show_computed=args.showcomputed,unmute=unmute,
show_tol_err=show_tol_err)
if question is None:
print("Provisional evaluation")
tabulate(table_data)
table = table_data
print(tabulate(table))
print(" ")
fr = inspect.getouterframes(inspect.currentframe())[1].filename
gfile = os.path.basename(fr)[:-3] + "_grade.py"
if os.path.exists(gfile):
print("Note your results have not yet been registered. \nTo register your results, please run the file:")
print(">>>", gfile)
print("In the same manner as you ran this file.")
return results
def upack(q):
# h = zip([(i['w'], i['possible'], i['obtained']) for i in q.values()])
h =[(i['w'], i['possible'], i['obtained']) for i in q.values()]
h = np.asarray(h)
return h[:,0], h[:,1], h[:,2],
class UnitgradeTextRunner(unittest.TextTestRunner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class SequentialTestLoader(unittest.TestLoader):
def getTestCaseNames(self, testCaseClass):
test_names = super().getTestCaseNames(testCaseClass)
# testcase_methods = list(testCaseClass.__dict__.keys())
ls = []
for C in testCaseClass.mro():
if issubclass(C, unittest.TestCase):
ls = list(C.__dict__.keys()) + ls
testcase_methods = ls
test_names.sort(key=testcase_methods.index)
return test_names
def evaluate_report(report, question=None, qitem=None, passall=False, verbose=False, show_expected=False, show_computed=False,unmute=False, show_help_flag=True, silent=False,
show_progress_bar=True,
show_tol_err=False,
big_header=True):
now = datetime.now()
if big_header:
ascii_banner = pyfiglet.figlet_format("UnitGrade", font="doom")
b = "\n".join( [l for l in ascii_banner.splitlines() if len(l.strip()) > 0] )
else:
b = "Unitgrade"
dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
print(b + " v" + __version__ + ", started: " + dt_string+ "\n")
# print("Started: " + dt_string)
s = report.title
if hasattr(report, "version") and report.version is not None:
s += " version " + report.version
print(s, "(use --help for options)" if show_help_flag else "")
# print(f"Loaded answers from: ", report.computed_answers_file, "\n")
table_data = []
t_start = time.time()
score = {}
loader = SequentialTestLoader()
for n, (q, w) in enumerate(report.questions):
if question is not None and n+1 != question:
continue
suite = loader.loadTestsFromTestCase(q)
qtitle = q.question_title() if hasattr(q, 'question_title') else q.__qualname__
q_title_print = "Question %i: %s"%(n+1, qtitle)
print(q_title_print, end="")
q.possible = 0
q.obtained = 0
q_ = {} # Gather score in this class.
UTextResult.q_title_print = q_title_print # Hacky
UTextResult.show_progress_bar = show_progress_bar # Hacky.
UTextResult.number = n
UTextResult.nL = report.nL
res = UTextTestRunner(verbosity=2, resultclass=UTextResult).run(suite)
possible = res.testsRun
obtained = len(res.successes)
assert len(res.successes) + len(res.errors) + len(res.failures) == res.testsRun
obtained = int(w * obtained * 1.0 / possible ) if possible > 0 else 0
score[n] = {'w': w, 'possible': w, 'obtained': obtained, 'items': q_, 'title': qtitle}
q.obtained = obtained
q.possible = possible
s1 = f" * q{n+1}) Total"
s2 = f" {q.obtained}/{w}"
print(s1 + ("."* (report.nL-len(s1)-len(s2) )) + s2 )
print(" ")
table_data.append([f"q{n+1}) Total", f"{q.obtained}/{w}"])
ws, possible, obtained = upack(score)
possible = int( msum(possible) )
obtained = int( msum(obtained) ) # Cast to python int
report.possible = possible
report.obtained = obtained
now = datetime.now()
dt_string = now.strftime("%H:%M:%S")
dt = int(time.time()-t_start)
minutes = dt//60
seconds = dt - minutes*60
plrl = lambda i, s: str(i) + " " + s + ("s" if i != 1 else "")
dprint(first = "Total points at "+ dt_string + " (" + plrl(minutes, "minute") + ", "+ plrl(seconds, "second") +")",
last=""+str(report.obtained)+"/"+str(report.possible), nL = report.nL)
# print(f"Completed at "+ dt_string + " (" + plrl(minutes, "minute") + ", "+ plrl(seconds, "second") +"). Total")
table_data.append(["Total", ""+str(report.obtained)+"/"+str(report.possible) ])
results = {'total': (obtained, possible), 'details': score}
return results, table_data
from tabulate import tabulate
from datetime import datetime
import inspect
import json
import os
import bz2
import pickle
import os
def bzwrite(json_str, token): # to get around obfuscation issues
with getattr(bz2, 'open')(token, "wt") as f:
f.write(json_str)
def gather_imports(imp):
resources = {}
m = imp
# for m in pack_imports:
# print(f"*** {m.__name__}")
f = m.__file__
# dn = os.path.dirname(f)
# top_package = os.path.dirname(__import__(m.__name__.split('.')[0]).__file__)
# top_package = str(__import__(m.__name__.split('.')[0]).__path__)
if hasattr(m, '__file__') and not hasattr(m, '__path__'): # Importing a simple file: m.__class__.__name__ == 'module' and False:
top_package = os.path.dirname(m.__file__)
module_import = True
else:
top_package = __import__(m.__name__.split('.')[0]).__path__._path[0]
module_import = False
# top_package = os.path.dirname(__import__(m.__name__.split('.')[0]).__file__)
# top_package = os.path.dirname(top_package)
import zipfile
# import strea
# zipfile.ZipFile
import io
# file_like_object = io.BytesIO(my_zip_data)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zip:
# zip.write()
for root, dirs, files in os.walk(top_package):
for file in files:
if file.endswith(".py"):
fpath = os.path.join(root, file)
v = os.path.relpath(os.path.join(root, file), os.path.dirname(top_package) if not module_import else top_package)
zip.write(fpath, v)
resources['zipfile'] = zip_buffer.getvalue()
resources['top_package'] = top_package
resources['module_import'] = module_import
return resources, top_package
if f.endswith("__init__.py"):
for root, dirs, files in os.walk(os.path.dirname(f)):
for file in files:
if file.endswith(".py"):
# print(file)
# print()
v = os.path.relpath(os.path.join(root, file), top_package)
with open(os.path.join(root, file), 'r') as ff:
resources[v] = ff.read()
else:
v = os.path.relpath(f, top_package)
with open(f, 'r') as ff:
resources[v] = ff.read()
return resources
import argparse
parser = argparse.ArgumentParser(description='Evaluate your report.', epilog="""Use this script to get the score of your report. Example:
> python report1_grade.py
Finally, note that if your report is part of a module (package), and the report script requires part of that package, the -m option for python may be useful.
For instance, if the report file is in Documents/course_package/report3_complete.py, and `course_package` is a python package, then change directory to 'Documents/` and run:
> python -m course_package.report1
see https://docs.python.org/3.9/using/cmdline.html
""", formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('--noprogress', action="store_true", help='Disable progress bars')
parser.add_argument('--autolab', action="store_true", help='Show Autolab results')
def gather_upload_to_campusnet(report, output_dir=None):
n = report.nL
args = parser.parse_args()
results, table_data = evaluate_report(report, show_help_flag=False, show_expected=False, show_computed=False, silent=True,
show_progress_bar=not args.noprogress,
big_header=not args.autolab)
# print(" ")
# print("="*n)
# print("Final evaluation")
# print(tabulate(table_data))
# also load the source code of missing files...
sources = {}
print("")
if not args.autolab:
if len(report.individual_imports) > 0:
print("By uploading the .token file, you verify the files:")
for m in report.individual_imports:
print(">", m.__file__)
print("Are created/modified individually by you in agreement with DTUs exam rules")
report.pack_imports += report.individual_imports
if len(report.pack_imports) > 0:
print("Including files in upload...")
for k, m in enumerate(report.pack_imports):
nimp, top_package = gather_imports(m)
_, report_relative_location, module_import = report._import_base_relative()
# report_relative_location = os.path.relpath(inspect.getfile(report.__class__), top_package)
nimp['report_relative_location'] = report_relative_location
nimp['report_module_specification'] = module_import
nimp['name'] = m.__name__
sources[k] = nimp
# if len([k for k in nimp if k not in sources]) > 0:
print(f" * {m.__name__}")
# sources = {**sources, **nimp}
results['sources'] = sources
if output_dir is None:
output_dir = os.getcwd()
payload_out_base = report.__class__.__name__ + "_handin"
obtain, possible = results['total']
vstring = "_v"+report.version if report.version is not None else ""
token = "%s_%i_of_%i%s.token"%(payload_out_base, obtain, possible,vstring)
token = os.path.normpath(os.path.join(output_dir, token))
with open(token, 'wb') as f:
pickle.dump(results, f)
if not args.autolab:
print(" ")
print("To get credit for your results, please upload the single unmodified file: ")
print(">", token)
# print("To campusnet without any modifications.")
# print("Now time for some autolab fun")
def source_instantiate(name, report1_source, payload):
eval("exec")(report1_source, globals())
pl = pickle.loads(bytes.fromhex(payload))
report = eval(name)(payload=pl, strict=True)
# report.set_payload(pl)
return report
report1_source = 'import os\n\n# DONT\'t import stuff here since install script requires __version__\n\ndef cache_write(object, file_name, verbose=True):\n import compress_pickle\n dn = os.path.dirname(file_name)\n if not os.path.exists(dn):\n os.mkdir(dn)\n if verbose: print("Writing cache...", file_name)\n with open(file_name, \'wb\', ) as f:\n compress_pickle.dump(object, f, compression="lzma")\n if verbose: print("Done!")\n\n\ndef cache_exists(file_name):\n # file_name = cn_(file_name) if cache_prefix else file_name\n return os.path.exists(file_name)\n\n\ndef cache_read(file_name):\n import compress_pickle # Import here because if you import in top the __version__ tag will fail.\n # file_name = cn_(file_name) if cache_prefix else file_name\n if os.path.exists(file_name):\n try:\n with open(file_name, \'rb\') as f:\n return compress_pickle.load(f, compression="lzma")\n except Exception as e:\n print("Tried to load a bad pickle file at", file_name)\n print("If the file appears to be automatically generated, you can try to delete it, otherwise download a new version")\n print(e)\n # return pickle.load(f)\n else:\n return None\n\n\n\n"""\ngit add . && git commit -m "Options" && git push && pip install git+ssh://git@gitlab.compute.dtu.dk/tuhe/unitgrade_v1.git --upgrade\n"""\nimport numpy as np\nimport sys\nimport re\nimport threading\nimport tqdm\nimport pickle\nimport os\nfrom io import StringIO\nimport io\nfrom unittest.runner import _WritelnDecorator\nfrom typing import Any\nimport inspect\nimport textwrap\nimport colorama\nfrom colorama import Fore\nfrom functools import _make_key, RLock\nfrom collections import namedtuple\nimport unittest\nimport time\n\n_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])\n\ncolorama.init(autoreset=True) # auto resets your settings after every output\n\ndef gprint(s):\n print(f"{Fore.GREEN}{s}")\n\nmyround = lambda x: np.round(x) # required.\nmsum = lambda x: sum(x)\nmfloor = lambda x: np.floor(x)\n\n\ndef setup_dir_by_class(C, base_dir):\n name = C.__class__.__name__\n return base_dir, name\n\n\nclass Logger(object):\n def __init__(self, buffer):\n assert False\n self.terminal = sys.stdout\n self.log = buffer\n\n def write(self, message):\n self.terminal.write(message)\n self.log.write(message)\n\n def flush(self):\n # this flush method is needed for python 3 compatibility.\n pass\n\n\nclass Capturing(list):\n def __init__(self, *args, stdout=None, unmute=False, **kwargs):\n self._stdout = stdout\n self.unmute = unmute\n super().__init__(*args, **kwargs)\n\n def __enter__(self, capture_errors=True): # don\'t put arguments here.\n self._stdout = sys.stdout if self._stdout == None else self._stdout\n self._stringio = StringIO()\n if self.unmute:\n sys.stdout = Logger(self._stringio)\n else:\n sys.stdout = self._stringio\n\n if capture_errors:\n self._sterr = sys.stderr\n sys.sterr = StringIO() # memory hole it\n self.capture_errors = capture_errors\n return self\n\n def __exit__(self, *args):\n self.extend(self._stringio.getvalue().splitlines())\n del self._stringio # free up some memory\n sys.stdout = self._stdout\n if self.capture_errors:\n sys.sterr = self._sterr\n\n\nclass Capturing2(Capturing):\n def __exit__(self, *args):\n lines = self._stringio.getvalue().splitlines()\n txt = "\\n".join(lines)\n numbers = extract_numbers(txt)\n self.extend(lines)\n del self._stringio # free up some memory\n sys.stdout = self._stdout\n if self.capture_errors:\n sys.sterr = self._sterr\n\n self.output = txt\n self.numbers = numbers\n\n\n# @classmethod\n# class OrderedClassMembers(type):\n# def __prepare__(self, name, bases):\n# assert False\n# return collections.OrderedDict()\n#\n# def __new__(self, name, bases, classdict):\n# ks = list(classdict.keys())\n# for b in bases:\n# ks += b.__ordered__\n# classdict[\'__ordered__\'] = [key for key in ks if key not in (\'__module__\', \'__qualname__\')]\n# return type.__new__(self, name, bases, classdict)\n\n\nclass Report:\n title = "report title"\n version = None\n questions = []\n pack_imports = []\n individual_imports = []\n nL = 120 # Maximum line width\n\n @classmethod\n def reset(cls):\n for (q, _) in cls.questions:\n if hasattr(q, \'reset\'):\n q.reset()\n\n @classmethod\n def mfile(clc):\n return inspect.getfile(clc)\n\n def _file(self):\n return inspect.getfile(type(self))\n\n def _import_base_relative(self):\n if hasattr(self.pack_imports[0], \'__path__\'):\n root_dir = self.pack_imports[0].__path__._path[0]\n else:\n root_dir = self.pack_imports[0].__file__\n\n root_dir = os.path.dirname(root_dir)\n relative_path = os.path.relpath(self._file(), root_dir)\n modules = os.path.normpath(relative_path[:-3]).split(os.sep)\n return root_dir, relative_path, modules\n\n def __init__(self, strict=False, payload=None):\n working_directory = os.path.abspath(os.path.dirname(self._file()))\n self.wdir, self.name = setup_dir_by_class(self, working_directory)\n # self.computed_answers_file = os.path.join(self.wdir, self.name + "_resources_do_not_hand_in.dat")\n for (q, _) in self.questions:\n q.nL = self.nL # Set maximum line length.\n\n if payload is not None:\n self.set_payload(payload, strict=strict)\n\n def main(self, verbosity=1):\n # Run all tests using standard unittest (nothing fancy).\n loader = unittest.TestLoader()\n for q, _ in self.questions:\n start = time.time() # A good proxy for setup time is to\n suite = loader.loadTestsFromTestCase(q)\n unittest.TextTestRunner(verbosity=verbosity).run(suite)\n total = time.time() - start\n q.time = total\n\n def _setup_answers(self, with_coverage=False):\n if with_coverage:\n for q, _ in self.questions:\n q._with_coverage = True\n q._report = self\n\n self.main() # Run all tests in class just to get that out of the way...\n report_cache = {}\n for q, _ in self.questions:\n # print(self.questions)\n if hasattr(q, \'_save_cache\'):\n q()._save_cache()\n print("q is", q())\n q()._cache_put(\'time\', q.time) # = q.time\n report_cache[q.__qualname__] = q._cache2\n else:\n report_cache[q.__qualname__] = {\'no cache see _setup_answers in framework.py\': True}\n if with_coverage:\n for q, _ in self.questions:\n q._with_coverage = False\n return report_cache\n\n def set_payload(self, payloads, strict=False):\n for q, _ in self.questions:\n q._cache = payloads[q.__qualname__]\n\n\ndef rm_progress_bar(txt):\n # More robust version. Apparently length of bar can depend on various factors, so check for order of symbols.\n nlines = []\n for l in txt.splitlines():\n pct = l.find("%")\n ql = False\n if pct > 0:\n i = l.find("|", pct + 1)\n if i > 0 and l.find("|", i + 1) > 0:\n ql = True\n if not ql:\n nlines.append(l)\n return "\\n".join(nlines)\n\n\ndef extract_numbers(txt):\n # txt = rm_progress_bar(txt)\n numeric_const_pattern = r\'[-+]? (?: (?: \\d* \\. \\d+ ) | (?: \\d+ \\.? ) )(?: [Ee] [+-]? \\d+ ) ?\'\n rx = re.compile(numeric_const_pattern, re.VERBOSE)\n all = rx.findall(txt)\n all = [float(a) if (\'.\' in a or "e" in a) else int(a) for a in all]\n if len(all) > 500:\n print(txt)\n raise Exception("unitgrade_v1.unitgrade_v1.py: Warning, too many numbers!", len(all))\n return all\n\n\nclass ActiveProgress():\n def __init__(self, t, start=True, title="my progress bar", show_progress_bar=True, file=None):\n if file == None:\n file = sys.stdout\n self.file = file\n self.t = t\n self._running = False\n self.title = title\n self.dt = 0.01\n self.n = int(np.round(self.t / self.dt))\n self.show_progress_bar = show_progress_bar\n self.pbar = None\n\n if start:\n self.start()\n\n def start(self):\n self._running = True\n if self.show_progress_bar:\n self.thread = threading.Thread(target=self.run)\n self.thread.start()\n self.time_started = time.time()\n\n def terminate(self):\n if not self._running:\n raise Exception("Stopping a stopped progress bar. ")\n self._running = False\n if self.show_progress_bar:\n self.thread.join()\n if self.pbar is not None:\n self.pbar.update(1)\n self.pbar.close()\n self.pbar = None\n\n self.file.flush()\n return time.time() - self.time_started\n\n def run(self):\n self.pbar = tqdm.tqdm(total=self.n, file=self.file, position=0, leave=False, desc=self.title, ncols=100,\n bar_format=\'{l_bar}{bar}| [{elapsed}<{remaining}]\')\n\n for _ in range(self.n - 1): # Don\'t terminate completely; leave bar at 99% done until terminate.\n if not self._running:\n self.pbar.close()\n self.pbar = None\n break\n\n time.sleep(self.dt)\n self.pbar.update(1)\n\ndef dprint(first, last, nL, extra = "", file=None, dotsym=\'.\', color=\'white\'):\n if file == None:\n file = sys.stdout\n\n # ss = self.item_title_print\n # state = "PASS" if success else "FAILED"\n dot_parts = (dotsym * max(0, nL - len(last) - len(first)))\n # if self.show_progress_bar or True:\n print(first + dot_parts, end="", file=file)\n # else:\n # print(dot_parts, end="", file=self.cc.file)\n last += extra\n # if tsecs >= 0.5:\n # state += " (" + str(tsecs) + " seconds)"\n print(last, file=file)\n\n\nclass UTextResult(unittest.TextTestResult):\n nL = 80\n number = -1 # HAcky way to set question number.\n show_progress_bar = True\n cc = None\n\n def __init__(self, stream, descriptions, verbosity):\n super().__init__(stream, descriptions, verbosity)\n self.successes = []\n\n def printErrors(self) -> None:\n self.printErrorList(\'ERROR\', self.errors)\n self.printErrorList(\'FAIL\', self.failures)\n\n def addError(self, test, err):\n super(unittest.TextTestResult, self).addFailure(test, err)\n self.cc_terminate(success=False)\n\n def addFailure(self, test, err):\n super(unittest.TextTestResult, self).addFailure(test, err)\n self.cc_terminate(success=False)\n\n def addSuccess(self, test: unittest.case.TestCase) -> None:\n self.successes.append(test)\n self.cc_terminate()\n\n def cc_terminate(self, success=True):\n if self.show_progress_bar or True:\n tsecs = np.round(self.cc.terminate(), 2)\n self.cc.file.flush()\n ss = self.item_title_print\n\n state = "PASS" if success else "FAILED"\n\n dot_parts = (\'.\' * max(0, self.nL - len(state) - len(ss)))\n if self.show_progress_bar or True:\n print(self.item_title_print + dot_parts, end="", file=self.cc.file)\n else:\n print(dot_parts, end="", file=self.cc.file)\n\n if tsecs >= 0.5:\n state += " (" + str(tsecs) + " seconds)"\n print(state, file=self.cc.file)\n\n def startTest(self, test):\n # j =self.testsRun\n self.testsRun += 1\n # item_title = self.getDescription(test)\n item_title = test.shortDescription() # Better for printing (get from cache).\n if item_title == None:\n # For unittest framework where getDescription may return None.\n item_title = self.getDescription(test)\n self.item_title_print = " * q%i.%i) %s" % (UTextResult.number + 1, self.testsRun, item_title)\n estimated_time = 10\n if self.show_progress_bar or True:\n self.cc = ActiveProgress(t=estimated_time, title=self.item_title_print, show_progress_bar=self.show_progress_bar, file=sys.stdout)\n else:\n print(self.item_title_print + (\'.\' * max(0, self.nL - 4 - len(self.item_title_print))), end="")\n\n self._test = test\n self._stdout = sys.stdout\n sys.stdout = io.StringIO()\n\n def stopTest(self, test):\n sys.stdout = self._stdout\n super().stopTest(test)\n\n def _setupStdout(self):\n if self._previousTestClass == None:\n total_estimated_time = 1\n if hasattr(self.__class__, \'q_title_print\'):\n q_title_print = self.__class__.q_title_print\n else:\n q_title_print = "<unnamed test. See unitgrade_v1.py>"\n\n cc = ActiveProgress(t=total_estimated_time, title=q_title_print, show_progress_bar=self.show_progress_bar)\n self.cc = cc\n\n def _restoreStdout(self): # Used when setting up the test.\n if self._previousTestClass is None:\n q_time = self.cc.terminate()\n q_time = np.round(q_time, 2)\n sys.stdout.flush()\n if self.show_progress_bar:\n print(self.cc.title, end="")\n print(" " * max(0, self.nL - len(self.cc.title)) + (" (" + str(q_time) + " seconds)" if q_time >= 0.5 else ""))\n\n\nclass UTextTestRunner(unittest.TextTestRunner):\n def __init__(self, *args, **kwargs):\n stream = io.StringIO()\n super().__init__(*args, stream=stream, **kwargs)\n\n def _makeResult(self):\n # stream = self.stream # not you!\n stream = sys.stdout\n stream = _WritelnDecorator(stream)\n return self.resultclass(stream, self.descriptions, self.verbosity)\n\n\ndef cache(foo, typed=False):\n """ Magic cache wrapper\n https://github.com/python/cpython/blob/main/Lib/functools.py\n """\n maxsize = None\n def wrapper(self, *args, **kwargs):\n key = (self.cache_id(), ("@cache", foo.__name__, _make_key(args, kwargs, typed)))\n if not self._cache_contains(key):\n value = foo(self, *args, **kwargs)\n self._cache_put(key, value)\n else:\n value = self._cache_get(key)\n return value\n\n return wrapper\n\n\ndef get_hints(ss):\n if ss == None:\n return None\n try:\n ss = textwrap.dedent(ss)\n ss = ss.replace(\'\'\'"""\'\'\', "").strip()\n hints = ["hints:", ]\n j = np.argmax([ss.lower().find(h) for h in hints])\n h = hints[j]\n ss = ss[ss.find(h) + len(h) + 1:]\n ss = "\\n".join([l for l in ss.split("\\n") if not l.strip().startswith(":")])\n ss = textwrap.dedent(ss)\n ss = ss.strip()\n return ss\n except Exception as e:\n print("bad hints", ss, e)\n\n\nclass UTestCase(unittest.TestCase):\n _outcome = None # A dictionary which stores the user-computed outcomes of all the tests. This differs from the cache.\n _cache = None # Read-only cache. Ensures method always produce same result.\n _cache2 = None # User-written cache.\n _with_coverage = False\n _report = None # The report used. This is very, very hacky and should always be None. Don\'t rely on it!\n\n def capture(self):\n if hasattr(self, \'_stdout\') and self._stdout is not None:\n file = self._stdout\n else:\n # self._stdout = sys.stdout\n # sys._stdout = io.StringIO()\n file = sys.stdout\n return Capturing2(stdout=file)\n\n @classmethod\n def question_title(cls):\n """ Return the question title """\n return cls.__doc__.strip().splitlines()[0].strip() if cls.__doc__ is not None else cls.__qualname__\n\n @classmethod\n def reset(cls):\n print("Warning, I am not sure UTestCase.reset() is needed anymore and it seems very hacky.")\n cls._outcome = None\n cls._cache = None\n cls._cache2 = None\n\n def _callSetUp(self):\n if self._with_coverage:\n if not hasattr(self._report, \'covcache\'):\n self._report.covcache = {}\n import coverage\n self.cov = coverage.Coverage()\n self.cov.start()\n self.setUp()\n\n def _callTearDown(self):\n self.tearDown()\n if self._with_coverage:\n from pathlib import Path\n from snipper import snipper\n self.cov.stop()\n data = self.cov.get_data()\n base, _, _ = self._report._import_base_relative()\n for file in data.measured_files():\n file = os.path.normpath(file)\n root = Path(base)\n child = Path(file)\n if root in child.parents:\n with open(child, \'r\') as f:\n s = f.read()\n lines = s.splitlines()\n garb = \'GARBAGE\'\n\n lines2 = snipper.censor_code(lines, keep=True)\n assert len(lines) == len(lines2)\n\n for l in data.contexts_by_lineno(file):\n if lines2[l].strip() == garb:\n if self.cache_id() not in self._report.covcache:\n self._report.covcache[self.cache_id()] = {}\n\n rel = os.path.relpath(child, root)\n cc = self._report.covcache[self.cache_id()]\n j = 0\n for j in range(l, -1, -1):\n if "def" in lines2[j] or "class" in lines2[j]:\n break\n from snipper.snipper import gcoms\n fun = lines2[j]\n comments, _ = gcoms("\\n".join(lines2[j:l]))\n if rel not in cc:\n cc[rel] = {}\n cc[rel][fun] = (l, "\\n".join(comments))\n self._cache_put((self.cache_id(), \'coverage\'), self._report.covcache)\n\n def shortDescriptionStandard(self):\n sd = super().shortDescription()\n if sd is None:\n sd = self._testMethodName\n return sd\n\n def shortDescription(self):\n sd = self.shortDescriptionStandard()\n title = self._cache_get((self.cache_id(), \'title\'), sd)\n return title if title is not None else sd\n\n @property\n def title(self):\n return self.shortDescription()\n\n @title.setter\n def title(self, value):\n self._cache_put((self.cache_id(), \'title\'), value)\n\n def _get_outcome(self):\n if not (self.__class__, \'_outcome\') or self.__class__._outcome is None:\n self.__class__._outcome = {}\n return self.__class__._outcome\n\n def _callTestMethod(self, testMethod):\n t = time.time()\n self._ensure_cache_exists() # Make sure cache is there.\n if self._testMethodDoc is not None:\n self._cache_put((self.cache_id(), \'title\'), self.shortDescriptionStandard())\n\n self._cache2[(self.cache_id(), \'assert\')] = {}\n res = testMethod()\n elapsed = time.time() - t\n self._get_outcome()[self.cache_id()] = res\n self._cache_put((self.cache_id(), "time"), elapsed)\n\n def cache_id(self):\n c = self.__class__.__qualname__\n m = self._testMethodName\n return c, m\n\n def __init__(self, *args, **kwargs):\n super().__init__(*args, **kwargs)\n self._load_cache()\n self._assert_cache_index = 0\n\n def _ensure_cache_exists(self):\n if not hasattr(self.__class__, \'_cache\') or self.__class__._cache == None:\n self.__class__._cache = dict()\n if not hasattr(self.__class__, \'_cache2\') or self.__class__._cache2 == None:\n self.__class__._cache2 = dict()\n\n def _cache_get(self, key, default=None):\n self._ensure_cache_exists()\n return self.__class__._cache.get(key, default)\n\n def _cache_put(self, key, value):\n self._ensure_cache_exists()\n self.__class__._cache2[key] = value\n\n def _cache_contains(self, key):\n self._ensure_cache_exists()\n return key in self.__class__._cache\n\n def wrap_assert(self, assert_fun, first, *args, **kwargs):\n # sys.stdout = self._stdout\n key = (self.cache_id(), \'assert\')\n if not self._cache_contains(key):\n print("Warning, framework missing", key)\n self.__class__._cache[\n key] = {} # A new dict. We manually insert it because we have to use that the dict is mutable.\n cache = self._cache_get(key)\n id = self._assert_cache_index\n if not id in cache:\n print("Warning, framework missing cache index", key, "id =", id)\n _expected = cache.get(id, f"Key {id} not found in cache; framework files missing. Please run deploy()")\n\n # The order of these calls is important. If the method assert fails, we should still store the correct result in cache.\n cache[id] = first\n self._cache_put(key, cache)\n self._assert_cache_index += 1\n assert_fun(first, _expected, *args, **kwargs)\n\n def assertEqualC(self, first: Any, msg: Any = ...) -> None:\n self.wrap_assert(self.assertEqual, first, msg)\n\n def _cache_file(self):\n return os.path.dirname(inspect.getfile(self.__class__)) + "/unitgrade_v1/" + self.__class__.__name__ + ".pkl"\n\n def _save_cache(self):\n # get the class name (i.e. what to save to).\n cfile = self._cache_file()\n if not os.path.isdir(os.path.dirname(cfile)):\n os.makedirs(os.path.dirname(cfile))\n\n if hasattr(self.__class__, \'_cache2\'):\n with open(cfile, \'wb\') as f:\n pickle.dump(self.__class__._cache2, f)\n\n # But you can also set cache explicitly.\n def _load_cache(self):\n if self._cache is not None: # Cache already loaded. We will not load it twice.\n return\n # raise Exception("Loaded cache which was already set. What is going on?!")\n cfile = self._cache_file()\n if os.path.exists(cfile):\n try:\n with open(cfile, \'rb\') as f:\n data = pickle.load(f)\n self.__class__._cache = data\n except Exception as e:\n print("Bad cache", cfile)\n print(e)\n else:\n print("Warning! data file not found", cfile)\n\n def _feedErrorsToResult(self, result, errors):\n """ Use this to show hints on test failure. """\n if not isinstance(result, UTextResult):\n er = [e for e, v in errors if v != None]\n\n if len(er) > 0:\n hints = []\n key = (self.cache_id(), \'coverage\')\n if self._cache_contains(key):\n CC = self._cache_get(key)\n for id in CC:\n if id == self.cache_id():\n cl, m = id\n gprint(f"> An error occured while solving: {cl}.{m}. The files/methods you need to edit are:") # For the test {id} in {file} you should edit:")\n for file in CC[id]:\n rec = CC[id][file]\n gprint(f"> * {file}")\n for l in rec:\n _, comments = CC[id][file][l]\n hint = get_hints(comments)\n\n if hint != None:\n hints.append(hint)\n gprint(f"> - {l}")\n\n er = er[0]\n doc = er._testMethodDoc\n if doc is not None:\n hint = get_hints(er._testMethodDoc)\n if hint is not None:\n hints = [hint] + hints\n if len(hints) > 0:\n gprint("> Hints:")\n gprint(textwrap.indent("\\n".join(hints), "> "))\n\n super()._feedErrorsToResult(result, errors)\n\n def startTestRun(self):\n # print("asdfsdaf 11", file=sys.stderr)\n super().startTestRun()\n # print("asdfsdaf")\n\n def _callTestMethod(self, method):\n # print("asdfsdaf")\n super()._callTestMethod(method)\n\n\ndef hide(func):\n return func\n\n\ndef makeRegisteringDecorator(foreignDecorator):\n """\n Returns a copy of foreignDecorator, which is identical in every\n way(*), except also appends a .decorator property to the callable it\n spits out.\n """\n\n def newDecorator(func):\n # Call to newDecorator(method)\n # Exactly like old decorator, but output keeps track of what decorated it\n R = foreignDecorator(func) # apply foreignDecorator, like call to foreignDecorator(method) would have done\n R.decorator = newDecorator # keep track of decorator\n # R.original = func # might as well keep track of everything!\n return R\n\n newDecorator.__name__ = foreignDecorator.__name__\n newDecorator.__doc__ = foreignDecorator.__doc__\n return newDecorator\n\nhide = makeRegisteringDecorator(hide)\n\ndef methodsWithDecorator(cls, decorator):\n """\n Returns all methods in CLS with DECORATOR as the\n outermost decorator.\n\n DECORATOR must be a "registering decorator"; one\n can make any decorator "registering" via the\n makeRegisteringDecorator function.\n\n import inspect\n ls = list(methodsWithDecorator(GeneratorQuestion, deco))\n for f in ls:\n print(inspect.getsourcelines(f) ) # How to get all hidden questions.\n """\n for maybeDecorated in cls.__dict__.values():\n if hasattr(maybeDecorated, \'decorator\'):\n if maybeDecorated.decorator == decorator:\n print(maybeDecorated)\n yield maybeDecorated\n# 817\n\n\nimport numpy as np\nfrom tabulate import tabulate\nfrom datetime import datetime\nimport pyfiglet\nimport unittest\nimport inspect\nimport os\nimport argparse\nimport time\n\nparser = argparse.ArgumentParser(description=\'Evaluate your report.\', epilog="""Example: \nTo run all tests in a report: \n\n> python assignment1_dp.py\n\nTo run only question 2 or question 2.1\n\n> python assignment1_dp.py -q 2\n> python assignment1_dp.py -q 2.1\n\nNote this scripts does not grade your report. To grade your report, use:\n\n> python report1_grade.py\n\nFinally, note that if your report is part of a module (package), and the report script requires part of that package, the -m option for python may be useful.\nFor instance, if the report file is in Documents/course_package/report3_complete.py, and `course_package` is a python package, then change directory to \'Documents/` and run:\n\n> python -m course_package.report1\n\nsee https://docs.python.org/3.9/using/cmdline.html\n""", formatter_class=argparse.RawTextHelpFormatter)\nparser.add_argument(\'-q\', nargs=\'?\', type=str, default=None, help=\'Only evaluate this question (e.g.: -q 2)\')\nparser.add_argument(\'--showexpected\', action="store_true", help=\'Show the expected/desired result\')\nparser.add_argument(\'--showcomputed\', action="store_true", help=\'Show the answer your code computes\')\nparser.add_argument(\'--unmute\', action="store_true", help=\'Show result of print(...) commands in code\')\nparser.add_argument(\'--passall\', action="store_true", help=\'Automatically pass all tests. Useful when debugging.\')\n\ndef evaluate_report_student(report, question=None, qitem=None, unmute=None, passall=None, ignore_missing_file=False, show_tol_err=False):\n args = parser.parse_args()\n if question is None and args.q is not None:\n question = args.q\n if "." in question:\n question, qitem = [int(v) for v in question.split(".")]\n else:\n question = int(question)\n\n if hasattr(report, "computed_answer_file") and not os.path.isfile(report.computed_answers_file) and not ignore_missing_file:\n raise Exception("> Error: The pre-computed answer file", os.path.abspath(report.computed_answers_file), "does not exist. Check your package installation")\n\n if unmute is None:\n unmute = args.unmute\n if passall is None:\n passall = args.passall\n\n results, table_data = evaluate_report(report, question=question, show_progress_bar=not unmute, qitem=qitem, verbose=False, passall=passall, show_expected=args.showexpected, show_computed=args.showcomputed,unmute=unmute,\n show_tol_err=show_tol_err)\n\n\n if question is None:\n print("Provisional evaluation")\n tabulate(table_data)\n table = table_data\n print(tabulate(table))\n print(" ")\n\n fr = inspect.getouterframes(inspect.currentframe())[1].filename\n gfile = os.path.basename(fr)[:-3] + "_grade.py"\n if os.path.exists(gfile):\n print("Note your results have not yet been registered. \\nTo register your results, please run the file:")\n print(">>>", gfile)\n print("In the same manner as you ran this file.")\n\n\n return results\n\n\ndef upack(q):\n # h = zip([(i[\'w\'], i[\'possible\'], i[\'obtained\']) for i in q.values()])\n h =[(i[\'w\'], i[\'possible\'], i[\'obtained\']) for i in q.values()]\n h = np.asarray(h)\n return h[:,0], h[:,1], h[:,2],\n\nclass UnitgradeTextRunner(unittest.TextTestRunner):\n def __init__(self, *args, **kwargs):\n super().__init__(*args, **kwargs)\n\nclass SequentialTestLoader(unittest.TestLoader):\n def getTestCaseNames(self, testCaseClass):\n test_names = super().getTestCaseNames(testCaseClass)\n # testcase_methods = list(testCaseClass.__dict__.keys())\n ls = []\n for C in testCaseClass.mro():\n if issubclass(C, unittest.TestCase):\n ls = list(C.__dict__.keys()) + ls\n testcase_methods = ls\n test_names.sort(key=testcase_methods.index)\n return test_names\n\ndef evaluate_report(report, question=None, qitem=None, passall=False, verbose=False, show_expected=False, show_computed=False,unmute=False, show_help_flag=True, silent=False,\n show_progress_bar=True,\n show_tol_err=False,\n big_header=True):\n\n now = datetime.now()\n if big_header:\n ascii_banner = pyfiglet.figlet_format("UnitGrade", font="doom")\n b = "\\n".join( [l for l in ascii_banner.splitlines() if len(l.strip()) > 0] )\n else:\n b = "Unitgrade"\n dt_string = now.strftime("%d/%m/%Y %H:%M:%S")\n print(b + " v" + __version__ + ", started: " + dt_string+ "\\n")\n # print("Started: " + dt_string)\n s = report.title\n if hasattr(report, "version") and report.version is not None:\n s += " version " + report.version\n print(s, "(use --help for options)" if show_help_flag else "")\n # print(f"Loaded answers from: ", report.computed_answers_file, "\\n")\n table_data = []\n t_start = time.time()\n score = {}\n loader = SequentialTestLoader()\n\n for n, (q, w) in enumerate(report.questions):\n if question is not None and n+1 != question:\n continue\n suite = loader.loadTestsFromTestCase(q)\n qtitle = q.question_title() if hasattr(q, \'question_title\') else q.__qualname__\n q_title_print = "Question %i: %s"%(n+1, qtitle)\n print(q_title_print, end="")\n q.possible = 0\n q.obtained = 0\n q_ = {} # Gather score in this class.\n UTextResult.q_title_print = q_title_print # Hacky\n UTextResult.show_progress_bar = show_progress_bar # Hacky.\n UTextResult.number = n\n UTextResult.nL = report.nL\n\n res = UTextTestRunner(verbosity=2, resultclass=UTextResult).run(suite)\n\n possible = res.testsRun\n obtained = len(res.successes)\n\n assert len(res.successes) + len(res.errors) + len(res.failures) == res.testsRun\n\n obtained = int(w * obtained * 1.0 / possible ) if possible > 0 else 0\n score[n] = {\'w\': w, \'possible\': w, \'obtained\': obtained, \'items\': q_, \'title\': qtitle}\n q.obtained = obtained\n q.possible = possible\n\n s1 = f" * q{n+1}) Total"\n s2 = f" {q.obtained}/{w}"\n print(s1 + ("."* (report.nL-len(s1)-len(s2) )) + s2 )\n print(" ")\n table_data.append([f"q{n+1}) Total", f"{q.obtained}/{w}"])\n\n ws, possible, obtained = upack(score)\n possible = int( msum(possible) )\n obtained = int( msum(obtained) ) # Cast to python int\n report.possible = possible\n report.obtained = obtained\n now = datetime.now()\n dt_string = now.strftime("%H:%M:%S")\n\n dt = int(time.time()-t_start)\n minutes = dt//60\n seconds = dt - minutes*60\n plrl = lambda i, s: str(i) + " " + s + ("s" if i != 1 else "")\n\n dprint(first = "Total points at "+ dt_string + " (" + plrl(minutes, "minute") + ", "+ plrl(seconds, "second") +")",\n last=""+str(report.obtained)+"/"+str(report.possible), nL = report.nL)\n\n # print(f"Completed at "+ dt_string + " (" + plrl(minutes, "minute") + ", "+ plrl(seconds, "second") +"). Total")\n\n table_data.append(["Total", ""+str(report.obtained)+"/"+str(report.possible) ])\n results = {\'total\': (obtained, possible), \'details\': score}\n return results, table_data\n\n\nfrom tabulate import tabulate\nfrom datetime import datetime\nimport inspect\nimport json\nimport os\nimport bz2\nimport pickle\nimport os\n\ndef bzwrite(json_str, token): # to get around obfuscation issues\n with getattr(bz2, \'open\')(token, "wt") as f:\n f.write(json_str)\n\ndef gather_imports(imp):\n resources = {}\n m = imp\n # for m in pack_imports:\n # print(f"*** {m.__name__}")\n f = m.__file__\n # dn = os.path.dirname(f)\n # top_package = os.path.dirname(__import__(m.__name__.split(\'.\')[0]).__file__)\n # top_package = str(__import__(m.__name__.split(\'.\')[0]).__path__)\n\n if hasattr(m, \'__file__\') and not hasattr(m, \'__path__\'): # Importing a simple file: m.__class__.__name__ == \'module\' and False:\n top_package = os.path.dirname(m.__file__)\n module_import = True\n else:\n top_package = __import__(m.__name__.split(\'.\')[0]).__path__._path[0]\n module_import = False\n\n # top_package = os.path.dirname(__import__(m.__name__.split(\'.\')[0]).__file__)\n # top_package = os.path.dirname(top_package)\n import zipfile\n # import strea\n # zipfile.ZipFile\n import io\n # file_like_object = io.BytesIO(my_zip_data)\n zip_buffer = io.BytesIO()\n with zipfile.ZipFile(zip_buffer, \'w\') as zip:\n # zip.write()\n for root, dirs, files in os.walk(top_package):\n for file in files:\n if file.endswith(".py"):\n fpath = os.path.join(root, file)\n v = os.path.relpath(os.path.join(root, file), os.path.dirname(top_package) if not module_import else top_package)\n zip.write(fpath, v)\n\n resources[\'zipfile\'] = zip_buffer.getvalue()\n resources[\'top_package\'] = top_package\n resources[\'module_import\'] = module_import\n return resources, top_package\n\n if f.endswith("__init__.py"):\n for root, dirs, files in os.walk(os.path.dirname(f)):\n for file in files:\n if file.endswith(".py"):\n # print(file)\n # print()\n v = os.path.relpath(os.path.join(root, file), top_package)\n with open(os.path.join(root, file), \'r\') as ff:\n resources[v] = ff.read()\n else:\n v = os.path.relpath(f, top_package)\n with open(f, \'r\') as ff:\n resources[v] = ff.read()\n return resources\n\nimport argparse\nparser = argparse.ArgumentParser(description=\'Evaluate your report.\', epilog="""Use this script to get the score of your report. Example:\n\n> python report1_grade.py\n\nFinally, note that if your report is part of a module (package), and the report script requires part of that package, the -m option for python may be useful.\nFor instance, if the report file is in Documents/course_package/report3_complete.py, and `course_package` is a python package, then change directory to \'Documents/` and run:\n\n> python -m course_package.report1\n\nsee https://docs.python.org/3.9/using/cmdline.html\n""", formatter_class=argparse.RawTextHelpFormatter)\nparser.add_argument(\'--noprogress\', action="store_true", help=\'Disable progress bars\')\nparser.add_argument(\'--autolab\', action="store_true", help=\'Show Autolab results\')\n\ndef gather_upload_to_campusnet(report, output_dir=None):\n n = report.nL\n args = parser.parse_args()\n results, table_data = evaluate_report(report, show_help_flag=False, show_expected=False, show_computed=False, silent=True,\n show_progress_bar=not args.noprogress,\n big_header=not args.autolab)\n # print(" ")\n # print("="*n)\n # print("Final evaluation")\n # print(tabulate(table_data))\n # also load the source code of missing files...\n\n sources = {}\n print("")\n if not args.autolab:\n if len(report.individual_imports) > 0:\n print("By uploading the .token file, you verify the files:")\n for m in report.individual_imports:\n print(">", m.__file__)\n print("Are created/modified individually by you in agreement with DTUs exam rules")\n report.pack_imports += report.individual_imports\n\n if len(report.pack_imports) > 0:\n print("Including files in upload...")\n for k, m in enumerate(report.pack_imports):\n nimp, top_package = gather_imports(m)\n _, report_relative_location, module_import = report._import_base_relative()\n\n # report_relative_location = os.path.relpath(inspect.getfile(report.__class__), top_package)\n nimp[\'report_relative_location\'] = report_relative_location\n nimp[\'report_module_specification\'] = module_import\n nimp[\'name\'] = m.__name__\n sources[k] = nimp\n # if len([k for k in nimp if k not in sources]) > 0:\n print(f" * {m.__name__}")\n # sources = {**sources, **nimp}\n results[\'sources\'] = sources\n\n if output_dir is None:\n output_dir = os.getcwd()\n\n payload_out_base = report.__class__.__name__ + "_handin"\n\n obtain, possible = results[\'total\']\n vstring = "_v"+report.version if report.version is not None else ""\n\n token = "%s_%i_of_%i%s.token"%(payload_out_base, obtain, possible,vstring)\n token = os.path.normpath(os.path.join(output_dir, token))\n\n\n with open(token, \'wb\') as f:\n pickle.dump(results, f)\n\n if not args.autolab:\n print(" ")\n print("To get credit for your results, please upload the single unmodified file: ")\n print(">", token)\n # print("To campusnet without any modifications.")\n\n # print("Now time for some autolab fun")\n\ndef source_instantiate(name, report1_source, payload):\n eval("exec")(report1_source, globals())\n pl = pickle.loads(bytes.fromhex(payload))\n report = eval(name)(payload=pl, strict=True)\n # report.set_payload(pl)\n return report\n\n\n__version__ = "0.9.0"\n\n\nclass Week1(UTestCase):\n """ The first question for week 1. """\n def test_add(self):\n from cs103.homework1 import add\n self.assertEqualC(add(2,2))\n self.assertEqualC(add(-100, 5))\n\n\nclass AutomaticPass(UTestCase):\n def test_student_passed(self):\n self.assertEqual(2,2)\n\n\nimport cs103\nclass Report3(Report):\n title = "CS 101 Report 3"\n questions = [(Week1, 20), (AutomaticPass, 10)] # Include a single question for 10 credits.\n pack_imports = [cs103]'
report1_payload = '80049568000000000000007d94288c055765656b31947d942868018c08746573745f6164649486948c066173736572749486947d94284b014aa1ffffff4b004b04758c0474696d6594473fb1eb1c00000000758c0d4175746f6d6174696350617373947d946808473fa78d300000000073752e'
name="Report3"
report = source_instantiate(name, report1_source, report1_payload)
output_dir = os.path.dirname(__file__)
gather_upload_to_campusnet(report, output_dir)