Newer
Older
# imports
# standard
import math
from statistics import mean
# local
# import numpy as np
# import networkx as nx
import pyomo.environ as pyo
# import src.topupopt.problems.esipp.utils as utils
from src.topupopt.data.misc.utils import generate_pseudo_unique_key
from src.topupopt.problems.esipp.problem import InfrastructurePlanningProblem
from src.topupopt.problems.esipp.network import Arcs, Network
from src.topupopt.problems.esipp.resource import ResourcePrice
from src.topupopt.problems.esipp.problem import simplify_peak_total_problem
from src.topupopt.problems.esipp.problem import is_peak_total_problem
from src.topupopt.problems.esipp.time import TimeFrame
# *****************************************************************************
# *****************************************************************************
def build_solve_ipp(
self,
solver: str = "glpk",
solver_options: dict = None,
use_sos_arcs: bool = False,
arc_sos_weight_key: str = (InfrastructurePlanningProblem.SOS1_ARC_WEIGHTS_NONE),
arc_use_real_variables_if_possible: bool = False,
use_sos_sense: bool = False,
sense_sos_weight_key: int = (
InfrastructurePlanningProblem.SOS1_SENSE_WEIGHT_NOMINAL_HIGHER
),
sense_use_real_variables_if_possible: bool = False,
sense_use_arc_interfaces: bool = False,
perform_analysis: bool = False,
plot_results: bool = False,
print_solver_output: bool = False,
irregular_time_intervals: bool = False,
networks: dict = None,
number_intraperiod_time_intervals: int = 4,
static_losses_mode=None,
mandatory_arcs: list = None,
max_number_parallel_arcs: dict = None,
arc_groups_dict: dict = None,
init_aux_sets: bool = False,
discount_rates: dict = None,
time_intervals: dict = None,
assessment_weights: dict = None,
simplify_problem: bool = False,
):
reporting_period_duration = 365 * 24 * 3600
if type(discount_rates) != dict:
if type(assessment_weights) != dict:
# if type(reporting_periods) != dict:
# reporting_periods = {0: (0, 1)}
if type(converters) != dict:
converters = {}
# time intervals
if type(time_intervals) != dict:
if irregular_time_intervals:
time_step_max_relative_variation = 0.25
intraperiod_time_interval_duration = [
(reporting_period_duration / number_intraperiod_time_intervals)
* (
1
+ (k / (number_intraperiod_time_intervals - 1) - 0.5)
* time_step_max_relative_variation
)
for k in range(number_intraperiod_time_intervals)
]
else:
intraperiod_time_interval_duration = [
reporting_period_duration / number_intraperiod_time_intervals
for k in range(number_intraperiod_time_intervals)
]
# average time interval duration
average_time_interval_duration = round(
mean(intraperiod_time_interval_duration)
)
time_intervals = {0: tuple(dt for dt in intraperiod_time_interval_duration)}
# time weights
# relative weight of time period
# one interval twice as long as the average is worth twice
# one interval half as long as the average is worth half
# time_weights = [
# [time_period_duration/average_time_interval_duration
# for time_period_duration in intraperiod_time_interval_duration]
# for p in range(number_periods)]
time_weights = None # nothing yet
normalised_time_interval_duration = None # nothing yet
# create problem object
ipp = InfrastructurePlanningProblem(
reporting_periods=time_frame.reporting_periods,
time_intervals=time_intervals,
time_weights=time_weights,
normalised_time_interval_duration=normalised_time_interval_duration,
# add networks and systems
for netkey, net in networks.items():
ipp.add_network(network_key=netkey, network=net)
# add converters
for cvtkey, cvt in converters.items():
ipp.add_converter(converter_key=cvtkey, converter=cvt)
# define arcs as mandatory
if type(mandatory_arcs) == list:
for full_arc_key in mandatory_arcs:
ipp.make_arc_mandatory(full_arc_key[0], full_arc_key[1:])
# if make_all_arcs_mandatory:
# for network_key in ipp.networks:
# for arc_key in ipp.networks[network_key].edges(keys=True):
# # preexisting arcs are no good
# if ipp.networks[network_key].edges[arc_key][
# Network.KEY_ARC_TECH].has_been_selected():
# ipp.make_arc_mandatory(network_key, arc_key)
# set up the use of sos for arc selection
if use_sos_arcs:
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if (
ipp.networks[network_key]
.edges[arc_key][Network.KEY_ARC_TECH]
.has_been_selected()
):
continue
ipp.use_sos1_for_arc_selection(
arc_key,
use_real_variables_if_possible=(
arc_use_real_variables_if_possible
),
sos1_weight_method=arc_sos_weight_key,
)
# set up the use of sos for flow sense determination
if use_sos_sense:
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if not ipp.networks[network_key].edges[arc_key][
continue
ipp.use_sos1_for_flow_senses(
arc_key,
use_real_variables_if_possible=(
sense_use_real_variables_if_possible
use_interface_variables=sense_use_arc_interfaces,
sos1_weight_method=sense_sos_weight_key,
)
elif sense_use_arc_interfaces: # set up the use of arc interfaces w/o sos1
for network_key in ipp.networks:
for arc_key in ipp.networks[network_key].edges(keys=True):
if (
ipp.networks[network_key]
.edges[arc_key][Network.KEY_ARC_TECH]
.has_been_selected()
):
continue
ipp.use_interface_variables_for_arc_selection(network_key, arc_key)
# static losses
if static_losses_mode == ipp.STATIC_LOSS_MODE_ARR:
ipp.place_static_losses_arrival_node()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_DEP:
ipp.place_static_losses_departure_node()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_US:
ipp.place_static_losses_upstream()
elif static_losses_mode == ipp.STATIC_LOSS_MODE_DS:
ipp.place_static_losses_downstream()
else:
raise ValueError("Unknown static loss modelling mode.")
# *********************************************************************
# groups
if type(arc_groups_dict) != type(None):
for key in arc_groups_dict:
ipp.create_arc_group(arc_groups_dict[key])
# *********************************************************************
# maximum number of parallel arcs
for key in max_number_parallel_arcs:
ipp.set_maximum_number_parallel_arcs(
network_key=key[0],
node_a=key[1],
node_b=key[2],
limit=max_number_parallel_arcs[key],
)
# *********************************************************************
if simplify_problem:
ipp = simplify_peak_total_problem(ipp)
# *********************************************************************
# instantiate (disable the default case v-a-v fixed losses)
# ipp.instantiate(place_fixed_losses_upstream_if_possible=False)
ipp.instantiate(initialise_ancillary_sets=init_aux_sets)
# optimise
ipp.optimise(
solver_name=solver,
solver_options=solver_options,
output_options={},
print_solver_output=print_solver_output,
)
# return the problem object
return ipp
# *********************************************************************
# *********************************************************************
# *************************************************************************
# *************************************************************************
def test_single_network_single_arc_problem(self):
# scenario
q = 0
# time
number_intervals = 3
tf = TimeFrame(
reporting_periods={q: [0, 1]},
reporting_period_durations={q: [365 * 24 * 3600, 365 * 24 * 3600]},
time_intervals={q: [0, 1, 2]},
time_interval_durations={q: [1, 1, 1]},
)
# 2 nodes: one import, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
prices={
# (q, p, k): ResourcePrice(prices=1.0, volumes=None)
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
number_intraperiod_time_intervals=number_intervals,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 24
assert ipp.results["Problem"][0]["Number of variables"] == 22
assert ipp.results["Problem"][0]["Number of nonzeros"] == 49
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
1.0,
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
0.0,
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 2)]),
2.0,
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
2.0,
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# sdncf should be -5.7
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -5.7, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_single_network_single_arc_problem_simpler(self):
# scenario
q = 0
# time
number_intervals = 3
tf = TimeFrame(
reporting_periods={q: [0, 1]},
reporting_period_durations={q: [365 * 24 * 3600, 365 * 24 * 3600]},
time_intervals={q: [0, 1, 2]},
time_interval_durations={q: [1, 1, 1]},
)
# 2 nodes: one import, one regular
mynet = Network()
# node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
prices={
# (q, p, k): ResourcePrice(prices=1.0, volumes=None)
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=1.0, volumes=None)
for qpk in tf.qpk()
# other nodes
# node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A,
# base_flow=[0.5, 0.0, 1.0],
base_flow={(q, 0): 0.50, (q, 1): 0.00, (q, 2): 1.00},
)
# arc IA
arc_tech_IA = Arcs(
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
number_intraperiod_time_intervals=number_intervals,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
assert is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 20
assert ipp.results["Problem"][0]["Number of variables"] == 19
assert ipp.results["Problem"][0]["Number of nonzeros"] == 36
# *********************************************************************
# *********************************************************************
# validation
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
# the objective function should be -9.7
assert math.isclose(pyo.value(ipp.instance.obj_f), -9.7, abs_tol=1e-3)
Pedro L. Magalhães
committed
# TODO: create method to automate getting data from the command line
import io
import sys
from contextlib import redirect_stdout
Pedro L. Magalhães
committed
# print('wow wow wow')
# ipp.instance.constr_imp_flow_cost.pprint()
expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n"""
Pedro L. Magalhães
committed
cmd_output = io.StringIO()
sys.stdout = cmd_output
ipp.instance.constr_imp_flow_cost.pprint()
sys.stdout = sys.__stdout__
assert cmd_output.getvalue() == expected_string
Pedro L. Magalhães
committed
expected_string = """constr_exp_flow_revenue : Size=0, Index=constr_exp_flow_revenue_index, Active=True\n Key : Lower : Body : Upper : Active\n"""
f = io.StringIO()
with redirect_stdout(f):
ipp.instance.constr_exp_flow_revenue.pprint()
assert f.getvalue() == expected_string
Pedro L. Magalhães
committed
# try the whole model
# print('wow wow wow')
# ipp.instance.pprint()
# expected_string = """constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True\n Key : Lower : Body : Upper : Active\n ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True\n ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True\n"""
# cmd_output = io.StringIO()
# sys.stdout = cmd_output
# ipp.instance.pprint()
# sys.stdout = sys.__stdout__
# assert cmd_output.getvalue() == expected_string
# from contextlib import redirect_stdout
# import io
# ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
# f = io.StringIO()
# with redirect_stdout(f):
# # ipp.instance.pprint() # full model
# ipp.instance.constr_imp_flow_cost.pprint() # only one constraint
# expected_string = r"""constr_imp_flow_cost : Size=4, Index=constr_imp_flow_cost_index, Active=True
# Key : Lower : Body : Upper : Active
# ('mynet', 'thatimpnode', 'peak', 0, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,0,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'peak', 1, 0) : 0.0 : 0*var_if_glqpks[mynet,thatimpnode,peak,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,peak,1,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'total', 0, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,0,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,0,0] : 0.0 : True
# ('mynet', 'thatimpnode', 'total', 1, 0) : 0.0 : var_if_glqpks[mynet,thatimpnode,total,1,0,0] - var_ifc_glqpk[mynet,thatimpnode,total,1,0] : 0.0 : True
# """
# assert expected_string == f.getvalue()
# from contextlib import redirect_stdout
# import io
# f = io.StringIO()
# with redirect_stdout(f):
# print('foobar')
# print(12)
# 12+3
# print('Got stdout: "{0}"'.format(f.getvalue()))
# *************************************************************************
# *************************************************************************
def test_problem_increasing_imp_prices(self):
# scenario
q = 0
# time
number_intervals = 1
# periods
number_periods = 1
tf = TimeFrame(
reporting_periods={q: [0]},
reporting_period_durations={q: [365 * 24 * 3600]},
time_intervals={q: [0]},
time_interval_durations={q: [1]},
)
# 2 nodes: one import, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
# (q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
# for p in range(number_periods)
# for k in range(number_intervals)
qpk: ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
for qpk in tf.qpk()
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): 1.0})
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
number_intraperiod_time_intervals=number_intervals,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 10
assert ipp.results["Problem"][0]["Number of variables"] == 11
assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 4.0, abs_tol=1e-3)
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), -3.5, abs_tol=1e-3)
# the objective function should be -7.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -7.5, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_decreasing_exp_prices(self):
# scenario
q = 0
# time
number_intervals = 1
# periods
number_periods = 1
tf = TimeFrame(
reporting_periods={q: [0]},
reporting_period_durations={q: [365 * 24 * 3600]},
time_intervals={q: [0]},
time_interval_durations={q: [1]},
)
# 2 nodes: one export, one regular
mynet = Network()
node_EXP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_export_node(
(q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(node_key=node_A, base_flow={(q, 0): -1.0})
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_IA)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
number_intraperiod_time_intervals=number_intervals,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 10
assert ipp.results["Problem"][0]["Number of variables"] == 11
assert ipp.results["Problem"][0]["Number of nonzeros"] == 20
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the flows should be 1.0, 0.0 and 2.0
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
# arc amplitude should be two
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),
# capex should be four
assert math.isclose(pyo.value(ipp.instance.var_capex), 3.0, abs_tol=1e-3)
assert math.isclose(pyo.value(ipp.instance.var_sdncf_q[q]), 1.0, abs_tol=1e-3)
# the objective function should be -7.5
assert math.isclose(pyo.value(ipp.instance.obj_f), -2.0, abs_tol=1e-3)
# *************************************************************************
# *************************************************************************
def test_problem_increasing_imp_decreasing_exp_prices(self):
# scenario
q = 0
# time
number_intervals = 2
# periods
number_periods = 1
tf = TimeFrame(
reporting_periods={q: [0]},
reporting_period_durations={q: [365 * 24 * 3600]},
time_intervals={q: [0]},
time_interval_durations={q: [1]},
)
# 3 nodes: one import, one export, one regular
mynet = Network()
node_IMP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_import_node(
(q, p, k): ResourcePrice(prices=[1.0, 2.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# export node
node_EXP = generate_pseudo_unique_key(mynet.nodes())
mynet.add_export_node(
(q, p, k): ResourcePrice(prices=[2.0, 1.0], volumes=[0.5, None])
for p in range(number_periods)
for k in range(number_intervals)
# other nodes
node_A = generate_pseudo_unique_key(mynet.nodes())
mynet.add_source_sink_node(
node_key=node_A, base_flow={(q, 0): 1.0, (q, 1): -1.0}
)
# arc IA
arc_tech_IA = Arcs(
efficiency={(q, 0): 0.5, (q, 1): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_IMP, node_key_b=node_A, arcs=arc_tech_IA)
# arc AE
arc_tech_AE = Arcs(
efficiency={(q, 0): 0.5, (q, 1): 0.5},
efficiency_reverse=None,
static_loss=None,
capacity=[3],
minimum_cost=[2],
capacity_is_instantaneous=False,
validate=False,
)
mynet.add_directed_arc(node_key_a=node_A, node_key_b=node_EXP, arcs=arc_tech_AE)
# identify node types
mynet.identify_node_types()
# no sos, regular time intervals
ipp = self.build_solve_ipp(
# solver=solver,
solver_options={},
# use_sos_arcs=use_sos_arcs,
# arc_sos_weight_key=sos_weight_key,
# arc_use_real_variables_if_possible=use_real_variables_if_possible,
# use_sos_sense=use_sos_sense,
# sense_sos_weight_key=sense_sos_weight_key,
# sense_use_real_variables_if_possible=sense_use_real_variables_if_possible,
# sense_use_arc_interfaces=use_arc_interfaces,
perform_analysis=False,
print_solver_output=False,
# irregular_time_intervals=irregular_time_intervals,
number_intraperiod_time_intervals=number_intervals,
mandatory_arcs=[],
max_number_parallel_arcs={},
# init_aux_sets=init_aux_sets,
simplify_problem=False,
reporting_periods={0: (0,)},
assert not is_peak_total_problem(ipp)
assert ipp.results["Problem"][0]["Number of constraints"] == 23
assert ipp.results["Problem"][0]["Number of variables"] == 26
assert ipp.results["Problem"][0]["Number of nonzeros"] == 57
# *********************************************************************
# *********************************************************************
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_IMP, node_A, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# the arc should be installed since it is required for feasibility
assert (
True
in ipp.networks["mynet"]
.edges[(node_A, node_EXP, 0)][Network.KEY_ARC_TECH]
.options_selected
)
# interval 0: import only
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 0)]),
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 0)]),
# interval 1: export only
assert math.isclose(
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_IMP, node_A, 0, q, 1)]),
pyo.value(ipp.instance.var_v_glljqk[("mynet", node_A, node_EXP, 0, q, 1)]),
# IA amplitude
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_IMP, node_A, 0)]),
# AE amplitude
assert math.isclose(
pyo.value(ipp.instance.var_v_amp_gllj[("mynet", node_A, node_EXP, 0)]),