Skip to content
Snippets Groups Projects
Commit 5497c30d authored by Pedro L. Magalhães's avatar Pedro L. Magalhães
Browse files

Added new method for generating state-correlated profiles.

parent 62e6ad6d
No related branches found
No related tags found
1 merge request!6Stats
......@@ -2,14 +2,12 @@
# *****************************************************************************
# standard
import uuid
import math
from statistics import mean
# local, external
import pyomo.environ as pyo
# *****************************************************************************
# *****************************************************************************
......@@ -127,7 +125,7 @@ def synch_profile(profile: list, reference_profile: list, synch: bool = True) ->
By default, the profiles are synched: the highest sample in one is placed
in the same position as the highest sample in the other; the second highest
sample is placede in the same position as the second highest sample in the
sample is placed in the same position as the second highest sample in the
other profile; and so on. Alternatively, the profiles can be synched in
reverse: the highest sample in one profile is placed in the same position
as the lowest sample in the other; and so on and so forth.
......@@ -291,6 +289,228 @@ def create_profile_using_time_weighted_state(
n = len(time_interval_durations)
return [*new_profile[n - _sorted[0][1] :], *new_profile[0 : n - _sorted[0][1]]]
# *****************************************************************************
# *****************************************************************************
def generate_manual_state_correlated_profile(
integration_result: float,
states: list,
time_interval_durations: list,
deviation_gain: float
) -> list:
"""
Returns a profile matching a given integral and varying according to a
sequence of time intervals and the respective mean states.
The profile for interval i is defined as follows:
P[i] = (dt[i]/dt_mean)*( (x[i]-x_mean)*gain + offset)
where:
dt[i] is the time interval duration for interval i
dt_mean is the mean time interval duration
x[i] is the state for interval i
x_mean is the mean state for the entire profile
The offset is defined as the integration result divided by the number
time intervals, whereas the gain is user-defined and real-valued.
Parameters
----------
integration_result : float
The result of integrating the sinusoidal function for one period.
states : list
The average state during each time interval.
time_interval_durations : list
The time interval durations for each sample.
deviation_gain : float
DESCRIPTION.
Raises
------
ValueError
This error is raised if the list inputs do not have the same size.
Returns
-------
list
A profile matching the aforementioned characteristics.
"""
if len(states) != len(time_interval_durations):
raise ValueError("The inputs are inconsistent.")
dt_total = sum(time_interval_durations)
dt_mean = mean(time_interval_durations)
# x_mean = mean(states)
x_mean = sum(
deltat_k*x_k
for deltat_k, x_k in zip(time_interval_durations, states)
)/dt_total
beta = integration_result/len(states)
return [
((x_k-x_mean)*deviation_gain+beta )* deltat_k / dt_mean
for deltat_k, x_k in zip(time_interval_durations, states)
]
# *****************************************************************************
# *****************************************************************************
def generate_state_correlated_profile(
integration_result: float,
states: list,
time_interval_durations: list,
states_correlate_profile: bool,
min_max_ratio: float,
solver: str
) -> list:
"""
Returns a profile observing a number of conditions.
The profile must correlate with a set of states averaged over certain time
intervals, whose durations may be irregular. Integration of the profile
over all time intervals must also match a certain value. Finally, the peaks
must be related by a factor between 0 and 1.
This method relies on linear programming. An LP solver must be used.
Parameters
----------
integration_result : float
The result of integrating the sinusoidal function for one period.
states : list
The average state during each time interval.
time_interval_durations : list
The duration of each time interval.
states_correlate_profile : bool
If True, higher state values must lead to higher profile values.
If False, lower state values must lead to higher profile values.
min_max_ratio : float
The ratio between the minimum and the maximum profile values.
solver : str
The name of the LP solver to use, according to Pyomo conventions.
Raises
------
ValueError
This error is raised if the list inputs do not have the same size.
Returns
-------
list
A profile matching the aforementioned characteristics.
"""
# *************************************************************************
# *************************************************************************
# internal model
def model(states_correlate_profile: bool) -> pyo.AbstractModel:
# abstract model
model = pyo.AbstractModel()
# sets
model.I = pyo.Set()
# decision variables
model.P_i = pyo.Var(model.I, domain=pyo.NonNegativeReals)
model.P_max = pyo.Var(domain=pyo.NonNegativeReals)
model.P_min = pyo.Var(domain=pyo.NonNegativeReals)
if states_correlate_profile:
model.alpha = pyo.Var(domain=pyo.PositiveReals)
else:
model.alpha = pyo.Var(domain=pyo.NegativeReals)
# parameters
model.param_R = pyo.Param()
model.param_VI = pyo.Param()
model.param_X_i = pyo.Param(model.I)
model.param_Y_i = pyo.Param(model.I)
def obj_f(m):
if states_correlate_profile:
return m.alpha # if positive
else:
return -m.alpha # if negative
# model.OBJ = pyo.Objective(rule=obj_f)
model.OBJ = pyo.Objective(rule=obj_f, sense=pyo.maximize)
# integral
def constr_integral_rule(m):
return sum(m.P_i[i] for i in m.I) == m.param_VI
model.constr_integral = pyo.Constraint(rule=constr_integral_rule)
# profile equations
def constr_profile_equations_rule(m,i):
return m.P_i[i] - m.param_X_i[i]*m.alpha == m.param_Y_i[i]
model.constr_profile_equations = pyo.Constraint(model.I, rule=constr_profile_equations_rule)
# upper bound
def constr_max_upper_bound_rule(m,i):
return m.P_i[i] <= m.P_max
model.constr_max_upper_bound = pyo.Constraint(model.I, rule=constr_max_upper_bound_rule)
# lower bound
def constr_max_lower_bound_rule(m,i):
return m.P_i[i] >= m.P_min
model.constr_max_lower_bound = pyo.Constraint(model.I, rule=constr_max_lower_bound_rule)
# ratio
def constr_min_max_rule(m,i):
return m.P_min == m.P_max*m.param_R
model.constr_min_max = pyo.Constraint(rule=constr_min_max_rule)
return model
number_time_steps = len(time_interval_durations)
if len(states) != number_time_steps:
raise ValueError("The inputs are inconsistent.")
# *************************************************************************
# *************************************************************************
dt_total = sum(time_interval_durations)
dt_mean = mean(time_interval_durations)
# x_mean = mean(states)
x_mean = sum(
deltat_k*x_k
for deltat_k, x_k in zip(time_interval_durations, states)
)/dt_total
beta = integration_result/number_time_steps
f = [dt_k/dt_mean for dt_k in time_interval_durations]
set_I = tuple(i for i in range(number_time_steps))
# create a dictionary with the data (using pyomo conventions)
data_dict = {
None: {
# sets
"I": {None: set_I},
# parameters
"param_VI": {None: integration_result},
"param_R": {None: min_max_ratio},
"param_X_i": {i:f[i]*(states[i]-x_mean) for i in set_I},
"param_Y_i": {i:f[i]*beta for i in set_I},
}
}
# *************************************************************************
# *************************************************************************
opt = pyo.SolverFactory(solver)
fit_model = model(states_correlate_profile=states_correlate_profile)
problem = fit_model.create_instance(data=data_dict)
opt.solve(problem, tee=False)
# return profile
return [pyo.value(problem.P_i[i]) for i in problem.I]
# *****************************************************************************
# *****************************************************************************
......
......@@ -372,9 +372,164 @@ class TestDataUtils:
assert new_key not in key_list
# **************************************************************************
# **************************************************************************
# *************************************************************************
# *************************************************************************
def test_state_correlated_profile(self):
# ******************************************************************************
# ******************************************************************************
# correlation: direct, inverse
# states: positive, negative
# time intervals: regular irregular
#
# profile with positive correlation, positive states, regular intervals
number_time_intervals = 10
states = [i+1 for i in range(number_time_intervals)]
integration_result = 100
time_interval_durations = [10 for i in range(number_time_intervals)]
states_correlate_profile = True
min_max_ratio = 0.2
profile = utils.generate_state_correlated_profile(
integration_result=integration_result,
states=states,
time_interval_durations=time_interval_durations,
states_correlate_profile=states_correlate_profile,
min_max_ratio=min_max_ratio,
solver='glpk'
)
# test profile
assert len(profile) == number_time_intervals
assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
assert math.isclose(min(profile), max(profile)*min_max_ratio, abs_tol=1e-3)
assert max(profile) == profile[number_time_intervals-1]
# profile with inverse correlation, positive states, regular intervals
number_time_intervals = 10
states = [i+1 for i in range(number_time_intervals)]
integration_result = 100
time_interval_durations = [10 for i in range(number_time_intervals)]
states_correlate_profile = False
min_max_ratio = 0.2
profile = utils.generate_state_correlated_profile(
integration_result=integration_result,
states=states,
time_interval_durations=time_interval_durations,
states_correlate_profile=states_correlate_profile,
min_max_ratio=min_max_ratio,
solver='glpk'
)
# test profile
assert len(profile) == number_time_intervals
assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
assert math.isclose(min(profile), max(profile)*min_max_ratio, abs_tol=1e-3)
assert min(profile) == profile[number_time_intervals-1]
# *************************************************************************
# *************************************************************************
def test_trigger_state_correlated_profile_error(self):
# trigger an error
number_time_intervals = 10
states = [i+1 for i in range(number_time_intervals)]
integration_result = 100
time_interval_durations = [10 for i in range(number_time_intervals+1)]
states_correlate_profile = True
min_max_ratio = 0.2
error_raised = False
try:
utils.generate_state_correlated_profile(
integration_result=integration_result,
states=states,
time_interval_durations=time_interval_durations,
states_correlate_profile=states_correlate_profile,
min_max_ratio=min_max_ratio,
solver='glpk'
)
except ValueError:
error_raised = True
assert error_raised
# *************************************************************************
# *************************************************************************
def test_manual_state_correlated_profile(self):
# correlation: direct, inverse
# states: positive, negative
# time intervals: regular irregular
#
# profile with positive correlation, positive states, regular intervals
number_time_intervals = 10
states = [i+1 for i in range(number_time_intervals)]
integration_result = 100
time_interval_durations = [10 for i in range(number_time_intervals)]
deviation_gain = 1
profile = utils.generate_manual_state_correlated_profile(
integration_result=integration_result,
states=states,
time_interval_durations=time_interval_durations,
deviation_gain=deviation_gain
)
# test profile
assert len(profile) == number_time_intervals
assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
assert max(profile) == profile[number_time_intervals-1]
# profile with inverse correlation, positive states, regular intervals
number_time_intervals = 10
states = [i+1 for i in range(number_time_intervals)]
integration_result = 100
time_interval_durations = [10 for i in range(number_time_intervals)]
deviation_gain = -1
profile = utils.generate_manual_state_correlated_profile(
integration_result=integration_result,
states=states,
time_interval_durations=time_interval_durations,
deviation_gain=deviation_gain
)
# test profile
assert len(profile) == number_time_intervals
assert math.isclose(sum(profile), integration_result, abs_tol=1e-3)
assert min(profile) == profile[number_time_intervals-1]
# *************************************************************************
# *************************************************************************
def test_trigger_manual_state_correlated_profile_error(self):
# trigger an error
number_time_intervals = 10
states = [i+1 for i in range(number_time_intervals)]
integration_result = 100
time_interval_durations = [10 for i in range(number_time_intervals+1)]
deviation_gain = -1
error_raised = False
try:
utils.generate_manual_state_correlated_profile(
integration_result=integration_result,
states=states,
time_interval_durations=time_interval_durations,
deviation_gain=deviation_gain
)
except ValueError:
error_raised = True
assert error_raised
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment