Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_data_utils.py 24.46 KiB
# imports

# standard
import random
import math
from statistics import mean

# local, internal
from src.topupopt.data.misc import utils

class TestDataUtils:

    def test_profile_synching2(self):
        integration_result = 10446
        ratio_min_avg = 0.2
        min_max_ratio = ratio_min_avg / (2 - ratio_min_avg)

        states = [
            2.66,
            2.34,
            3.54,
            7.42,
            11.72,
            16.94,
            17.94,
            17.98,
            14.1,
            10.48,
            6.74,
            3.16,
        ]

        time_interval_durations = [
            31,  # jan
            28,  # fev
            31,  # mar
            30,  # apr
            31,  # may
            30,  # june
            31,  # july
            31,  # august
            30,  # september
            31,  # october
            30,  # november
            31,  # december
        ]

        # *********************************************************************
        # *********************************************************************

        # state correlates with output

        new_profile = utils.create_profile_using_time_weighted_state(
            integration_result=integration_result,
            states=states,
            time_interval_durations=time_interval_durations,
            min_max_ratio=min_max_ratio,
            states_correlate_profile=False,
        )

        expected_result = [
            1500.0513102636057,
            1436.2189684321309,
            1500.0513102636044,
            1206.6051909345115,
            896.2493366213225,
            525.7218723351705,
            283.3475134825171,
            185.83058429361876,
            270.8127439165165,
            538.2566419011698,
            861.4985340132666,
            1241.355993542566,
        ]

        abs_tol = 1e-3

        assert math.isclose(sum(new_profile), integration_result, abs_tol=abs_tol)

        for sample, expected_sample in zip(new_profile, expected_result):
            assert math.isclose(sample, expected_sample, abs_tol=abs_tol)

        # *********************************************************************
        # *********************************************************************

        # state does not correlate with output

        # state correlates with output

        new_profile = utils.create_profile_using_time_weighted_state(
            integration_result=integration_result,
            states=states,
            time_interval_durations=time_interval_durations,
            min_max_ratio=min_max_ratio,
            states_correlate_profile=True,
        )

        expected_result = [
            274.3377308322865,
            166.45500417060902,
            274.33773083228533,
            510.54549399699533,
            878.1397044745678,
            1191.4288125963367,
            1491.041527613374,
            1588.5584568022714,
            1446.3379410149894,
            1236.132399194721,
            855.6521509182398,
            533.0330475533233,
        ]

        abs_tol = 1e-3

        assert math.isclose(sum(new_profile), integration_result, abs_tol=abs_tol)

        for sample, expected_sample in zip(new_profile, expected_result):
            assert math.isclose(sample, expected_sample, abs_tol=abs_tol)

        # *********************************************************************
        # *********************************************************************

        # find out the peaks of the sinusoidal profile

        pmax, pmin = utils.max_min_sinusoidal_profile(
            integration_result=integration_result,
            period=sum(time_interval_durations),
            time_interval_duration=mean(time_interval_durations),
            min_max_ratio=min_max_ratio,
        )

        expected_pmax, expected_pmin = 1558.972133279683, 182.02786672031687

        assert math.isclose(pmax, expected_pmax, abs_tol=1e-3)

        assert math.isclose(pmin, expected_pmin, abs_tol=1e-3)

        # *********************************************************************
        # *********************************************************************

        # raise exception

        error_triggered = False
        time_interval_durations.pop(0)
        try:
            new_profile = utils.create_profile_using_time_weighted_state(
                integration_result=integration_result,
                states=states,
                time_interval_durations=time_interval_durations,
                min_max_ratio=min_max_ratio,
                states_correlate_profile=True,
            )
        except ValueError:
            error_triggered = True
        assert error_triggered

        # *********************************************************************
        # *********************************************************************

    # *************************************************************************
    # *************************************************************************

    def test_profile_synching(self):
        # synch, normal, ex1

        profile = [1, 2, 3, 4]
        reference_profile = [2, 3, 4, 1]
        synched_profile = utils.synch_profile(profile, reference_profile)
        true_synched_profile = [2, 3, 4, 1]
        assert repr(synched_profile) == repr(true_synched_profile)

        # synch, normal, ex2

        profile = [-2, -1, 1, 2, 0]
        reference_profile = [2, 3, 4, 1, 5]
        synched_profile = utils.synch_profile(profile, reference_profile)
        true_synched_profile = [-1, 0, 1, -2, 2]
        assert repr(synched_profile) == repr(true_synched_profile)

        # synch, alternative, ex1

        profile = [1, 2, 3, 4]
        reference_profile = [2, 3, 4, 1]
        synched_profile = utils.synch_profile(profile, reference_profile, synch=False)
        true_synched_profile = [3, 2, 1, 4]
        assert repr(synched_profile) == repr(true_synched_profile)

    # *************************************************************************
    # *************************************************************************

    def test_profile_generation(self):
        # *********************************************************************

        # fixed time interval durations

        number_tests = 10

        for test_index in range(number_tests):
            integration_period = 365 * 24 * 3600

            number_intervals = random.randint(1, 8760)

            phase_shift_radians = 2 * math.pi * random.random()

            time_interval_durations = [
                round(integration_period / number_intervals)
                for i in range(number_intervals)
            ]

            integration_result = 100

            min_max_ratio = 0.2

            profile = utils.discrete_sinusoid_matching_integral(
                integration_result,
                time_interval_durations,
                min_max_ratio,
                phase_shift_radians=phase_shift_radians,
            )

            assert math.isclose(sum(profile), integration_result, abs_tol=0.01)

        # *********************************************************************

        # import matplotlib.pyplot as plt

        # # Data for plotting
        # x = [i for i in range(number_intervals)]
        # y = profile

        # fig, ax = plt.subplots()
        # ax.plot(x, y)

        # ax.set(xlabel='time (s)', ylabel='voltage (mV)',
        #         title='About as simple as it gets, folks')
        # ax.grid()

        # #fig.savefig("test.png")
        # plt.show()

        # *********************************************************************

        # variable time step durations

        number_tests = 10

        for test_index in range(number_tests):
            number_intervals = random.randint(10, 8760)

            time_interval_durations = [
                random.random() * 3.6e3 for i in range(number_intervals)
            ]

            integration_period = sum(time_interval_durations)

            phase_shift_radians = 2 * math.pi * random.random()

            integration_result = 100

            min_max_ratio = 0.2

            profile = utils.discrete_sinusoid_matching_integral(
                integration_result,
                time_interval_durations,
                min_max_ratio,
                phase_shift_radians=phase_shift_radians,
            )

            assert math.isclose(sum(profile), integration_result, abs_tol=0.01)

        # *********************************************************************

        # # import matplotlib.pyplot as plt

        # t = [sum(time_interval_durations[0:i])
        #      for i in range(len(time_interval_durations)+1)]

        # # Data for plotting
        # x = [(t[i+1]+t[i])*0.5
        #      for i in range(number_intervals)] # time interval's center point
        # y = profile

        # fig, ax = plt.subplots()
        # ax.plot(x, y)

        # ax.set(xlabel='time (s)', ylabel='voltage (mV)',
        #         title='About as simple as it gets, folks')
        # ax.grid()

        # #fig.savefig("test.png")
        # plt.show()

        # *********************************************************************

        # use the default phase shift

        integration_period = 365 * 24 * 3600

        number_intervals = random.randint(1, 8760)

        time_interval_durations = [
            round(integration_period / number_intervals)
            for i in range(number_intervals)
        ]

        integration_result = 100

        min_max_ratio = 0.2

        profile = utils.discrete_sinusoid_matching_integral(
            integration_result, time_interval_durations, min_max_ratio
        )

        assert math.isclose(sum(profile), integration_result, abs_tol=0.01)

    # *************************************************************************
    # *************************************************************************

    def test_key_generation(self):
        # generate_pseudo_unique_key

        key_list = (str(random.random()) for i in range(10))

        new_key = utils.generate_pseudo_unique_key(key_list=key_list)

        assert new_key not in key_list

        # use an empty key list

        new_key = utils.generate_pseudo_unique_key(key_list=[])

        assert new_key not in key_list

        # use zero iterations to force an error

        error_triggered = False
        try:
            new_key = utils.generate_pseudo_unique_key(
                key_list=key_list, max_iterations=0
            )
        except Exception:
            error_triggered = True
        assert error_triggered

        # use a seed number to trigger more iterations

        import uuid

        rand = random.Random()
        rand.seed(360)
        uuid.uuid4 = lambda: uuid.UUID(int=rand.getrandbits(128), version=4)

        key_list = [
            "3e225573-4e78-48c8-bb08-efbeeb795c22",
            "f6d30428-15d1-41e9-a952-0742eaaa5a31",
            "8c29b906-2518-41c5-ada8-07b83508b5b8",
            "f9a72a39-1422-4a02-af97-906ce79c32a3",
            "b6941a48-10cc-465d-bf53-178bd2939bd1",
        ]

        new_key = utils.generate_pseudo_unique_key(key_list=key_list)

        assert new_key not in key_list

    # *************************************************************************
    # *************************************************************************
    
    def test_state_correlated_profile(self):
        
        # correlation: direct, inverse
        # states: positive, negative
        # time intervals: regular irregular
        # 
        
        # profile with positive correlation, positive states, regular intervals
        number_time_intervals = 10
        states = [i+1 for i in range(number_time_intervals)]
        integration_result = 100
        time_interval_durations = [10 for i in range(number_time_intervals)]
        states_correlate_profile = True
        min_max_ratio = 0.2
        
        profile, a, b = utils.generate_state_correlated_profile(
            integration_result=integration_result, 
            states=states, 
            time_interval_durations=time_interval_durations, 
            states_correlate_profile=states_correlate_profile, 
            min_max_ratio=min_max_ratio, 
            solver='glpk'
            )
        
        # test profile 
        assert a > 0 and b > 0
        assert len(profile) == number_time_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
        assert math.isclose(min(profile), max(profile)*min_max_ratio, abs_tol=1e-3)
        assert max(profile) == profile[number_time_intervals-1]
        
        # profile with inverse correlation, positive states, regular intervals
        number_time_intervals = 10
        states = [i+1 for i in range(number_time_intervals)]
        integration_result = 100
        time_interval_durations = [10 for i in range(number_time_intervals)]
        states_correlate_profile = False
        min_max_ratio = 0.2
        
        profile, a, b = utils.generate_state_correlated_profile(
            integration_result=integration_result, 
            states=states, 
            time_interval_durations=time_interval_durations, 
            states_correlate_profile=states_correlate_profile, 
            min_max_ratio=min_max_ratio, 
            solver='glpk'
            )
        
        # test profile 
        assert a < 0 and b > 0
        assert len(profile) == number_time_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
        assert math.isclose(min(profile), max(profile)*min_max_ratio, abs_tol=1e-3)
        assert min(profile) == profile[number_time_intervals-1]
        

    # *************************************************************************
    # *************************************************************************
        
    def test_trigger_state_correlated_profile_error(self):
        
        # trigger an error
        number_time_intervals = 10
        states = [i+1 for i in range(number_time_intervals)]
        integration_result = 100
        time_interval_durations = [10 for i in range(number_time_intervals+1)]
        states_correlate_profile = True
        min_max_ratio = 0.2
        
        error_raised = False
        try:
            utils.generate_state_correlated_profile(
                integration_result=integration_result, 
                states=states, 
                time_interval_durations=time_interval_durations, 
                states_correlate_profile=states_correlate_profile, 
                min_max_ratio=min_max_ratio, 
                solver='glpk'
                )
        except ValueError:
            error_raised = True
        assert error_raised
    
    # *************************************************************************
    # *************************************************************************
    
    def test_manual_state_correlated_profile(self):
        
        # correlation: direct, inverse
        # states: positive, negative
        # time intervals: regular irregular
        
        # profile with positive correlation, positive states, regular intervals
        number_time_intervals = 10
        states = [i+1 for i in range(number_time_intervals)]
        integration_result = 100
        time_interval_durations = [10 for i in range(number_time_intervals)]
        deviation_gain = 1
        
        profile = utils.generate_manual_state_correlated_profile(
            integration_result=integration_result, 
            states=states, 
            time_interval_durations=time_interval_durations, 
            deviation_gain=deviation_gain
            )
        
        # test profile 
        assert len(profile) == number_time_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
        assert max(profile) == profile[number_time_intervals-1]
        
        # profile with inverse correlation, positive states, regular intervals
        number_time_intervals = 10
        states = [i+1 for i in range(number_time_intervals)]
        integration_result = 100
        time_interval_durations = [10 for i in range(number_time_intervals)]
        deviation_gain = -1
        
        profile = utils.generate_manual_state_correlated_profile(
            integration_result=integration_result, 
            states=states, 
            time_interval_durations=time_interval_durations, 
            deviation_gain=deviation_gain
            )
        
        # test profile 
        assert len(profile) == number_time_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
        assert min(profile) == profile[number_time_intervals-1]

    # *************************************************************************
    # *************************************************************************
        
    def test_trigger_manual_state_correlated_profile_error(self):
        
        # trigger an error
        number_time_intervals = 10
        states = [i+1 for i in range(number_time_intervals)]
        integration_result = 100
        time_interval_durations = [10 for i in range(number_time_intervals+1)]
        deviation_gain = -1
        
        error_raised = False
        try:
            utils.generate_manual_state_correlated_profile(
                integration_result=integration_result, 
                states=states, 
                time_interval_durations=time_interval_durations, 
                deviation_gain=deviation_gain
                )
        except ValueError:
            error_raised = True
        assert error_raised

    # *************************************************************************
    # *************************************************************************
    
    def test_create_profile_sinusoidal(self):
        
        number_intervals = 10
        integration_result = 100
        min_max_ratio = 0.25
        
        # sinusoidal profile
        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            min_max_ratio=min_max_ratio, 
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
        
        # sinusoidal profile with phase shift
        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            min_max_ratio=min_max_ratio, 
            phase_shift_radians=math.pi/2
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
        
        # use incorrect parameter
        
        error_raised = False
        try:
            profile = utils.generate_profile(
                integration_result=integration_result, 
                time_interval_durations=[1 for i in range(number_intervals)], 
                min_max_ratio=min_max_ratio, 
                deviation_gain=-1,
                )
        except TypeError:
            error_raised = True
        assert error_raised

    # *************************************************************************
    # *************************************************************************
        
    def test_create_profile_predefined_gain(self):
        
        number_intervals = 10
        integration_result = 100
        deviation_gain = 5
        states = [number_intervals-i*0.5 for i in range(number_intervals)]
        
        # predefined gain
        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            states=states, 
            deviation_gain=deviation_gain
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)      
        
        # predefined gain, opposite sign
        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            states=states, 
            deviation_gain=-deviation_gain
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)    
        
        # use incorrect parameter
        
        error_raised = False
        try:
            profile = utils.generate_profile(
                integration_result=integration_result, 
                time_interval_durations=[1 for i in range(number_intervals)], 
                states=states, 
                deviation_gain=-deviation_gain,
                phase_shift_radians=math.pi
                )
        except TypeError:
            error_raised = True
        assert error_raised

    # *************************************************************************
    # *************************************************************************
        
    def test_create_profile_via_sorting_sinusoid(self):
        
        number_intervals = 10
        integration_result = 100
        states_correlate_profile = True
        min_max_ratio = 0.25
        states = [number_intervals-i*0.5 for i in range(number_intervals)]
        
        # sorting and sinusoidal function        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            min_max_ratio=min_max_ratio, 
            states=states, 
            states_correlate_profile=states_correlate_profile, 
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)    
        

    # *************************************************************************
    # *************************************************************************
        
    def test_create_profile_via_optimisation(self):
        
        number_intervals = 10
        integration_result = 100
        states_correlate_profile = True
        min_max_ratio = 0.25
        solver = 'glpk'
        states = [number_intervals-i*0.5 for i in range(number_intervals)]
        
        # optimisation
        
        # states_correlate_profile is necessary
        # min_max_ratio is necessary
        # solver is necessary
        # states matter but the gain must be determined
        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            min_max_ratio=min_max_ratio, 
            states=states, 
            states_correlate_profile=states_correlate_profile, 
            solver=solver
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)    
        assert math.isclose(min(profile),max(profile)*min_max_ratio, abs_tol=1e-3)
        
        # optimisation but with states that do no warrant it
        states = [5 for i in range(number_intervals)]
        
        profile = utils.generate_profile(
            integration_result=integration_result, 
            time_interval_durations=[1 for i in range(number_intervals)], 
            min_max_ratio=min_max_ratio, 
            states=states, 
            states_correlate_profile=states_correlate_profile, 
            solver=solver
            )
        
        assert len(profile) == number_intervals
        assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)   
        # the min to max ratio cannot be observed if the states do not change
        assert math.isclose(min(profile), max(profile), abs_tol=1e-3)
        
        # use incorrect parameter
        error_raised = False
        try:
            profile = utils.generate_profile(
                integration_result=integration_result, 
                time_interval_durations=[1 for i in range(number_intervals)], 
                min_max_ratio=min_max_ratio, 
                states=states, 
                states_correlate_profile=states_correlate_profile, 
                solver=solver,
                phase_shift_radians=math.pi
                )
        except TypeError:
            error_raised = True
        assert error_raised
        
    # *************************************************************************
    # *************************************************************************

# *****************************************************************************
# *****************************************************************************