diff --git a/src/topupopt/data/misc/utils.py b/src/topupopt/data/misc/utils.py index 4855b627865718948127ffd0e5a980510c6a1a58..5f15ab0a895dbb65071bbfd61ec82a184c6230b4 100644 --- a/src/topupopt/data/misc/utils.py +++ b/src/topupopt/data/misc/utils.py @@ -2,14 +2,12 @@ # ***************************************************************************** # standard - import uuid - import math - from statistics import mean # local, external +import pyomo.environ as pyo # ***************************************************************************** # ***************************************************************************** @@ -127,7 +125,7 @@ def synch_profile(profile: list, reference_profile: list, synch: bool = True) -> By default, the profiles are synched: the highest sample in one is placed in the same position as the highest sample in the other; the second highest - sample is placede in the same position as the second highest sample in the + sample is placed in the same position as the second highest sample in the other profile; and so on. Alternatively, the profiles can be synched in reverse: the highest sample in one profile is placed in the same position as the lowest sample in the other; and so on and so forth. @@ -291,6 +289,228 @@ def create_profile_using_time_weighted_state( n = len(time_interval_durations) return [*new_profile[n - _sorted[0][1] :], *new_profile[0 : n - _sorted[0][1]]] +# ***************************************************************************** +# ***************************************************************************** + +def generate_manual_state_correlated_profile( + integration_result: float, + states: list, + time_interval_durations: list, + deviation_gain: float +) -> list: + """ + Returns a profile matching a given integral and varying according to a + sequence of time intervals and the respective mean states. + + The profile for interval i is defined as follows: + + P[i] = (dt[i]/dt_mean)*( (x[i]-x_mean)*gain + offset) + + where: + + dt[i] is the time interval duration for interval i + + dt_mean is the mean time interval duration + + x[i] is the state for interval i + + x_mean is the mean state for the entire profile + + The offset is defined as the integration result divided by the number + time intervals, whereas the gain is user-defined and real-valued. + + Parameters + ---------- + integration_result : float + The result of integrating the sinusoidal function for one period. + states : list + The average state during each time interval. + time_interval_durations : list + The time interval durations for each sample. + deviation_gain : float + DESCRIPTION. + + Raises + ------ + ValueError + This error is raised if the list inputs do not have the same size. + + Returns + ------- + list + A profile matching the aforementioned characteristics. + + """ + + if len(states) != len(time_interval_durations): + raise ValueError("The inputs are inconsistent.") + + dt_total = sum(time_interval_durations) + dt_mean = mean(time_interval_durations) + # x_mean = mean(states) + x_mean = sum( + deltat_k*x_k + for deltat_k, x_k in zip(time_interval_durations, states) + )/dt_total + beta = integration_result/len(states) + return [ + ((x_k-x_mean)*deviation_gain+beta )* deltat_k / dt_mean + for deltat_k, x_k in zip(time_interval_durations, states) + ] + +# ***************************************************************************** +# ***************************************************************************** + +def generate_state_correlated_profile( + integration_result: float, + states: list, + time_interval_durations: list, + states_correlate_profile: bool, + min_max_ratio: float, + solver: str +) -> list: + """ + Returns a profile observing a number of conditions. + + The profile must correlate with a set of states averaged over certain time + intervals, whose durations may be irregular. Integration of the profile + over all time intervals must also match a certain value. Finally, the peaks + must be related by a factor between 0 and 1. + + This method relies on linear programming. An LP solver must be used. + + Parameters + ---------- + integration_result : float + The result of integrating the sinusoidal function for one period. + states : list + The average state during each time interval. + time_interval_durations : list + The duration of each time interval. + states_correlate_profile : bool + If True, higher state values must lead to higher profile values. + If False, lower state values must lead to higher profile values. + min_max_ratio : float + The ratio between the minimum and the maximum profile values. + solver : str + The name of the LP solver to use, according to Pyomo conventions. + + Raises + ------ + ValueError + This error is raised if the list inputs do not have the same size. + + Returns + ------- + list + A profile matching the aforementioned characteristics. + + """ + + # ************************************************************************* + # ************************************************************************* + + # internal model + + def model(states_correlate_profile: bool) -> pyo.AbstractModel: + + # abstract model + model = pyo.AbstractModel() + + # sets + model.I = pyo.Set() + + # decision variables + model.P_i = pyo.Var(model.I, domain=pyo.NonNegativeReals) + model.P_max = pyo.Var(domain=pyo.NonNegativeReals) + model.P_min = pyo.Var(domain=pyo.NonNegativeReals) + if states_correlate_profile: + model.alpha = pyo.Var(domain=pyo.PositiveReals) + else: + model.alpha = pyo.Var(domain=pyo.NegativeReals) + + # parameters + model.param_R = pyo.Param() + model.param_VI = pyo.Param() + model.param_X_i = pyo.Param(model.I) + model.param_Y_i = pyo.Param(model.I) + + def obj_f(m): + if states_correlate_profile: + return m.alpha # if positive + else: + return -m.alpha # if negative + # model.OBJ = pyo.Objective(rule=obj_f) + model.OBJ = pyo.Objective(rule=obj_f, sense=pyo.maximize) + + # integral + def constr_integral_rule(m): + return sum(m.P_i[i] for i in m.I) == m.param_VI + model.constr_integral = pyo.Constraint(rule=constr_integral_rule) + + # profile equations + def constr_profile_equations_rule(m,i): + return m.P_i[i] - m.param_X_i[i]*m.alpha == m.param_Y_i[i] + model.constr_profile_equations = pyo.Constraint(model.I, rule=constr_profile_equations_rule) + + # upper bound + def constr_max_upper_bound_rule(m,i): + return m.P_i[i] <= m.P_max + model.constr_max_upper_bound = pyo.Constraint(model.I, rule=constr_max_upper_bound_rule) + + # lower bound + def constr_max_lower_bound_rule(m,i): + return m.P_i[i] >= m.P_min + model.constr_max_lower_bound = pyo.Constraint(model.I, rule=constr_max_lower_bound_rule) + + # ratio + def constr_min_max_rule(m,i): + return m.P_min == m.P_max*m.param_R + model.constr_min_max = pyo.Constraint(rule=constr_min_max_rule) + + return model + + number_time_steps = len(time_interval_durations) + if len(states) != number_time_steps: + raise ValueError("The inputs are inconsistent.") + + # ************************************************************************* + # ************************************************************************* + + + dt_total = sum(time_interval_durations) + dt_mean = mean(time_interval_durations) + # x_mean = mean(states) + x_mean = sum( + deltat_k*x_k + for deltat_k, x_k in zip(time_interval_durations, states) + )/dt_total + beta = integration_result/number_time_steps + f = [dt_k/dt_mean for dt_k in time_interval_durations] + set_I = tuple(i for i in range(number_time_steps)) + + # create a dictionary with the data (using pyomo conventions) + data_dict = { + None: { + # sets + "I": {None: set_I}, + # parameters + "param_VI": {None: integration_result}, + "param_R": {None: min_max_ratio}, + "param_X_i": {i:f[i]*(states[i]-x_mean) for i in set_I}, + "param_Y_i": {i:f[i]*beta for i in set_I}, + } + } + + # ************************************************************************* + # ************************************************************************* + + opt = pyo.SolverFactory(solver) + fit_model = model(states_correlate_profile=states_correlate_profile) + problem = fit_model.create_instance(data=data_dict) + opt.solve(problem, tee=False) + # return profile + return [pyo.value(problem.P_i[i]) for i in problem.I] # ***************************************************************************** # ***************************************************************************** diff --git a/tests/test_data_utils.py b/tests/test_data_utils.py index 2ddfbf0d01b79860a322ab8582bdb089c6fdc887..09d87af33c2727cddf6e865d30334248e7c32392 100644 --- a/tests/test_data_utils.py +++ b/tests/test_data_utils.py @@ -372,9 +372,164 @@ class TestDataUtils: assert new_key not in key_list - # ************************************************************************** - # ************************************************************************** + # ************************************************************************* + # ************************************************************************* + + def test_state_correlated_profile(self): + + # correlation: direct, inverse + # states: positive, negative + # time intervals: regular irregular + # + + # profile with positive correlation, positive states, regular intervals + number_time_intervals = 10 + states = [i+1 for i in range(number_time_intervals)] + integration_result = 100 + time_interval_durations = [10 for i in range(number_time_intervals)] + states_correlate_profile = True + min_max_ratio = 0.2 + + profile = utils.generate_state_correlated_profile( + integration_result=integration_result, + states=states, + time_interval_durations=time_interval_durations, + states_correlate_profile=states_correlate_profile, + min_max_ratio=min_max_ratio, + solver='glpk' + ) + + # test profile + assert len(profile) == number_time_intervals + assert math.isclose(sum(profile), integration_result, abs_tol=1e-3) + assert math.isclose(min(profile), max(profile)*min_max_ratio, abs_tol=1e-3) + assert max(profile) == profile[number_time_intervals-1] + + # profile with inverse correlation, positive states, regular intervals + number_time_intervals = 10 + states = [i+1 for i in range(number_time_intervals)] + integration_result = 100 + time_interval_durations = [10 for i in range(number_time_intervals)] + states_correlate_profile = False + min_max_ratio = 0.2 + + profile = utils.generate_state_correlated_profile( + integration_result=integration_result, + states=states, + time_interval_durations=time_interval_durations, + states_correlate_profile=states_correlate_profile, + min_max_ratio=min_max_ratio, + solver='glpk' + ) + + # test profile + assert len(profile) == number_time_intervals + assert math.isclose(sum(profile), integration_result, abs_tol=1e-3) + assert math.isclose(min(profile), max(profile)*min_max_ratio, abs_tol=1e-3) + assert min(profile) == profile[number_time_intervals-1] + + + # ************************************************************************* + # ************************************************************************* + + def test_trigger_state_correlated_profile_error(self): + + # trigger an error + number_time_intervals = 10 + states = [i+1 for i in range(number_time_intervals)] + integration_result = 100 + time_interval_durations = [10 for i in range(number_time_intervals+1)] + states_correlate_profile = True + min_max_ratio = 0.2 + + error_raised = False + try: + utils.generate_state_correlated_profile( + integration_result=integration_result, + states=states, + time_interval_durations=time_interval_durations, + states_correlate_profile=states_correlate_profile, + min_max_ratio=min_max_ratio, + solver='glpk' + ) + except ValueError: + error_raised = True + assert error_raised + + # ************************************************************************* + # ************************************************************************* + + def test_manual_state_correlated_profile(self): + + # correlation: direct, inverse + # states: positive, negative + # time intervals: regular irregular + # + + # profile with positive correlation, positive states, regular intervals + number_time_intervals = 10 + states = [i+1 for i in range(number_time_intervals)] + integration_result = 100 + time_interval_durations = [10 for i in range(number_time_intervals)] + deviation_gain = 1 + + profile = utils.generate_manual_state_correlated_profile( + integration_result=integration_result, + states=states, + time_interval_durations=time_interval_durations, + deviation_gain=deviation_gain + ) + + # test profile + assert len(profile) == number_time_intervals + assert math.isclose(sum(profile), integration_result, abs_tol=1e-3) + assert max(profile) == profile[number_time_intervals-1] + + # profile with inverse correlation, positive states, regular intervals + number_time_intervals = 10 + states = [i+1 for i in range(number_time_intervals)] + integration_result = 100 + time_interval_durations = [10 for i in range(number_time_intervals)] + deviation_gain = -1 + + profile = utils.generate_manual_state_correlated_profile( + integration_result=integration_result, + states=states, + time_interval_durations=time_interval_durations, + deviation_gain=deviation_gain + ) + + # test profile + assert len(profile) == number_time_intervals + assert math.isclose(sum(profile), integration_result, abs_tol=1e-3) + assert min(profile) == profile[number_time_intervals-1] + + # ************************************************************************* + # ************************************************************************* + + def test_trigger_manual_state_correlated_profile_error(self): + + # trigger an error + number_time_intervals = 10 + states = [i+1 for i in range(number_time_intervals)] + integration_result = 100 + time_interval_durations = [10 for i in range(number_time_intervals+1)] + deviation_gain = -1 + + error_raised = False + try: + utils.generate_manual_state_correlated_profile( + integration_result=integration_result, + states=states, + time_interval_durations=time_interval_durations, + deviation_gain=deviation_gain + ) + except ValueError: + error_raised = True + assert error_raised + # ************************************************************************* + # ************************************************************************* -# ****************************************************************************** -# ****************************************************************************** +# ***************************************************************************** +# *****************************************************************************