Skip to content
Snippets Groups Projects

Stats

Merged
pmagrequested to merge
stats into master
2 files
+ 383
8
Compare changes
  • Side-by-side
  • Inline

Files

+ 355
26
@@ -2,14 +2,11 @@
# *****************************************************************************
# standard
import uuid
import math
from statistics import mean
# local, external
import pyomo.environ as pyo
# *****************************************************************************
# *****************************************************************************
@@ -40,7 +37,7 @@ def generate_pseudo_unique_key(key_list: tuple, max_iterations: int = 10) -> str
def discrete_sinusoid_matching_integral(
integration_result: float,
time_interval_durations: list,
min_to_max_ratio: float,
min_max_ratio: float,
phase_shift_radians: float = None,
) -> list:
"""
@@ -57,7 +54,7 @@ def discrete_sinusoid_matching_integral(
where:
a = b*(1-min_to_max_ratio)/(1+min_to_max_ratio)
a = b*(1-min_max_ratio)/(1+min_max_ratio)
b = integration_result/integration_period
@@ -71,7 +68,7 @@ def discrete_sinusoid_matching_integral(
The result of integrating the sinusoidal function for one period.
time_interval_durations : list
The time interval durations for each sample.
min_to_max_ratio : float
min_max_ratio : float
The ratio between the minimum and maximum values of the function.
phase_shift_radians : float, optional
The phase shift for the sinusoidal function. The default is None, which
@@ -90,7 +87,7 @@ def discrete_sinusoid_matching_integral(
b = integration_result / integration_period
a = b * (1 - min_to_max_ratio) / (1 + min_to_max_ratio)
a = b * (1 - min_max_ratio) / (1 + min_max_ratio)
alpha = 2 * math.pi / integration_period
@@ -127,7 +124,7 @@ def synch_profile(profile: list, reference_profile: list, synch: bool = True) ->
By default, the profiles are synched: the highest sample in one is placed
in the same position as the highest sample in the other; the second highest
sample is placede in the same position as the second highest sample in the
sample is placed in the same position as the second highest sample in the
other profile; and so on. Alternatively, the profiles can be synched in
reverse: the highest sample in one profile is placed in the same position
as the lowest sample in the other; and so on and so forth.
@@ -188,10 +185,10 @@ def synch_profile(profile: list, reference_profile: list, synch: bool = True) ->
def create_profile_using_time_weighted_state(
integration_result: float,
avg_state: list,
states: list,
time_interval_durations: list,
min_to_max_ratio: float,
state_correlates_with_output: bool = True,
min_max_ratio: float,
states_correlate_profile: bool = True,
) -> list:
"""
Returns a profile that approximates a sinusoidal function in discrete time.
@@ -210,7 +207,7 @@ def create_profile_using_time_weighted_state(
where:
a = b*(1-min_to_max_ratio)/(1+min_to_max_ratio)
a = b*(1-min_max_ratio)/(1+min_max_ratio)
b = integration_result/integration_period
@@ -222,13 +219,13 @@ def create_profile_using_time_weighted_state(
----------
integration_result : float
The result of integrating the sinusoidal function for one period.
avg_state : list
states : list
The average state during each time interval.
time_interval_durations : list
The time interval durations for each sample.
min_to_max_ratio : float
min_max_ratio : float
The ratio between the minimum and maximum values of the function.
state_correlates_with_output : bool, optional
states_correlate_profile : bool, optional
If True, the peak should happen when the state is at its highest point.
If False, the peak should happen when the state is at its lowest point.
The default is True.
@@ -246,26 +243,26 @@ def create_profile_using_time_weighted_state(
"""
if len(avg_state) != len(time_interval_durations):
if len(states) != len(time_interval_durations):
raise ValueError("The inputs are inconsistent.")
period = sum(time_interval_durations)
avg_time_interval_duration = mean(time_interval_durations)
avg_state_weighted = [
states_weighted = [
(
x_k * delta_k / avg_time_interval_duration
if state_correlates_with_output
if states_correlate_profile
else -x_k * delta_k / avg_time_interval_duration
)
for delta_k, x_k in zip(time_interval_durations, avg_state)
for delta_k, x_k in zip(time_interval_durations, states)
]
# find the peak
_sorted = sorted(
((state, index) for index, state in enumerate(avg_state_weighted)), reverse=True
((state, index) for index, state in enumerate(states_weighted)), reverse=True
)
# create new list for time durations starting with that of the peak
@@ -280,7 +277,7 @@ def create_profile_using_time_weighted_state(
new_profile = discrete_sinusoid_matching_integral(
integration_result=integration_result,
time_interval_durations=swapped_time_durations,
min_to_max_ratio=min_to_max_ratio,
min_max_ratio=min_max_ratio,
phase_shift_radians=(
math.pi / 2
- 0.5 * (time_interval_durations[_sorted[0][1]] / period) * 2 * math.pi
@@ -291,6 +288,257 @@ def create_profile_using_time_weighted_state(
n = len(time_interval_durations)
return [*new_profile[n - _sorted[0][1] :], *new_profile[0 : n - _sorted[0][1]]]
# *****************************************************************************
# *****************************************************************************
def generate_manual_state_correlated_profile(
integration_result: float,
states: list,
time_interval_durations: list,
deviation_gain: float
) -> list:
"""
Returns a profile matching a given integral and varying according to a
sequence of time intervals and the respective mean states.
The profile for interval i is defined as follows:
P[i] = (dt[i]/dt_mean)*( (x[i]-x_mean)*gain + offset)
where:
dt[i] is the time interval duration for interval i
dt_mean is the mean time interval duration
x[i] is the state for interval i
x_mean is the mean state for the entire profile
The offset is defined as the integration result divided by the number
time intervals, whereas the gain is user-defined and real-valued.
Parameters
----------
integration_result : float
The result of integrating the sinusoidal function for one period.
states : list
The average state during each time interval.
time_interval_durations : list
The time interval durations for each sample.
deviation_gain : float
DESCRIPTION.
Raises
------
ValueError
This error is raised if the list inputs do not have the same size.
Returns
-------
list
A profile matching the aforementioned characteristics.
"""
if len(states) != len(time_interval_durations):
raise ValueError("The inputs are inconsistent.")
dt_total = sum(time_interval_durations)
dt_mean = mean(time_interval_durations)
# x_mean = mean(states)
x_mean = sum(
deltat_k*x_k
for deltat_k, x_k in zip(time_interval_durations, states)
)/dt_total
beta = integration_result/len(states)
return [
((x_k-x_mean)*deviation_gain+beta )* deltat_k / dt_mean
for deltat_k, x_k in zip(time_interval_durations, states)
]
# *****************************************************************************
# *****************************************************************************
def generate_state_correlated_profile(
integration_result: float,
states: list,
time_interval_durations: list,
states_correlate_profile: bool,
min_max_ratio: float,
solver: str
) -> tuple:
"""
Returns a profile observing a number of conditions.
The profile must correlate with a set of states averaged over certain time
intervals, whose durations may be irregular. Integration of the profile
over all time intervals must also match a certain value. Finally, the peaks
must be related by a factor between 0 and 1.
This method relies on linear programming. An LP solver must be used.
The profile for interval i is defined as follows:
P[i] = (dt[i]/dt_mean)*( (x[i]-x_mean)*gain + offset)
where:
dt[i] is the time interval duration for interval i
dt_mean is the mean time interval duration
x[i] is the state for interval i
x_mean is the mean state for the entire profile
The offset is defined as the integration result divided by the number
time intervals, whereas the gain is determined via optimisation.
Parameters
----------
integration_result : float
The result of integrating the sinusoidal function for one period.
states : list
The average state during each time interval.
time_interval_durations : list
The duration of each time interval.
states_correlate_profile : bool
If True, higher state values must lead to higher profile values.
If False, lower state values must lead to higher profile values.
min_max_ratio : float
The ratio between the minimum and the maximum profile values.
solver : str
The name of the LP solver to use, according to Pyomo conventions.
Raises
------
ValueError
This error is raised if the list inputs do not have the same size.
Returns
-------
tuple
A tuple containing the profile, the gain and the offset.
"""
# *************************************************************************
# *************************************************************************
# TODO: find alternative solution, as this is most likely overkill
# internal model
def model(states_correlate_profile: bool) -> pyo.AbstractModel:
# abstract model
model = pyo.AbstractModel()
# sets
model.I = pyo.Set()
# decision variables
model.P_i = pyo.Var(model.I, domain=pyo.NonNegativeReals)
model.P_max = pyo.Var(domain=pyo.NonNegativeReals)
model.P_min = pyo.Var(domain=pyo.NonNegativeReals)
if states_correlate_profile:
model.alpha = pyo.Var(domain=pyo.PositiveReals)
else:
model.alpha = pyo.Var(domain=pyo.NegativeReals)
# parameters
model.param_R = pyo.Param()
model.param_VI = pyo.Param()
model.param_X_i = pyo.Param(model.I)
model.param_Y_i = pyo.Param(model.I)
def obj_f(m):
if states_correlate_profile:
return m.alpha # if positive
else:
return -m.alpha # if negative
# model.OBJ = pyo.Objective(rule=obj_f)
model.OBJ = pyo.Objective(rule=obj_f, sense=pyo.maximize)
# integral
def constr_integral_rule(m):
return sum(m.P_i[i] for i in m.I) == m.param_VI
model.constr_integral = pyo.Constraint(rule=constr_integral_rule)
# profile equations
def constr_profile_equations_rule(m,i):
return m.P_i[i] - m.param_X_i[i]*m.alpha == m.param_Y_i[i]
model.constr_profile_equations = pyo.Constraint(model.I, rule=constr_profile_equations_rule)
# upper bound
def constr_max_upper_bound_rule(m,i):
return m.P_i[i] <= m.P_max
model.constr_max_upper_bound = pyo.Constraint(model.I, rule=constr_max_upper_bound_rule)
# lower bound
def constr_max_lower_bound_rule(m,i):
return m.P_i[i] >= m.P_min
model.constr_max_lower_bound = pyo.Constraint(model.I, rule=constr_max_lower_bound_rule)
# ratio
def constr_min_max_rule(m,i):
return m.P_min == m.P_max*m.param_R
model.constr_min_max = pyo.Constraint(rule=constr_min_max_rule)
return model
number_time_steps = len(time_interval_durations)
if len(states) != number_time_steps:
raise ValueError("The inputs are inconsistent.")
# *************************************************************************
# *************************************************************************
dt_total = sum(time_interval_durations)
dt_mean = mean(time_interval_durations)
x_mean = sum(
deltat_k*x_k
for deltat_k, x_k in zip(time_interval_durations, states)
)/dt_total
beta = integration_result/number_time_steps
f = [dt_k/dt_mean for dt_k in time_interval_durations]
set_I = tuple(i for i in range(number_time_steps))
if len(set(states)) == 1:
# alpha = 0, return trivial profile
return (
[fi*beta for fi in f],
0,
beta
)
# create a dictionary with the data (using pyomo conventions)
data_dict = {
None: {
# sets
"I": {None: set_I},
# parameters
"param_VI": {None: integration_result},
"param_R": {None: min_max_ratio},
"param_X_i": {i:f[i]*(states[i]-x_mean) for i in set_I},
"param_Y_i": {i:f[i]*beta for i in set_I},
}
}
# *************************************************************************
# *************************************************************************
opt = pyo.SolverFactory(solver)
fit_model = model(states_correlate_profile=states_correlate_profile)
problem = fit_model.create_instance(data=data_dict)
opt.solve(problem, tee=False)
# return profile
return (
[pyo.value(problem.P_i[i]) for i in problem.I],
pyo.value(problem.alpha),
beta
)
# *****************************************************************************
# *****************************************************************************
@@ -300,7 +548,7 @@ def max_min_sinusoidal_profile(
integration_result: float or int,
period: float or int,
time_interval_duration: float or int,
min_to_max_ratio: float,
min_max_ratio: float,
) -> tuple:
"""
Returns the maximum and minimum amount for a given time interval, according
@@ -317,7 +565,7 @@ def max_min_sinusoidal_profile(
where:
a = b*(1-min_to_max_ratio)/(1+min_to_max_ratio)
a = b*(1-min_max_ratio)/(1+min_max_ratio)
b = integration_result/integration_period
@@ -331,7 +579,7 @@ def max_min_sinusoidal_profile(
The result of integrating the sinusoidal function for one period.
time_interval_durations : list
The time interval durations for each sample.
min_to_max_ratio : float
min_max_ratio : float
The ratio between the minimum and maximum values of the function.
phase_shift_radians : float, optional
The phase shift for the sinusoidal function. The default is None, which
@@ -345,7 +593,7 @@ def max_min_sinusoidal_profile(
"""
b = integration_result / period
a = b * (1 - min_to_max_ratio) / (1 + min_to_max_ratio)
a = b * (1 - min_max_ratio) / (1 + min_max_ratio)
alpha = 2 * math.pi / period
amplitude = a * (2 / alpha) * math.sin(alpha * time_interval_duration / 2)
@@ -354,6 +602,87 @@ def max_min_sinusoidal_profile(
b * time_interval_duration - amplitude,
)
# *****************************************************************************
# *****************************************************************************
def generate_profile(
integration_result: float,
time_interval_durations: list,
**kwargs
) -> tuple:
"""
Returns a profile according to a variety of methods.
Parameters
----------
integration_result : float
The value which must be obtained by adding up all samples.
time_interval_durations : list
A list with the durations of each time interval.
**kwargs :
A sequence of key and value pairs for use in subsequent methods.
Returns
-------
numpy array
Returns the desired profile.
"""
# generate the profile
if 'states' not in kwargs:
# min_max_ratio is necessary
# phase_shift_radians is optional
# states play no role
# # remove unnecessary arguments
# if 'deviation_gain' in kwargs:
# kwargs.pop('deviation_gain')
# if 'solver' in kwargs:
# kwargs.pop('solver')
return discrete_sinusoid_matching_integral(
integration_result=integration_result,
time_interval_durations=time_interval_durations,
**kwargs
)
elif 'deviation_gain' not in kwargs:
# states matter but the gain must be determined
if 'solver' in kwargs:
# - states_correlate_profile is necessary
# - min_max_ratio is necessary
# - solver is necessary
return generate_state_correlated_profile(
integration_result=integration_result,
time_interval_durations=time_interval_durations,
**kwargs
)[0]
else:
# - states_correlate_profile is necessary
# - min_max_ratio is necessary
# - solver is not necessary
return create_profile_using_time_weighted_state(
integration_result=integration_result,
time_interval_durations=time_interval_durations,
**kwargs)
else:
# states matter and the gain is predefined
# states are necessary
# deviation gain is necessary
# # remove unnecessary arguments
# if 'phase_shift_radians' in kwargs:
# kwargs.pop('phase_shift_radians')
return generate_manual_state_correlated_profile(
integration_result=integration_result,
time_interval_durations=time_interval_durations,
**kwargs
)
# integration_result: float,
# states: list,
# time_interval_durations: list,
# min_max_ratio: float,
# states_correlate_profile: bool = True,
# *****************************************************************************
# *****************************************************************************
Loading