Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
"""
git add . && git commit -m "Options" && git push && pip install git+ssh://git@gitlab.compute.dtu.dk/tuhe/unitgrade.git --upgrade
"""
import numpy as np
import sys
import re
import threading
import tqdm
import pickle
import os
from io import StringIO
import io
from unittest.runner import _WritelnDecorator
from typing import Any
import inspect
import textwrap
import colorama
from colorama import Fore
from functools import _make_key, RLock
from collections import namedtuple
import unittest
import time
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
colorama.init(autoreset=True) # auto resets your settings after every output
def gprint(s):
print(f"{Fore.GREEN}{s}")
myround = lambda x: np.round(x) # required.
msum = lambda x: sum(x)
mfloor = lambda x: np.floor(x)
def setup_dir_by_class(C, base_dir):
name = C.__class__.__name__
return base_dir, name
class Logger(object):
def __init__(self, buffer):
assert False
self.terminal = sys.stdout
self.log = buffer
def write(self, message):
self.terminal.write(message)
self.log.write(message)
def flush(self):
# this flush method is needed for python 3 compatibility.
pass
class Capturing(list):
def __init__(self, *args, stdout=None, unmute=False, **kwargs):
self._stdout = stdout
self.unmute = unmute
super().__init__(*args, **kwargs)
def __enter__(self, capture_errors=True): # don't put arguments here.
self._stdout = sys.stdout if self._stdout == None else self._stdout
self._stringio = StringIO()
if self.unmute:
sys.stdout = Logger(self._stringio)
else:
sys.stdout = self._stringio
if capture_errors:
self._sterr = sys.stderr
sys.sterr = StringIO() # memory hole it
self.capture_errors = capture_errors
return self
def __exit__(self, *args):
self.extend(self._stringio.getvalue().splitlines())
del self._stringio # free up some memory
sys.stdout = self._stdout
if self.capture_errors:
sys.sterr = self._sterr
class Capturing2(Capturing):
def __exit__(self, *args):
lines = self._stringio.getvalue().splitlines()
txt = "\n".join(lines)
numbers = extract_numbers(txt)
self.extend(lines)
del self._stringio # free up some memory
sys.stdout = self._stdout
if self.capture_errors:
sys.sterr = self._sterr
self.output = txt
self.numbers = numbers
# @classmethod
# class OrderedClassMembers(type):
# def __prepare__(self, name, bases):
# assert False
# return collections.OrderedDict()
#
# def __new__(self, name, bases, classdict):
# ks = list(classdict.keys())
# for b in bases:
# ks += b.__ordered__
# classdict['__ordered__'] = [key for key in ks if key not in ('__module__', '__qualname__')]
# return type.__new__(self, name, bases, classdict)
class Report:
title = "report title"
version = None
questions = []
pack_imports = []
individual_imports = []
nL = 120 # Maximum line width
@classmethod
def reset(cls):
for (q, _) in cls.questions:
if hasattr(q, 'reset'):
q.reset()
@classmethod
def mfile(clc):
return inspect.getfile(clc)
def _file(self):
return inspect.getfile(type(self))
def _import_base_relative(self):
if hasattr(self.pack_imports[0], '__path__'):
root_dir = self.pack_imports[0].__path__._path[0]
else:
root_dir = self.pack_imports[0].__file__
root_dir = os.path.dirname(root_dir)
relative_path = os.path.relpath(self._file(), root_dir)
modules = os.path.normpath(relative_path[:-3]).split(os.sep)
return root_dir, relative_path, modules
def __init__(self, strict=False, payload=None):
working_directory = os.path.abspath(os.path.dirname(self._file()))
self.wdir, self.name = setup_dir_by_class(self, working_directory)
# self.computed_answers_file = os.path.join(self.wdir, self.name + "_resources_do_not_hand_in.dat")
for (q, _) in self.questions:
q.nL = self.nL # Set maximum line length.
if payload is not None:
self.set_payload(payload, strict=strict)
def main(self, verbosity=1):
# Run all tests using standard unittest (nothing fancy).
loader = unittest.TestLoader()
for q, _ in self.questions:
start = time.time() # A good proxy for setup time is to
suite = loader.loadTestsFromTestCase(q)
unittest.TextTestRunner(verbosity=verbosity).run(suite)
total = time.time() - start
q.time = total
def _setup_answers(self, with_coverage=False):
if with_coverage:
for q, _ in self.questions:
q._with_coverage = True
q._report = self
self.main() # Run all tests in class just to get that out of the way...
report_cache = {}
for q, _ in self.questions:
# print(self.questions)
if hasattr(q, '_save_cache'):
q()._save_cache()
print("q is", q())
q()._cache_put('time', q.time) # = q.time
report_cache[q.__qualname__] = q._cache2
else:
report_cache[q.__qualname__] = {'no cache see _setup_answers in unitgrade2.py': True}
if with_coverage:
for q, _ in self.questions:
q._with_coverage = False
return report_cache
def set_payload(self, payloads, strict=False):
for q, _ in self.questions:
q._cache = payloads[q.__qualname__]
def rm_progress_bar(txt):
# More robust version. Apparently length of bar can depend on various factors, so check for order of symbols.
nlines = []
for l in txt.splitlines():
pct = l.find("%")
ql = False
if pct > 0:
i = l.find("|", pct + 1)
if i > 0 and l.find("|", i + 1) > 0:
ql = True
if not ql:
nlines.append(l)
return "\n".join(nlines)
def extract_numbers(txt):
# txt = rm_progress_bar(txt)
numeric_const_pattern = r'[-+]? (?: (?: \d* \. \d+ ) | (?: \d+ \.? ) )(?: [Ee] [+-]? \d+ ) ?'
rx = re.compile(numeric_const_pattern, re.VERBOSE)
all = rx.findall(txt)
all = [float(a) if ('.' in a or "e" in a) else int(a) for a in all]
if len(all) > 500:
print(txt)
raise Exception("unitgrade.unitgrade.py: Warning, too many numbers!", len(all))
return all
class ActiveProgress():
def __init__(self, t, start=True, title="my progress bar", show_progress_bar=True, file=None):
if file == None:
file = sys.stdout
self.file = file
self.t = t
self._running = False
self.title = title
self.dt = 0.01
self.n = int(np.round(self.t / self.dt))
self.show_progress_bar = show_progress_bar
self.pbar = None
if start:
self.start()
def start(self):
self._running = True
if self.show_progress_bar:
self.thread = threading.Thread(target=self.run)
self.thread.start()
self.time_started = time.time()
def terminate(self):
if not self._running:
raise Exception("Stopping a stopped progress bar. ")
self._running = False
if self.show_progress_bar:
self.thread.join()
if self.pbar is not None:
self.pbar.update(1)
self.pbar.close()
self.pbar = None
self.file.flush()
return time.time() - self.time_started
def run(self):
self.pbar = tqdm.tqdm(total=self.n, file=self.file, position=0, leave=False, desc=self.title, ncols=100,
bar_format='{l_bar}{bar}| [{elapsed}<{remaining}]')
for _ in range(self.n - 1): # Don't terminate completely; leave bar at 99% done until terminate.
if not self._running:
self.pbar.close()
self.pbar = None
break
time.sleep(self.dt)
self.pbar.update(1)
def dprint(first, last, nL, extra = "", file=None, dotsym='.', color='white'):
if file == None:
file = sys.stdout
# ss = self.item_title_print
# state = "PASS" if success else "FAILED"
dot_parts = (dotsym * max(0, nL - len(last) - len(first)))
# if self.show_progress_bar or True:
print(first + dot_parts, end="", file=file)
# else:
# print(dot_parts, end="", file=self.cc.file)
last += extra
# if tsecs >= 0.5:
# state += " (" + str(tsecs) + " seconds)"
print(last, file=file)
class UTextResult(unittest.TextTestResult):
nL = 80
number = -1 # HAcky way to set question number.
show_progress_bar = True
cc = None
def __init__(self, stream, descriptions, verbosity):
super().__init__(stream, descriptions, verbosity)
self.successes = []
def printErrors(self) -> None:
self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
def addError(self, test, err):
super(unittest.TextTestResult, self).addFailure(test, err)
self.cc_terminate(success=False)
def addFailure(self, test, err):
super(unittest.TextTestResult, self).addFailure(test, err)
self.cc_terminate(success=False)
def addSuccess(self, test: unittest.case.TestCase) -> None:
self.successes.append(test)
self.cc_terminate()
def cc_terminate(self, success=True):
if self.show_progress_bar or True:
tsecs = np.round(self.cc.terminate(), 2)
self.cc.file.flush()
ss = self.item_title_print
state = "PASS" if success else "FAILED"
dot_parts = ('.' * max(0, self.nL - len(state) - len(ss)))
if self.show_progress_bar or True:
print(self.item_title_print + dot_parts, end="", file=self.cc.file)
else:
print(dot_parts, end="", file=self.cc.file)
if tsecs >= 0.5:
state += " (" + str(tsecs) + " seconds)"
print(state, file=self.cc.file)
def startTest(self, test):
# j =self.testsRun
self.testsRun += 1
# item_title = self.getDescription(test)
item_title = test.shortDescription() # Better for printing (get from cache).
if item_title == None:
# For unittest framework where getDescription may return None.
item_title = self.getDescription(test)
self.item_title_print = " * q%i.%i) %s" % (UTextResult.number + 1, self.testsRun, item_title)
estimated_time = 10
if self.show_progress_bar or True:
self.cc = ActiveProgress(t=estimated_time, title=self.item_title_print, show_progress_bar=self.show_progress_bar, file=sys.stdout)
else:
print(self.item_title_print + ('.' * max(0, self.nL - 4 - len(self.item_title_print))), end="")
self._test = test
self._stdout = sys.stdout
sys.stdout = io.StringIO()
def stopTest(self, test):
sys.stdout = self._stdout
super().stopTest(test)
def _setupStdout(self):
if self._previousTestClass == None:
total_estimated_time = 1
if hasattr(self.__class__, 'q_title_print'):
q_title_print = self.__class__.q_title_print
else:
q_title_print = "<unnamed test. See unitgrade.py>"
cc = ActiveProgress(t=total_estimated_time, title=q_title_print, show_progress_bar=self.show_progress_bar)
self.cc = cc
def _restoreStdout(self): # Used when setting up the test.
if self._previousTestClass is None:
q_time = self.cc.terminate()
q_time = np.round(q_time, 2)
sys.stdout.flush()
if self.show_progress_bar:
print(self.cc.title, end="")
print(" " * max(0, self.nL - len(self.cc.title)) + (" (" + str(q_time) + " seconds)" if q_time >= 0.5 else ""))
class UTextTestRunner(unittest.TextTestRunner):
def __init__(self, *args, **kwargs):
stream = io.StringIO()
super().__init__(*args, stream=stream, **kwargs)
def _makeResult(self):
# stream = self.stream # not you!
stream = sys.stdout
stream = _WritelnDecorator(stream)
return self.resultclass(stream, self.descriptions, self.verbosity)
def cache(foo, typed=False):
""" Magic cache wrapper
https://github.com/python/cpython/blob/main/Lib/functools.py
"""
maxsize = None
def wrapper(self, *args, **kwargs):
key = (self.cache_id(), ("@cache", foo.__name__, _make_key(args, kwargs, typed)))
if not self._cache_contains(key):
value = foo(self, *args, **kwargs)
self._cache_put(key, value)
else:
value = self._cache_get(key)
return value
return wrapper
def get_hints(ss):
if ss == None:
return None
try:
ss = textwrap.dedent(ss)
ss = ss.replace('''"""''', "").strip()
hints = ["hints:", ]
j = np.argmax([ss.lower().find(h) for h in hints])
h = hints[j]
ss = ss[ss.find(h) + len(h) + 1:]
ss = "\n".join([l for l in ss.split("\n") if not l.strip().startswith(":")])
ss = textwrap.dedent(ss)
ss = ss.strip()
return ss
except Exception as e:
print("bad hints", ss, e)
class UTestCase(unittest.TestCase):
_outcome = None # A dictionary which stores the user-computed outcomes of all the tests. This differs from the cache.
_cache = None # Read-only cache. Ensures method always produce same result.
_cache2 = None # User-written cache.
_with_coverage = False
_report = None # The report used. This is very, very hacky and should always be None. Don't rely on it!
def capture(self):
if hasattr(self, '_stdout') and self._stdout is not None:
file = self._stdout
else:
# self._stdout = sys.stdout
# sys._stdout = io.StringIO()
file = sys.stdout
return Capturing2(stdout=file)
@classmethod
def question_title(cls):
""" Return the question title """
return cls.__doc__.strip().splitlines()[0].strip() if cls.__doc__ is not None else cls.__qualname__
@classmethod
def reset(cls):
print("Warning, I am not sure UTestCase.reset() is needed anymore and it seems very hacky.")
cls._outcome = None
cls._cache = None
cls._cache2 = None
def _callSetUp(self):
if self._with_coverage:
if not hasattr(self._report, 'covcache'):
self._report.covcache = {}
import coverage
self.cov = coverage.Coverage()
self.cov.start()
self.setUp()
def _callTearDown(self):
self.tearDown()
if self._with_coverage:
from pathlib import Path
from snipper import snipper
self.cov.stop()
data = self.cov.get_data()
base, _, _ = self._report._import_base_relative()
for file in data.measured_files():
file = os.path.normpath(file)
root = Path(base)
child = Path(file)
if root in child.parents:
with open(child, 'r') as f:
s = f.read()
lines = s.splitlines()
garb = 'GARBAGE'
lines2 = snipper.censor_code(lines, keep=True)
assert len(lines) == len(lines2)
for l in data.contexts_by_lineno(file):
if lines2[l].strip() == garb:
if self.cache_id() not in self._report.covcache:
self._report.covcache[self.cache_id()] = {}
rel = os.path.relpath(child, root)
cc = self._report.covcache[self.cache_id()]
j = 0
for j in range(l, -1, -1):
if "def" in lines2[j] or "class" in lines2[j]:
break
from snipper.snipper import gcoms
fun = lines2[j]
comments, _ = gcoms("\n".join(lines2[j:l]))
if rel not in cc:
cc[rel] = {}
cc[rel][fun] = (l, "\n".join(comments))
self._cache_put((self.cache_id(), 'coverage'), self._report.covcache)
def shortDescriptionStandard(self):
sd = super().shortDescription()
if sd is None:
sd = self._testMethodName
return sd
def shortDescription(self):
sd = self.shortDescriptionStandard()
title = self._cache_get((self.cache_id(), 'title'), sd)
return title if title is not None else sd
@property
def title(self):
return self.shortDescription()
@title.setter
def title(self, value):
self._cache_put((self.cache_id(), 'title'), value)
def _get_outcome(self):
if not (self.__class__, '_outcome') or self.__class__._outcome is None:
self.__class__._outcome = {}
return self.__class__._outcome
def _callTestMethod(self, testMethod):
t = time.time()
self._ensure_cache_exists() # Make sure cache is there.
if self._testMethodDoc is not None:
self._cache_put((self.cache_id(), 'title'), self.shortDescriptionStandard())
self._cache2[(self.cache_id(), 'assert')] = {}
res = testMethod()
elapsed = time.time() - t
self._get_outcome()[self.cache_id()] = res
self._cache_put((self.cache_id(), "time"), elapsed)
def cache_id(self):
c = self.__class__.__qualname__
m = self._testMethodName
return c, m
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._load_cache()
self._assert_cache_index = 0
def _ensure_cache_exists(self):
if not hasattr(self.__class__, '_cache') or self.__class__._cache == None:
self.__class__._cache = dict()
if not hasattr(self.__class__, '_cache2') or self.__class__._cache2 == None:
self.__class__._cache2 = dict()
def _cache_get(self, key, default=None):
self._ensure_cache_exists()
return self.__class__._cache.get(key, default)
def _cache_put(self, key, value):
self._ensure_cache_exists()
self.__class__._cache2[key] = value
def _cache_contains(self, key):
self._ensure_cache_exists()
return key in self.__class__._cache
def wrap_assert(self, assert_fun, first, *args, **kwargs):
# sys.stdout = self._stdout
key = (self.cache_id(), 'assert')
if not self._cache_contains(key):
print("Warning, framework missing", key)
self.__class__._cache[
key] = {} # A new dict. We manually insert it because we have to use that the dict is mutable.
cache = self._cache_get(key)
id = self._assert_cache_index
if not id in cache:
print("Warning, framework missing cache index", key, "id =", id)
_expected = cache.get(id, f"Key {id} not found in cache; framework files missing. Please run deploy()")
# The order of these calls is important. If the method assert fails, we should still store the correct result in cache.
cache[id] = first
self._cache_put(key, cache)
self._assert_cache_index += 1
assert_fun(first, _expected, *args, **kwargs)
def assertEqualC(self, first: Any, msg: Any = ...) -> None:
self.wrap_assert(self.assertEqual, first, msg)
def _cache_file(self):
return os.path.dirname(inspect.getfile(self.__class__)) + "/unitgrade/" + self.__class__.__name__ + ".pkl"
def _save_cache(self):
# get the class name (i.e. what to save to).
cfile = self._cache_file()
if not os.path.isdir(os.path.dirname(cfile)):
os.makedirs(os.path.dirname(cfile))
if hasattr(self.__class__, '_cache2'):
with open(cfile, 'wb') as f:
pickle.dump(self.__class__._cache2, f)
# But you can also set cache explicitly.
def _load_cache(self):
if self._cache is not None: # Cache already loaded. We will not load it twice.
return
# raise Exception("Loaded cache which was already set. What is going on?!")
cfile = self._cache_file()
if os.path.exists(cfile):
try:
with open(cfile, 'rb') as f:
data = pickle.load(f)
self.__class__._cache = data
except Exception as e:
print("Bad cache", cfile)
print(e)
else:
print("Warning! data file not found", cfile)
def _feedErrorsToResult(self, result, errors):
""" Use this to show hints on test failure. """
if not isinstance(result, UTextResult):
er = [e for e, v in errors if v != None]
if len(er) > 0:
hints = []
key = (self.cache_id(), 'coverage')
if self._cache_contains(key):
CC = self._cache_get(key)
for id in CC:
if id == self.cache_id():
cl, m = id
gprint(f"> An error occured while solving: {cl}.{m}. The files/methods you need to edit are:") # For the test {id} in {file} you should edit:")
for file in CC[id]:
rec = CC[id][file]
gprint(f"> * {file}")
for l in rec:
_, comments = CC[id][file][l]
hint = get_hints(comments)
if hint != None:
hints.append(hint)
gprint(f"> - {l}")
er = er[0]
doc = er._testMethodDoc
if doc is not None:
hint = get_hints(er._testMethodDoc)
if hint is not None:
hints = [hint] + hints
if len(hints) > 0:
gprint("> Hints:")
gprint(textwrap.indent("\n".join(hints), "> "))
super()._feedErrorsToResult(result, errors)
def startTestRun(self):
# print("asdfsdaf 11", file=sys.stderr)
super().startTestRun()
# print("asdfsdaf")
def _callTestMethod(self, method):
# print("asdfsdaf")
super()._callTestMethod(method)
def hide(func):
return func
def makeRegisteringDecorator(foreignDecorator):
"""
Returns a copy of foreignDecorator, which is identical in every
way(*), except also appends a .decorator property to the callable it
spits out.
"""
def newDecorator(func):
# Call to newDecorator(method)
# Exactly like old decorator, but output keeps track of what decorated it
R = foreignDecorator(func) # apply foreignDecorator, like call to foreignDecorator(method) would have done
R.decorator = newDecorator # keep track of decorator
# R.original = func # might as well keep track of everything!
return R
newDecorator.__name__ = foreignDecorator.__name__
newDecorator.__doc__ = foreignDecorator.__doc__
return newDecorator
hide = makeRegisteringDecorator(hide)
def methodsWithDecorator(cls, decorator):
"""
Returns all methods in CLS with DECORATOR as the
outermost decorator.
DECORATOR must be a "registering decorator"; one
can make any decorator "registering" via the
makeRegisteringDecorator function.
import inspect
ls = list(methodsWithDecorator(GeneratorQuestion, deco))
for f in ls:
print(inspect.getsourcelines(f) ) # How to get all hidden questions.
"""
for maybeDecorated in cls.__dict__.values():
if hasattr(maybeDecorated, 'decorator'):
if maybeDecorated.decorator == decorator:
print(maybeDecorated)
yield maybeDecorated
# 817