Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
1 result

Target

Select target project
  • pmag/topupopt
1 result
Select Git revision
  • master
1 result
Show changes
Showing
with 10178 additions and 24354 deletions
......@@ -14,38 +14,35 @@ from statistics import mean
# *****************************************************************************
# *****************************************************************************
def generate_pseudo_unique_key(key_list: tuple,
max_iterations: int = 10) -> str:
def generate_pseudo_unique_key(key_list: tuple, max_iterations: int = 10) -> str:
"""Generates a pseudo-unique key that is not among a given list of keys."""
iteration = 0
while iteration < max_iterations:
new_key = str(uuid.uuid4())
if new_key not in key_list:
return new_key
iteration += 1
raise Exception(
'A unique key could not be found within '
+
str(max_iterations)
+
' iterations.'
"A unique key could not be found within " + str(max_iterations) + " iterations."
)
# *****************************************************************************
# *****************************************************************************
def discrete_sinusoid_matching_integral(
integration_result: float,
time_interval_durations: list,
min_to_max_ratio: float,
phase_shift_radians: float = None) -> list:
phase_shift_radians: float = None,
) -> list:
"""
Returns a profile that approximates a sinusoidal function in discrete time.
......@@ -98,39 +95,33 @@ def discrete_sinusoid_matching_integral(
alpha = 2 * math.pi / integration_period
if phase_shift_radians is None:
beta = 0
else:
beta = phase_shift_radians
t = [sum(time_interval_durations[0:i])
for i in range(len(time_interval_durations)+1)]
t = [
sum(time_interval_durations[0:i])
for i in range(len(time_interval_durations) + 1)
]
def _integral(a, b, alpha, beta, t, t0):
return (
-(a / alpha) * math.cos(alpha * t + beta)
+
b*(t-t0)
+
(a/alpha)*math.cos(alpha*t0+beta)
+ b * (t - t0)
+ (a / alpha) * math.cos(alpha * t0 + beta)
)
return [
_integral(a, b, alpha, beta, t[i+1], t[i])
for i in range(number_time_steps)
_integral(a, b, alpha, beta, t[i + 1], t[i]) for i in range(number_time_steps)
]
# *****************************************************************************
# *****************************************************************************
def synch_profile(
profile: list,
reference_profile: list,
synch: bool = True) -> list:
def synch_profile(profile: list, reference_profile: list, synch: bool = True) -> list:
"""
Rearranges a profile based on the samples of a reference profile.
......@@ -167,14 +158,11 @@ def synch_profile(
# 3rd lowest ref >> 3rd highest unsorted
if synch:
# regular synch
sorted_profile = sorted((p, i) for i, p in enumerate(profile))
sorted_ref_profile = sorted(
(r,i) for i, r in enumerate(reference_profile)
)
sorted_ref_profile = sorted((r, i) for i, r in enumerate(reference_profile))
return [
sorted_profile[sorted_ref_profile.index((r, i))][0]
......@@ -182,32 +170,29 @@ def synch_profile(
]
else:
# reverse synch
sorted_profile = sorted(
((p,i) for i, p in enumerate(profile)),
reverse=True
)
sorted_profile = sorted(((p, i) for i, p in enumerate(profile)), reverse=True)
sorted_ref_profile = sorted(
(r,i) for i, r in enumerate(reference_profile)
)
sorted_ref_profile = sorted((r, i) for i, r in enumerate(reference_profile))
return [
sorted_profile[sorted_ref_profile.index((r, i))][0]
for i, r in enumerate(reference_profile)
]
# *****************************************************************************
# *****************************************************************************
def create_profile_using_time_weighted_state(
integration_result: float,
avg_state: list,
time_interval_durations: list,
min_to_max_ratio: float,
state_correlates_with_output: bool = True) -> list:
state_correlates_with_output: bool = True,
) -> list:
"""
Returns a profile that approximates a sinusoidal function in discrete time.
......@@ -262,32 +247,32 @@ def create_profile_using_time_weighted_state(
"""
if len(avg_state) != len(time_interval_durations):
raise ValueError('The inputs are inconsistent.')
raise ValueError("The inputs are inconsistent.")
period = sum(time_interval_durations)
avg_time_interval_duration = mean(time_interval_durations)
avg_state_weighted = [
(x_k*delta_k/avg_time_interval_duration
if state_correlates_with_output else
-x_k*delta_k/avg_time_interval_duration)
(
x_k * delta_k / avg_time_interval_duration
if state_correlates_with_output
else -x_k * delta_k / avg_time_interval_duration
)
for delta_k, x_k in zip(time_interval_durations, avg_state)
]
# find the peak
_sorted = sorted(
((state,index) for index, state in enumerate(avg_state_weighted)),
reverse=True
((state, index) for index, state in enumerate(avg_state_weighted)), reverse=True
)
# create new list for time durations starting with that of the peak
swapped_time_durations = [
*time_interval_durations[_sorted[0][1] :],
*time_interval_durations[0:_sorted[0][1]]
*time_interval_durations[0 : _sorted[0][1]],
]
# create sinusoidal profile based on that peak
......@@ -298,23 +283,25 @@ def create_profile_using_time_weighted_state(
min_to_max_ratio=min_to_max_ratio,
phase_shift_radians=(
math.pi / 2
-
0.5*(time_interval_durations[_sorted[0][1]]/period)*2*math.pi
)
- 0.5 * (time_interval_durations[_sorted[0][1]] / period) * 2 * math.pi
),
)
# return profile in correct order
n = len(time_interval_durations)
return [*new_profile[n - _sorted[0][1] :], *new_profile[0 : n - _sorted[0][1]]]
# *****************************************************************************
# *****************************************************************************
def max_min_sinusoidal_profile(
integration_result: float or int,
period: float or int,
time_interval_duration: float or int,
min_to_max_ratio: float) -> tuple:
min_to_max_ratio: float,
) -> tuple:
"""
Returns the maximum and minimum amount for a given time interval, according
to a sinusoidal function of time.
......@@ -364,8 +351,9 @@ def max_min_sinusoidal_profile(
return (
b * time_interval_duration + amplitude,
b*time_interval_duration-amplitude
b * time_interval_duration - amplitude,
)
# *****************************************************************************
# *****************************************************************************
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -8,11 +8,11 @@ from numbers import Real
# TODO: change name to ResourceTariff
class ResourcePrice:
"""A class for piece-wise linear resource prices in network problems."""
def __init__(self, prices: list or int, volumes: list = None):
# how do we keep the size of the object as small as possible
# if the tariff is time-invariant, how can information be stored?
# - a flag
......@@ -25,11 +25,8 @@ class ResourcePrice:
# 2) price as a list, volume as None
# - all elements in the list need to be numeric and non-negative
(self._number_segments,
self.prices,
self.volumes) = self.validate_list(
prices,
volumes
(self._number_segments, self.prices, self.volumes) = self.validate_list(
prices, volumes
)
self._volume_invariant = self.is_volume_invariant()
......@@ -44,14 +41,11 @@ class ResourcePrice:
"""Validates the inputs provided in list format."""
if type(volumes) == list:
# ensure prices are also provided as a list
if type(prices) != list:
raise TypeError(
'The prices need to be provided as a list, if volumes are'+
' too.'
"The prices need to be provided as a list, if volumes are" + " too."
)
# prices and volumes are lists
......@@ -59,15 +53,11 @@ class ResourcePrice:
number_segments = len(volumes)
if number_segments != len(prices):
raise ValueError(
'The number of prices and volumes are inconsistent.'
)
raise ValueError("The number of prices and volumes are inconsistent.")
# check the elements
for segment_index in range(number_segments):
price = prices[segment_index]
volume = volumes[segment_index]
......@@ -75,57 +65,43 @@ class ResourcePrice:
# price
try:
if price < 0:
raise ValueError(
'The prices provided were not all positive.'
)
raise ValueError("The prices provided were not all positive.")
except TypeError:
raise TypeError(
'The prices were not all provided as numeric types.'
"The prices were not all provided as numeric types."
)
# volume
if type(volume) == type(None):
# if None, move to the next iteration
if segment_index == number_segments - 1:
# the last segment is None: unlimited volume
continue
else:
# intermediate segment:
raise ValueError(
'The intermediate segments cannot have volume '+
'limits.'
"The intermediate segments cannot have volume " + "limits."
)
else:
# if not None, make sure it is positive
try:
if volume <= 0:
raise ValueError(
'The volumes provided were not all positive.'
"The volumes provided were not all positive."
)
except TypeError:
raise TypeError(
'The volumes were not all provided as numeric '+
'types.'
"The volumes were not all provided as numeric " + "types."
)
# done
......@@ -133,21 +109,13 @@ class ResourcePrice:
return number_segments, prices, volumes
elif type(volumes) == type(None) or isinstance(volumes, Real):
# the prices must be numeric and positive
if not isinstance(prices, Real):
raise TypeError(
'The prices were not all provided as numeric types.'
)
raise TypeError("The prices were not all provided as numeric types.")
if prices < 0:
raise ValueError(
'The prices provided were not all positive.'
)
raise ValueError("The prices provided were not all positive.")
# the number of segments must be 1
......@@ -158,30 +126,25 @@ class ResourcePrice:
return number_segments, [prices], [volumes]
else:
raise TypeError('Unrecognised type for volumes.')
raise TypeError("Unrecognised type for volumes.")
# *************************************************************************
# *************************************************************************
def price_monotonically_increasing_with_volume(self) -> bool:
# check if we are to focus on one specific time interval or not
# if there is only one segment, return false
if self._number_segments == 1:
return True
# check one interval:
for segment_index in range(self._number_segments - 1):
# check consecutive segments
if self.prices[segment_index] > self.prices[segment_index + 1]:
# prices decreased
return False
......@@ -194,23 +157,19 @@ class ResourcePrice:
# *************************************************************************
def price_monotonically_decreasing_with_volume(self) -> bool:
# check if we are to focus on one specific time interval or not
# if there is only one segment, return false
if self._number_segments == 1:
return True
# check one interval:
for segment_index in range(self._number_segments - 1):
# check consecutive segments
if self.prices[segment_index] < self.prices[segment_index + 1]:
# prices increased
return False
......@@ -223,7 +182,6 @@ class ResourcePrice:
# *************************************************************************
def is_volume_capped(self) -> bool:
return not (type(self.volumes[-1]) == type(None))
# *************************************************************************
......@@ -269,9 +227,26 @@ class ResourcePrice:
# *************************************************************************
# *************************************************************************
def __eq__(self, o) -> bool:
"""Returns True if a given ResourcePrice is equivalent to another."""
return self.is_equivalent(o)
def __hash__(self):
return hash(
tuple((
self.number_segments(),
tuple(self.prices),
tuple(self.volumes)
))
)
# *************************************************************************
# *************************************************************************
# *****************************************************************************
# *****************************************************************************
# TODO: method to determine if qpk-keyed dict is time-invariant per q and p
def are_prices_time_invariant(resource_prices_qpk: dict) -> bool:
"""Returns True if all prices are identical per time interval."""
# check if there is only one or no (q,p,k) entry
......@@ -280,8 +255,7 @@ def are_prices_time_invariant(resource_prices_qpk: dict) -> bool:
# check if the entries for the same period and assessment are time invariant
entries_qp = set([qpk[0:2] for qpk in resource_prices_qpk])
qpk_qp = {
qp: [qpk for qpk in resource_prices_qpk if qp == qpk[0:2]]
for qp in entries_qp
qp: [qpk for qpk in resource_prices_qpk if qp == qpk[0:2]] for qp in entries_qp
}
# check if the tariffs per period and assessment are equivalent
for qp, qpk_list in qpk_qp.items():
......@@ -293,5 +267,6 @@ def are_prices_time_invariant(resource_prices_qpk: dict) -> bool:
# all tariffs are equivalent per period and assessment: they are invariant
return True
# *****************************************************************************
# *****************************************************************************
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.